You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by tm...@apache.org on 2015/02/10 21:10:52 UTC
[14/19] incubator-reef git commit: [REEF-136] Harmonize namespaces
and folder names in Org.Apache.REEF projects
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/ReefMessageProtoObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/ReefMessageProtoObserver.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/ReefMessageProtoObserver.cs
new file mode 100644
index 0000000..d1a9f8d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/ReefMessageProtoObserver.cs
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Globalization;
+using System.Threading;
+using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator
+{
+ public class ReefMessageProtoObserver :
+ IObserver<IRemoteMessage<REEFMessage>>,
+ IObservable<IRemoteMessage<REEFMessage>>,
+ IDisposable
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(ReefMessageProtoObserver));
+ private volatile IObserver<IRemoteMessage<REEFMessage>> _observer = null;
+ private long _count = 0;
+ private DateTime _begin;
+ private DateTime _origBegin;
+
+ public void OnCompleted()
+ {
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+
+ public void OnNext(IRemoteMessage<REEFMessage> value)
+ {
+ REEFMessage remoteEvent = value.Message;
+ IRemoteIdentifier id = value.Identifier;
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "receive a ReefMessage from {0} Driver at {1}.", remoteEvent, id));
+
+ if (remoteEvent.evaluatorControl != null)
+ {
+ if (remoteEvent.evaluatorControl.context_control != null)
+ {
+ string context_message = null;
+ string task_message = null;
+
+ if (remoteEvent.evaluatorControl.context_control.context_message != null)
+ {
+ context_message = remoteEvent.evaluatorControl.context_control.context_message.ToString();
+ }
+ if (remoteEvent.evaluatorControl.context_control.task_message != null)
+ {
+ task_message = ByteUtilities.ByteArrarysToString(remoteEvent.evaluatorControl.context_control.task_message);
+ }
+
+ if (!(string.IsNullOrEmpty(context_message) && string.IsNullOrEmpty(task_message)))
+ {
+ LOGGER.Log(Level.Info,
+ string.Format(CultureInfo.InvariantCulture, "Control protobuf with context message [{0}] and task message [{1}]", context_message, task_message));
+ }
+ else if (remoteEvent.evaluatorControl.context_control.remove_context != null)
+ {
+ LOGGER.Log(Level.Info,
+ string.Format(CultureInfo.InvariantCulture, "Control protobuf to remove context {0}", remoteEvent.evaluatorControl.context_control.remove_context.context_id));
+ }
+ else if (remoteEvent.evaluatorControl.context_control.add_context != null)
+ {
+ LOGGER.Log(Level.Info,
+ string.Format(CultureInfo.InvariantCulture, "Control protobuf to add a context on top of {0}", remoteEvent.evaluatorControl.context_control.add_context.parent_context_id));
+ }
+ else if (remoteEvent.evaluatorControl.context_control.start_task != null)
+ {
+ LOGGER.Log(Level.Info,
+ string.Format(CultureInfo.InvariantCulture, "Control protobuf to start an task in {0}", remoteEvent.evaluatorControl.context_control.start_task.context_id));
+ }
+ else if (remoteEvent.evaluatorControl.context_control.stop_task != null)
+ {
+ LOGGER.Log(Level.Info, "Control protobuf to stop task");
+ }
+ else if (remoteEvent.evaluatorControl.context_control.suspend_task != null)
+ {
+ LOGGER.Log(Level.Info, "Control protobuf to suspend task");
+ }
+ }
+ }
+ if (_count == 0)
+ {
+ _begin = DateTime.Now;
+ _origBegin = _begin;
+ }
+ var count = Interlocked.Increment(ref _count);
+
+ int printBatchSize = 100000;
+ if (count % printBatchSize == 0)
+ {
+ DateTime end = DateTime.Now;
+ var diff = (end - _begin).TotalMilliseconds;
+ double seconds = diff / 1000.0;
+ long eventsPerSecond = (long)(printBatchSize / seconds);
+ _begin = DateTime.Now;
+ }
+
+ var observer = _observer;
+ if (observer != null)
+ {
+ observer.OnNext(value);
+ }
+ }
+
+ public IDisposable Subscribe(IObserver<IRemoteMessage<REEFMessage>> observer)
+ {
+ if (_observer != null)
+ {
+ return null;
+ }
+ _observer = observer;
+ return this;
+ }
+
+ public void Dispose()
+ {
+ _observer = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/CloseEventImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/CloseEventImpl.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/CloseEventImpl.cs
new file mode 100644
index 0000000..39bda77
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/CloseEventImpl.cs
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
+{
+ public class CloseEventImpl : ICloseEvent
+ {
+ public CloseEventImpl()
+ {
+ Value = Optional<byte[]>.Empty();
+ }
+
+ public CloseEventImpl(byte[] bytes)
+ {
+ Value = Optional<byte[]>.OfNullable(bytes);
+ }
+
+ public Optional<byte[]> Value
+ {
+ get { return Value; }
+ set { value = Value; }
+ }
+
+ public override string ToString()
+ {
+ return "CloseEvent{value=" + Value.ToString() + "}";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/DriverMessageImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/DriverMessageImpl.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/DriverMessageImpl.cs
new file mode 100644
index 0000000..548e13a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/DriverMessageImpl.cs
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
+{
+ public class DriverMessageImpl : IDriverMessage
+ {
+ private Optional<byte[]> _value;
+
+ public DriverMessageImpl()
+ {
+ _value = Optional<byte[]>.Empty();
+ }
+
+ public DriverMessageImpl(byte[] bytes)
+ {
+ _value = Optional<byte[]>.OfNullable(bytes);
+ }
+
+ public Optional<byte[]> Message
+ {
+ get
+ {
+ return _value;
+ }
+ }
+
+ public override string ToString()
+ {
+ return "DriverMessage [value=" + ByteUtilities.ByteArrarysToString(_value.Value) + "]";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs
new file mode 100644
index 0000000..0a19106
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
+{
+ public class SuspendEventImpl : ICloseEvent
+ {
+ public SuspendEventImpl()
+ {
+ Value = Optional<byte[]>.Empty();
+ }
+
+ public SuspendEventImpl(byte[] bytes)
+ {
+ Value = Optional<byte[]>.OfNullable(bytes);
+ }
+
+ public Optional<byte[]> Value
+ {
+ get { return Value; }
+ set { value = Value; }
+ }
+
+ public override string ToString()
+ {
+ return "SuspendEvent{value=" + Value.ToString() + "}";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs
new file mode 100644
index 0000000..da83300
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Tang.Interface;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
+{
+ public class TaskClientCodeException : Exception
+ {
+ private readonly string _taskId;
+
+ private readonly string _contextId;
+
+ /// <summary>
+ /// construct the exception that caused the Task to fail
+ /// </summary>
+ /// <param name="taskId"> the id of the failed task.</param>
+ /// <param name="contextId"> the id of the context the failed Task was executing in.</param>
+ /// <param name="message"> the error message </param>
+ /// <param name="cause"> the exception that caused the Task to fail.</param>
+ public TaskClientCodeException(
+ string taskId,
+ string contextId,
+ string message,
+ Exception cause)
+ : base(message, cause)
+ {
+ _taskId = taskId;
+ _contextId = contextId;
+ }
+
+ public string TaskId
+ {
+ get { return _taskId; }
+ }
+
+ public string ContextId
+ {
+ get { return _contextId; }
+ }
+
+ public static string GetTaskIdentifier(IConfiguration c)
+ {
+ // TODO: update after TANG is available
+ return string.Empty;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs
new file mode 100644
index 0000000..74fc320
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Common.Tasks.Events;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
+{
+ public class TaskLifeCycle
+ {
+ private readonly HashSet<IObserver<ITaskStop>> _taskStopHandlers;
+ private readonly HashSet<IObserver<ITaskStart>> _taskStartHandlers;
+ private readonly ITaskStart _taskStart;
+ private readonly ITaskStop _taskStop;
+
+ // INJECT
+ public TaskLifeCycle(
+ HashSet<IObserver<ITaskStop>> taskStopHandlers,
+ HashSet<IObserver<ITaskStart>> taskStartHandlers,
+ TaskStartImpl taskStart,
+ TaskStopImpl taskStop)
+ {
+ _taskStartHandlers = taskStartHandlers;
+ _taskStopHandlers = taskStopHandlers;
+ _taskStart = taskStart;
+ _taskStop = taskStop;
+ }
+
+ public TaskLifeCycle()
+ {
+ _taskStartHandlers = new HashSet<IObserver<ITaskStart>>();
+ _taskStopHandlers = new HashSet<IObserver<ITaskStop>>();
+ }
+
+ public void Start()
+ {
+ foreach (IObserver<ITaskStart> startHandler in _taskStartHandlers)
+ {
+ startHandler.OnNext(_taskStart);
+ }
+ }
+
+ public void Stop()
+ {
+ foreach (IObserver<ITaskStop> stopHandler in _taskStopHandlers)
+ {
+ stopHandler.OnNext(_taskStop);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
new file mode 100644
index 0000000..721adf7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
+{
+ public class TaskRuntime : IObserver<ICloseEvent>, IObserver<ISuspendEvent>, IObserver<IDriverMessage>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskRuntime));
+
+ private readonly ITask _task;
+
+ private readonly IInjector _injector;
+
+ // The memento given by the task configuration
+ private readonly Optional<byte[]> _memento;
+
+ private readonly HeartBeatManager _heartBeatManager;
+
+ private readonly TaskStatus _currentStatus;
+
+ private readonly INameClient _nameClient;
+
+ public TaskRuntime(IInjector taskInjector, string contextId, string taskId, HeartBeatManager heartBeatManager, string memento = null)
+ {
+ _injector = taskInjector;
+ _heartBeatManager = heartBeatManager;
+
+ Optional<ISet<ITaskMessageSource>> messageSources = Optional<ISet<ITaskMessageSource>>.Empty();
+ try
+ {
+ _task = _injector.GetInstance<ITask>();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Unable to inject task.", e), Level.Error, "Unable to inject task.", LOGGER);
+ }
+ try
+ {
+ ITaskMessageSource taskMessageSource = _injector.GetInstance<ITaskMessageSource>();
+ messageSources = Optional<ISet<ITaskMessageSource>>.Of(new HashSet<ITaskMessageSource>() { taskMessageSource });
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot inject task message source with error: " + e.StackTrace, LOGGER);
+ // do not rethrow since this is benign
+ }
+ try
+ {
+ _nameClient = _injector.GetInstance<INameClient>();
+ _heartBeatManager.EvaluatorSettings.NameClient = _nameClient;
+ }
+ catch (InjectionException)
+ {
+ LOGGER.Log(Level.Warning, "Cannot inject name client from task configuration.");
+ // do not rethrow since user is not required to provide name client
+ }
+
+ LOGGER.Log(Level.Info, "task message source injected");
+ _currentStatus = new TaskStatus(_heartBeatManager, contextId, taskId, messageSources);
+ _memento = memento == null ?
+ Optional<byte[]>.Empty() : Optional<byte[]>.Of(ByteUtilities.StringToByteArrays(memento));
+ }
+
+ public string TaskId
+ {
+ get { return _currentStatus.TaskId; }
+ }
+
+ public string ContextId
+ {
+ get { return _currentStatus.ContextId; }
+ }
+
+ public void Initialize()
+ {
+ _currentStatus.SetRunning();
+ }
+
+ /// <summary>
+ /// Run the task
+ /// </summary>
+ public void Start()
+ {
+ try
+ {
+ LOGGER.Log(Level.Info, "Call Task");
+ if (_currentStatus.IsNotRunning())
+ {
+ var e = new InvalidOperationException("TaskRuntime not in Running state, instead it is in state " + _currentStatus.State);
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ byte[] result;
+ byte[] taskMemento = _memento.IsPresent() ? _memento.Value : null;
+ System.Threading.Tasks.Task<byte[]> runTask = new System.Threading.Tasks.Task<byte[]>(() => RunTask(taskMemento));
+ try
+ {
+ runTask.Start();
+ runTask.Wait();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Exception thrown during task running.", LOGGER);
+ }
+ result = runTask.Result;
+
+ LOGGER.Log(Level.Info, "Task Call Finished");
+ if (_task != null)
+ {
+ _task.Dispose();
+ }
+ _currentStatus.SetResult(result);
+ if (result != null && result.Length > 0)
+ {
+ LOGGER.Log(Level.Info, "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result));
+ }
+ }
+ catch (Exception e)
+ {
+ if (_task != null)
+ {
+ _task.Dispose();
+ }
+ LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Task failed caused by exception [{0}]", e));
+ _currentStatus.SetException(e);
+ }
+ }
+
+ public TaskState GetTaskState()
+ {
+ return _currentStatus.State;
+ }
+
+ /// <summary>
+ /// Called by heartbeat manager
+ /// </summary>
+ /// <returns> current TaskStatusProto </returns>
+ public TaskStatusProto GetStatusProto()
+ {
+ return _currentStatus.ToProto();
+ }
+
+ public bool HasEnded()
+ {
+ return _currentStatus.HasEnded();
+ }
+
+ /// <summary>
+ /// get ID of the task.
+ /// </summary>
+ /// <returns>ID of the task.</returns>
+ public string GetActicityId()
+ {
+ return _currentStatus.TaskId;
+ }
+
+ public void Close(byte[] message)
+ {
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to close Task {0}", TaskId));
+ if (_currentStatus.IsNotRunning())
+ {
+ LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in {0} state. Ignored.", _currentStatus.State));
+ }
+ else
+ {
+ try
+ {
+ OnNext(new CloseEventImpl(message));
+ _currentStatus.SetCloseRequested();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", LOGGER);
+
+ _currentStatus.SetException(
+ new TaskClientCodeException(TaskId, ContextId, "Error during Close().", e));
+ }
+ }
+ }
+
+ public void Suspend(byte[] message)
+ {
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to suspend Task {0}", TaskId));
+
+ if (_currentStatus.IsNotRunning())
+ {
+ LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to supend an task that is in {0} state. Ignored.", _currentStatus.State));
+ }
+ else
+ {
+ try
+ {
+ OnNext(new SuspendEventImpl(message));
+ _currentStatus.SetSuspendRequested();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Suspend.", LOGGER);
+ _currentStatus.SetException(
+ new TaskClientCodeException(TaskId, ContextId, "Error during Suspend().", e));
+ }
+ }
+ }
+
+ public void Deliver(byte[] message)
+ {
+ if (_currentStatus.IsNotRunning())
+ {
+ LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to send a message to an task that is in {0} state. Ignored.", _currentStatus.State));
+ }
+ else
+ {
+ try
+ {
+ OnNext(new DriverMessageImpl(message));
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during message delivery.", LOGGER);
+ _currentStatus.SetException(
+ new TaskClientCodeException(TaskId, ContextId, "Error during message delivery.", e));
+ }
+ }
+ }
+
+ public void OnNext(ICloseEvent value)
+ {
+ LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ICloseEvent value)");
+ // TODO: send a heartbeat
+ }
+
+ void IObserver<ICloseEvent>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<IDriverMessage>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<IDriverMessage>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<ISuspendEvent>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<ISuspendEvent>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<ICloseEvent>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnNext(ISuspendEvent value)
+ {
+ LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)");
+ // TODO: send a heartbeat
+ }
+
+ public void OnNext(IDriverMessage value)
+ {
+ IDriverMessageHandler messageHandler = null;
+ LOGGER.Log(Level.Info, "TaskRuntime::OnNext(IDriverMessage value)");
+ try
+ {
+ messageHandler = _injector.GetInstance<IDriverMessageHandler>();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Received Driver message, but unable to inject handler for driver message ", LOGGER);
+ }
+ if (messageHandler != null)
+ {
+ try
+ {
+ messageHandler.Handle(value);
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Exception throw when handling driver message: " + e, LOGGER);
+ _currentStatus.RecordExecptionWithoutHeartbeat(e);
+ }
+ }
+ }
+
+ private byte[] RunTask(byte[] memento)
+ {
+ return _task.Call(memento);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStartImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStartImpl.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStartImpl.cs
new file mode 100644
index 0000000..1c3c734
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStartImpl.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Tasks.Events;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
+{
+ public class TaskStartImpl : ITaskStart
+ {
+ //INJECT
+ public TaskStartImpl(string id)
+ {
+ Id = id;
+ }
+
+ public string Id { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskState.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskState.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskState.cs
new file mode 100644
index 0000000..7829364
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskState.cs
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
+{
+ public enum TaskState
+ {
+ Init = 0,
+
+ Running = 1,
+
+ CloseRequested = 2,
+
+ SuspendRequested = 3,
+
+ Suspended = 4,
+
+ Failed = 5,
+
+ Done = 6,
+
+ Killed = 7
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
new file mode 100644
index 0000000..d62ef39
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
+{
+ public class TaskStatus
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskStatus));
+ private readonly TaskLifeCycle _taskLifeCycle;
+ private readonly HeartBeatManager _heartBeatManager;
+ private readonly Optional<ISet<ITaskMessageSource>> _evaluatorMessageSources;
+
+ private string _taskId;
+ private string _contextId;
+ private Optional<Exception> _lastException = Optional<Exception>.Empty();
+ private Optional<byte[]> _result = Optional<byte[]>.Empty();
+ private TaskState _state;
+
+ public TaskStatus(HeartBeatManager heartBeatManager, string contextId, string taskId, Optional<ISet<ITaskMessageSource>> evaluatorMessageSources)
+ {
+ _contextId = contextId;
+ _taskId = taskId;
+ _heartBeatManager = heartBeatManager;
+ _taskLifeCycle = new TaskLifeCycle();
+ _evaluatorMessageSources = evaluatorMessageSources;
+ State = TaskState.Init;
+ }
+
+ public TaskState State
+ {
+ get
+ {
+ return _state;
+ }
+
+ set
+ {
+ if (IsLegalStateTransition(_state, value))
+ {
+ _state = value;
+ }
+ else
+ {
+ string message = string.Format(CultureInfo.InvariantCulture, "Illegal state transition from [{0}] to [{1}]", _state, value);
+ LOGGER.Log(Level.Error, message);
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(message), LOGGER);
+ }
+ }
+ }
+
+ public string TaskId
+ {
+ get { return _taskId; }
+ }
+
+ public string ContextId
+ {
+ get { return _contextId; }
+ }
+
+ public void SetException(Exception e)
+ {
+ RecordExecptionWithoutHeartbeat(e);
+ Heartbeat();
+ _lastException = Optional<Exception>.Empty();
+ }
+
+ public void SetResult(byte[] result)
+ {
+ _result = Optional<byte[]>.OfNullable(result);
+ if (State == TaskState.Running)
+ {
+ State = TaskState.Done;
+ }
+ else if (State == TaskState.SuspendRequested)
+ {
+ State = TaskState.Suspended;
+ }
+ else if (State == TaskState.CloseRequested)
+ {
+ State = TaskState.Done;
+ }
+ _taskLifeCycle.Stop();
+ Heartbeat();
+ }
+
+ public void SetRunning()
+ {
+ LOGGER.Log(Level.Verbose, "TaskStatus::SetRunning");
+ if (_state == TaskState.Init)
+ {
+ try
+ {
+ _taskLifeCycle.Start();
+ // Need to send an INIT heartbeat to the driver prompting it to create an RunningTask event.
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Sending task INIT heartbeat"));
+ Heartbeat();
+ State = TaskState.Running;
+ }
+ catch (Exception e)
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Cannot set task status to running.", LOGGER);
+ SetException(e);
+ }
+ }
+ }
+
+ public void SetCloseRequested()
+ {
+ State = TaskState.CloseRequested;
+ }
+
+ public void SetSuspendRequested()
+ {
+ State = TaskState.SuspendRequested;
+ }
+
+ public void SetKilled()
+ {
+ State = TaskState.Killed;
+ Heartbeat();
+ }
+
+ public bool IsNotRunning()
+ {
+ return _state != TaskState.Running;
+ }
+
+ public bool HasEnded()
+ {
+ switch (_state)
+ {
+ case TaskState.Done:
+ case TaskState.Suspended:
+ case TaskState.Failed:
+ case TaskState.Killed:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ public TaskStatusProto ToProto()
+ {
+ Check();
+ TaskStatusProto taskStatusProto = new TaskStatusProto()
+ {
+ context_id = _contextId,
+ task_id = _taskId,
+ state = GetProtoState(),
+ };
+ if (_result.IsPresent())
+ {
+ taskStatusProto.result = ByteUtilities.CopyBytesFrom(_result.Value);
+ }
+ else if (_lastException.IsPresent())
+ {
+ //final Encoder<Throwable> codec = new ObjectSerializableCodec<>();
+ //final byte[] error = codec.encode(_lastException.get());
+ byte[] error = ByteUtilities.StringToByteArrays(_lastException.Value.ToString());
+ taskStatusProto.result = ByteUtilities.CopyBytesFrom(error);
+ }
+ else if (_state == TaskState.Running)
+ {
+ foreach (TaskMessage message in GetMessages())
+ {
+ TaskStatusProto.TaskMessageProto taskMessageProto = new TaskStatusProto.TaskMessageProto()
+ {
+ source_id = message.MessageSourceId,
+ message = ByteUtilities.CopyBytesFrom(message.Message),
+ };
+ taskStatusProto.task_message.Add(taskMessageProto);
+ }
+ }
+ return taskStatusProto;
+ }
+
+ internal void RecordExecptionWithoutHeartbeat(Exception e)
+ {
+ if (!_lastException.IsPresent())
+ {
+ _lastException = Optional<Exception>.Of(e);
+ }
+ State = TaskState.Failed;
+ _taskLifeCycle.Stop();
+ }
+
+ private static bool IsLegalStateTransition(TaskState? from, TaskState to)
+ {
+ if (from == null)
+ {
+ return to == TaskState.Init;
+ }
+ switch (from)
+ {
+ case TaskState.Init:
+ switch (to)
+ {
+ case TaskState.Init:
+ case TaskState.Running:
+ case TaskState.Failed:
+ case TaskState.Killed:
+ case TaskState.Done:
+ return true;
+ default:
+ return false;
+ }
+ case TaskState.Running:
+ switch (to)
+ {
+ case TaskState.CloseRequested:
+ case TaskState.SuspendRequested:
+ case TaskState.Failed:
+ case TaskState.Killed:
+ case TaskState.Done:
+ return true;
+ default:
+ return false;
+ }
+ case TaskState.CloseRequested:
+ switch (to)
+ {
+ case TaskState.Failed:
+ case TaskState.Killed:
+ case TaskState.Done:
+ return true;
+ default:
+ return false;
+ }
+ case TaskState.SuspendRequested:
+ switch (to)
+ {
+ case TaskState.Failed:
+ case TaskState.Killed:
+ case TaskState.Suspended:
+ return true;
+ default:
+ return false;
+ }
+
+ case TaskState.Failed:
+ case TaskState.Done:
+ case TaskState.Killed:
+ default:
+ return true;
+ }
+ }
+
+ private void Check()
+ {
+ if (_result.IsPresent() && _lastException.IsPresent())
+ {
+ LOGGER.Log(Level.Warning, "Both task result and exception are present, the expcetion will take over. Thrown away result:" + ByteUtilities.ByteArrarysToString(_result.Value));
+ State = TaskState.Failed;
+ _result = Optional<byte[]>.Empty();
+ }
+ }
+
+ private void Heartbeat()
+ {
+ _heartBeatManager.OnNext(ToProto());
+ }
+
+ private State GetProtoState()
+ {
+ switch (_state)
+ {
+ case TaskState.Init:
+ return Protobuf.ReefProtocol.State.INIT;
+ case TaskState.CloseRequested:
+ case TaskState.SuspendRequested:
+ case TaskState.Running:
+ return Protobuf.ReefProtocol.State.RUNNING;
+ case TaskState.Done:
+ return Protobuf.ReefProtocol.State.DONE;
+ case TaskState.Suspended:
+ return Protobuf.ReefProtocol.State.SUSPEND;
+ case TaskState.Failed:
+ return Protobuf.ReefProtocol.State.FAILED;
+ case TaskState.Killed:
+ return Protobuf.ReefProtocol.State.KILLED;
+ default:
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Unknown state: " + _state), LOGGER);
+ break;
+ }
+ return Protobuf.ReefProtocol.State.FAILED; //this line should not be reached as default case will throw exception
+ }
+
+ private ICollection<TaskMessage> GetMessages()
+ {
+ List<TaskMessage> result = new List<TaskMessage>();
+ if (_evaluatorMessageSources.IsPresent())
+ {
+ foreach (ITaskMessageSource source in _evaluatorMessageSources.Value)
+ {
+ Optional<TaskMessage> taskMessageOptional = source.Message;
+ if (taskMessageOptional.IsPresent())
+ {
+ result.Add(taskMessageOptional.Value);
+ }
+ }
+ }
+ return result;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStopImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStopImpl.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStopImpl.cs
new file mode 100644
index 0000000..8ad8a5b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStopImpl.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Tasks.Events;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
+{
+ public class TaskStopImpl : ITaskStop
+ {
+ //INJECT
+ public TaskStopImpl(string id)
+ {
+ Id = id;
+ }
+
+ public string Id { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/EvaluatorConfigurations.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/EvaluatorConfigurations.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/EvaluatorConfigurations.cs
new file mode 100644
index 0000000..66f726d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/EvaluatorConfigurations.cs
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.IO;
+using System.Linq;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Formats.AvroConfigurationDataContract;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator.Utils
+{
+ public class EvaluatorConfigurations
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorConfigurations));
+
+ private AvroConfiguration _avroConfiguration;
+
+ private string _configFile;
+
+ private string _applicationId;
+
+ private string _evaluatorId;
+
+ private string _taskConfiguration;
+
+ private string _rootContextConfiguration;
+
+ private string _rootServiceConfiguration;
+
+ public EvaluatorConfigurations(string configFile)
+ {
+ using (LOGGER.LogFunction("EvaluatorConfigurations::EvaluatorConfigurations"))
+ {
+ if (string.IsNullOrWhiteSpace(configFile))
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("configFile"), LOGGER);
+ }
+ if (!File.Exists(configFile))
+ {
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new FileNotFoundException("cannot find file " + configFile), LOGGER);
+ }
+ _configFile = configFile;
+ AvroConfigurationSerializer serializer = new AvroConfigurationSerializer();
+ _avroConfiguration = serializer.AvroDeseriaizeFromFile(_configFile);
+ }
+ }
+
+ public string TaskConfiguration
+ {
+ get
+ {
+ _taskConfiguration = _taskConfiguration ?? GetSettingValue(Constants.TaskConfiguration);
+ return _taskConfiguration;
+ }
+ }
+
+ public string EvaluatorId
+ {
+ get
+ {
+ _evaluatorId = _evaluatorId ?? GetSettingValue(Constants.EvaluatorIdentifier);
+ return _evaluatorId;
+ }
+ }
+
+ public string ApplicationId
+ {
+ get
+ {
+ _applicationId = _applicationId ?? GetSettingValue(Constants.ApplicationIdentifier);
+ return _applicationId;
+ }
+ }
+
+ public string RootContextConfiguration
+ {
+ get
+ {
+ _rootContextConfiguration = _rootContextConfiguration ?? GetSettingValue(Constants.RootContextConfiguration);
+ return _rootContextConfiguration;
+ }
+ }
+
+ public string RootServiceConfiguration
+ {
+ get
+ {
+ _rootServiceConfiguration = _rootServiceConfiguration ?? GetSettingValue(Constants.RootServiceConfiguration);
+ return _rootServiceConfiguration;
+ }
+ }
+
+ private string GetSettingValue(string settingKey)
+ {
+ ConfigurationEntry configurationEntry =
+ _avroConfiguration.Bindings.SingleOrDefault(b => b.key.EndsWith(settingKey, StringComparison.OrdinalIgnoreCase));
+ if (configurationEntry == null)
+ {
+ return string.Empty;
+ }
+
+ return configurationEntry.value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/RemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/RemoteManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/RemoteManager.cs
new file mode 100644
index 0000000..c129d04
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/RemoteManager.cs
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator.Utils
+{
+ public class RemoteManager
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/MachineStatus.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/MachineStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/MachineStatus.cs
new file mode 100644
index 0000000..54aca4c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/MachineStatus.cs
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Diagnostics;
+using System.Globalization;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Common.Runtime
+{
+ public class MachineStatus
+ {
+ private static PerformanceCounter _cpuCounter;
+
+ private static PerformanceCounter _ramCounter;
+
+ private static PerformanceCounter _processCpuCounter;
+
+ private static Process _process;
+
+ private static bool _checkStatus;
+
+ static MachineStatus()
+ {
+ _checkStatus = true;
+ _process = Process.GetCurrentProcess();
+ string processName = _process.ProcessName;
+
+ _cpuCounter = _cpuCounter ?? new PerformanceCounter()
+ {
+ CategoryName = "Processor",
+ CounterName = "% Processor Time",
+ InstanceName = "_Total",
+ };
+
+ _ramCounter = _ramCounter ?? new PerformanceCounter()
+ {
+ CategoryName = "Memory",
+ CounterName = "Available MBytes"
+ };
+
+ _processCpuCounter = _processCpuCounter ?? new PerformanceCounter()
+ {
+ CategoryName = "Process",
+ CounterName = "% Processor Time",
+ InstanceName = processName
+ };
+ }
+
+ public static string CurrentNodeCpuUsage
+ {
+ get
+ {
+ return _cpuCounter.NextValue() + "%";
+ }
+ }
+
+ public static string AvailableMemory
+ {
+ get
+ {
+ return _ramCounter.NextValue() + "MB";
+ }
+ }
+
+ public static string CurrentProcessMemoryUsage
+ {
+ get
+ {
+ return ((float)_process.WorkingSet64 / 1000000.0).ToString(CultureInfo.InvariantCulture) + "MB";
+ }
+ }
+
+ public static string PeakProcessMemoryUsage
+ {
+ get
+ {
+ return ((float)_process.PeakWorkingSet64 / 1000000.0).ToString(CultureInfo.InvariantCulture) + "MB";
+ }
+ }
+
+ // this may not be accurate if there are multiple evaluator processes running on a single machine
+ public static string CurrentProcessCpuUsage
+ {
+ get
+ {
+ return ((float)_processCpuCounter.RawValue / 1000000.0) + "%";
+ }
+ }
+
+ public override string ToString()
+ {
+ string info = "No machine status information retrieved. Could be due to lack of admin right to get the info.";
+ if (_checkStatus)
+ {
+ try
+ {
+ _process.Refresh();
+ info = string.Format(
+ CultureInfo.InvariantCulture,
+ "current node is running at [{0}] CPU usage and with [{1}] memory available.{2} current evaluator process is using [{3}] of CPU and [{4}] of memory, with a peak memory usage of [{5}]",
+ CurrentNodeCpuUsage,
+ AvailableMemory,
+ Environment.NewLine,
+ CurrentProcessCpuUsage,
+ CurrentProcessMemoryUsage,
+ PeakProcessMemoryUsage);
+ }
+ catch (Exception e)
+ {
+ _checkStatus = false; // It only takes one exception to switch the cheking off for good.
+ Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot obtain machine status due to error", Logger.GetLogger(typeof(MachineStatus)));
+ // we do not want to crash the evealuator just because we cannot get the information.
+ info = "Cannot obtain machine status due to error " + e.Message;
+ }
+ }
+
+ return info;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Services/IService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Services/IService.cs b/lang/cs/Org.Apache.REEF.Common/Services/IService.cs
new file mode 100644
index 0000000..fa5b5d7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Services/IService.cs
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Common.Services
+{
+ public interface IService
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Services/ServiceConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Services/ServiceConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/Services/ServiceConfiguration.cs
new file mode 100644
index 0000000..af5070b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Services/ServiceConfiguration.cs
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+
+[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1401:FieldsMustBePrivate", Justification = "static field, typical usage in configurations")]
+
+namespace Org.Apache.REEF.Common.Services
+{
+ /// <summary>
+ /// Configuration module for services. The configuration created here can be passed alongside a ContextConfiguration
+ /// to form a context. Different from bindings made in the ContextConfiguration, those made here will be passed along
+ /// to child context.
+ /// </summary>
+ public class ServiceConfiguration : ConfigurationModuleBuilder
+ {
+ /// <summary>
+ /// A set of services to instantiate. All classes given here will be instantiated in the context, and their references
+ /// will be made available to child context and tasks.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalParameter<IService> Services = new OptionalParameter<IService>();
+
+ public ServiceConfiguration()
+ : base()
+ {
+ }
+
+ public ServiceConfiguration(string config)
+ {
+ TangConfig = new AvroConfigurationSerializer().FromString(config);
+ }
+
+ public static ConfigurationModule ConfigurationModule
+ {
+ get
+ {
+ return new ServiceConfiguration()
+ .BindSetEntry(GenericType<ServicesSet>.Class, Services)
+ .Build();
+ }
+ }
+
+ public IConfiguration TangConfig { get; private set; }
+ }
+
+ public class InjectedServices
+ {
+ [Inject]
+ public InjectedServices([Parameter(typeof(ServicesSet))] ISet<IService> services)
+ {
+ Services = services;
+ }
+
+ public ISet<IService> Services { get; set; }
+ }
+
+ [NamedParameter("Set of services", "servicesSet", "")]
+ class ServicesSet : Name<ISet<IService>>
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Services/ServicesConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Services/ServicesConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Common/Services/ServicesConfigurationOptions.cs
new file mode 100644
index 0000000..f4afd7b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Services/ServicesConfigurationOptions.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Common.Services
+{
+ public class ServicesConfigurationOptions
+ {
+ [NamedParameter("Services", "services", "services")]
+ public class Services : Name<string>
+ {
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverMessageHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverMessageHandler.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverMessageHandler.cs
new file mode 100644
index 0000000..5e34136
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverMessageHandler.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Common.Tasks.Defaults
+{
+ public class DefaultDriverMessageHandler : IDriverMessageHandler
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(DefaultDriverMessageHandler));
+
+ [Inject]
+ public DefaultDriverMessageHandler()
+ {
+ }
+
+ public void Handle(IDriverMessage message)
+ {
+ Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("No DriverMessage handler bound. Message received" + message), LOGGER);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultTaskMessageSource.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultTaskMessageSource.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultTaskMessageSource.cs
new file mode 100644
index 0000000..2929a59
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultTaskMessageSource.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Globalization;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Common.Tasks.Defaults
+{
+ public class DefaultTaskMessageSource : ITaskMessageSource
+ {
+ [Inject]
+ public DefaultTaskMessageSource()
+ {
+ }
+
+ public Optional<TaskMessage> Message
+ {
+ get
+ {
+ TaskMessage defaultTaskMessage = TaskMessage.From(
+ "defaultSourceId",
+ ByteUtilities.StringToByteArrays("default message generated at " + DateTime.Now.ToString(CultureInfo.InvariantCulture)));
+ return Optional<TaskMessage>.Of(defaultTaskMessage);
+ }
+
+ set
+ {
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ICloseEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ICloseEvent.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ICloseEvent.cs
new file mode 100644
index 0000000..b26f255
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ICloseEvent.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Common.Tasks.Events
+{
+ public interface ICloseEvent
+ {
+ Optional<byte[]> Value { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/Events/IDriverMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Events/IDriverMessage.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/IDriverMessage.cs
new file mode 100644
index 0000000..15a706e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/IDriverMessage.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Common.Tasks.Events
+{
+ public interface IDriverMessage
+ {
+ Optional<byte[]> Message { get; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs
new file mode 100644
index 0000000..36356b7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Common.Tasks.Events
+{
+ public interface ISuspendEvent
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ITaskStart.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ITaskStart.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ITaskStart.cs
new file mode 100644
index 0000000..aeb376d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ITaskStart.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Common.Tasks.Events
+{
+ public interface ITaskStart
+ {
+ string Id { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ITaskStop.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ITaskStop.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ITaskStop.cs
new file mode 100644
index 0000000..b80b38b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ITaskStop.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Common.Tasks.Events
+{
+ public interface ITaskStop
+ {
+ string Id { get; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverMessageHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverMessageHandler.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverMessageHandler.cs
new file mode 100644
index 0000000..0b8e7e7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverMessageHandler.cs
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Tasks.Events;
+
+namespace Org.Apache.REEF.Common.Tasks
+{
+ //[DefaultImplementation(typeof(DefaultTaskMessageSource))]
+ public interface IDriverMessageHandler
+ {
+ void Handle(IDriverMessage message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/IRunningTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/IRunningTask.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/IRunningTask.cs
new file mode 100644
index 0000000..236da5a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/IRunningTask.cs
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Common.Tasks
+{
+ /// <summary>
+ /// Represents a running Task
+ /// </summary>
+ public interface IRunningTask : IIdentifiable, IDisposable
+ {
+ /// <summary>
+ /// Sends the message to the running task.
+ /// </summary>
+ /// <param name="message"></param>
+ void OnNext(byte[] message);
+
+ /// <summary>
+ /// Signal the task to suspend.
+ /// </summary>
+ /// <param name="message">a message that is sent to the Task.</param>
+ void Suspend(byte[] message);
+
+ /// <summary>
+ /// Sends the message to the running task.
+ /// </summary>
+ void Suspend();
+
+ /// <summary>
+ /// Signal the task to shut down.
+ /// </summary>
+ /// <param name="message">a message that is sent to the Task.</param>
+ void Dispose(byte[] message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs
new file mode 100644
index 0000000..37a1d46
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.REEF.Common.Tasks
+{
+ public interface ITask : IDisposable
+ {
+ byte[] Call(byte[] memento);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Tasks/ITaskMessageSource.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/ITaskMessageSource.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/ITaskMessageSource.cs
new file mode 100644
index 0000000..d9830c4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/ITaskMessageSource.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Tasks.Defaults;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Common.Tasks
+{
+ [DefaultImplementation(typeof(DefaultTaskMessageSource))]
+ public interface ITaskMessageSource
+ {
+ Optional<TaskMessage> Message { get; set; }
+ }
+}
\ No newline at end of file