You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by tm...@apache.org on 2015/02/10 21:10:46 UTC
[08/19] incubator-reef git commit: [REEF-136] Harmonize namespaces
and folder names in Org.Apache.REEF projects
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
new file mode 100644
index 0000000..531ebbf
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Globalization;
+using System.IO;
+using System.Runtime.InteropServices;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Driver.Bridge.Clr2java;
+using Org.Apache.REEF.Driver.Bridge.Events;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Implementations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Time;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Wake.Time.Event;
+using ContextMessage = Org.Apache.REEF.Driver.Bridge.Events.ContextMessage;
+
+namespace Org.Apache.REEF.Driver.Bridge
+{
+ public class ClrSystemHandlerWrapper
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(ClrSystemHandlerWrapper));
+
+ private static DriverBridge _driverBridge;
+
+ public static void Call_ClrSystemAllocatedEvaluatorHandler_OnNext(ulong handle, IAllocatedEvaluaotrClr2Java clr2Java)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemAllocatedEvaluatorHandler_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<IAllocatedEvaluator> obj = (ClrSystemHandler<IAllocatedEvaluator>)gc.Target;
+ obj.OnNext(new AllocatedEvaluator(clr2Java));
+ }
+ }
+
+ public static void Call_ClrSystemActiveContextHandler_OnNext(ulong handle, IActiveContextClr2Java clr2Java)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemActiveContextHandler_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<IActiveContext> obj = (ClrSystemHandler<IActiveContext>)gc.Target;
+ obj.OnNext(new ActiveContext(clr2Java));
+ }
+ }
+
+ public static void Call_ClrSystemDriverRestartActiveContextHandler_OnNext(ulong handle, IActiveContextClr2Java clr2Java)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemDriverRestartActiveContextHandler_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<IActiveContext> obj = (ClrSystemHandler<IActiveContext>)gc.Target;
+ obj.OnNext(new ActiveContext(clr2Java));
+ }
+ }
+
+ public static void Call_ClrSystemEvaluatorRequestor_OnNext(ulong handle, IEvaluatorRequestorClr2Java clr2Java)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemEvaluatorRequestor_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<IEvaluatorRequestor> obj = (ClrSystemHandler<IEvaluatorRequestor>)gc.Target;
+ obj.OnNext(new EvaluatorRequestor(clr2Java));
+ }
+ }
+
+ public static void Call_ClrSystemTaskMessage_OnNext(ulong handle, ITaskMessageClr2Java clr2Java, byte[] message)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemTaskMessage_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<ITaskMessage> obj = (ClrSystemHandler<ITaskMessage>)gc.Target;
+ obj.OnNext(new TaskMessage(clr2Java, message));
+ }
+ }
+
+ public static void Call_ClrSystemFailedTask_OnNext(ulong handle, IFailedTaskClr2Java clr2Java)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemFailedTask_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<IFailedTask> obj = (ClrSystemHandler<IFailedTask>)gc.Target;
+ obj.OnNext(new FailedTask(clr2Java));
+ }
+ }
+
+ public static void Call_ClrSystemRunningTask_OnNext(ulong handle, IRunningTaskClr2Java clr2Java)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemRunningTask_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<IRunningTask> obj = (ClrSystemHandler<IRunningTask>)gc.Target;
+ obj.OnNext(new RunningTask(clr2Java));
+ }
+ }
+
+ public static void Call_ClrSystemDriverRestartRunningTask_OnNext(ulong handle, IRunningTaskClr2Java clr2Java)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemDriverRestartRunningTask_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<IRunningTask> obj = (ClrSystemHandler<IRunningTask>)gc.Target;
+ obj.OnNext(new RunningTask(clr2Java));
+ }
+ }
+
+ public static void Call_ClrSystemFailedEvaluator_OnNext(ulong handle, IFailedEvaluatorClr2Java clr2Java)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemFailedEvaluator_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<IFailedEvaluator> obj = (ClrSystemHandler<IFailedEvaluator>)gc.Target;
+ obj.OnNext(new FailedEvaluator(clr2Java));
+ }
+ }
+
+ public static void Call_ClrSystemCompletedTask_OnNext(ulong handle, ICompletedTaskClr2Java clr2Java)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemCompletedTask_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<ICompletedTask> obj = (ClrSystemHandler<ICompletedTask>)gc.Target;
+ obj.OnNext(new CompletedTask(clr2Java));
+ }
+ }
+
+ public static void Call_ClrSystemSuspendedTask_OnNext(ulong handle, ISuspendedTaskClr2Java clr2Java)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemSuspendedTask_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<ISuspendedTask> obj = (ClrSystemHandler<ISuspendedTask>)gc.Target;
+ obj.OnNext(new SuspendedTask(clr2Java));
+ }
+ }
+
+ public static void Call_ClrSystemCompletedEvaluator_OnNext(ulong handle, ICompletedEvaluatorClr2Java clr2Java)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemCompletedEvaluator_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<ICompletedEvaluator> obj = (ClrSystemHandler<ICompletedEvaluator>)gc.Target;
+ obj.OnNext(new CompletedEvaluator(clr2Java));
+ }
+ }
+
+ public static void Call_ClrSystemHttpServer_OnNext(ulong handle, IHttpServerBridgeClr2Java clr2Java)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemHttpServer_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<IHttpMessage> obj = (ClrSystemHandler<IHttpMessage>)gc.Target;
+ obj.OnNext(new HttpMessage(clr2Java));
+ }
+ }
+
+ public static void Call_ClrSystemClosedContext_OnNext(ulong handle, IClosedContextClr2Java clr2Java)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemClosedContext_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<IClosedContext> obj = (ClrSystemHandler<IClosedContext>)gc.Target;
+ obj.OnNext(new ClosedContext(clr2Java));
+ }
+ }
+
+ public static void Call_ClrSystemFailedContext_OnNext(ulong handle, IFailedContextClr2Java clr2Java)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemFailedContext_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<IFailedContext> obj = (ClrSystemHandler<IFailedContext>)gc.Target;
+ obj.OnNext(new FailedContext(clr2Java));
+ }
+ }
+
+ public static void Call_ClrSystemContextMessage_OnNext(ulong handle, IContextMessageClr2Java clr2Java)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemContextMessage_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<IContextMessage> obj = (ClrSystemHandler<IContextMessage>)gc.Target;
+ obj.OnNext(new ContextMessage(clr2Java));
+ }
+ }
+
+ public static void Call_ClrSystemDriverRestart_OnNext(ulong handle)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemDriverRestart_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<StartTime> obj = (ClrSystemHandler<StartTime>)gc.Target;
+ obj.OnNext(new StartTime(DateTime.Now.Ticks));
+ }
+ }
+
+ //Deprecate, remove after both Java and C# code gets checked in
+ public static ulong[] Call_ClrSystemStartHandler_OnStart(DateTime startTime)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart"))
+ {
+ LOGGER.Log(Level.Info, "*** Start time is " + startTime);
+ return GetHandlers(null);
+ }
+ }
+
+ public static ulong[] Call_ClrSystemStartHandler_OnStart(DateTime startTime, string httpServerPort)
+ {
+ using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart"))
+ {
+ LOGGER.Log(Level.Info, "*** Start time is " + startTime);
+ LOGGER.Log(Level.Info, "*** httpServerPort: " + httpServerPort);
+ return GetHandlers(httpServerPort);
+ }
+ }
+
+ private static ulong[] GetHandlers(string httpServerPortNumber)
+ {
+ IStartHandler startHandler;
+ IInjector injector = null;
+ string errorMessage;
+ string bridgeConfiguration = Path.Combine(Directory.GetCurrentDirectory(), "reef", "global", Constants.DriverBridgeConfiguration);
+ if (!File.Exists(bridgeConfiguration))
+ {
+ errorMessage = "Cannot find CLR Driver bridge configuration file " + bridgeConfiguration;
+ Exceptions.Throw(new InvalidOperationException(errorMessage), LOGGER);
+ }
+ try
+ {
+ IConfiguration driverBridgeConfiguration = new AvroConfigurationSerializer().FromFile(bridgeConfiguration);
+ injector = TangFactory.GetTang().NewInjector(driverBridgeConfiguration);
+ }
+ catch (Exception e)
+ {
+ errorMessage = "Failed to get injector from driver bridge configuration.";
+ Exceptions.CaughtAndThrow(new InvalidOperationException(errorMessage, e), Level.Error, errorMessage, LOGGER);
+ }
+
+ try
+ {
+ HttpServerPort port = injector.GetInstance<HttpServerPort>();
+ port.PortNumber = httpServerPortNumber == null ? 0 : int.Parse(httpServerPortNumber, CultureInfo.InvariantCulture);
+
+ startHandler = injector.GetInstance<IStartHandler>();
+ LOGGER.Log(Level.Info, "Start handler set to be " + startHandler.Identifier);
+ _driverBridge = injector.GetInstance<DriverBridge>();
+ }
+ catch (Exception e)
+ {
+ Exceptions.CaughtAndThrow(e, Level.Error, "Cannot get instance.", LOGGER);
+ }
+
+ return _driverBridge.Subscribe();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
new file mode 100644
index 0000000..1ecda50
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Globalization;
+using System.Linq;
+
+using Org.Apache.REEF.Wake.Time;
+using Org.Apache.REEF.Wake.Time.Event;
+
+namespace Org.Apache.REEF.Driver.Bridge
+{
+ public class DriverBridge
+ {
+ private static Logger _logger;
+
+ private static ClrSystemHandler<IAllocatedEvaluator> _allocatedEvaluatorSubscriber;
+
+ private static ClrSystemHandler<IEvaluatorRequestor> _evaluatorRequestorSubscriber;
+
+ private static ClrSystemHandler<ITaskMessage> _taskMessageSubscriber;
+
+ private static ClrSystemHandler<IActiveContext> _activeContextSubscriber;
+
+ private static ClrSystemHandler<IActiveContext> _driverRestartActiveContextSubscriber;
+
+ private static ClrSystemHandler<IFailedTask> _failedTaskSubscriber;
+
+ private static ClrSystemHandler<IRunningTask> _runningTaskSubscriber;
+
+ private static ClrSystemHandler<IRunningTask> _driverRestartRunningTaskSubscriber;
+
+ private static ClrSystemHandler<ISuspendedTask> _suspendedTaskSubscriber;
+
+ private static ClrSystemHandler<IFailedEvaluator> _failedEvaluatorSubscriber;
+
+ private static ClrSystemHandler<ICompletedEvaluator> _completedEvaluatorSubscriber;
+
+ private static ClrSystemHandler<IHttpMessage> _httpServerEventSubscriber;
+
+ private static ClrSystemHandler<ICompletedTask> _completedTaskSubscriber;
+
+ private static ClrSystemHandler<IClosedContext> _closedContextSubscriber;
+
+ private static ClrSystemHandler<IFailedContext> _failedContextSubscriber;
+
+ private static ClrSystemHandler<IContextMessage> _contextMessageSubscriber;
+
+ private static ClrSystemHandler<StartTime> _driverRestartSubscriber;
+
+ private IObserver<StartTime> _driverRestartHandler;
+
+ private ISet<IObserver<IEvaluatorRequestor>> _evaluatorRequestHandlers;
+
+ private ISet<IObserver<IAllocatedEvaluator>> _allocatedEvaluatorHandlers;
+
+ private ISet<IObserver<IActiveContext>> _activeContextHandlers;
+
+ private ISet<IObserver<IActiveContext>> _driverRestartActiveContextHandlers;
+
+ private ISet<IObserver<ITaskMessage>> _taskMessageHandlers;
+
+ private ISet<IObserver<IFailedTask>> _failedTaskHandlers;
+
+ private ISet<IObserver<ISuspendedTask>> _suspendedTaskHandlers;
+
+ private ISet<IObserver<IRunningTask>> _runningTaskHandlers;
+
+ private ISet<IObserver<IRunningTask>> _driverRestartRunningTaskHandlers;
+
+ private ISet<IObserver<IFailedEvaluator>> _failedEvaluatorHandlers;
+
+ private ISet<IObserver<ICompletedEvaluator>> _completedEvaluatorHandlers;
+
+ private ISet<IObserver<IClosedContext>> _closedContextHandlers;
+
+ private ISet<IObserver<IFailedContext>> _failedContextHandlers;
+
+ private ISet<IObserver<IContextMessage>> _contextMessageHandlers;
+
+ private ISet<IObserver<ICompletedTask>> _completedTaskHandlers;
+
+ private HttpServerHandler _httpServerHandler;
+
+ [Inject]
+ public DriverBridge(
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.DriverRestartHandler))] IObserver<StartTime> driverRestartHandler,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.EvaluatorRequestHandlers))] ISet<IObserver<IEvaluatorRequestor>> evaluatorRequestHandlers,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.AllocatedEvaluatorHandlers))] ISet<IObserver<IAllocatedEvaluator>> allocatedEvaluatorHandlers,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.ActiveContextHandlers))] ISet<IObserver<IActiveContext>> activeContextHandlers,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.TaskMessageHandlers))] ISet<IObserver<ITaskMessage>> taskMessageHandlers,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.FailedTaskHandlers))] ISet<IObserver<IFailedTask>> failedTaskHandlers,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.FailedEvaluatorHandlers))] ISet<IObserver<IFailedEvaluator>> failedEvaluatorHandlers,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.CompletedEvaluatorHandlers))] ISet<IObserver<ICompletedEvaluator>> completedEvaluatorHandlers,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.RunningTaskHandlers))] ISet<IObserver<IRunningTask>> runningTaskHandlers,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.CompletedTaskHandlers))] ISet<IObserver<ICompletedTask>> completedTaskHandlers,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.SuspendedTaskHandlers))] ISet<IObserver<ISuspendedTask>> suspendedTaskHandlers,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.ClosedContextHandlers))] ISet<IObserver<IClosedContext>> closedContextHandlers,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.FailedContextHandlers))] ISet<IObserver<IFailedContext>> failedContextHandlers,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.ContextMessageHandlers))] ISet<IObserver<IContextMessage>> contextMessageHandlers,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.DriverRestartActiveContextHandlers))] ISet<IObserver<IActiveContext>> driverRestartActiveContextHandlers,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers))] ISet<IObserver<IRunningTask>> driverRestartRunningTaskHandlers,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.TraceListenersSet))] ISet<TraceListener> traceListeners,
+ [Parameter(Value = typeof(DriverBridgeConfigurationOptions.TraceLevel))] string traceLevel,
+ HttpServerHandler httpServerHandler)
+ {
+ foreach (TraceListener listener in traceListeners)
+ {
+ Logger.AddTraceListner(listener);
+ }
+ _logger = Logger.GetLogger(typeof(DriverBridge));
+ _logger.Log(Level.Info, "Constructing DriverBridge");
+
+ Level level;
+ if (!Enum.TryParse(traceLevel.ToString(CultureInfo.InvariantCulture), out level))
+ {
+ _logger.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Invalid trace level {0} provided, will by default use verbose level", traceLevel));
+ }
+ else
+ {
+ Logger.SetCustomLevel(level);
+ }
+
+ _evaluatorRequestHandlers = evaluatorRequestHandlers;
+ _allocatedEvaluatorHandlers = allocatedEvaluatorHandlers;
+ _activeContextHandlers = activeContextHandlers;
+ _taskMessageHandlers = taskMessageHandlers;
+ _failedEvaluatorHandlers = failedEvaluatorHandlers;
+ _failedTaskHandlers = failedTaskHandlers;
+ _completedTaskHandlers = completedTaskHandlers;
+ _runningTaskHandlers = runningTaskHandlers;
+ _suspendedTaskHandlers = suspendedTaskHandlers;
+ _completedEvaluatorHandlers = completedEvaluatorHandlers;
+ _closedContextHandlers = closedContextHandlers;
+ _failedContextHandlers = failedContextHandlers;
+ _contextMessageHandlers = contextMessageHandlers;
+ _driverRestartHandler = driverRestartHandler;
+ _driverRestartActiveContextHandlers = driverRestartActiveContextHandlers;
+ _driverRestartRunningTaskHandlers = driverRestartRunningTaskHandlers;
+ _httpServerHandler = httpServerHandler;
+
+ _evaluatorRequestorSubscriber = new ClrSystemHandler<IEvaluatorRequestor>();
+ _allocatedEvaluatorSubscriber = new ClrSystemHandler<IAllocatedEvaluator>();
+ _completedEvaluatorSubscriber = new ClrSystemHandler<ICompletedEvaluator>();
+ _taskMessageSubscriber = new ClrSystemHandler<ITaskMessage>();
+ _activeContextSubscriber = new ClrSystemHandler<IActiveContext>();
+ _failedTaskSubscriber = new ClrSystemHandler<IFailedTask>();
+ _failedEvaluatorSubscriber = new ClrSystemHandler<IFailedEvaluator>();
+ _httpServerEventSubscriber = new ClrSystemHandler<IHttpMessage>();
+ _completedTaskSubscriber = new ClrSystemHandler<ICompletedTask>();
+ _runningTaskSubscriber = new ClrSystemHandler<IRunningTask>();
+ _suspendedTaskSubscriber = new ClrSystemHandler<ISuspendedTask>();
+ _closedContextSubscriber = new ClrSystemHandler<IClosedContext>();
+ _failedContextSubscriber = new ClrSystemHandler<IFailedContext>();
+ _contextMessageSubscriber = new ClrSystemHandler<IContextMessage>();
+ _driverRestartSubscriber = new ClrSystemHandler<StartTime>();
+ _driverRestartActiveContextSubscriber = new ClrSystemHandler<IActiveContext>();
+ _driverRestartRunningTaskSubscriber = new ClrSystemHandler<IRunningTask>();
+ }
+
+ public ulong[] Subscribe()
+ {
+ ulong[] handlers = Enumerable.Repeat(Constants.NullHandler, Constants.HandlersNumber).ToArray();
+
+ // subscribe to StartTime event for driver restart
+ _driverRestartSubscriber.Subscribe(_driverRestartHandler);
+ _logger.Log(Level.Info, "subscribed to Driver restart handler: " + _driverRestartHandler);
+ handlers[Constants.Handlers[Constants.DriverRestartHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartSubscriber);
+
+ // subscribe to Evaluator Requestor
+ foreach (var handler in _evaluatorRequestHandlers)
+ {
+ _evaluatorRequestorSubscriber.Subscribe(handler);
+ _logger.Log(Level.Info, "subscribed to IEvaluatorRequestor handler: " + handler);
+ }
+ handlers[Constants.Handlers[Constants.EvaluatorRequestorHandler]] = ClrHandlerHelper.CreateHandler(_evaluatorRequestorSubscriber);
+
+ // subscribe to Allocated Evaluator
+ foreach (var handler in _allocatedEvaluatorHandlers)
+ {
+ _allocatedEvaluatorSubscriber.Subscribe(handler);
+ _logger.Log(Level.Info, "subscribed to IAllocatedEvaluator handler: " + handler);
+ }
+ handlers[Constants.Handlers[Constants.AllocatedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_allocatedEvaluatorSubscriber);
+
+ // subscribe to TaskMessage
+ foreach (var handler in _taskMessageHandlers)
+ {
+ _taskMessageSubscriber.Subscribe(handler);
+ _logger.Log(Level.Info, "subscribed to ITaskMessage handler: " + handler);
+ }
+ handlers[Constants.Handlers[Constants.TaskMessageHandler]] = ClrHandlerHelper.CreateHandler(_taskMessageSubscriber);
+
+ // subscribe to Active Context
+ foreach (var handler in _activeContextHandlers)
+ {
+ _activeContextSubscriber.Subscribe(handler);
+ _logger.Log(Level.Info, "subscribed to IActiveContext handler: " + handler);
+ }
+ handlers[Constants.Handlers[Constants.ActiveContextHandler]] = ClrHandlerHelper.CreateHandler(_activeContextSubscriber);
+
+ // subscribe to Failed Task
+ foreach (var handler in _failedTaskHandlers)
+ {
+ _failedTaskSubscriber.Subscribe(handler);
+ _logger.Log(Level.Info, "subscribed to IFailedTask handler: " + handler);
+ }
+ handlers[Constants.Handlers[Constants.FailedTaskHandler]] = ClrHandlerHelper.CreateHandler(_failedTaskSubscriber);
+
+ // subscribe to Running Task
+ foreach (var handler in _runningTaskHandlers)
+ {
+ _runningTaskSubscriber.Subscribe(handler);
+ _logger.Log(Level.Info, "subscribed to IRunningask handler: " + handler);
+ }
+ handlers[Constants.Handlers[Constants.RunningTaskHandler]] = ClrHandlerHelper.CreateHandler(_runningTaskSubscriber);
+
+ // subscribe to Completed Task
+ foreach (var handler in _completedTaskHandlers)
+ {
+ _completedTaskSubscriber.Subscribe(handler);
+ _logger.Log(Level.Info, "subscribed to ICompletedTask handler: " + handler);
+ }
+ handlers[Constants.Handlers[Constants.CompletedTaskHandler]] = ClrHandlerHelper.CreateHandler(_completedTaskSubscriber);
+
+ // subscribe to Suspended Task
+ foreach (var handler in _suspendedTaskHandlers)
+ {
+ _suspendedTaskSubscriber.Subscribe(handler);
+ _logger.Log(Level.Info, "subscribed to ISuspendedTask handler: " + handler);
+ }
+ handlers[Constants.Handlers[Constants.SuspendedTaskHandler]] = ClrHandlerHelper.CreateHandler(_suspendedTaskSubscriber);
+
+ // subscribe to Failed Evaluator
+ foreach (var handler in _failedEvaluatorHandlers)
+ {
+ _failedEvaluatorSubscriber.Subscribe(handler);
+ _logger.Log(Level.Info, "subscribed to IFailedEvaluator handler: " + handler);
+ }
+ handlers[Constants.Handlers[Constants.FailedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_failedEvaluatorSubscriber);
+
+ // subscribe to Completed Evaluator
+ foreach (var handler in _completedEvaluatorHandlers)
+ {
+ _completedEvaluatorSubscriber.Subscribe(handler);
+ _logger.Log(Level.Info, "subscribed to ICompletedEvaluator handler: " + handler);
+ }
+ handlers[Constants.Handlers[Constants.CompletedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_completedEvaluatorSubscriber);
+
+ // subscribe to Closed Context
+ foreach (var handler in _closedContextHandlers)
+ {
+ _closedContextSubscriber.Subscribe(handler);
+ _logger.Log(Level.Info, "subscribed to IClosedContext handler: " + handler);
+ }
+ handlers[Constants.Handlers[Constants.ClosedContextHandler]] = ClrHandlerHelper.CreateHandler(_closedContextSubscriber);
+
+ // subscribe to Failed Context
+ foreach (var handler in _failedContextHandlers)
+ {
+ _failedContextSubscriber.Subscribe(handler);
+ _logger.Log(Level.Info, "subscribed to IFailedContext handler: " + handler);
+ }
+ handlers[Constants.Handlers[Constants.FailedContextHandler]] = ClrHandlerHelper.CreateHandler(_failedContextSubscriber);
+
+ // subscribe to Context Message
+ foreach (var handler in _contextMessageHandlers)
+ {
+ _contextMessageSubscriber.Subscribe(handler);
+ _logger.Log(Level.Info, "subscribed to IContextMesage handler: " + handler);
+ }
+ handlers[Constants.Handlers[Constants.ContextMessageHandler]] = ClrHandlerHelper.CreateHandler(_contextMessageSubscriber);
+
+ // subscribe to Active Context received during driver restart
+ foreach (var handler in _driverRestartActiveContextHandlers)
+ {
+ _driverRestartActiveContextSubscriber.Subscribe(handler);
+ _logger.Log(Level.Info, "subscribed to handler for IActiveContext received during driver restart: " + handler);
+ }
+ handlers[Constants.Handlers[Constants.DriverRestartActiveContextHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartActiveContextSubscriber);
+
+ // subscribe to Running Task received during driver restart
+ foreach (var handler in _driverRestartRunningTaskHandlers)
+ {
+ _driverRestartRunningTaskSubscriber.Subscribe(handler);
+ _logger.Log(Level.Info, "subscribed to handler for IRunningTask received during driver restart: " + handler);
+ }
+ handlers[Constants.Handlers[Constants.DriverRestartRunningTaskHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartRunningTaskSubscriber);
+
+ // subscribe to Http message
+ _httpServerEventSubscriber.Subscribe(_httpServerHandler);
+ _logger.Log(Level.Info, "subscribed to IHttpMessage handler :" + _httpServerHandler);
+ handlers[Constants.Handlers[Constants.HttpServerHandler]] = ClrHandlerHelper.CreateHandler(_httpServerEventSubscriber);
+
+ return handlers;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
new file mode 100644
index 0000000..7b7b280
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Evaluator;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Time;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+using System.Globalization;
+using Org.Apache.REEF.Wake.Time.Event;
+
+[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1401:FieldsMustBePrivate", Justification = "static field, typical usage in configurations")]
+
+namespace Org.Apache.REEF.Driver.Bridge
+{
+ public class DriverBridgeConfiguration : ConfigurationModuleBuilder
+ {
+ /// <summary>
+ /// The event handler invoked right after the driver boots up.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly RequiredImpl<IStartHandler> OnDriverStarted = new RequiredImpl<IStartHandler>();
+
+ /// <summary>
+ /// The event handler invoked when driver restarts
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<StartTime>> OnDriverRestarted = new OptionalImpl<IObserver<StartTime>>();
+
+ /// <summary>
+ /// The event handler for requesting evaluator
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<IEvaluatorRequestor>> OnEvaluatorRequested = new OptionalImpl<IObserver<IEvaluatorRequestor>>();
+
+ /// <summary>
+ /// Event handler for allocated evaluators. Defaults to returning the evaluator if not bound.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<IAllocatedEvaluator>> OnEvaluatorAllocated = new OptionalImpl<IObserver<IAllocatedEvaluator>>();
+
+ /// <summary>
+ /// Event handler for completed evaluators. Defaults to logging if not bound.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<ICompletedEvaluator>> OnEvaluatorCompleted = new OptionalImpl<IObserver<ICompletedEvaluator>>();
+
+ /// <summary>
+ /// Event handler for failed evaluators. Defaults to job failure if not bound.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<IFailedEvaluator>> OnEvaluatorFailed = new OptionalImpl<IObserver<IFailedEvaluator>>();
+
+ /// <summary>
+ /// Event handler for failed evaluators. Defaults to job failure if not bound.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IHttpHandler> OnHttpEvent = new OptionalImpl<IHttpHandler>();
+
+ /// <summary>
+ /// Event handler for task messages. Defaults to logging if not bound.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<ITaskMessage>> OnTaskMessage = new OptionalImpl<IObserver<ITaskMessage>>();
+
+ /// <summary>
+ /// Event handler for completed tasks. Defaults to closing the context the task ran on if not bound.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<ICompletedTask>> OnTaskCompleted = new OptionalImpl<IObserver<ICompletedTask>>();
+
+ /// <summary>
+ /// Event handler for failed tasks. Defaults to job failure if not bound.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<IFailedTask>> OnTaskFailed = new OptionalImpl<IObserver<IFailedTask>>();
+
+ ///// <summary>
+ ///// Event handler for running tasks. Defaults to logging if not bound.
+ ///// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<IRunningTask>> OnTaskRunning = new OptionalImpl<IObserver<IRunningTask>>();
+
+ ///// <summary>
+ ///// Event handler for running task received during driver restart. Defaults to logging if not bound.
+ ///// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<IRunningTask>> OnDriverRestartTaskRunning = new OptionalImpl<IObserver<IRunningTask>>();
+
+ /// <summary>
+ /// Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support
+ /// task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<ISuspendedTask>> OnTaskSuspended = new OptionalImpl<IObserver<ISuspendedTask>>();
+
+ /// <summary>
+ /// Event handler for active context. Defaults to closing the context if not bound.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<IActiveContext>> OnContextActive = new OptionalImpl<IObserver<IActiveContext>>();
+
+ /// <summary>
+ /// Event handler for active context received during driver restart. Defaults to closing the context if not bound.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<IActiveContext>> OnDirverRestartContextActive = new OptionalImpl<IObserver<IActiveContext>>();
+
+ /// <summary>
+ /// Event handler for closed context. Defaults to logging if not bound.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<IClosedContext>> OnContextClosed = new OptionalImpl<IObserver<IClosedContext>>();
+
+ /// <summary>
+ /// Event handler for closed context. Defaults to job failure if not bound.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<IFailedContext>> OnContextFailed = new OptionalImpl<IObserver<IFailedContext>>();
+
+ /// <summary>
+ /// Event handler for context messages. Defaults to logging if not bound.
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<IContextMessage>> OnContextMessage = new OptionalImpl<IObserver<IContextMessage>>();
+
+ /// <summary>
+ /// Additional set of string arguments that can be pssed to handlers through client
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalParameter<string> CommandLineArguments = new OptionalParameter<string>();
+
+ /// <summary>
+ /// The trace level of the TraceListner
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalParameter<string> CustomTraceLevel = new OptionalParameter<string>();
+
+ /// <summary>
+ /// Additional set of trace listners provided by client
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalParameter<TraceListener> CustomTraceListeners = new OptionalParameter<TraceListener>();
+
+ /// <summary>
+ /// The implemenation for (attempting to) re-establish connection to driver
+ /// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IDriverConnection> OnDriverReconnect = new OptionalImpl<IDriverConnection>();
+
+ // This is currently not needed in Bridge/Driver model
+ ///// <summary>
+ ///// The event handler invoked right before the driver shuts down. Defaults to ignore.
+ ///// </summary>
+ //public static readonly OptionalImpl<IObserver<StopTime>> OnDriverStop = new OptionalImpl<IObserver<StopTime>>();
+
+ // Client handlers only needed when client interactions are expeceted. Not enabled for now.
+ ///// <summary>
+ ///// Event handler for client messages. Defaults to logging if not bound.
+ ///// </summary>
+ //public static readonly OptionalImpl<IObserver<byte[]>> OnClientMessage = new OptionalImpl<IObserver<byte[]>>();
+
+ // Client handlers only needed when client interactions are expeceted. Not enabled for now.
+ ///// <summary>
+ ///// Event handler for close messages sent by the client. Defaults to job failure if not bound.
+ ///// Note: in java the type is void, but IObserver does not take void as a type
+ ///// </summary>
+ //public static readonly OptionalImpl<IObserver<byte[]>> OnClientClosed = new OptionalImpl<IObserver<byte[]>>();
+
+ // Client handlers only needed when client interactions are expeceted. Not enabled for now.
+ ///// <summary>
+ ///// Event handler for close messages sent by the client. Defaults to job failure if not bound.
+ ///// </summary>
+ //public static readonly OptionalImpl<IObserver<byte[]>> OnClientClosedMessage = new OptionalImpl<IObserver<byte[]>>();
+
+ public static ConfigurationModule ConfigurationModule
+ {
+ get
+ {
+ return new DriverBridgeConfiguration()
+ .BindImplementation(GenericType<IStartHandler>.Class, OnDriverStarted)
+ .BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.DriverRestartHandler>.Class, OnDriverRestarted)
+ .BindImplementation(GenericType<IDriverConnection>.Class, OnDriverReconnect)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.EvaluatorRequestHandlers>.Class, OnEvaluatorRequested)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.AllocatedEvaluatorHandlers>.Class, OnEvaluatorAllocated)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.ActiveContextHandlers>.Class, OnContextActive)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.TaskMessageHandlers>.Class, OnTaskMessage)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.FailedTaskHandlers>.Class, OnTaskFailed)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.RunningTaskHandlers>.Class, OnTaskRunning)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.SuspendedTaskHandlers>.Class, OnTaskSuspended)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.FailedEvaluatorHandlers>.Class, OnEvaluatorFailed)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.CompletedEvaluatorHandlers>.Class, OnEvaluatorCompleted)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.CompletedTaskHandlers>.Class, OnTaskCompleted)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.ClosedContextHandlers>.Class, OnContextClosed)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.FailedContextHandlers>.Class, OnContextFailed)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.ContextMessageHandlers>.Class, OnContextMessage)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.ArgumentSets>.Class, CommandLineArguments)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.HttpEventHandlers>.Class, OnHttpEvent)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.TraceListenersSet>.Class, CustomTraceListeners)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartActiveContextHandlers>.Class, OnDirverRestartContextActive)
+ .BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers>.Class, OnDriverRestartTaskRunning)
+ .BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class, CustomTraceLevel)
+ .Build();
+ }
+ }
+ }
+
+ public class CommandLineArguments
+ {
+ [Inject]
+ public CommandLineArguments([Parameter(typeof(DriverBridgeConfigurationOptions.ArgumentSets))] ISet<string> arguments)
+ {
+ Arguments = arguments;
+ }
+
+ public ISet<string> Arguments { get; set; }
+ }
+
+ public class CustomTraceListeners
+ {
+ [Inject]
+ public CustomTraceListeners([Parameter(typeof(DriverBridgeConfigurationOptions.TraceListenersSet))] ISet<TraceListener> listeners)
+ {
+ Listeners = listeners;
+ }
+
+ public ISet<TraceListener> Listeners { get; set; }
+ }
+
+ public class CustomTraceLevel
+ {
+ [Inject]
+ public CustomTraceLevel([Parameter(typeof(DriverBridgeConfigurationOptions.TraceLevel))] string traceLevel)
+ {
+ Level level = Level.Verbose;
+ if (Enum.TryParse(traceLevel.ToString(CultureInfo.InvariantCulture), out level))
+ {
+ Logger.SetCustomLevel(level);
+ }
+ else
+ {
+ Console.WriteLine("Cannot parse trace level" + traceLevel);
+ }
+ TraceLevel = level;
+ }
+
+ public Level TraceLevel { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
new file mode 100644
index 0000000..9bc2402
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Defaults;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Time;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+using Org.Apache.REEF.Wake.Time.Event;
+
+[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1402:FileMayOnlyContainASingleClass", Justification = "allow name parameter class to be embedded")]
+
+namespace Org.Apache.REEF.Driver.Bridge
+{
+ /// <summary>
+ /// Hosts all named parameters for Drivers, including bridge handlers.
+ /// </summary>
+ public class DriverBridgeConfigurationOptions
+ {
+ // Level.Verbose (since enum is not suppoted for TANG, we use a string here)
+ private const string _verboseLevel = "Verbose";
+
+ [NamedParameter(documentation: "Called when driver is restarted, after CLR bridge is set up.", defaultClasses: new[] { typeof(DefaultDriverRestartHandler) })]
+ public class DriverRestartHandler : Name<IObserver<StartTime>>
+ {
+ }
+
+ [NamedParameter(documentation: "Called when evaluator is requested.", defaultClasses: new[] { typeof(DefaultEvaluatorRequestorHandler) })]
+ public class EvaluatorRequestHandlers : Name<ISet<IObserver<IEvaluatorRequestor>>>
+ {
+ }
+
+ [NamedParameter(documentation: "Called when an exception occurs on a running evaluator.", defaultClasses: new[] { typeof(DefaultEvaluatorFailureHandler) })]
+ public class FailedEvaluatorHandlers : Name<ISet<IObserver<IFailedEvaluator>>>
+ {
+ }
+
+ [NamedParameter(documentation: "Called when an evaluator completes.", defaultClasses: new[] { typeof(DefaultEvaluatorCompletionHandler) })]
+ public class CompletedEvaluatorHandlers : Name<ISet<IObserver<ICompletedEvaluator>>>
+ {
+ }
+
+ [NamedParameter(documentation: "Called when an allocated evaluator is given to the client.", defaultClasses: new[] { typeof(DefaultEvaluatorAllocationHandler) })]
+ public class AllocatedEvaluatorHandlers : Name<ISet<IObserver<IAllocatedEvaluator>>>
+ {
+ }
+
+ [NamedParameter(documentation: "Running task handler.", defaultClasses: new[] { typeof(DefaultTaskRunningHandler) })]
+ public class RunningTaskHandlers : Name<ISet<IObserver<IRunningTask>>>
+ {
+ }
+
+ [NamedParameter(documentation: "Running task during driver restart handler.", defaultClasses: new[] { typeof(DefaultDriverRestartTaskRunningHandler) })]
+ public class DriverRestartRunningTaskHandlers : Name<ISet<IObserver<IRunningTask>>>
+ {
+ }
+
+ [NamedParameter(documentation: "Task exception handler.", defaultClasses: new[] { typeof(DefaultTaskFailureHandler) })]
+ public class FailedTaskHandlers : Name<ISet<IObserver<IFailedTask>>>
+ {
+ }
+
+ [NamedParameter(documentation: "Task message handler.", defaultClasses: new[] { typeof(DefaultTaskMessageHandler) })]
+ public class TaskMessageHandlers : Name<ISet<IObserver<ITaskMessage>>>
+ {
+ }
+
+ [NamedParameter(documentation: "Http Event Handlers.", defaultClasses: new[] { typeof(DefaultHttpHandler) })]
+ public class HttpEventHandlers : Name<ISet<IHttpHandler>>
+ {
+ }
+
+ [NamedParameter(documentation: "Completed task handler.", defaultClasses: new[] { typeof(DefaultTaskCompletionHandler) })]
+ public class CompletedTaskHandlers : Name<ISet<IObserver<ICompletedTask>>>
+ {
+ }
+
+ [NamedParameter(documentation: "Suspended task handler.", defaultClasses: new[] { typeof(DefaultTaskSuspensionHandler) })]
+ public class SuspendedTaskHandlers : Name<ISet<IObserver<ISuspendedTask>>>
+ {
+ }
+
+ [NamedParameter(documentation: "Handler for IActiveContext.", defaultClasses: new[] { typeof(DefaultContextActiveHandler) })]
+ public class ActiveContextHandlers : Name<ISet<IObserver<IActiveContext>>>
+ {
+ }
+
+ [NamedParameter(documentation: "Handler for IActiveContext received during driver restart.", defaultClasses: new[] { typeof(DefaultDriverRestartContextActiveHandler) })]
+ public class DriverRestartActiveContextHandlers : Name<ISet<IObserver<IActiveContext>>>
+ {
+ }
+
+ [NamedParameter(documentation: "Handler for ClosedContext.", defaultClasses: new[] { typeof(DefaultContextClosureHandler) })]
+ public class ClosedContextHandlers : Name<ISet<IObserver<IClosedContext>>>
+ {
+ }
+
+ [NamedParameter(documentation: "Handler for FailedContext.", defaultClasses: new[] { typeof(DefaultContextFailureHandler) })]
+ public class FailedContextHandlers : Name<ISet<IObserver<IFailedContext>>>
+ {
+ }
+
+ [NamedParameter(documentation: "Handler for ContextMessage.", defaultClasses: new[] { typeof(DefaultContextMessageHandler) })]
+ public class ContextMessageHandlers : Name<ISet<IObserver<IContextMessage>>>
+ {
+ }
+
+ [NamedParameter("Command Line Arguments supplied by client", "CommandLineArguments", null)]
+ public class ArgumentSets : Name<ISet<string>>
+ {
+ }
+
+ [NamedParameter("Additional trace listners supplied by client", "TraceListeners", null, defaultClasses: new[] { typeof(DefaultCustomTraceListener) })]
+ public class TraceListenersSet : Name<ISet<TraceListener>>
+ {
+ }
+
+ [NamedParameter("Custom Trace Level", "TraceLevel", defaultValue: _verboseLevel)]
+ public class TraceLevel : Name<string>
+ {
+ }
+
+ //[NamedParameter(documentation: "Job message handler.", defaultClasses: new[] { typeof(DefaultClientMessageHandler) })]
+ //public class ClientMessageHandlers : Name<ISet<IObserver<byte[]>>>
+ //{
+ //}
+
+ //[NamedParameter(documentation: "Client close handler.", defaultClasses: new[] { typeof(DefaultClientCloseHandler) })]
+ //public class ClientCloseHandlers : Name<ISet<IObserver<byte[]>>>
+ //{
+ //}
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs
new file mode 100644
index 0000000..989483b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ActiveContext.cs
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Runtime.Serialization;
+using Org.Apache.REEF.Driver.Bridge.Clr2java;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Driver.Bridge.Events
+{
+ [DataContract]
+ internal class ActiveContext : IActiveContext
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(ActiveContext));
+
+ private readonly AvroConfigurationSerializer _serializer;
+
+ public ActiveContext(IActiveContextClr2Java clr2Java)
+ {
+ InstanceId = Guid.NewGuid().ToString("N");
+ Clr2Java = clr2Java;
+ _serializer = new AvroConfigurationSerializer();
+ }
+
+ [DataMember]
+ public string InstanceId { get; set; }
+
+ public string Id
+ {
+ get
+ {
+ return Clr2Java.GetId();
+ }
+
+ set
+ {
+ }
+ }
+
+ public string EvaluatorId
+ {
+ get
+ {
+ return Clr2Java.GetEvaluatorId();
+ }
+
+ set
+ {
+ }
+ }
+
+ public Optional<string> ParentId { get; set; }
+
+ public IEvaluatorDescriptor EvaluatorDescriptor
+ {
+ get
+ {
+ return Clr2Java.GetEvaluatorDescriptor();
+ }
+
+ set
+ {
+ }
+ }
+
+ private IActiveContextClr2Java Clr2Java { get; set; }
+
+ public void SubmitTask(IConfiguration taskConfiguration)
+ {
+ LOGGER.Log(Level.Info, "ActiveContext::SubmitTask");
+ string task = _serializer.ToString(taskConfiguration);
+ LOGGER.Log(Level.Info, "serialized taskConfiguration: " + task);
+ Clr2Java.SubmitTask(task);
+ }
+
+ public void Dispose()
+ {
+ LOGGER.Log(Level.Info, "ActiveContext::Dispose");
+ Clr2Java.Close();
+ }
+
+ public void SubmitContext(IConfiguration contextConfiguration)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void SubmitContextAndService(IConfiguration contextConfiguration, IConfiguration serviceConfiguration)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void SendMessage(byte[] message)
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs
new file mode 100644
index 0000000..7db4be8
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Runtime.Serialization;
+using Org.Apache.REEF.Common.Catalog;
+using Org.Apache.REEF.Common.Evaluator;
+using Org.Apache.REEF.Driver.Bridge.Clr2java;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Driver.Bridge.Events
+{
+ [DataContract]
+ internal class AllocatedEvaluator : IAllocatedEvaluator
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(AllocatedEvaluator));
+
+ private readonly AvroConfigurationSerializer _serializer;
+
+ private IEvaluatorDescriptor _evaluatorDescriptor;
+
+ public AllocatedEvaluator(IAllocatedEvaluaotrClr2Java clr2Java)
+ {
+ InstanceId = Guid.NewGuid().ToString("N");
+ _serializer = new AvroConfigurationSerializer();
+ Clr2Java = clr2Java;
+ Id = Clr2Java.GetId();
+ ProcessNewEvaluator();
+
+ NameServerInfo = Clr2Java.GetNameServerInfo();
+ }
+
+ [DataMember]
+ public string InstanceId { get; set; }
+
+ public string Id { get; set; }
+
+ public string EvaluatorBatchId { get; set; }
+
+ public EvaluatorType Type { get; set; }
+
+ public string NameServerInfo { get; set; }
+
+ [DataMember]
+ private IAllocatedEvaluaotrClr2Java Clr2Java { get; set; }
+
+ public void SubmitContext(IConfiguration contextConfiguration)
+ {
+ LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContext");
+ string context = _serializer.ToString(contextConfiguration);
+ LOGGER.Log(Level.Info, "serialized contextConfiguration: " + context);
+ Clr2Java.SubmitContext(context);
+ }
+
+ public void SubmitContextAndTask(IConfiguration contextConfiguration, IConfiguration taskConfiguration)
+ {
+ LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContextAndTask");
+
+ string context = _serializer.ToString(contextConfiguration);
+ string task = _serializer.ToString(taskConfiguration);
+
+ LOGGER.Log(Level.Info, "serialized contextConfiguration: " + context);
+ LOGGER.Log(Level.Info, "serialized taskConfiguration: " + task);
+
+ Clr2Java.SubmitContextAndTask(context, task);
+ }
+
+ public void SubmitContextAndService(IConfiguration contextConfiguration, IConfiguration serviceConfiguration)
+ {
+ LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContextAndService");
+
+ string context = _serializer.ToString(contextConfiguration);
+ string service = _serializer.ToString(serviceConfiguration);
+
+ LOGGER.Log(Level.Info, "serialized contextConfiguration: " + context);
+ LOGGER.Log(Level.Info, "serialized serviceConfiguration: " + service);
+
+ Clr2Java.SubmitContextAndService(context, service);
+ }
+
+ public void SubmitContextAndServiceAndTask(IConfiguration contextConfiguration, IConfiguration serviceConfiguration, IConfiguration taskConfiguration)
+ {
+ LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContextAndServiceAndTask");
+
+ string context = _serializer.ToString(contextConfiguration);
+ string service = _serializer.ToString(serviceConfiguration);
+ string task = _serializer.ToString(taskConfiguration);
+
+ LOGGER.Log(Level.Info, "serialized contextConfiguration: " + context);
+ LOGGER.Log(Level.Info, "serialized serviceConfiguration: " + service);
+ LOGGER.Log(Level.Info, "serialized taskConfiguration: " + task);
+
+ Clr2Java.SubmitContextAndServiceAndTask(context, service, task);
+ }
+
+ public IEvaluatorDescriptor GetEvaluatorDescriptor()
+ {
+ return _evaluatorDescriptor;
+ }
+
+ public void Dispose()
+ {
+ Clr2Java.Close();
+ }
+
+ public INodeDescriptor GetNodeDescriptor()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void AddFile(string file)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void AddLibrary(string file)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void AddFileResource(string file)
+ {
+ throw new NotImplementedException();
+ }
+
+ private void ProcessNewEvaluator()
+ {
+ _evaluatorDescriptor = Clr2Java.GetEvaluatorDescriptor();
+ lock (EvaluatorRequestor.Evaluators)
+ {
+ foreach (KeyValuePair<string, IEvaluatorDescriptor> pair in EvaluatorRequestor.Evaluators)
+ {
+ if (pair.Value.Equals(_evaluatorDescriptor))
+ {
+ string key = pair.Key;
+ EvaluatorRequestor.Evaluators.Remove(key);
+ string assignedId = key.Substring(0, key.LastIndexOf('_'));
+ string message = string.Format(
+ CultureInfo.InvariantCulture,
+ "Received evalautor [{0}] of memory {1}MB that matches request of {2}MB with batch id [{3}].",
+ Id,
+ _evaluatorDescriptor.Memory,
+ pair.Value.Memory,
+ assignedId);
+
+ LOGGER.Log(Level.Verbose, message);
+ EvaluatorBatchId = assignedId;
+ break;
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ClosedContext.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ClosedContext.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ClosedContext.cs
new file mode 100644
index 0000000..693d520
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ClosedContext.cs
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Runtime.Serialization;
+using Org.Apache.REEF.Driver.Bridge.Clr2java;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Driver.Bridge.Events
+{
+ public class ClosedContext : IClosedContext
+ {
+ private string _id;
+
+ private string _evaluatorId;
+
+ public ClosedContext(IClosedContextClr2Java clr2java)
+ {
+ InstanceId = Guid.NewGuid().ToString("N");
+ _id = clr2java.GetId();
+ _evaluatorId = clr2java.GetEvaluatorId();
+ }
+
+ [DataMember]
+ public string InstanceId { get; set; }
+
+ public string Id
+ {
+ get
+ {
+ return _id;
+ }
+
+ set
+ {
+ }
+ }
+
+ public string EvaluatorId
+ {
+ get
+ {
+ return _evaluatorId;
+ }
+
+ set
+ {
+ }
+ }
+
+ public Optional<string> ParentId { get; set; }
+
+ public IEvaluatorDescriptor EvaluatorDescriptor
+ {
+ get
+ {
+ return ClosedContextClr2JavaClr2Java.GetEvaluatorDescriptor();
+ }
+
+ set
+ {
+ }
+ }
+
+ public IActiveContext ParentContext
+ {
+ get
+ {
+ return new ActiveContext(ParentContextClr2Java);
+ }
+
+ set
+ {
+ }
+ }
+
+ private IActiveContextClr2Java ParentContextClr2Java { get; set; }
+
+ private IClosedContextClr2Java ClosedContextClr2JavaClr2Java { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedEvaluator.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedEvaluator.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedEvaluator.cs
new file mode 100644
index 0000000..096599a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedEvaluator.cs
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Runtime.Serialization;
+using Org.Apache.REEF.Driver.Bridge.Clr2java;
+using Org.Apache.REEF.Driver.Evaluator;
+
+namespace Org.Apache.REEF.Driver.Bridge.Events
+{
+ [DataContract]
+ internal class CompletedEvaluator : ICompletedEvaluator
+ {
+ private string _instanceId;
+
+ public CompletedEvaluator(ICompletedEvaluatorClr2Java clr2Java)
+ {
+ _instanceId = Guid.NewGuid().ToString("N");
+ CompletedEvaluatorClr2Java = clr2Java;
+ }
+
+ [DataMember]
+ public string InstanceId
+ {
+ get { return _instanceId; }
+ set { _instanceId = value; }
+ }
+
+ [DataMember]
+ public string Id
+ {
+ get
+ {
+ return CompletedEvaluatorClr2Java.GetId();
+ }
+
+ set
+ {
+ }
+ }
+
+ [DataMember]
+ public ICompletedEvaluatorClr2Java CompletedEvaluatorClr2Java { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs
new file mode 100644
index 0000000..3fb76b2
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/CompletedTask.cs
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Runtime.Serialization;
+using Org.Apache.REEF.Driver.Bridge.Clr2java;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Driver.Bridge.Events
+{
+ [DataContract]
+ internal class CompletedTask : ICompletedTask
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(CompletedTask));
+
+ internal CompletedTask(ICompletedTaskClr2Java completedTaskClr2Java)
+ {
+ InstanceId = Guid.NewGuid().ToString("N");
+ CompletedTaskClr2Java = completedTaskClr2Java;
+ ActiveContextClr2Java = completedTaskClr2Java.GetActiveContext();
+ }
+
+ [DataMember]
+ public string InstanceId { get; set; }
+
+ public byte[] Message { get; set; }
+
+ public string Id
+ {
+ get
+ {
+ return CompletedTaskClr2Java.GetId();
+ }
+
+ set
+ {
+ }
+ }
+
+ public IActiveContext ActiveContext
+ {
+ get
+ {
+ return new ActiveContext(ActiveContextClr2Java);
+ }
+
+ set
+ {
+ }
+ }
+
+ [DataMember]
+ private ICompletedTaskClr2Java CompletedTaskClr2Java { get; set; }
+
+ [DataMember]
+ private IActiveContextClr2Java ActiveContextClr2Java { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ContextMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ContextMessage.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ContextMessage.cs
new file mode 100644
index 0000000..afebc59
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/ContextMessage.cs
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Driver.Bridge.Clr2java;
+
+namespace Org.Apache.REEF.Driver.Bridge.Events
+{
+ public class ContextMessage : IContextMessage
+ {
+ private readonly string _messageSourcId;
+ private readonly byte[] _bytes;
+ private readonly string _id;
+
+ public ContextMessage(IContextMessageClr2Java clr2Java)
+ {
+ _messageSourcId = clr2Java.GetMessageSourceId();
+ _bytes = clr2Java.Get();
+ _id = clr2Java.GetId();
+ }
+
+ public string Id
+ {
+ get { return _id; }
+ set { }
+ }
+
+ public string MessageSourceId
+ {
+ get { return _messageSourcId; }
+ }
+
+ public byte[] Message
+ {
+ get { return _bytes; }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequstor.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequstor.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequstor.cs
new file mode 100644
index 0000000..0be5c9b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequstor.cs
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Runtime.Serialization;
+using Org.Apache.REEF.Common.Catalog;
+using Org.Apache.REEF.Common.Evaluator;
+using Org.Apache.REEF.Driver.Bridge.Clr2java;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Driver.Bridge.Events
+{
+ [DataContract]
+ internal class EvaluatorRequestor : IEvaluatorRequestor
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorRequestor));
+
+ private static Dictionary<string, IEvaluatorDescriptor> _evaluators;
+
+ public EvaluatorRequestor(IEvaluatorRequestorClr2Java clr2Java)
+ {
+ InstanceId = Guid.NewGuid().ToString("N");
+ Clr2Java = clr2Java;
+ }
+
+ public static Dictionary<string, IEvaluatorDescriptor> Evaluators
+ {
+ get
+ {
+ if (_evaluators == null)
+ {
+ _evaluators = new Dictionary<string, IEvaluatorDescriptor>();
+ }
+ return _evaluators;
+ }
+ }
+
+ public IResourceCatalog ResourceCatalog { get; set; }
+
+ [DataMember]
+ public string InstanceId { get; set; }
+
+ [DataMember]
+ private IEvaluatorRequestorClr2Java Clr2Java { get; set; }
+
+ public void Submit(IEvaluatorRequest request)
+ {
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Submitting request for {0} evaluators and {1} MB memory and {2} core to rack {3}.", request.Number, request.MemoryMegaBytes, request.VirtualCore, request.Rack));
+
+ lock (Evaluators)
+ {
+ for (int i = 0; i < request.Number; i++)
+ {
+ EvaluatorDescriptorImpl descriptor = new EvaluatorDescriptorImpl(new NodeDescriptorImpl(), EvaluatorType.CLR, request.MemoryMegaBytes, request.VirtualCore);
+ descriptor.Rack = request.Rack;
+ string key = string.Format(CultureInfo.InvariantCulture, "{0}_{1}", request.EvaluatorBatchId, i);
+ try
+ {
+ _evaluators.Add(key, descriptor);
+ }
+ catch (ArgumentException e)
+ {
+ Exceptions.Caught(e, Level.Error, string.Format(CultureInfo.InvariantCulture, "EvaluatorBatchId [{0}] already exists.", key), LOGGER);
+ Exceptions.Throw(new InvalidOperationException("Cannot use evaluator id " + key, e), LOGGER);
+ }
+ }
+ }
+
+ Clr2Java.Submit(request);
+ }
+
+ public void Dispose()
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedContext.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedContext.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedContext.cs
new file mode 100644
index 0000000..eb982c6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedContext.cs
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Driver.Bridge.Clr2java;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Utilities;
+
+namespace Org.Apache.REEF.Driver.Bridge.Events
+{
+ public class FailedContext : IFailedContext
+ {
+ private string _id;
+
+ private string _evaluatorId;
+
+ private string _parentId;
+
+ public FailedContext(IFailedContextClr2Java clr2Java)
+ {
+ _id = clr2Java.GetId();
+ _evaluatorId = clr2Java.GetEvaluatorId();
+ _parentId = clr2Java.GetParentId();
+ FailedContextClr2Java = clr2Java;
+ }
+
+ public string Id
+ {
+ get
+ {
+ return _id;
+ }
+
+ set
+ {
+ }
+ }
+
+ public string EvaluatorId
+ {
+ get
+ {
+ return _evaluatorId;
+ }
+
+ set
+ {
+ }
+ }
+
+ public Optional<string> ParentId
+ {
+ get
+ {
+ return string.IsNullOrEmpty(_parentId) ?
+ Optional<string>.Empty() :
+ Optional<string>.Of(_parentId);
+ }
+
+ set
+ {
+ }
+ }
+
+ public IEvaluatorDescriptor EvaluatorDescriptor
+ {
+ get
+ {
+ return FailedContextClr2Java.GetEvaluatorDescriptor();
+ }
+
+ set
+ {
+ }
+ }
+
+ public Optional<IActiveContext> ParentContext
+ {
+ get
+ {
+ IActiveContextClr2Java context = FailedContextClr2Java.GetParentContext();
+ if (context != null)
+ {
+ return Optional<IActiveContext>.Of(new ActiveContext(context));
+ }
+ else
+ {
+ return Optional<IActiveContext>.Empty();
+ }
+ }
+ }
+
+ private IFailedContextClr2Java FailedContextClr2Java { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedEvaluator.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedEvaluator.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedEvaluator.cs
new file mode 100644
index 0000000..a63f953
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedEvaluator.cs
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Runtime.Serialization;
+using Org.Apache.REEF.Common.Exceptions;
+using Org.Apache.REEF.Driver.Bridge.Clr2java;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Driver.Bridge.Events
+{
+ [DataContract]
+ internal class FailedEvaluator : IFailedEvaluator
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(FailedEvaluator));
+
+ public FailedEvaluator(IFailedEvaluatorClr2Java clr2Java)
+ {
+ InstanceId = Guid.NewGuid().ToString("N");
+ FailedEvaluatorClr2Java = clr2Java;
+ EvaluatorRequestorClr2Java = FailedEvaluatorClr2Java.GetEvaluatorRequestor();
+ Id = FailedEvaluatorClr2Java.GetId();
+ }
+
+ [DataMember]
+ public string InstanceId { get; set; }
+
+ public string Id { get; set; }
+
+ public EvaluatorException EvaluatorException { get; set; }
+
+ public List<FailedContext> FailedContexts { get; set; }
+
+ public Optional<IFailedTask> FailedTask { get; set; }
+
+ [DataMember]
+ private IFailedEvaluatorClr2Java FailedEvaluatorClr2Java { get; set; }
+
+ [DataMember]
+ private IEvaluatorRequestorClr2Java EvaluatorRequestorClr2Java { get; set; }
+
+ public IEvaluatorRequestor GetEvaluatorRequetor()
+ {
+ if (EvaluatorRequestorClr2Java == null)
+ {
+ Exceptions.Throw(new InvalidOperationException("EvaluatorRequestorClr2Java not initialized."), LOGGER);
+ }
+ return new EvaluatorRequestor(EvaluatorRequestorClr2Java);
+ }
+ }
+}