You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/29 21:43:10 UTC

[26/31] incubator-reef git commit: [REEF-97] Add the REEF.NET code base

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/CloseEventImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/CloseEventImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/CloseEventImpl.cs
new file mode 100644
index 0000000..e3a13b8
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/CloseEventImpl.cs
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Utilities;
+
+namespace Org.Apache.Reef.Common
+{
+    public class CloseEventImpl : ICloseEvent
+    {
+        public CloseEventImpl()
+        {
+            Value = Optional<byte[]>.Empty();
+        }
+
+        public CloseEventImpl(byte[] bytes)
+        {
+            Value = Optional<byte[]>.OfNullable(bytes);
+        }
+
+        public Optional<byte[]> Value
+        {
+            get { return Value; }
+            set { value = Value; } 
+        }
+
+        public override string ToString()
+        {
+            return "CloseEvent{value=" + Value.ToString() + "}";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/DriverMessageImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/DriverMessageImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/DriverMessageImpl.cs
new file mode 100644
index 0000000..034df50
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/DriverMessageImpl.cs
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Utilities;
+
+namespace Org.Apache.Reef.Common.Runtime.Evaluator
+{
+    public class DriverMessageImpl : IDriverMessage
+    {
+        private Optional<byte[]> _value;
+ 
+        public DriverMessageImpl()
+        {
+            _value = Optional<byte[]>.Empty();
+        }
+
+        public DriverMessageImpl(byte[] bytes)
+        {
+            _value = Optional<byte[]>.OfNullable(bytes);
+        }
+
+        public Optional<byte[]> Message
+        {
+            get
+            {
+                return _value;
+            }
+        }
+
+        public override string ToString()
+        {
+            return "DriverMessage [value=" + ByteUtilities.ByteArrarysToString(_value.Value) + "]";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/SuspendEventImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/SuspendEventImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/SuspendEventImpl.cs
new file mode 100644
index 0000000..c75b09f
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/SuspendEventImpl.cs
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Utilities;
+
+namespace Org.Apache.Reef.Common
+{
+    public class SuspendEventImpl : ICloseEvent
+    {
+        public SuspendEventImpl()
+        {
+            Value = Optional<byte[]>.Empty();
+        }
+
+        public SuspendEventImpl(byte[] bytes)
+        {
+            Value = Optional<byte[]>.OfNullable(bytes);
+        }
+
+        public Optional<byte[]> Value
+        {
+            get { return Value; }
+            set { value = Value; }
+        }
+
+        public override string ToString()
+        {
+            return "SuspendEvent{value=" + Value.ToString() + "}";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskClientCodeException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskClientCodeException.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskClientCodeException.cs
new file mode 100644
index 0000000..65b8be9
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskClientCodeException.cs
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.Reef.Tang.Interface;
+
+namespace Org.Apache.Reef.Common.Task
+{
+    public class TaskClientCodeException : Exception
+    {
+        private readonly string _taskId;
+
+        private readonly string _contextId;
+
+        /// <summary>
+        /// construct the exception that caused the Task to fail
+        /// </summary>
+        /// <param name="taskId"> the id of the failed task.</param>
+        /// <param name="contextId"> the id of the context the failed Task was executing in.</param>
+        /// <param name="message"> the error message </param>
+        /// <param name="cause"> the exception that caused the Task to fail.</param>
+        public TaskClientCodeException(
+                string taskId,
+                string contextId,
+                string message,
+                Exception cause)
+            : base(message, cause)
+        {
+            _taskId = taskId;
+            _contextId = contextId;
+        }
+
+        public string TaskId 
+        {
+            get { return _taskId; }
+        }
+
+        public string ContextId
+        {
+            get { return _contextId; }
+        }
+
+        public static string GetTaskIdentifier(IConfiguration c)
+        {
+            // TODO: update after TANG is available
+            return string.Empty;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskLifeCycle.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskLifeCycle.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskLifeCycle.cs
new file mode 100644
index 0000000..30acc2e
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskLifeCycle.cs
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Wake;
+using System;
+using System.Collections.Generic;
+
+namespace Org.Apache.Reef.Common
+{
+    public class TaskLifeCycle
+    {
+        private readonly HashSet<IObserver<ITaskStop>> _taskStopHandlers;
+        private readonly HashSet<IObserver<ITaskStart>> _taskStartHandlers;
+        private readonly ITaskStart _taskStart;
+        private readonly ITaskStop _taskStop;
+
+        // INJECT
+        public TaskLifeCycle(
+            HashSet<IObserver<ITaskStop>> taskStopHandlers,
+            HashSet<IObserver<ITaskStart>> taskStartHandlers,
+            TaskStartImpl taskStart,
+            TaskStopImpl taskStop)
+        {
+            _taskStartHandlers = taskStartHandlers;
+            _taskStopHandlers = taskStopHandlers;
+            _taskStart = taskStart;
+            _taskStop = taskStop;
+        }
+
+        public TaskLifeCycle()
+        {
+            _taskStartHandlers = new HashSet<IObserver<ITaskStart>>();
+            _taskStopHandlers = new HashSet<IObserver<ITaskStop>>();
+        }
+        
+        public void Start() 
+        {
+            foreach (IObserver<ITaskStart> startHandler in _taskStartHandlers) 
+            {
+                startHandler.OnNext(_taskStart);
+            }
+        }
+
+        public void Stop() 
+        {
+            foreach (IObserver<ITaskStop> stopHandler in _taskStopHandlers)
+            {
+                stopHandler.OnNext(_taskStop);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskRuntime.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskRuntime.cs
new file mode 100644
index 0000000..05d6eec
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskRuntime.cs
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.io;
+using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.Reef.Common.Runtime.Evaluator;
+using Org.Apache.Reef.Common.Task;
+using Org.Apache.Reef.Tasks;
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Exceptions;
+using Org.Apache.Reef.Tang.Interface;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+
+namespace Org.Apache.Reef.Common
+{
+    public class TaskRuntime : IObserver<ICloseEvent>, IObserver<ISuspendEvent>, IObserver<IDriverMessage>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskRuntime));
+        
+        private readonly ITask _task;
+
+        private readonly IInjector _injector;
+
+        // The memento given by the task configuration
+        private readonly Optional<byte[]> _memento;
+
+        private readonly HeartBeatManager _heartBeatManager;
+
+        private readonly TaskStatus _currentStatus;
+
+        private readonly INameClient _nameClient;
+
+        public TaskRuntime(IInjector taskInjector, string contextId, string taskId, HeartBeatManager heartBeatManager, string memento = null)
+        {
+            _injector = taskInjector;
+            _heartBeatManager = heartBeatManager;
+
+            Optional<ISet<ITaskMessageSource>> messageSources = Optional<ISet<ITaskMessageSource>>.Empty();
+            try
+            {
+                _task = _injector.GetInstance<ITask>();
+            }
+            catch (Exception e)
+            {
+                Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Unable to inject task.", e), Level.Error, "Unable to inject task.", LOGGER);
+            }
+            try
+            {
+                ITaskMessageSource taskMessageSource = _injector.GetInstance<ITaskMessageSource>();
+                messageSources = Optional<ISet<ITaskMessageSource>>.Of(new HashSet<ITaskMessageSource>() { taskMessageSource });
+            }
+            catch (Exception e)
+            {
+                Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot inject task message source with error: " + e.StackTrace, LOGGER);
+                // do not rethrow since this is benign
+            }
+            try
+            {
+                _nameClient = _injector.GetInstance<INameClient>();
+                _heartBeatManager.EvaluatorSettings.NameClient = _nameClient;
+            }
+            catch (InjectionException)
+            {
+                LOGGER.Log(Level.Warning, "Cannot inject name client from task configuration.");
+                // do not rethrow since user is not required to provide name client
+            }
+
+            LOGGER.Log(Level.Info, "task message source injected");
+            _currentStatus = new TaskStatus(_heartBeatManager, contextId, taskId, messageSources);
+            _memento = memento == null ?
+                Optional<byte[]>.Empty() : Optional<byte[]>.Of(ByteUtilities.StringToByteArrays(memento));
+        }
+
+        public string TaskId
+        {
+            get { return _currentStatus.TaskId; }
+        }
+
+        public string ContextId
+        {
+            get { return _currentStatus.ContextId; }
+        }
+
+        public void Initialize()
+        {
+            _currentStatus.SetRunning();
+        }
+
+        /// <summary>
+        /// Run the task
+        /// </summary>
+        public void Start()
+        {
+            try
+            {
+                LOGGER.Log(Level.Info, "Call Task");
+                if (_currentStatus.IsNotRunning())
+                {
+                    var e = new InvalidOperationException("TaskRuntime not in Running state, instead it is in state " + _currentStatus.State);
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                }
+                byte[] result;
+                byte[] taskMemento = _memento.IsPresent() ? _memento.Value : null;
+                System.Threading.Tasks.Task<byte[]> runTask = new System.Threading.Tasks.Task<byte[]>(() => RunTask(taskMemento));
+                try
+                {
+                    runTask.Start();
+                    runTask.Wait();
+                }
+                catch (Exception e)
+                {
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Exception thrown during task running.", LOGGER);
+                }
+                result = runTask.Result;
+
+                LOGGER.Log(Level.Info, "Task Call Finished");
+                if (_task != null)
+                {
+                    _task.Dispose();
+                }
+                _currentStatus.SetResult(result);
+                if (result != null && result.Length > 0)
+                {
+                    LOGGER.Log(Level.Info, "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result));
+                }
+            }
+            catch (Exception e)
+            {
+                if (_task != null)
+                {
+                    _task.Dispose();
+                }
+                LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Task failed caused by exception [{0}]", e));
+                _currentStatus.SetException(e);
+            }
+        }
+
+        public TaskState GetTaskState()
+        {
+            return _currentStatus.State;
+        }
+
+        /// <summary>
+        /// Called by heartbeat manager
+        /// </summary>
+        /// <returns>  current TaskStatusProto </returns>
+        public TaskStatusProto GetStatusProto()
+        {
+            return _currentStatus.ToProto();
+        }
+
+        public bool HasEnded()
+        {
+            return _currentStatus.HasEnded();
+        }
+
+        /// <summary>
+        /// get ID of the task.
+        /// </summary>
+        /// <returns>ID of the task.</returns>
+        public string GetActicityId()
+        {
+            return _currentStatus.TaskId;
+        }
+
+        public void Close(byte[] message)
+        {
+            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to close Task {0}", TaskId));
+            if (_currentStatus.IsNotRunning())
+            {
+                LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in {0} state. Ignored.", _currentStatus.State));
+            }
+            else
+            {
+                try
+                {
+                    OnNext(new CloseEventImpl(message));
+                    _currentStatus.SetCloseRequested();
+                }
+                catch (Exception e)
+                {
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", LOGGER);
+
+                    _currentStatus.SetException(
+                        new TaskClientCodeException(TaskId, ContextId, "Error during Close().", e));
+                }
+            }
+        }
+
+        public void Suspend(byte[] message)
+        {
+            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to suspend Task {0}", TaskId));
+            
+            if (_currentStatus.IsNotRunning())
+            {
+                LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to supend an task that is in {0} state. Ignored.", _currentStatus.State));
+            }
+            else
+            {
+                try
+                {
+                    OnNext(new SuspendEventImpl(message));
+                    _currentStatus.SetSuspendRequested();
+                }
+                catch (Exception e)
+                {
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Suspend.", LOGGER);
+                    _currentStatus.SetException(
+                        new TaskClientCodeException(TaskId, ContextId, "Error during Suspend().", e));
+                }
+            }
+        }
+
+        public void Deliver(byte[] message)
+        {
+            if (_currentStatus.IsNotRunning())
+            {
+                LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to send a message to an task that is in {0} state. Ignored.", _currentStatus.State));
+            }
+            else
+            {
+                try
+                {
+                    OnNext(new DriverMessageImpl(message));
+                }
+                catch (Exception e)
+                {
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during message delivery.", LOGGER);
+                    _currentStatus.SetException(
+                        new TaskClientCodeException(TaskId, ContextId, "Error during message delivery.", e));
+                }
+            }
+        }
+
+        public void OnNext(ICloseEvent value)
+        {
+            LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ICloseEvent value)");
+            // TODO: send a heartbeat
+        }
+
+        void IObserver<ICloseEvent>.OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<IDriverMessage>.OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<IDriverMessage>.OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<ISuspendEvent>.OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<ISuspendEvent>.OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<ICloseEvent>.OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnNext(ISuspendEvent value)
+        {
+            LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)");
+            // TODO: send a heartbeat
+        }
+
+        public void OnNext(IDriverMessage value)
+        {
+            IDriverMessageHandler messageHandler = null;
+            LOGGER.Log(Level.Info, "TaskRuntime::OnNext(IDriverMessage value)");
+            try
+            {
+                messageHandler = _injector.GetInstance<IDriverMessageHandler>();
+            }
+            catch (Exception e)
+            {
+                Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Received Driver message, but unable to inject handler for driver message ", LOGGER);
+            }
+            if (messageHandler != null)
+            {
+                try
+                {
+                    messageHandler.Handle(value);
+                }
+                catch (Exception e)
+                {
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Exception throw when handling driver message: " + e, LOGGER);
+                    _currentStatus.RecordExecptionWithoutHeartbeat(e);
+                }
+            }
+        }
+
+        private byte[] RunTask(byte[] memento)
+        {
+            return _task.Call(memento);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStartImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStartImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStartImpl.cs
new file mode 100644
index 0000000..c4047b8
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStartImpl.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Events;
+
+namespace Org.Apache.Reef.Common
+{
+    public class TaskStartImpl : ITaskStart
+    {        
+        //INJECT
+        public TaskStartImpl(string id)
+        {
+            Id = id;
+        }
+
+        public string Id { get; set; } 
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskState.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskState.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskState.cs
new file mode 100644
index 0000000..9e3bcb4
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskState.cs
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Common
+{
+    public enum TaskState
+    {
+        Init = 0,
+        
+        Running = 1,
+
+        CloseRequested = 2,
+
+        SuspendRequested = 3,
+
+        Suspended = 4,
+
+        Failed = 5,
+
+        Done = 6,
+
+        Killed = 7
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStatus.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStatus.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStatus.cs
new file mode 100644
index 0000000..639a7d0
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStatus.cs
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.Reef.Tasks;
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Utilities.Logging;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+
+namespace Org.Apache.Reef.Common
+{
+    public class TaskStatus
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskStatus));
+        private readonly TaskLifeCycle _taskLifeCycle;
+        private readonly HeartBeatManager _heartBeatManager;
+        private readonly Optional<ISet<ITaskMessageSource>> _evaluatorMessageSources;
+
+        private string _taskId;
+        private string _contextId;
+        private Optional<Exception> _lastException = Optional<Exception>.Empty();
+        private Optional<byte[]> _result = Optional<byte[]>.Empty();
+        private TaskState _state;
+
+        public TaskStatus(HeartBeatManager heartBeatManager, string contextId, string taskId, Optional<ISet<ITaskMessageSource>> evaluatorMessageSources)
+        {
+            _contextId = contextId;
+            _taskId = taskId;
+            _heartBeatManager = heartBeatManager;
+            _taskLifeCycle = new TaskLifeCycle();
+            _evaluatorMessageSources = evaluatorMessageSources;
+            State = TaskState.Init;
+        }
+
+        public TaskState State
+        {
+            get
+            {
+                return _state;
+            }
+
+            set
+            {
+                if (IsLegalStateTransition(_state, value))
+                {
+                    _state = value;
+                }
+                else
+                {
+                    string message = string.Format(CultureInfo.InvariantCulture, "Illegal state transition from [{0}] to [{1}]", _state, value);
+                    LOGGER.Log(Level.Error, message);
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(message), LOGGER);
+                }
+            }
+        }
+
+        public string TaskId
+        {
+            get { return _taskId; }
+        }
+
+        public string ContextId
+        {
+            get { return _contextId; }
+        }
+
+        public void SetException(Exception e)
+        {
+            RecordExecptionWithoutHeartbeat(e);
+            Heartbeat();
+            _lastException = Optional<Exception>.Empty();
+        }
+
+        public void SetResult(byte[] result)
+        {
+            _result = Optional<byte[]>.OfNullable(result);
+            if (State == TaskState.Running)
+            {
+                State = TaskState.Done;
+            }
+            else if (State == TaskState.SuspendRequested)
+            {
+                State = TaskState.Suspended;
+            }
+            else if (State == TaskState.CloseRequested)
+            {
+                State = TaskState.Done;
+            }
+            _taskLifeCycle.Stop();
+            Heartbeat();
+        }
+
+        public void SetRunning()
+        {
+            LOGGER.Log(Level.Verbose, "TaskStatus::SetRunning");
+            if (_state == TaskState.Init)
+            {
+                try
+                {
+                    _taskLifeCycle.Start();
+                    // Need to send an INIT heartbeat to the driver prompting it to create an RunningTask event. 
+                    LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Sending task INIT heartbeat"));
+                    Heartbeat();
+                    State = TaskState.Running;
+                }
+                catch (Exception e)
+                {
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Cannot set task status to running.", LOGGER);
+                    SetException(e);
+                }
+            }
+        }
+
+        public void SetCloseRequested()
+        {
+            State = TaskState.CloseRequested;
+        }
+
+        public void SetSuspendRequested()
+        {
+            State = TaskState.SuspendRequested;
+        }
+
+        public void SetKilled()
+        {
+            State = TaskState.Killed;
+            Heartbeat();
+        }
+
+        public bool IsNotRunning()
+        {
+            return _state != TaskState.Running;
+        }
+
+        public bool HasEnded()
+        {
+            switch (_state)
+            {
+                case TaskState.Done:
+                case TaskState.Suspended:
+                case TaskState.Failed:
+                case TaskState.Killed:
+                    return true;
+                default:
+                    return false;
+            }
+        }
+
+        public TaskStatusProto ToProto()
+        {
+            Check();
+            TaskStatusProto taskStatusProto = new TaskStatusProto()
+            {
+                context_id = _contextId,
+                task_id = _taskId,
+                state = GetProtoState(),
+            };
+            if (_result.IsPresent())
+            {
+                taskStatusProto.result = ByteUtilities.CopyBytesFrom(_result.Value);
+            }
+            else if (_lastException.IsPresent())
+            {
+                //final Encoder<Throwable> codec = new ObjectSerializableCodec<>();
+                //final byte[] error = codec.encode(_lastException.get());
+                byte[] error = ByteUtilities.StringToByteArrays(_lastException.Value.ToString());
+                taskStatusProto.result = ByteUtilities.CopyBytesFrom(error);
+            }
+            else if (_state == TaskState.Running)
+            {
+                foreach (TaskMessage message in GetMessages())
+                {
+                    TaskStatusProto.TaskMessageProto taskMessageProto = new TaskStatusProto.TaskMessageProto()
+                    {
+                        source_id = message.MessageSourceId,
+                        message = ByteUtilities.CopyBytesFrom(message.Message),
+                    };
+                    taskStatusProto.task_message.Add(taskMessageProto);
+                }
+            }
+            return taskStatusProto;
+        }
+
+        internal void RecordExecptionWithoutHeartbeat(Exception e)
+        {
+            if (!_lastException.IsPresent())
+            {
+                _lastException = Optional<Exception>.Of(e);
+            }
+            State = TaskState.Failed;
+            _taskLifeCycle.Stop();
+        }
+
+        private static bool IsLegalStateTransition(TaskState? from, TaskState to)
+        {
+            if (from == null)
+            {
+                return to == TaskState.Init;
+            }
+            switch (from)
+            {
+                case TaskState.Init:
+                    switch (to)
+                    {
+                        case TaskState.Init:
+                        case TaskState.Running:
+                        case TaskState.Failed:
+                        case TaskState.Killed:
+                        case TaskState.Done:
+                            return true;
+                        default:
+                            return false;
+                    }
+                case TaskState.Running:
+                    switch (to)
+                    {
+                        case TaskState.CloseRequested:
+                        case TaskState.SuspendRequested:
+                        case TaskState.Failed:
+                        case TaskState.Killed:
+                        case TaskState.Done:
+                            return true;
+                        default:
+                            return false;
+                    }
+                case TaskState.CloseRequested:
+                    switch (to)
+                    {
+                        case TaskState.Failed:
+                        case TaskState.Killed:
+                        case TaskState.Done:
+                            return true;
+                        default:
+                            return false;
+                    }
+                case TaskState.SuspendRequested:
+                    switch (to)
+                    {
+                        case TaskState.Failed:
+                        case TaskState.Killed:
+                        case TaskState.Suspended:
+                            return true;
+                        default:
+                            return false;
+                    }
+
+                case TaskState.Failed:
+                case TaskState.Done:
+                case TaskState.Killed:           
+                default:
+                    return true;
+            }
+        }
+
+        private void Check()
+        {
+            if (_result.IsPresent() && _lastException.IsPresent())
+            {
+                LOGGER.Log(Level.Warning, "Both task result and exception are present, the expcetion will take over. Thrown away result:" + ByteUtilities.ByteArrarysToString(_result.Value));
+                State = TaskState.Failed;
+                _result = Optional<byte[]>.Empty();
+            }
+        }
+
+        private void Heartbeat()
+        {
+            _heartBeatManager.OnNext(ToProto());
+        }
+
+        private State GetProtoState()
+        {
+            switch (_state)
+            {
+                case TaskState.Init:
+                    return ProtoBuf.ReefServiceProto.State.INIT;
+                case TaskState.CloseRequested:
+                case TaskState.SuspendRequested:
+                case TaskState.Running:
+                    return ProtoBuf.ReefServiceProto.State.RUNNING;
+                case TaskState.Done:
+                    return ProtoBuf.ReefServiceProto.State.DONE;
+                case TaskState.Suspended:
+                    return ProtoBuf.ReefServiceProto.State.SUSPEND;
+                case TaskState.Failed:
+                    return ProtoBuf.ReefServiceProto.State.FAILED;
+                case TaskState.Killed:
+                    return ProtoBuf.ReefServiceProto.State.KILLED;
+                default:
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Unknown state: " + _state), LOGGER);
+                    break;
+            }
+            return ProtoBuf.ReefServiceProto.State.FAILED; //this line should not be reached as default case will throw exception
+        }
+
+        private ICollection<TaskMessage> GetMessages()
+        {
+            List<TaskMessage> result = new List<TaskMessage>();
+            if (_evaluatorMessageSources.IsPresent())
+            {
+                foreach (ITaskMessageSource source in _evaluatorMessageSources.Value)
+                {
+                    Optional<TaskMessage> taskMessageOptional = source.Message;
+                    if (taskMessageOptional.IsPresent())
+                    {
+                        result.Add(taskMessageOptional.Value);
+                    }
+                }
+            }
+            return result;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStopImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStopImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStopImpl.cs
new file mode 100644
index 0000000..2c7e75e
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStopImpl.cs
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.Task;
+using Org.Apache.Reef.Tasks.Events;
+
+namespace Org.Apache.Reef.Common
+{
+    public class TaskStopImpl : ITaskStop
+    {
+        //INJECT
+        public TaskStopImpl(string id)
+        {
+            Id = id;
+        }
+
+        public string Id { get; set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/EvaluatorConfigurations.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/EvaluatorConfigurations.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/EvaluatorConfigurations.cs
new file mode 100644
index 0000000..3154541
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/EvaluatorConfigurations.cs
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Formats;
+using System;
+using System.IO;
+using System.Linq;
+
+namespace Org.Apache.Reef.Common
+{
+    public class EvaluatorConfigurations
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorConfigurations));
+
+        private AvroConfiguration _avroConfiguration;
+
+        private string _configFile;
+
+        private string _applicationId;
+
+        private string _evaluatorId;
+
+        private string _taskConfiguration;
+
+        private string _rootContextConfiguration;
+
+        private string _rootServiceConfiguration;
+
+        public EvaluatorConfigurations(string configFile)
+        {
+            using (LOGGER.LogFunction("EvaluatorConfigurations::EvaluatorConfigurations"))
+            {
+                if (string.IsNullOrWhiteSpace(configFile))
+                {
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("configFile"), LOGGER);
+                }
+                if (!File.Exists(configFile))
+                {
+                    Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new FileNotFoundException("cannot find file " + configFile), LOGGER);
+                }
+                _configFile = configFile;
+                AvroConfigurationSerializer serializer = new AvroConfigurationSerializer();
+                _avroConfiguration = serializer.AvroDeseriaizeFromFile(_configFile);
+            }
+        }
+
+        public string TaskConfiguration
+        {
+            get
+            {
+                _taskConfiguration = _taskConfiguration ?? GetSettingValue(Reef.Evaluator.Constants.TaskConfiguration);
+                return _taskConfiguration;
+            }
+        }
+
+        public string EvaluatorId
+        {
+            get
+            {
+                _evaluatorId = _evaluatorId ?? GetSettingValue(Reef.Evaluator.Constants.EvaluatorIdentifier);
+                return _evaluatorId;
+            }
+        }
+
+        public string ApplicationId
+        {
+            get
+            {
+                _applicationId = _applicationId ?? GetSettingValue(Reef.Evaluator.Constants.ApplicationIdentifier);
+                return _applicationId;
+            }
+        }
+
+        public string RootContextConfiguration
+        {
+            get
+            {
+                _rootContextConfiguration = _rootContextConfiguration ?? GetSettingValue(Reef.Evaluator.Constants.RootContextConfiguration);
+                return _rootContextConfiguration;
+            }
+        }
+
+        public string RootServiceConfiguration
+        {
+            get
+            {
+                _rootServiceConfiguration = _rootServiceConfiguration ?? GetSettingValue(Reef.Evaluator.Constants.RootServiceConfiguration);
+                return _rootServiceConfiguration;
+            }
+        }
+
+        private string GetSettingValue(string settingKey)
+        {
+            ConfigurationEntry configurationEntry =
+                _avroConfiguration.Bindings.SingleOrDefault(b => b.key.EndsWith(settingKey, StringComparison.OrdinalIgnoreCase));
+            if (configurationEntry == null)
+            {
+                return string.Empty;
+            }
+
+            return configurationEntry.value;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/RemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/RemoteManager.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/RemoteManager.cs
new file mode 100644
index 0000000..4e68186
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/RemoteManager.cs
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Common
+{
+    public class RemoteManager
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/services/IService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/services/IService.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/services/IService.cs
new file mode 100644
index 0000000..cbcebf8
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/services/IService.cs
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Services
+{
+    public interface IService
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServiceConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServiceConfiguration.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServiceConfiguration.cs
new file mode 100644
index 0000000..c51ef40
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServiceConfiguration.cs
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Tang.Formats;
+using Org.Apache.Reef.Tang.Interface;
+using Org.Apache.Reef.Tang.Util;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+
+[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1401:FieldsMustBePrivate", Justification = "static field, typical usage in configurations")]
+
+namespace Org.Apache.Reef.Services
+{
+    /// <summary>
+    /// Configuration module for services. The configuration created here can be passed alongside a ContextConfiguration
+    /// to form a context. Different from bindings made in the ContextConfiguration, those made here will be passed along
+    /// to child context.
+    /// </summary>
+    public class ServiceConfiguration : ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// A set of services to instantiate. All classes given here will be instantiated in the context, and their references
+        /// will be made available to child context and tasks.
+        /// </summary>
+        [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+        public static readonly OptionalParameter<IService> Services = new OptionalParameter<IService>();
+
+        public ServiceConfiguration()
+            : base()
+        {
+        }
+
+        public ServiceConfiguration(string config)
+        {
+            TangConfig = new AvroConfigurationSerializer().FromString(config);
+        }
+
+        public static ConfigurationModule ConfigurationModule
+        {
+            get
+            {
+                return new ServiceConfiguration()
+                    .BindSetEntry(GenericType<ServicesSet>.Class, Services)
+                    .Build();
+            }
+        }
+
+        public IConfiguration TangConfig { get; private set; }
+    }
+
+    public class InjectedServices
+    {
+        [Inject]
+        public InjectedServices([Parameter(typeof(ServicesSet))] ISet<IService> services)
+        {
+            Services = services;
+        }
+
+        public ISet<IService> Services { get; set; }
+    }
+
+    [NamedParameter("Set of services", "servicesSet", "")]
+    class ServicesSet : Name<ISet<IService>>
+    {      
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServicesConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServicesConfigurationOptions.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServicesConfigurationOptions.cs
new file mode 100644
index 0000000..30a38de
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServicesConfigurationOptions.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+
+namespace Org.Apache.Reef.Services
+{
+    public class ServicesConfigurationOptions
+    {
+        [NamedParameter("Services", "services", "services")]
+        public class Services : Name<string>
+        {
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IDriverMessageHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IDriverMessageHandler.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IDriverMessageHandler.cs
new file mode 100644
index 0000000..a0111b8
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IDriverMessageHandler.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Defaults;
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Tang.Annotations;
+
+namespace Org.Apache.Reef.Tasks
+{
+    //[DefaultImplementation(typeof(DefaultTaskMessageSource))]
+    public interface IDriverMessageHandler
+    {
+        void Handle(IDriverMessage message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IRunningTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IRunningTask.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IRunningTask.cs
new file mode 100644
index 0000000..92a6887
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IRunningTask.cs
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities;
+using System;
+
+namespace Org.Apache.Reef.Common.Task
+{
+    /// <summary>
+    /// Represents a running Task
+    /// </summary>
+    public interface IRunningTask : IIdentifiable, IDisposable
+    {
+        /// <summary>
+        /// Sends the message to the running task.
+        /// </summary>
+        /// <param name="message"></param>
+        void OnNext(byte[] message);
+
+        /// <summary>
+        ///  Signal the task to suspend.
+        /// </summary>
+        /// <param name="message">a message that is sent to the Task.</param>
+        void Suspend(byte[] message);
+
+        /// <summary>
+        /// Sends the message to the running task.
+        /// </summary>
+        void Suspend();
+
+        /// <summary>
+        /// Signal the task to shut down.
+        /// </summary>
+        /// <param name="message">a message that is sent to the Task.</param>
+        void Dispose(byte[] message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITask.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITask.cs
new file mode 100644
index 0000000..3655a4b
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITask.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Tasks
+{
+    public interface ITask : IDisposable
+    {
+        byte[] Call(byte[] memento);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITaskMessageSource.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITaskMessageSource.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITaskMessageSource.cs
new file mode 100644
index 0000000..589a445
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITaskMessageSource.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Defaults;
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Tang.Annotations;
+
+namespace Org.Apache.Reef.Tasks
+{
+    [DefaultImplementation(typeof(DefaultTaskMessageSource))]
+    public interface ITaskMessageSource
+    {
+        Optional<TaskMessage> Message { get; set; }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfiguration.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfiguration.cs
new file mode 100644
index 0000000..6fb386b
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfiguration.cs
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Formats;
+using Org.Apache.Reef.Tang.Interface;
+using Org.Apache.Reef.Tang.Util;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.Globalization;
+
+[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1401:FieldsMustBePrivate", Justification = "static readonly field, typical usage in configurations")]
+
+namespace Org.Apache.Reef.Tasks
+{
+    public class TaskConfiguration : ConfigurationModuleBuilder
+    {
+        // this is a hack for getting the task identifier for now
+        public const string TaskIdentifier = "TaskConfigurationOptions+Identifier";
+
+        /// <summary>
+        ///  The identifier of the task.
+        /// </summary>
+        [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+        public static readonly RequiredParameter<string> Identifier = new RequiredParameter<string>();
+
+        /// <summary>
+        /// The task to instantiate.
+        /// </summary>
+        [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+        public static readonly RequiredImpl<ITask> Task = new RequiredImpl<ITask>();
+
+        /// <summary>
+        /// for task suspension. Defaults to task failure if not bound.
+        /// </summary>
+        [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+        public static readonly OptionalImpl<IObserver<ISuspendEvent>> OnSuspend = new OptionalImpl<IObserver<ISuspendEvent>>();
+
+        /// <summary>
+        /// for messages from the driver. Defaults to task failure if not bound.
+        /// </summary>
+        [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+        public static readonly OptionalImpl<IDriverMessageHandler> OnMessage = new OptionalImpl<IDriverMessageHandler>();
+
+        /// <summary>
+        /// for closure requests from the driver. Defaults to task failure if not bound.
+        /// </summary>
+        [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+        public static readonly OptionalImpl<IObserver<ICloseEvent>> OnClose = new OptionalImpl<IObserver<ICloseEvent>>();
+
+        /// <summary>
+        /// Message source invoked upon each evaluator heartbeat.
+        /// </summary>
+        [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+        public static readonly OptionalImpl<ITaskMessageSource> OnSendMessage = new OptionalImpl<ITaskMessageSource>();
+
+        /// <summary>
+        /// to receive TaskStart after the Task.call() method was called.
+        /// </summary>
+        [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+        public static readonly OptionalImpl<IObserver<ITaskStart>> OnTaskStart = new OptionalImpl<IObserver<ITaskStart>>();
+
+        /// <summary>
+        /// to receive TaskStop after the Task.call() method returned.
+        /// </summary>
+        [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+        public static readonly OptionalImpl<IObserver<ITaskStop>> OnTaskStop = new OptionalImpl<IObserver<ITaskStop>>();
+
+        /// <summary>
+        /// The memento to be passed to Task.call().
+        /// </summary>
+        [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+        public static readonly OptionalParameter<string> Memento = new OptionalParameter<string>();
+
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskConfiguration));
+
+        public TaskConfiguration()
+            : base()
+        {
+        }
+
+        public TaskConfiguration(string configString)
+        {
+            TangConfig = new AvroConfigurationSerializer().FromString(configString);
+            AvroConfiguration avroConfiguration = AvroConfiguration.GetAvroConfigurationFromEmbeddedString(configString);
+            foreach (ConfigurationEntry config in avroConfiguration.Bindings)
+            {
+                if (config.key.Contains(TaskIdentifier))
+                {
+                    TaskId = config.value;
+                }
+            }
+            if (string.IsNullOrWhiteSpace(TaskId))
+            {
+                string msg = "Required parameter TaskId not provided.";
+                LOGGER.Log(Level.Error, msg);
+                Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException(msg), LOGGER);
+            }
+        }
+
+        public static ConfigurationModule ConfigurationModule
+        {
+            get
+            {
+                return new TaskConfiguration()
+                    .BindImplementation(GenericType<ITask>.Class, Task)
+                    .BindImplementation(GenericType<ITaskMessageSource>.Class, OnSendMessage)
+                    .BindImplementation(GenericType<IDriverMessageHandler>.Class, OnMessage)
+                    .BindNamedParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, Identifier)
+                    .BindNamedParameter(GenericType<TaskConfigurationOptions.Memento>.Class, Memento)
+                    .BindNamedParameter(GenericType<TaskConfigurationOptions.CloseHandler>.Class, OnClose)
+                    .BindNamedParameter(GenericType<TaskConfigurationOptions.SuspendHandler>.Class, OnSuspend)
+                    .BindSetEntry(GenericType<TaskConfigurationOptions.StartHandlers>.Class, OnTaskStart)
+                    .BindSetEntry(GenericType<TaskConfigurationOptions.StopHandlers>.Class, OnTaskStop)
+                    .Build();
+            }
+        }
+
+        public string TaskId { get;  private set; }
+
+        public IList<KeyValuePair<string, string>> Configurations { get; private set; }
+
+        public IConfiguration TangConfig { get; private set; }
+
+        public override string ToString()
+        {
+            return string.Format(CultureInfo.InvariantCulture, "TaskConfiguration - configurations: {0}", TangConfig.ToString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfigurationOptions.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfigurationOptions.cs
new file mode 100644
index 0000000..888faf0
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfigurationOptions.cs
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+
+namespace Org.Apache.Reef.Tasks
+{
+    public class TaskConfigurationOptions
+    {
+        [NamedParameter("The Identifier of the Task", "taskid", "Task")]
+        public class Identifier : Name<string>
+        {
+        }
+
+        [NamedParameter(documentation: "The memento to be used for the Task")]
+        public class Memento : Name<string>
+        {
+        }
+
+        [NamedParameter("TaskMessageSource", "messagesource", null)]
+        public class TaskMessageSources : Name<ISet<ITaskMessageSource>>
+        {
+        }
+
+        [NamedParameter(documentation: "The set of event handlers for the TaskStart event.")]
+        public class StartHandlers : Name<ISet<IObserver<ITaskStart>>>
+        {
+        }
+
+        [NamedParameter(documentation: "The set of event handlers for the TaskStop event.")]
+        public class StopHandlers : Name<ISet<IObserver<ITaskStop>>>
+        {
+        }
+
+        [NamedParameter(documentation: "The event handler that receives the close event.")]
+        public class CloseHandler : Name<IObserver<ICloseEvent>>
+        {
+        }
+
+        [NamedParameter(documentation: "The event handler that receives the suspend event.")]
+        public class SuspendHandler : Name<IObserver<ISuspendEvent>>
+        {
+        }
+
+        [NamedParameter(documentation: "The event handler that receives messages from the driver.")]
+        public class MessageHandler : Name<IObserver<IDriverMessage>>
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskMessage.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskMessage.cs
new file mode 100644
index 0000000..20defce
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskMessage.cs
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities;
+using System;
+using Org.Apache.Reef.Utilities.Logging;
+
+namespace Org.Apache.Reef.Tasks
+{
+    public class TaskMessage : IMessage
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskMessage));
+        private readonly string _messageSourcId;
+        private readonly byte[] _bytes;
+
+        private TaskMessage(string messageSourceId, byte[] bytes)
+        {
+            _messageSourcId = messageSourceId;
+            _bytes = bytes;
+        }
+
+        public string MessageSourceId
+        {
+            get { return _messageSourcId; }
+        }
+
+        public byte[] Message
+        {
+            get { return _bytes; }
+            set { }
+        }
+
+        /// <summary>
+        ///  From byte[] message to a TaskMessage
+        /// </summary>
+        /// <param name="messageSourceId">messageSourceId The message's sourceID. This will be accessible in the Driver for routing</param>
+        /// <param name="message">The actual content of the message, serialized into a byte[]</param>
+        /// <returns>a new TaskMessage with the given content</returns>
+        public static TaskMessage From(string messageSourceId, byte[] message)
+        {
+            if (string.IsNullOrEmpty(messageSourceId))
+            {
+                Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("messageSourceId"), LOGGER);
+            }
+            if (message == null)
+            {
+                Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("bytes"), LOGGER);
+            }
+            return new TaskMessage(messageSourceId, message);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultDriverMessageHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultDriverMessageHandler.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultDriverMessageHandler.cs
new file mode 100644
index 0000000..f77e18d
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultDriverMessageHandler.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Annotations;
+using System;
+
+namespace Org.Apache.Reef.Tasks.Defaults
+{
+    public class DefaultDriverMessageHandler : IDriverMessageHandler
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(DefaultDriverMessageHandler));
+
+        [Inject]
+        public DefaultDriverMessageHandler()
+        {
+        }
+
+        public void Handle(IDriverMessage message)
+        {
+            Exceptions.Throw(new InvalidOperationException("No DriverMessage handler bound. Message received" + message), LOGGER);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultTaskMessageSource.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultTaskMessageSource.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultTaskMessageSource.cs
new file mode 100644
index 0000000..97b52db
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultTaskMessageSource.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Tang.Annotations;
+using System;
+using System.Globalization;
+
+namespace Org.Apache.Reef.Tasks.Defaults
+{
+    public class DefaultTaskMessageSource : ITaskMessageSource
+    {
+        [Inject]
+        public DefaultTaskMessageSource()
+        {
+        }
+
+        public Optional<TaskMessage> Message
+        {
+            get
+            {
+                TaskMessage defaultTaskMessage = TaskMessage.From(
+                    "defaultSourceId", 
+                    ByteUtilities.StringToByteArrays("default message generated at " + DateTime.Now.ToString(CultureInfo.InvariantCulture)));
+                return Optional<TaskMessage>.Of(defaultTaskMessage);
+            }
+
+            set
+            {
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ICloseEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ICloseEvent.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ICloseEvent.cs
new file mode 100644
index 0000000..3ff9ccf
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ICloseEvent.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities;
+
+namespace Org.Apache.Reef.Tasks.Events
+{
+    public interface ICloseEvent
+    {
+        Optional<byte[]> Value { get; set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/IDriverMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/IDriverMessage.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/IDriverMessage.cs
new file mode 100644
index 0000000..9ac120d
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/IDriverMessage.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities;
+
+namespace Org.Apache.Reef.Tasks.Events
+{
+    public interface IDriverMessage
+    {
+        Optional<byte[]> Message { get; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ISuspendEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ISuspendEvent.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ISuspendEvent.cs
new file mode 100644
index 0000000..218a28f
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ISuspendEvent.cs
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Tasks.Events
+{
+    public interface ISuspendEvent
+    {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStart.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStart.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStart.cs
new file mode 100644
index 0000000..bcde0b3
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStart.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Tasks.Events
+{
+    public interface ITaskStart
+    {
+        string Id { get; set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStop.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStop.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStop.cs
new file mode 100644
index 0000000..5e8ebc8
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStop.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Tasks.Events
+{
+    public interface ITaskStop
+    {
+        string Id { get; }
+    }
+}