You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/29 21:43:10 UTC
[26/31] incubator-reef git commit: [REEF-97] Add the REEF.NET code
base
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/CloseEventImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/CloseEventImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/CloseEventImpl.cs
new file mode 100644
index 0000000..e3a13b8
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/CloseEventImpl.cs
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Utilities;
+
+namespace Org.Apache.Reef.Common
+{
+ public class CloseEventImpl : ICloseEvent
+ {
+ public CloseEventImpl()
+ {
+ Value = Optional<byte[]>.Empty();
+ }
+
+ public CloseEventImpl(byte[] bytes)
+ {
+ Value = Optional<byte[]>.OfNullable(bytes);
+ }
+
+ public Optional<byte[]> Value
+ {
+ get { return Value; }
+ set { value = Value; }
+ }
+
+ public override string ToString()
+ {
+ return "CloseEvent{value=" + Value.ToString() + "}";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/DriverMessageImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/DriverMessageImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/DriverMessageImpl.cs
new file mode 100644
index 0000000..034df50
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/DriverMessageImpl.cs
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Utilities;
+
+namespace Org.Apache.Reef.Common.Runtime.Evaluator
+{
+ public class DriverMessageImpl : IDriverMessage
+ {
+ private Optional<byte[]> _value;
+
+ public DriverMessageImpl()
+ {
+ _value = Optional<byte[]>.Empty();
+ }
+
+ public DriverMessageImpl(byte[] bytes)
+ {
+ _value = Optional<byte[]>.OfNullable(bytes);
+ }
+
+ public Optional<byte[]> Message
+ {
+ get
+ {
+ return _value;
+ }
+ }
+
+ public override string ToString()
+ {
+ return "DriverMessage [value=" + ByteUtilities.ByteArrarysToString(_value.Value) + "]";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/SuspendEventImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/SuspendEventImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/SuspendEventImpl.cs
new file mode 100644
index 0000000..c75b09f
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/SuspendEventImpl.cs
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Utilities;
+
+namespace Org.Apache.Reef.Common
+{
+ public class SuspendEventImpl : ICloseEvent
+ {
+ public SuspendEventImpl()
+ {
+ Value = Optional<byte[]>.Empty();
+ }
+
+ public SuspendEventImpl(byte[] bytes)
+ {
+ Value = Optional<byte[]>.OfNullable(bytes);
+ }
+
+ public Optional<byte[]> Value
+ {
+ get { return Value; }
+ set { value = Value; }
+ }
+
+ public override string ToString()
+ {
+ return "SuspendEvent{value=" + Value.ToString() + "}";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskClientCodeException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskClientCodeException.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskClientCodeException.cs
new file mode 100644
index 0000000..65b8be9
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskClientCodeException.cs
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.Reef.Tang.Interface;
+
+namespace Org.Apache.Reef.Common.Task
+{
+ public class TaskClientCodeException : Exception
+ {
+ private readonly string _taskId;
+
+ private readonly string _contextId;
+
+ /// <summary>
+ /// construct the exception that caused the Task to fail
+ /// </summary>
+ /// <param name="taskId"> the id of the failed task.</param>
+ /// <param name="contextId"> the id of the context the failed Task was executing in.</param>
+ /// <param name="message"> the error message </param>
+ /// <param name="cause"> the exception that caused the Task to fail.</param>
+ public TaskClientCodeException(
+ string taskId,
+ string contextId,
+ string message,
+ Exception cause)
+ : base(message, cause)
+ {
+ _taskId = taskId;
+ _contextId = contextId;
+ }
+
+ public string TaskId
+ {
+ get { return _taskId; }
+ }
+
+ public string ContextId
+ {
+ get { return _contextId; }
+ }
+
+ public static string GetTaskIdentifier(IConfiguration c)
+ {
+ // TODO: update after TANG is available
+ return string.Empty;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskLifeCycle.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskLifeCycle.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskLifeCycle.cs
new file mode 100644
index 0000000..30acc2e
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskLifeCycle.cs
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Wake;
+using System;
+using System.Collections.Generic;
+
+namespace Org.Apache.Reef.Common
+{
+ public class TaskLifeCycle
+ {
+ private readonly HashSet<IObserver<ITaskStop>> _taskStopHandlers;
+ private readonly HashSet<IObserver<ITaskStart>> _taskStartHandlers;
+ private readonly ITaskStart _taskStart;
+ private readonly ITaskStop _taskStop;
+
+ // INJECT
+ public TaskLifeCycle(
+ HashSet<IObserver<ITaskStop>> taskStopHandlers,
+ HashSet<IObserver<ITaskStart>> taskStartHandlers,
+ TaskStartImpl taskStart,
+ TaskStopImpl taskStop)
+ {
+ _taskStartHandlers = taskStartHandlers;
+ _taskStopHandlers = taskStopHandlers;
+ _taskStart = taskStart;
+ _taskStop = taskStop;
+ }
+
+ public TaskLifeCycle()
+ {
+ _taskStartHandlers = new HashSet<IObserver<ITaskStart>>();
+ _taskStopHandlers = new HashSet<IObserver<ITaskStop>>();
+ }
+
+ public void Start()
+ {
+ foreach (IObserver<ITaskStart> startHandler in _taskStartHandlers)
+ {
+ startHandler.OnNext(_taskStart);
+ }
+ }
+
+ public void Stop()
+ {
+ foreach (IObserver<ITaskStop> stopHandler in _taskStopHandlers)
+ {
+ stopHandler.OnNext(_taskStop);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskRuntime.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskRuntime.cs
new file mode 100644
index 0000000..05d6eec
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskRuntime.cs
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.io;
+using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.Reef.Common.Runtime.Evaluator;
+using Org.Apache.Reef.Common.Task;
+using Org.Apache.Reef.Tasks;
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Exceptions;
+using Org.Apache.Reef.Tang.Interface;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+
+namespace Org.Apache.Reef.Common
+{
+ public class TaskRuntime : IObserver<ICloseEvent>, IObserver<ISuspendEvent>, IObserver<IDriverMessage>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskRuntime));
+
+ private readonly ITask _task;
+
+ private readonly IInjector _injector;
+
+ // The memento given by the task configuration
+ private readonly Optional<byte[]> _memento;
+
+ private readonly HeartBeatManager _heartBeatManager;
+
+ private readonly TaskStatus _currentStatus;
+
+ private readonly INameClient _nameClient;
+
+ public TaskRuntime(IInjector taskInjector, string contextId, string taskId, HeartBeatManager heartBeatManager, string memento = null)
+ {
+ _injector = taskInjector;
+ _heartBeatManager = heartBeatManager;
+
+ Optional<ISet<ITaskMessageSource>> messageSources = Optional<ISet<ITaskMessageSource>>.Empty();
+ try
+ {
+ _task = _injector.GetInstance<ITask>();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Unable to inject task.", e), Level.Error, "Unable to inject task.", LOGGER);
+ }
+ try
+ {
+ ITaskMessageSource taskMessageSource = _injector.GetInstance<ITaskMessageSource>();
+ messageSources = Optional<ISet<ITaskMessageSource>>.Of(new HashSet<ITaskMessageSource>() { taskMessageSource });
+ }
+ catch (Exception e)
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot inject task message source with error: " + e.StackTrace, LOGGER);
+ // do not rethrow since this is benign
+ }
+ try
+ {
+ _nameClient = _injector.GetInstance<INameClient>();
+ _heartBeatManager.EvaluatorSettings.NameClient = _nameClient;
+ }
+ catch (InjectionException)
+ {
+ LOGGER.Log(Level.Warning, "Cannot inject name client from task configuration.");
+ // do not rethrow since user is not required to provide name client
+ }
+
+ LOGGER.Log(Level.Info, "task message source injected");
+ _currentStatus = new TaskStatus(_heartBeatManager, contextId, taskId, messageSources);
+ _memento = memento == null ?
+ Optional<byte[]>.Empty() : Optional<byte[]>.Of(ByteUtilities.StringToByteArrays(memento));
+ }
+
+ public string TaskId
+ {
+ get { return _currentStatus.TaskId; }
+ }
+
+ public string ContextId
+ {
+ get { return _currentStatus.ContextId; }
+ }
+
+ public void Initialize()
+ {
+ _currentStatus.SetRunning();
+ }
+
+ /// <summary>
+ /// Run the task
+ /// </summary>
+ public void Start()
+ {
+ try
+ {
+ LOGGER.Log(Level.Info, "Call Task");
+ if (_currentStatus.IsNotRunning())
+ {
+ var e = new InvalidOperationException("TaskRuntime not in Running state, instead it is in state " + _currentStatus.State);
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+ }
+ byte[] result;
+ byte[] taskMemento = _memento.IsPresent() ? _memento.Value : null;
+ System.Threading.Tasks.Task<byte[]> runTask = new System.Threading.Tasks.Task<byte[]>(() => RunTask(taskMemento));
+ try
+ {
+ runTask.Start();
+ runTask.Wait();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Exception thrown during task running.", LOGGER);
+ }
+ result = runTask.Result;
+
+ LOGGER.Log(Level.Info, "Task Call Finished");
+ if (_task != null)
+ {
+ _task.Dispose();
+ }
+ _currentStatus.SetResult(result);
+ if (result != null && result.Length > 0)
+ {
+ LOGGER.Log(Level.Info, "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result));
+ }
+ }
+ catch (Exception e)
+ {
+ if (_task != null)
+ {
+ _task.Dispose();
+ }
+ LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Task failed caused by exception [{0}]", e));
+ _currentStatus.SetException(e);
+ }
+ }
+
+ public TaskState GetTaskState()
+ {
+ return _currentStatus.State;
+ }
+
+ /// <summary>
+ /// Called by heartbeat manager
+ /// </summary>
+ /// <returns> current TaskStatusProto </returns>
+ public TaskStatusProto GetStatusProto()
+ {
+ return _currentStatus.ToProto();
+ }
+
+ public bool HasEnded()
+ {
+ return _currentStatus.HasEnded();
+ }
+
+ /// <summary>
+ /// get ID of the task.
+ /// </summary>
+ /// <returns>ID of the task.</returns>
+ public string GetActicityId()
+ {
+ return _currentStatus.TaskId;
+ }
+
+ public void Close(byte[] message)
+ {
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to close Task {0}", TaskId));
+ if (_currentStatus.IsNotRunning())
+ {
+ LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in {0} state. Ignored.", _currentStatus.State));
+ }
+ else
+ {
+ try
+ {
+ OnNext(new CloseEventImpl(message));
+ _currentStatus.SetCloseRequested();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", LOGGER);
+
+ _currentStatus.SetException(
+ new TaskClientCodeException(TaskId, ContextId, "Error during Close().", e));
+ }
+ }
+ }
+
+ public void Suspend(byte[] message)
+ {
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to suspend Task {0}", TaskId));
+
+ if (_currentStatus.IsNotRunning())
+ {
+ LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to supend an task that is in {0} state. Ignored.", _currentStatus.State));
+ }
+ else
+ {
+ try
+ {
+ OnNext(new SuspendEventImpl(message));
+ _currentStatus.SetSuspendRequested();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Suspend.", LOGGER);
+ _currentStatus.SetException(
+ new TaskClientCodeException(TaskId, ContextId, "Error during Suspend().", e));
+ }
+ }
+ }
+
+ public void Deliver(byte[] message)
+ {
+ if (_currentStatus.IsNotRunning())
+ {
+ LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to send a message to an task that is in {0} state. Ignored.", _currentStatus.State));
+ }
+ else
+ {
+ try
+ {
+ OnNext(new DriverMessageImpl(message));
+ }
+ catch (Exception e)
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during message delivery.", LOGGER);
+ _currentStatus.SetException(
+ new TaskClientCodeException(TaskId, ContextId, "Error during message delivery.", e));
+ }
+ }
+ }
+
+ public void OnNext(ICloseEvent value)
+ {
+ LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ICloseEvent value)");
+ // TODO: send a heartbeat
+ }
+
+ void IObserver<ICloseEvent>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<IDriverMessage>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<IDriverMessage>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<ISuspendEvent>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<ISuspendEvent>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<ICloseEvent>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnNext(ISuspendEvent value)
+ {
+ LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)");
+ // TODO: send a heartbeat
+ }
+
+ public void OnNext(IDriverMessage value)
+ {
+ IDriverMessageHandler messageHandler = null;
+ LOGGER.Log(Level.Info, "TaskRuntime::OnNext(IDriverMessage value)");
+ try
+ {
+ messageHandler = _injector.GetInstance<IDriverMessageHandler>();
+ }
+ catch (Exception e)
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Received Driver message, but unable to inject handler for driver message ", LOGGER);
+ }
+ if (messageHandler != null)
+ {
+ try
+ {
+ messageHandler.Handle(value);
+ }
+ catch (Exception e)
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Exception throw when handling driver message: " + e, LOGGER);
+ _currentStatus.RecordExecptionWithoutHeartbeat(e);
+ }
+ }
+ }
+
+ private byte[] RunTask(byte[] memento)
+ {
+ return _task.Call(memento);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStartImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStartImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStartImpl.cs
new file mode 100644
index 0000000..c4047b8
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStartImpl.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Events;
+
+namespace Org.Apache.Reef.Common
+{
+ public class TaskStartImpl : ITaskStart
+ {
+ //INJECT
+ public TaskStartImpl(string id)
+ {
+ Id = id;
+ }
+
+ public string Id { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskState.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskState.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskState.cs
new file mode 100644
index 0000000..9e3bcb4
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskState.cs
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Common
+{
+ public enum TaskState
+ {
+ Init = 0,
+
+ Running = 1,
+
+ CloseRequested = 2,
+
+ SuspendRequested = 3,
+
+ Suspended = 4,
+
+ Failed = 5,
+
+ Done = 6,
+
+ Killed = 7
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStatus.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStatus.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStatus.cs
new file mode 100644
index 0000000..639a7d0
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStatus.cs
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.Reef.Tasks;
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Utilities.Logging;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+
+namespace Org.Apache.Reef.Common
+{
+ public class TaskStatus
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskStatus));
+ private readonly TaskLifeCycle _taskLifeCycle;
+ private readonly HeartBeatManager _heartBeatManager;
+ private readonly Optional<ISet<ITaskMessageSource>> _evaluatorMessageSources;
+
+ private string _taskId;
+ private string _contextId;
+ private Optional<Exception> _lastException = Optional<Exception>.Empty();
+ private Optional<byte[]> _result = Optional<byte[]>.Empty();
+ private TaskState _state;
+
+ public TaskStatus(HeartBeatManager heartBeatManager, string contextId, string taskId, Optional<ISet<ITaskMessageSource>> evaluatorMessageSources)
+ {
+ _contextId = contextId;
+ _taskId = taskId;
+ _heartBeatManager = heartBeatManager;
+ _taskLifeCycle = new TaskLifeCycle();
+ _evaluatorMessageSources = evaluatorMessageSources;
+ State = TaskState.Init;
+ }
+
+ public TaskState State
+ {
+ get
+ {
+ return _state;
+ }
+
+ set
+ {
+ if (IsLegalStateTransition(_state, value))
+ {
+ _state = value;
+ }
+ else
+ {
+ string message = string.Format(CultureInfo.InvariantCulture, "Illegal state transition from [{0}] to [{1}]", _state, value);
+ LOGGER.Log(Level.Error, message);
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(message), LOGGER);
+ }
+ }
+ }
+
+ public string TaskId
+ {
+ get { return _taskId; }
+ }
+
+ public string ContextId
+ {
+ get { return _contextId; }
+ }
+
+ public void SetException(Exception e)
+ {
+ RecordExecptionWithoutHeartbeat(e);
+ Heartbeat();
+ _lastException = Optional<Exception>.Empty();
+ }
+
+ public void SetResult(byte[] result)
+ {
+ _result = Optional<byte[]>.OfNullable(result);
+ if (State == TaskState.Running)
+ {
+ State = TaskState.Done;
+ }
+ else if (State == TaskState.SuspendRequested)
+ {
+ State = TaskState.Suspended;
+ }
+ else if (State == TaskState.CloseRequested)
+ {
+ State = TaskState.Done;
+ }
+ _taskLifeCycle.Stop();
+ Heartbeat();
+ }
+
+ public void SetRunning()
+ {
+ LOGGER.Log(Level.Verbose, "TaskStatus::SetRunning");
+ if (_state == TaskState.Init)
+ {
+ try
+ {
+ _taskLifeCycle.Start();
+ // Need to send an INIT heartbeat to the driver prompting it to create an RunningTask event.
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Sending task INIT heartbeat"));
+ Heartbeat();
+ State = TaskState.Running;
+ }
+ catch (Exception e)
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Cannot set task status to running.", LOGGER);
+ SetException(e);
+ }
+ }
+ }
+
+ public void SetCloseRequested()
+ {
+ State = TaskState.CloseRequested;
+ }
+
+ public void SetSuspendRequested()
+ {
+ State = TaskState.SuspendRequested;
+ }
+
+ public void SetKilled()
+ {
+ State = TaskState.Killed;
+ Heartbeat();
+ }
+
+ public bool IsNotRunning()
+ {
+ return _state != TaskState.Running;
+ }
+
+ public bool HasEnded()
+ {
+ switch (_state)
+ {
+ case TaskState.Done:
+ case TaskState.Suspended:
+ case TaskState.Failed:
+ case TaskState.Killed:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ public TaskStatusProto ToProto()
+ {
+ Check();
+ TaskStatusProto taskStatusProto = new TaskStatusProto()
+ {
+ context_id = _contextId,
+ task_id = _taskId,
+ state = GetProtoState(),
+ };
+ if (_result.IsPresent())
+ {
+ taskStatusProto.result = ByteUtilities.CopyBytesFrom(_result.Value);
+ }
+ else if (_lastException.IsPresent())
+ {
+ //final Encoder<Throwable> codec = new ObjectSerializableCodec<>();
+ //final byte[] error = codec.encode(_lastException.get());
+ byte[] error = ByteUtilities.StringToByteArrays(_lastException.Value.ToString());
+ taskStatusProto.result = ByteUtilities.CopyBytesFrom(error);
+ }
+ else if (_state == TaskState.Running)
+ {
+ foreach (TaskMessage message in GetMessages())
+ {
+ TaskStatusProto.TaskMessageProto taskMessageProto = new TaskStatusProto.TaskMessageProto()
+ {
+ source_id = message.MessageSourceId,
+ message = ByteUtilities.CopyBytesFrom(message.Message),
+ };
+ taskStatusProto.task_message.Add(taskMessageProto);
+ }
+ }
+ return taskStatusProto;
+ }
+
+ internal void RecordExecptionWithoutHeartbeat(Exception e)
+ {
+ if (!_lastException.IsPresent())
+ {
+ _lastException = Optional<Exception>.Of(e);
+ }
+ State = TaskState.Failed;
+ _taskLifeCycle.Stop();
+ }
+
+ private static bool IsLegalStateTransition(TaskState? from, TaskState to)
+ {
+ if (from == null)
+ {
+ return to == TaskState.Init;
+ }
+ switch (from)
+ {
+ case TaskState.Init:
+ switch (to)
+ {
+ case TaskState.Init:
+ case TaskState.Running:
+ case TaskState.Failed:
+ case TaskState.Killed:
+ case TaskState.Done:
+ return true;
+ default:
+ return false;
+ }
+ case TaskState.Running:
+ switch (to)
+ {
+ case TaskState.CloseRequested:
+ case TaskState.SuspendRequested:
+ case TaskState.Failed:
+ case TaskState.Killed:
+ case TaskState.Done:
+ return true;
+ default:
+ return false;
+ }
+ case TaskState.CloseRequested:
+ switch (to)
+ {
+ case TaskState.Failed:
+ case TaskState.Killed:
+ case TaskState.Done:
+ return true;
+ default:
+ return false;
+ }
+ case TaskState.SuspendRequested:
+ switch (to)
+ {
+ case TaskState.Failed:
+ case TaskState.Killed:
+ case TaskState.Suspended:
+ return true;
+ default:
+ return false;
+ }
+
+ case TaskState.Failed:
+ case TaskState.Done:
+ case TaskState.Killed:
+ default:
+ return true;
+ }
+ }
+
+ private void Check()
+ {
+ if (_result.IsPresent() && _lastException.IsPresent())
+ {
+ LOGGER.Log(Level.Warning, "Both task result and exception are present, the expcetion will take over. Thrown away result:" + ByteUtilities.ByteArrarysToString(_result.Value));
+ State = TaskState.Failed;
+ _result = Optional<byte[]>.Empty();
+ }
+ }
+
+ private void Heartbeat()
+ {
+ _heartBeatManager.OnNext(ToProto());
+ }
+
+ private State GetProtoState()
+ {
+ switch (_state)
+ {
+ case TaskState.Init:
+ return ProtoBuf.ReefServiceProto.State.INIT;
+ case TaskState.CloseRequested:
+ case TaskState.SuspendRequested:
+ case TaskState.Running:
+ return ProtoBuf.ReefServiceProto.State.RUNNING;
+ case TaskState.Done:
+ return ProtoBuf.ReefServiceProto.State.DONE;
+ case TaskState.Suspended:
+ return ProtoBuf.ReefServiceProto.State.SUSPEND;
+ case TaskState.Failed:
+ return ProtoBuf.ReefServiceProto.State.FAILED;
+ case TaskState.Killed:
+ return ProtoBuf.ReefServiceProto.State.KILLED;
+ default:
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Unknown state: " + _state), LOGGER);
+ break;
+ }
+ return ProtoBuf.ReefServiceProto.State.FAILED; //this line should not be reached as default case will throw exception
+ }
+
+ private ICollection<TaskMessage> GetMessages()
+ {
+ List<TaskMessage> result = new List<TaskMessage>();
+ if (_evaluatorMessageSources.IsPresent())
+ {
+ foreach (ITaskMessageSource source in _evaluatorMessageSources.Value)
+ {
+ Optional<TaskMessage> taskMessageOptional = source.Message;
+ if (taskMessageOptional.IsPresent())
+ {
+ result.Add(taskMessageOptional.Value);
+ }
+ }
+ }
+ return result;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStopImpl.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStopImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStopImpl.cs
new file mode 100644
index 0000000..2c7e75e
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/task/TaskStopImpl.cs
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.Task;
+using Org.Apache.Reef.Tasks.Events;
+
+namespace Org.Apache.Reef.Common
+{
+ public class TaskStopImpl : ITaskStop
+ {
+ //INJECT
+ public TaskStopImpl(string id)
+ {
+ Id = id;
+ }
+
+ public string Id { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/EvaluatorConfigurations.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/EvaluatorConfigurations.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/EvaluatorConfigurations.cs
new file mode 100644
index 0000000..3154541
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/EvaluatorConfigurations.cs
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Formats;
+using System;
+using System.IO;
+using System.Linq;
+
+namespace Org.Apache.Reef.Common
+{
+ public class EvaluatorConfigurations
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorConfigurations));
+
+ private AvroConfiguration _avroConfiguration;
+
+ private string _configFile;
+
+ private string _applicationId;
+
+ private string _evaluatorId;
+
+ private string _taskConfiguration;
+
+ private string _rootContextConfiguration;
+
+ private string _rootServiceConfiguration;
+
+ public EvaluatorConfigurations(string configFile)
+ {
+ using (LOGGER.LogFunction("EvaluatorConfigurations::EvaluatorConfigurations"))
+ {
+ if (string.IsNullOrWhiteSpace(configFile))
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("configFile"), LOGGER);
+ }
+ if (!File.Exists(configFile))
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new FileNotFoundException("cannot find file " + configFile), LOGGER);
+ }
+ _configFile = configFile;
+ AvroConfigurationSerializer serializer = new AvroConfigurationSerializer();
+ _avroConfiguration = serializer.AvroDeseriaizeFromFile(_configFile);
+ }
+ }
+
+ public string TaskConfiguration
+ {
+ get
+ {
+ _taskConfiguration = _taskConfiguration ?? GetSettingValue(Reef.Evaluator.Constants.TaskConfiguration);
+ return _taskConfiguration;
+ }
+ }
+
+ public string EvaluatorId
+ {
+ get
+ {
+ _evaluatorId = _evaluatorId ?? GetSettingValue(Reef.Evaluator.Constants.EvaluatorIdentifier);
+ return _evaluatorId;
+ }
+ }
+
+ public string ApplicationId
+ {
+ get
+ {
+ _applicationId = _applicationId ?? GetSettingValue(Reef.Evaluator.Constants.ApplicationIdentifier);
+ return _applicationId;
+ }
+ }
+
+ public string RootContextConfiguration
+ {
+ get
+ {
+ _rootContextConfiguration = _rootContextConfiguration ?? GetSettingValue(Reef.Evaluator.Constants.RootContextConfiguration);
+ return _rootContextConfiguration;
+ }
+ }
+
+ public string RootServiceConfiguration
+ {
+ get
+ {
+ _rootServiceConfiguration = _rootServiceConfiguration ?? GetSettingValue(Reef.Evaluator.Constants.RootServiceConfiguration);
+ return _rootServiceConfiguration;
+ }
+ }
+
+ private string GetSettingValue(string settingKey)
+ {
+ ConfigurationEntry configurationEntry =
+ _avroConfiguration.Bindings.SingleOrDefault(b => b.key.EndsWith(settingKey, StringComparison.OrdinalIgnoreCase));
+ if (configurationEntry == null)
+ {
+ return string.Empty;
+ }
+
+ return configurationEntry.value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/RemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/RemoteManager.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/RemoteManager.cs
new file mode 100644
index 0000000..4e68186
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/utils/RemoteManager.cs
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Common
+{
+ public class RemoteManager
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/services/IService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/services/IService.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/services/IService.cs
new file mode 100644
index 0000000..cbcebf8
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/services/IService.cs
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Services
+{
+ public interface IService
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServiceConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServiceConfiguration.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServiceConfiguration.cs
new file mode 100644
index 0000000..c51ef40
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServiceConfiguration.cs
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Tang.Formats;
+using Org.Apache.Reef.Tang.Interface;
+using Org.Apache.Reef.Tang.Util;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+
+[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1401:FieldsMustBePrivate", Justification = "static field, typical usage in configurations")]
+
+namespace Org.Apache.Reef.Services
+{
+ /// <summary>
+ /// Configuration module for services. The configuration created here can be passed alongside a ContextConfiguration
+ /// to form a context. Different from bindings made in the ContextConfiguration, those made here will be passed along
+ /// to child context.
+ /// </summary>
+ public class ServiceConfiguration : ConfigurationModuleBuilder
+ {
+ /// <summary>
+ /// A set of services to instantiate. All classes given here will be instantiated in the context, and their references
+ /// will be made available to child context and tasks.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalParameter<IService> Services = new OptionalParameter<IService>();
+
+ public ServiceConfiguration()
+ : base()
+ {
+ }
+
+ public ServiceConfiguration(string config)
+ {
+ TangConfig = new AvroConfigurationSerializer().FromString(config);
+ }
+
+ public static ConfigurationModule ConfigurationModule
+ {
+ get
+ {
+ return new ServiceConfiguration()
+ .BindSetEntry(GenericType<ServicesSet>.Class, Services)
+ .Build();
+ }
+ }
+
+ public IConfiguration TangConfig { get; private set; }
+ }
+
+ public class InjectedServices
+ {
+ [Inject]
+ public InjectedServices([Parameter(typeof(ServicesSet))] ISet<IService> services)
+ {
+ Services = services;
+ }
+
+ public ISet<IService> Services { get; set; }
+ }
+
+ [NamedParameter("Set of services", "servicesSet", "")]
+ class ServicesSet : Name<ISet<IService>>
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServicesConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServicesConfigurationOptions.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServicesConfigurationOptions.cs
new file mode 100644
index 0000000..30a38de
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/services/ServicesConfigurationOptions.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+
+namespace Org.Apache.Reef.Services
+{
+ public class ServicesConfigurationOptions
+ {
+ [NamedParameter("Services", "services", "services")]
+ public class Services : Name<string>
+ {
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IDriverMessageHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IDriverMessageHandler.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IDriverMessageHandler.cs
new file mode 100644
index 0000000..a0111b8
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IDriverMessageHandler.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Defaults;
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Tang.Annotations;
+
+namespace Org.Apache.Reef.Tasks
+{
+ //[DefaultImplementation(typeof(DefaultTaskMessageSource))]
+ public interface IDriverMessageHandler
+ {
+ void Handle(IDriverMessage message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IRunningTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IRunningTask.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IRunningTask.cs
new file mode 100644
index 0000000..92a6887
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/IRunningTask.cs
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities;
+using System;
+
+namespace Org.Apache.Reef.Common.Task
+{
+ /// <summary>
+ /// Represents a running Task
+ /// </summary>
+ public interface IRunningTask : IIdentifiable, IDisposable
+ {
+ /// <summary>
+ /// Sends the message to the running task.
+ /// </summary>
+ /// <param name="message"></param>
+ void OnNext(byte[] message);
+
+ /// <summary>
+ /// Signal the task to suspend.
+ /// </summary>
+ /// <param name="message">a message that is sent to the Task.</param>
+ void Suspend(byte[] message);
+
+ /// <summary>
+ /// Sends the message to the running task.
+ /// </summary>
+ void Suspend();
+
+ /// <summary>
+ /// Signal the task to shut down.
+ /// </summary>
+ /// <param name="message">a message that is sent to the Task.</param>
+ void Dispose(byte[] message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITask.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITask.cs
new file mode 100644
index 0000000..3655a4b
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITask.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Tasks
+{
+ public interface ITask : IDisposable
+ {
+ byte[] Call(byte[] memento);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITaskMessageSource.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITaskMessageSource.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITaskMessageSource.cs
new file mode 100644
index 0000000..589a445
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/ITaskMessageSource.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Defaults;
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Tang.Annotations;
+
+namespace Org.Apache.Reef.Tasks
+{
+ [DefaultImplementation(typeof(DefaultTaskMessageSource))]
+ public interface ITaskMessageSource
+ {
+ Optional<TaskMessage> Message { get; set; }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfiguration.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfiguration.cs
new file mode 100644
index 0000000..6fb386b
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfiguration.cs
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Formats;
+using Org.Apache.Reef.Tang.Interface;
+using Org.Apache.Reef.Tang.Util;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.Globalization;
+
+[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1401:FieldsMustBePrivate", Justification = "static readonly field, typical usage in configurations")]
+
+namespace Org.Apache.Reef.Tasks
+{
+ public class TaskConfiguration : ConfigurationModuleBuilder
+ {
+ // this is a hack for getting the task identifier for now
+ public const string TaskIdentifier = "TaskConfigurationOptions+Identifier";
+
+ /// <summary>
+ /// The identifier of the task.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly RequiredParameter<string> Identifier = new RequiredParameter<string>();
+
+ /// <summary>
+ /// The task to instantiate.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly RequiredImpl<ITask> Task = new RequiredImpl<ITask>();
+
+ /// <summary>
+ /// for task suspension. Defaults to task failure if not bound.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<ISuspendEvent>> OnSuspend = new OptionalImpl<IObserver<ISuspendEvent>>();
+
+ /// <summary>
+ /// for messages from the driver. Defaults to task failure if not bound.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IDriverMessageHandler> OnMessage = new OptionalImpl<IDriverMessageHandler>();
+
+ /// <summary>
+ /// for closure requests from the driver. Defaults to task failure if not bound.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<ICloseEvent>> OnClose = new OptionalImpl<IObserver<ICloseEvent>>();
+
+ /// <summary>
+ /// Message source invoked upon each evaluator heartbeat.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<ITaskMessageSource> OnSendMessage = new OptionalImpl<ITaskMessageSource>();
+
+ /// <summary>
+ /// to receive TaskStart after the Task.call() method was called.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<ITaskStart>> OnTaskStart = new OptionalImpl<IObserver<ITaskStart>>();
+
+ /// <summary>
+ /// to receive TaskStop after the Task.call() method returned.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<ITaskStop>> OnTaskStop = new OptionalImpl<IObserver<ITaskStop>>();
+
+ /// <summary>
+ /// The memento to be passed to Task.call().
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalParameter<string> Memento = new OptionalParameter<string>();
+
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskConfiguration));
+
+ public TaskConfiguration()
+ : base()
+ {
+ }
+
+ public TaskConfiguration(string configString)
+ {
+ TangConfig = new AvroConfigurationSerializer().FromString(configString);
+ AvroConfiguration avroConfiguration = AvroConfiguration.GetAvroConfigurationFromEmbeddedString(configString);
+ foreach (ConfigurationEntry config in avroConfiguration.Bindings)
+ {
+ if (config.key.Contains(TaskIdentifier))
+ {
+ TaskId = config.value;
+ }
+ }
+ if (string.IsNullOrWhiteSpace(TaskId))
+ {
+ string msg = "Required parameter TaskId not provided.";
+ LOGGER.Log(Level.Error, msg);
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException(msg), LOGGER);
+ }
+ }
+
+ public static ConfigurationModule ConfigurationModule
+ {
+ get
+ {
+ return new TaskConfiguration()
+ .BindImplementation(GenericType<ITask>.Class, Task)
+ .BindImplementation(GenericType<ITaskMessageSource>.Class, OnSendMessage)
+ .BindImplementation(GenericType<IDriverMessageHandler>.Class, OnMessage)
+ .BindNamedParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, Identifier)
+ .BindNamedParameter(GenericType<TaskConfigurationOptions.Memento>.Class, Memento)
+ .BindNamedParameter(GenericType<TaskConfigurationOptions.CloseHandler>.Class, OnClose)
+ .BindNamedParameter(GenericType<TaskConfigurationOptions.SuspendHandler>.Class, OnSuspend)
+ .BindSetEntry(GenericType<TaskConfigurationOptions.StartHandlers>.Class, OnTaskStart)
+ .BindSetEntry(GenericType<TaskConfigurationOptions.StopHandlers>.Class, OnTaskStop)
+ .Build();
+ }
+ }
+
+ public string TaskId { get; private set; }
+
+ public IList<KeyValuePair<string, string>> Configurations { get; private set; }
+
+ public IConfiguration TangConfig { get; private set; }
+
+ public override string ToString()
+ {
+ return string.Format(CultureInfo.InvariantCulture, "TaskConfiguration - configurations: {0}", TangConfig.ToString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfigurationOptions.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfigurationOptions.cs
new file mode 100644
index 0000000..888faf0
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskConfigurationOptions.cs
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+
+namespace Org.Apache.Reef.Tasks
+{
+ public class TaskConfigurationOptions
+ {
+ [NamedParameter("The Identifier of the Task", "taskid", "Task")]
+ public class Identifier : Name<string>
+ {
+ }
+
+ [NamedParameter(documentation: "The memento to be used for the Task")]
+ public class Memento : Name<string>
+ {
+ }
+
+ [NamedParameter("TaskMessageSource", "messagesource", null)]
+ public class TaskMessageSources : Name<ISet<ITaskMessageSource>>
+ {
+ }
+
+ [NamedParameter(documentation: "The set of event handlers for the TaskStart event.")]
+ public class StartHandlers : Name<ISet<IObserver<ITaskStart>>>
+ {
+ }
+
+ [NamedParameter(documentation: "The set of event handlers for the TaskStop event.")]
+ public class StopHandlers : Name<ISet<IObserver<ITaskStop>>>
+ {
+ }
+
+ [NamedParameter(documentation: "The event handler that receives the close event.")]
+ public class CloseHandler : Name<IObserver<ICloseEvent>>
+ {
+ }
+
+ [NamedParameter(documentation: "The event handler that receives the suspend event.")]
+ public class SuspendHandler : Name<IObserver<ISuspendEvent>>
+ {
+ }
+
+ [NamedParameter(documentation: "The event handler that receives messages from the driver.")]
+ public class MessageHandler : Name<IObserver<IDriverMessage>>
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskMessage.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskMessage.cs
new file mode 100644
index 0000000..20defce
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/TaskMessage.cs
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities;
+using System;
+using Org.Apache.Reef.Utilities.Logging;
+
+namespace Org.Apache.Reef.Tasks
+{
+ public class TaskMessage : IMessage
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskMessage));
+ private readonly string _messageSourcId;
+ private readonly byte[] _bytes;
+
+ private TaskMessage(string messageSourceId, byte[] bytes)
+ {
+ _messageSourcId = messageSourceId;
+ _bytes = bytes;
+ }
+
+ public string MessageSourceId
+ {
+ get { return _messageSourcId; }
+ }
+
+ public byte[] Message
+ {
+ get { return _bytes; }
+ set { }
+ }
+
+ /// <summary>
+ /// From byte[] message to a TaskMessage
+ /// </summary>
+ /// <param name="messageSourceId">messageSourceId The message's sourceID. This will be accessible in the Driver for routing</param>
+ /// <param name="message">The actual content of the message, serialized into a byte[]</param>
+ /// <returns>a new TaskMessage with the given content</returns>
+ public static TaskMessage From(string messageSourceId, byte[] message)
+ {
+ if (string.IsNullOrEmpty(messageSourceId))
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("messageSourceId"), LOGGER);
+ }
+ if (message == null)
+ {
+ Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("bytes"), LOGGER);
+ }
+ return new TaskMessage(messageSourceId, message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultDriverMessageHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultDriverMessageHandler.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultDriverMessageHandler.cs
new file mode 100644
index 0000000..f77e18d
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultDriverMessageHandler.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tasks.Events;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Annotations;
+using System;
+
+namespace Org.Apache.Reef.Tasks.Defaults
+{
+ public class DefaultDriverMessageHandler : IDriverMessageHandler
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(DefaultDriverMessageHandler));
+
+ [Inject]
+ public DefaultDriverMessageHandler()
+ {
+ }
+
+ public void Handle(IDriverMessage message)
+ {
+ Exceptions.Throw(new InvalidOperationException("No DriverMessage handler bound. Message received" + message), LOGGER);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultTaskMessageSource.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultTaskMessageSource.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultTaskMessageSource.cs
new file mode 100644
index 0000000..97b52db
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/defaults/DefaultTaskMessageSource.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Tang.Annotations;
+using System;
+using System.Globalization;
+
+namespace Org.Apache.Reef.Tasks.Defaults
+{
+ public class DefaultTaskMessageSource : ITaskMessageSource
+ {
+ [Inject]
+ public DefaultTaskMessageSource()
+ {
+ }
+
+ public Optional<TaskMessage> Message
+ {
+ get
+ {
+ TaskMessage defaultTaskMessage = TaskMessage.From(
+ "defaultSourceId",
+ ByteUtilities.StringToByteArrays("default message generated at " + DateTime.Now.ToString(CultureInfo.InvariantCulture)));
+ return Optional<TaskMessage>.Of(defaultTaskMessage);
+ }
+
+ set
+ {
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ICloseEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ICloseEvent.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ICloseEvent.cs
new file mode 100644
index 0000000..3ff9ccf
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ICloseEvent.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities;
+
+namespace Org.Apache.Reef.Tasks.Events
+{
+ public interface ICloseEvent
+ {
+ Optional<byte[]> Value { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/IDriverMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/IDriverMessage.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/IDriverMessage.cs
new file mode 100644
index 0000000..9ac120d
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/IDriverMessage.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities;
+
+namespace Org.Apache.Reef.Tasks.Events
+{
+ public interface IDriverMessage
+ {
+ Optional<byte[]> Message { get; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ISuspendEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ISuspendEvent.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ISuspendEvent.cs
new file mode 100644
index 0000000..218a28f
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ISuspendEvent.cs
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Tasks.Events
+{
+ public interface ISuspendEvent
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStart.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStart.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStart.cs
new file mode 100644
index 0000000..bcde0b3
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStart.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Tasks.Events
+{
+ public interface ITaskStart
+ {
+ string Id { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStop.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStop.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStop.cs
new file mode 100644
index 0000000..5e8ebc8
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefCommon/tasks/events/ITaskStop.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Tasks.Events
+{
+ public interface ITaskStop
+ {
+ string Id { get; }
+ }
+}