You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/02/05 22:06:06 UTC
[45/51] [partial] incubator-reef git commit: [REEF-131] Towards the
new .Net project structure This is to change .Net project structure for Tang,
Wake, REEF utilities, Common and Driver:
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/DriverManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverManager.cs b/lang/cs/Org.Apache.REEF.Driver/DriverManager.cs
new file mode 100644
index 0000000..5c3c19c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/DriverManager.cs
@@ -0,0 +1,541 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common;
+using Org.Apache.REEF.Common.Api;
+using Org.Apache.REEF.Common.Catalog;
+using Org.Apache.REEF.Common.Evaluator;
+using Org.Apache.REEF.Common.Exceptions;
+using Org.Apache.REEF.Common.ProtoBuf.DriverRuntimeProto;
+using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol;
+using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Implementations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Time;
+using Org.Apache.REEF.Wake.Time.Runtime.Event;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
+
+namespace Org.Apache.REEF.Driver
+{
+ public class DriverManager :
+ IEvaluatorRequestor,
+ IObserver<RuntimeStatusProto>,
+ IObserver<ResourceStatusProto>,
+ IObserver<ResourceAllocationProto>,
+ IObserver<NodeDescriptorProto>,
+ IObserver<RuntimeStart>,
+ IObserver<RuntimeStop>,
+ IObserver<IdleClock>
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(DriverManager));
+
+ private IInjector _injector;
+
+ private IInjectionFuture<IClock> _clockFuture;
+
+ private ResourceCatalogImpl _resourceCatalog;
+
+ private IInjectionFuture<IResourceRequestHandler> _futureResourceRequestHandler;
+
+ private Dictionary<string, EvaluatorManager> _evaluators = new Dictionary<string, EvaluatorManager>();
+
+ private EvaluatorHeartBeatSanityChecker _sanityChecker = new EvaluatorHeartBeatSanityChecker();
+
+ private ClientJobStatusHandler _clientJobStatusHandler;
+
+ private IDisposable _heartbeatConnectionChannel;
+
+ private IDisposable _errorChannel;
+
+ private IObserver<RuntimeErrorProto> _runtimeErrorHandler;
+
+ public DriverManager(
+ IInjector injector,
+ ResourceCatalogImpl resourceCatalog,
+ IRemoteManager<REEFMessage> remoteManager,
+ IInjectionFuture<IClock> clockFuture,
+ IInjectionFuture<IResourceRequestHandler> futureResourceRequestHandler,
+ ClientJobStatusHandler clientJobStatusHandler,
+ string clientRId)
+ {
+ _injector = injector;
+ _clockFuture = clockFuture;
+ _resourceCatalog = resourceCatalog;
+ _futureResourceRequestHandler = futureResourceRequestHandler;
+ _clientJobStatusHandler = clientJobStatusHandler;
+
+ _heartbeatConnectionChannel = null;
+ _errorChannel = null;
+ _runtimeErrorHandler = null;
+ LOGGER.Log(Level.Info, "DriverManager instantiated");
+ }
+
+ public IResourceCatalog ResourceCatalog
+ {
+ get
+ {
+ return _resourceCatalog;
+ }
+
+ set
+ {
+ }
+ }
+
+ private RuntimeStatusProto _runtimeStatusProto
+ {
+ get
+ {
+ RuntimeStatusProto proto = new RuntimeStatusProto();
+ proto.state = State.INIT;
+ proto.name = "REEF";
+ proto.outstanding_container_requests = 0;
+ return proto;
+ }
+
+ set
+ {
+ _runtimeStatusProto = value;
+ }
+ }
+
+ public void Submit(IEvaluatorRequest request)
+ {
+ LOGGER.Log(Level.Info, "Got an EvaluatorRequest");
+ ResourceRequestProto proto = new ResourceRequestProto();
+ //TODO: request.size deprecated should use megabytes instead
+ //switch (request.Size)
+ //{
+ // case EvaluatorRequest.EvaluatorSize.SMALL:
+ // proto.resource_size = SIZE.SMALL;
+ // break;
+ // case EvaluatorRequest.EvaluatorSize.MEDIUM:
+ // proto.resource_size = SIZE.MEDIUM;
+ // break;
+ // case EvaluatorRequest.EvaluatorSize.LARGE:
+ // proto.resource_size = SIZE.LARGE;
+ // break;
+ // case EvaluatorRequest.EvaluatorSize.XLARGE:
+ // proto.resource_size = SIZE.XLARGE;
+ // break;
+ // default:
+ // throw new InvalidOperationException("invalid request size" + request.Size);
+ //}
+ proto.resource_count = request.Number;
+ if (request.MemoryMegaBytes > 0)
+ {
+ proto.memory_size = request.MemoryMegaBytes;
+ }
+
+ //final ResourceCatalog.Descriptor descriptor = req.getDescriptor();
+ //if (descriptor != null) {
+ // if (descriptor instanceof RackDescriptor) {
+ // request.addRackName(descriptor.getName());
+ // } else if (descriptor instanceof NodeDescriptor) {
+ // request.addNodeName(descriptor.getName());
+ // }
+ //}
+
+ //_futureResourceRequestHandler.Get().OnNext(proto);
+ }
+
+ public void Release(EvaluatorManager evaluatorManager)
+ {
+ lock (this)
+ {
+ string evaluatorManagerId = evaluatorManager.Id;
+ if (_evaluators.ContainsKey(evaluatorManagerId))
+ {
+ _evaluators.Remove(evaluatorManagerId);
+ }
+ else
+ {
+ var e = new InvalidOperationException("Trying to remove an unknown evaluator manager with id " + evaluatorManagerId);
+ Exceptions.Throw(e, LOGGER);
+ }
+ }
+ }
+
+ /// <summary>
+ /// This handles runtime error occurs on the evaluator
+ /// </summary>
+ /// <param name="runtimeErrorProto"></param>
+ public void Handle(RuntimeErrorProto runtimeErrorProto)
+ {
+ FailedRuntime error = new FailedRuntime(runtimeErrorProto);
+ LOGGER.Log(Level.Warning, "Runtime error:" + error);
+
+ EvaluatorException evaluatorException = error.Cause != null
+ ? new EvaluatorException(error.Id, error.Cause.Value)
+ : new EvaluatorException(error.Id, "Runtime error");
+ EvaluatorManager evaluatorManager = null;
+ lock (_evaluators)
+ {
+ if (_evaluators.ContainsKey(error.Id))
+ {
+ evaluatorManager = _evaluators[error.Id];
+ }
+ else
+ {
+ LOGGER.Log(Level.Warning, "Unknown evaluator runtime error: " + error.Cause);
+ }
+ }
+ if (null != evaluatorManager)
+ {
+ evaluatorManager.Handle(evaluatorException);
+ }
+ }
+
+ /// <summary>
+ /// A RuntimeStatusProto comes from the ResourceManager layer indicating its current status
+ /// </summary>
+ /// <param name="runtimeStatusProto"></param>
+ public void OnNext(RuntimeStatusProto runtimeStatusProto)
+ {
+ Handle(runtimeStatusProto);
+ }
+
+ /// <summary>
+ /// A ResourceStatusProto message comes from the ResourceManager layer to indicate what it thinks
+ /// about the current state of a given resource. Ideally, we should think the same thing.
+ /// </summary>
+ /// <param name="resourceStatusProto"></param>
+ public void OnNext(ResourceStatusProto resourceStatusProto)
+ {
+ Handle(resourceStatusProto);
+ }
+
+ /// <summary>
+ /// A ResourceAllocationProto indicates a resource allocation given by the ResourceManager layer.
+ /// </summary>
+ /// <param name="resourceAllocationProto"></param>
+ public void OnNext(ResourceAllocationProto resourceAllocationProto)
+ {
+ Handle(resourceAllocationProto);
+ }
+
+ /// <summary>
+ /// A NodeDescriptorProto defines a new node in the cluster. We should add this to the resource catalog
+ /// so that clients can make resource requests against it.
+ /// </summary>
+ /// <param name="nodeDescriptorProto"></param>
+ public void OnNext(NodeDescriptorProto nodeDescriptorProto)
+ {
+ _resourceCatalog.Handle(nodeDescriptorProto);
+ }
+
+ /// <summary>
+ /// This EventHandler is subscribed to the StartTime event of the Clock statically. It therefore provides the entrance
+ /// point to REEF.
+ /// </summary>
+ /// <param name="runtimeStart"></param>
+ public void OnNext(RuntimeStart runtimeStart)
+ {
+ LOGGER.Log(Level.Info, "RuntimeStart: " + runtimeStart);
+ _runtimeStatusProto = new RuntimeStatusProto();
+ _runtimeStatusProto.state = State.RUNNING;
+ _runtimeStatusProto.name = "REEF";
+ _runtimeStatusProto.outstanding_container_requests = 0;
+ }
+
+ /// <summary>
+ /// Handles RuntimeStop
+ /// </summary>
+ /// <param name="runtimeStop"></param>
+ public void OnNext(RuntimeStop runtimeStop)
+ {
+ LOGGER.Log(Level.Info, "RuntimeStop: " + runtimeStop);
+ if (runtimeStop.Exception != null)
+ {
+ string exceptionMessage = runtimeStop.Exception.Message;
+ LOGGER.Log(Level.Warning, "Sending runtime error:" + exceptionMessage);
+ RuntimeErrorProto runtimeErrorProto = new RuntimeErrorProto();
+ runtimeErrorProto.message = exceptionMessage;
+ runtimeErrorProto.exception = ByteUtilities.StringToByteArrays(exceptionMessage);
+ runtimeErrorProto.name = "REEF";
+ _runtimeErrorHandler.OnNext(runtimeErrorProto);
+
+ LOGGER.Log(Level.Warning, "DONE Sending runtime error: " + exceptionMessage);
+ }
+
+ lock (_evaluators)
+ {
+ foreach (EvaluatorManager evaluatorManager in _evaluators.Values)
+ {
+ LOGGER.Log(Level.Warning, "Unclean shutdown of evaluator: " + evaluatorManager.Id);
+ evaluatorManager.Dispose();
+ }
+ }
+
+ try
+ {
+ _heartbeatConnectionChannel.Dispose();
+ _errorChannel.Dispose();
+ Optional<Exception> e = runtimeStop.Exception != null ?
+ Optional<Exception>.Of(runtimeStop.Exception) : Optional<Exception>.Empty();
+ _clientJobStatusHandler.Dispose(e);
+
+ LOGGER.Log(Level.Info, "driver manager closed");
+ }
+ catch (Exception e)
+ {
+ Exceptions.Caught(e, Level.Error, "Error disposing Driver manager", LOGGER);
+ Exceptions.Throw(new InvalidOperationException("Cannot dispose driver manager"), LOGGER);
+ }
+ }
+
+ public void OnNext(IdleClock value)
+ {
+ string message = string.Format(
+ CultureInfo.InvariantCulture,
+ "IdleClock: [{0}], RuntimeState [{1}], Outstanding container requests [{2}], Container allocation count[{3}]",
+ value + Environment.NewLine,
+ _runtimeStatusProto.state + Environment.NewLine,
+ _runtimeStatusProto.outstanding_container_requests + Environment.NewLine,
+ _runtimeStatusProto.container_allocation.Count);
+ LOGGER.Log(Level.Info, message);
+
+ lock (_evaluators)
+ {
+ if (_runtimeStatusProto.state == State.RUNNING
+ && _runtimeStatusProto.outstanding_container_requests == 0
+ && _runtimeStatusProto.container_allocation.Count == 0)
+ {
+ LOGGER.Log(Level.Info, "Idle runtime shutdown");
+ _clockFuture.Get().Dispose();
+ }
+ }
+ }
+
+ void IObserver<IdleClock>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<IdleClock>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<RuntimeStop>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<RuntimeStop>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<RuntimeStart>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<RuntimeStart>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<NodeDescriptorProto>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<NodeDescriptorProto>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<ResourceAllocationProto>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<ResourceAllocationProto>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<ResourceStatusProto>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<ResourceStatusProto>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<RuntimeStatusProto>.OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ void IObserver<RuntimeStatusProto>.OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+
+ /// <summary>
+ /// Something went wrong at the runtime layer (either driver or evaluator). This
+ /// method simply forwards the RuntimeErrorProto to the client via the RuntimeErrorHandler.
+ /// </summary>
+ /// <param name="runtimeErrorProto"></param>
+ private void Fail(RuntimeErrorProto runtimeErrorProto)
+ {
+ _runtimeErrorHandler.OnNext(runtimeErrorProto);
+ _clockFuture.Get().Dispose();
+ }
+
+ /// <summary>
+ /// Helper method to create a new EvaluatorManager instance
+ /// </summary>
+ /// <param name="id">identifier of the Evaluator</param>
+ /// <param name="descriptor"> NodeDescriptor on which the Evaluator executes.</param>
+ /// <returns>new EvaluatorManager instance.</returns>
+ private EvaluatorManager GetNewEvaluatorManagerInstance(string id, EvaluatorDescriptorImpl descriptor)
+ {
+ LOGGER.Log(Level.Info, "Creating Evaluator Manager: " + id);
+ //TODO bindVolatieParameter
+ return (EvaluatorManager)_injector.GetInstance(typeof(EvaluatorManager));
+ }
+
+ /// <summary>
+ /// Receives and routes heartbeats from Evaluators.
+ /// </summary>
+ /// <param name="evaluatorHearBeatProto"></param>
+ private void Handle(IRemoteMessage<EvaluatorHeartbeatProto> evaluatorHearBeatProto)
+ {
+ EvaluatorHeartbeatProto heartbeat = evaluatorHearBeatProto.Message;
+ EvaluatorStatusProto status = heartbeat.evaluator_status;
+ string evaluatorId = status.evaluator_id;
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Heartbeat from Evaluator {0} with state {1} timestamp {2}", evaluatorId, status.state, heartbeat.timestamp));
+ _sanityChecker.check(evaluatorId, heartbeat.timestamp);
+
+ lock (_evaluators)
+ {
+ if (_evaluators.ContainsKey(evaluatorId))
+ {
+ EvaluatorManager evaluatorManager = _evaluators[evaluatorId];
+ evaluatorManager.Handle(evaluatorHearBeatProto);
+ }
+ else
+ {
+ string msg = "Contact from unkonwn evaluator with id: " + evaluatorId;
+ if (heartbeat.evaluator_status != null)
+ {
+ msg += " with state" + status.state;
+ }
+ LOGGER.Log(Level.Error, msg);
+ Exceptions.Throw(new InvalidOperationException(msg), LOGGER);
+ }
+ }
+ }
+
+ /// <summary>
+ /// This resource status message comes from the ResourceManager layer; telling me what it thinks
+ /// about the state of the resource executing an Evaluator; This method simply passes the message
+ /// off to the referenced EvaluatorManager
+ /// </summary>
+ /// <param name="resourceStatusProto"></param>
+ private void Handle(ResourceStatusProto resourceStatusProto)
+ {
+ lock (_evaluators)
+ {
+ if (_evaluators.ContainsKey(resourceStatusProto.identifier))
+ {
+ EvaluatorManager evaluatorManager = _evaluators[resourceStatusProto.identifier];
+ evaluatorManager.Handle(resourceStatusProto);
+ }
+ else
+ {
+ var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown resource status from evaluator {0} with state {1}", resourceStatusProto.identifier, resourceStatusProto.state));
+ Exceptions.Throw(e, LOGGER);
+ }
+ }
+ }
+
+ /// <summary>
+ /// This method handles resource allocations by creating a new EvaluatorManager instance.
+ /// </summary>
+ /// <param name="resourceAllocationProto"></param>
+ private void Handle(ResourceAllocationProto resourceAllocationProto)
+ {
+ lock (_evaluators)
+ {
+ try
+ {
+ INodeDescriptor nodeDescriptor = _resourceCatalog.GetNode(resourceAllocationProto.node_id);
+ if (nodeDescriptor == null)
+ {
+ Exceptions.Throw(new InvalidOperationException("Unknown resurce: " + resourceAllocationProto.node_id), LOGGER);
+ }
+ EvaluatorDescriptorImpl evaluatorDescriptor = new EvaluatorDescriptorImpl(nodeDescriptor, EvaluatorType.UNDECIDED, resourceAllocationProto.resource_memory, resourceAllocationProto.virtual_cores);
+ LOGGER.Log(Level.Info, "Resource allocation: new evaluator id: " + resourceAllocationProto.identifier);
+ EvaluatorManager evaluatorManager = GetNewEvaluatorManagerInstance(resourceAllocationProto.identifier, evaluatorDescriptor);
+ _evaluators.Add(resourceAllocationProto.identifier, evaluatorManager);
+ }
+ catch (Exception e)
+ {
+ Exceptions.Caught(e, Level.Error, LOGGER);
+ Exceptions.Throw(new InvalidOperationException("Error handling resourceAllocationProto."), LOGGER);
+ }
+ }
+ }
+
+ private void Handle(RuntimeStatusProto runtimeStatusProto)
+ {
+ State runtimeState = runtimeStatusProto.state;
+ LOGGER.Log(Level.Info, "Runtime status: " + runtimeStatusProto.state);
+
+ switch (runtimeState)
+ {
+ case State.FAILED:
+ Fail(runtimeStatusProto.error);
+ break;
+ case State.DONE:
+ _clockFuture.Get().Dispose();
+ break;
+ case State.RUNNING:
+ lock (_evaluators)
+ {
+ _runtimeStatusProto = runtimeStatusProto;
+ if (_clockFuture.Get().IsIdle()
+ && runtimeStatusProto.outstanding_container_requests == 0
+ && runtimeStatusProto.container_allocation.Count == 0)
+ {
+ _clockFuture.Get().Dispose();
+ }
+ }
+ break;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfiguration.cs b/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfiguration.cs
new file mode 100644
index 0000000..d329ee6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfiguration.cs
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common;
+using Org.Apache.REEF.Common.Api;
+using Org.Apache.REEF.Common.Catalog;
+using Org.Apache.REEF.Common.Client;
+using Org.Apache.REEF.Common.Evaluator;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Driver
+{
+ public class DriverRuntimeConfiguration : ConfigurationModuleBuilder
+ {
+ public static ConfigurationModule ConfigurationModule
+ {
+ get
+ {
+ return new DriverRuntimeConfiguration()
+ // Resource Catalog
+ .BindImplementation(GenericType<IResourceCatalog>.Class, GenericType<ResourceCatalogImpl>.Class)
+
+ // JobMessageObserver
+ //.BindImplementation(GenericType<IEvaluatorRequestor>.Class, GenericType<DriverManager>.Class)
+ .BindImplementation(GenericType<IJobMessageObserver>.Class, GenericType<ClientJobStatusHandler>.Class)
+
+ // JobMessageObserver Wake event handler bindings
+ .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobMessageHandler>.Class, GenericType<ClientJobStatusHandler>.Class)
+ .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobExceptionHandler>.Class, GenericType<ClientJobStatusHandler>.Class)
+
+ // Client manager
+ .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobControlHandler>.Class, GenericType<ClientManager>.Class)
+
+ // Bind the runtime parameters
+ //.BindNamedParameter(GenericType<RuntimeParameters.NodeDescriptorHandler>.Class, GenericType<DriverManager>.Class)
+ //.BindNamedParameter(GenericType<RuntimeParameters.ResourceAllocationHandler>.Class, GenericType<DriverManager>.Class)
+ //.BindNamedParameter(GenericType<RuntimeParameters.ResourceStatusHandler>.Class, GenericType<DriverManager>.Class)
+ //.BindNamedParameter(GenericType<RuntimeParameters.RuntimeStatusHandler>.Class, GenericType<DriverManager>.Class)
+
+ // Bind to the Clock
+ //.BindSetEntry(GenericType<IClock.RuntimeStopHandler>.Class, GenericType<DriverManager>.Class)
+ .Build();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfigurationOptions.cs
new file mode 100644
index 0000000..085e573
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/DriverRuntimeConfigurationOptions.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Driver
+{
+ public class DriverRuntimeConfigurationOptions
+ {
+ [NamedParameter(documentation: "Job message handler (see ClientJobStatusHandler)")]
+ public class JobMessageHandler : Name<ClientJobStatusHandler>
+ {
+ }
+
+ [NamedParameter(documentation: "Job exception handler (see ClientJobStatusHandler)")]
+ public class JobExceptionHandler : Name<ClientJobStatusHandler>
+ {
+ }
+
+ [NamedParameter(documentation: "Called when a job control message is received by the client.")]
+ public class JobControlHandler : Name<ClientManager>
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/DriverSubmissionSettings.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverSubmissionSettings.cs b/lang/cs/Org.Apache.REEF.Driver/DriverSubmissionSettings.cs
new file mode 100644
index 0000000..15cb8ca
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/DriverSubmissionSettings.cs
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Driver
+{
+ // TODO: merge with EvaluatorConfigurations class
+ public class DriverSubmissionSettings
+ {
+ // default to "ReefDevClrBridge"
+ private string _driverIdentifier;
+
+ // default to _defaultSubmissionDirectory is not provided
+ private string _submissionDirectory;
+
+ // deault to 512MB if no value is provided
+ private int _driverMemory = 0;
+
+ // default value, client wait till driver exit
+ private int _clientWaitTime = -1;
+
+ // default to submit to driver with driver config
+ private bool _submit = true;
+
+ // default to always update jar before submission
+ private bool _updateJar = true;
+
+ // default to run on local
+ private bool _runOnYarn;
+
+ // default to set to info logging
+ private JavaLoggingSetting _javaLogLevel = JavaLoggingSetting.INFO;
+
+ /// <summary>
+ /// Whether to update jar file with needed dlls before submission
+ /// User can choose to reduce startup time by skipping the update, if jar file already contains all necessary dll
+ /// Note this settig is .NET only, it does not propagate to java side.
+ /// </summary>
+ public bool UpdateJarBeforeSubmission
+ {
+ get { return _updateJar; }
+ set { _updateJar = value; }
+ }
+
+ /// <summary>
+ /// Determine the vebosity of Java driver's log.
+ /// Note this parameter is used when launching java process only, it does not propagate to java side.
+ /// </summary>
+ public JavaLoggingSetting JavaLogLevel
+ {
+ get { return _javaLogLevel; }
+ set { _javaLogLevel = value; }
+ }
+
+ /// <summary>
+ /// Memory allocated for driver, default to 512 MB
+ /// </summary>
+ public int DriverMemory
+ {
+ get
+ {
+ return _driverMemory;
+ }
+
+ set
+ {
+ if (value < 0)
+ {
+ throw new ArgumentException("driver memory cannot be negatvie value.");
+ }
+ _driverMemory = value;
+ }
+ }
+
+ /// <summary>
+ /// Driver Identifier, default to "ReefDevClrBridge"
+ /// </summary>
+ public string DriverIdentifier
+ {
+ get
+ {
+ return _driverIdentifier;
+ }
+
+ set
+ {
+ _driverIdentifier = value;
+ }
+ }
+
+ /// <summary>
+ /// Whether to submit driver with config after driver configuration is construted, default to True
+ /// </summary>
+ public bool Submit
+ {
+ get
+ {
+ return _submit;
+ }
+
+ set
+ {
+ _submit = value;
+ }
+ }
+
+ /// <summary>
+ /// How long client would wait for Driver, default to wait till driver is done
+ /// </summary>
+ public int ClientWaitTime
+ {
+ get
+ {
+ return _clientWaitTime;
+ }
+
+ set
+ {
+ _clientWaitTime = value;
+ }
+ }
+
+ /// <summary>
+ /// Driver job submission directory in (H)DFS where jar file shall be uploaded, default to a tmp directory with GUID name
+ /// If set by CLR user, the user must guarantee the uniquness of the directory across multiple jobs
+ /// </summary>
+ public string SubmissionDirectory
+ {
+ get
+ {
+ return _submissionDirectory;
+ }
+
+ set
+ {
+ _submissionDirectory = value;
+ }
+ }
+
+ /// <summary>
+ /// Whether to Run on YARN runtime, default to false
+ /// </summary>
+ public bool RunOnYarn
+ {
+ get
+ {
+ return _runOnYarn;
+ }
+
+ set
+ {
+ _runOnYarn = value;
+ }
+ }
+
+ public string ToComamndLineArguments()
+ {
+ return
+ (RunOnYarn ? " -local false" : string.Empty) +
+ (!Submit ? " -submit false" : string.Empty) +
+ (DriverMemory > 0 ? " -driver_memory " + DriverMemory : string.Empty) +
+ (!string.IsNullOrWhiteSpace(DriverIdentifier) ? " -drive_id " + DriverIdentifier : string.Empty) +
+ (ClientWaitTime > 0 ? " -wait_time " + ClientWaitTime : string.Empty) +
+ (!string.IsNullOrWhiteSpace(SubmissionDirectory) ? " -submission_directory " + SubmissionDirectory : string.Empty);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/EvaluatorManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/EvaluatorManager.cs b/lang/cs/Org.Apache.REEF.Driver/EvaluatorManager.cs
new file mode 100644
index 0000000..b8ebef9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/EvaluatorManager.cs
@@ -0,0 +1,655 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Api;
+using Org.Apache.REEF.Common.Catalog;
+using Org.Apache.REEF.Common.Evaluator;
+using Org.Apache.REEF.Common.Exceptions;
+using Org.Apache.REEF.Common.ProtoBuf.DriverRuntimeProto;
+using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Time;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Linq;
+using System.Text;
+
+using TaskMessage = Org.Apache.REEF.Tasks.TaskMessage;
+
+namespace Org.Apache.REEF.Driver
+{
+ /// <summary>
+ /// Manages a single Evaluator instance including all lifecycle instances:
+ /// (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator).
+ /// A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager.
+ /// The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this heartbeat channel.
+ /// A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime.
+ /// The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate
+ /// control information (e.g., shutdown, suspend).* Manages a single Evaluator instance including all lifecycle instances:
+ /// (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator).
+ /// A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager.
+ /// The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this heartbeat channel.
+ /// A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime.
+ /// The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate control information (e.g., shutdown, suspend).
+ /// </summary>
+ public class EvaluatorManager : IDisposable, IIdentifiable
+ {
+ private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorManager));
+
+ private STATE _state = STATE.ALLOCATED;
+
+ private IClock _clock;
+
+ // TODO
+ // private final RemoteManager remoteManager;
+ private DriverManager _driverManager;
+
+ private IResourceReleaseHandler _resourceReleaseHandler;
+
+ private IResourceLaunchHandler _resourceLaunchHandler;
+
+ private EvaluatorDescriptorImpl _evaluatorDescriptor;
+
+ private string _evaluatorId;
+
+ private IList<EvaluatorContext> _activeContexts = new List<EvaluatorContext>();
+
+ private HashSet<string> _activeContextIds = new HashSet<string>();
+
+ private IRunningTask _runningTask = null;
+
+ private IObserver<EvaluatorControlProto> _evaluatorControlHandler = null;
+
+ private bool _isResourceReleased = false;
+
+ //TODO
+ //private final DispatchingEStage dispatcher;
+ private EvaluatorType _type = EvaluatorType.CLR;
+
+ public EvaluatorManager(
+ IClock clock,
+ //RemoteManager remoteManager,
+ DriverManager driverManager,
+ IResourceReleaseHandler resourceReleaseHandler,
+ IResourceLaunchHandler resourceLaunchHandler,
+ //REEFErrorHandler errorHandler,
+ string evaluatorId,
+ EvaluatorDescriptorImpl evaluatorDescriptor,
+ ISet<IObservable<IActiveContext>> activeContextEventHandler,
+ ISet<IObservable<IClosedContext>> closedContextEventHandlers,
+ ISet<IObservable<FailedContext>> failedContextEventHandlers,
+ ISet<IObservable<ContextMessage>> contextMessageHandlers,
+ ISet<IObservable<IRunningTask>> runningTaskEventHandlers,
+ ISet<IObservable<ICompletedTask>> completedTaskEventHandlers,
+ ISet<IObservable<ISuspendedTask>> suspendedTaskEventHandlers,
+ ISet<IObservable<TaskMessage>> taskMessageEventHandlers,
+ ISet<IObservable<FailedTask>> taskExceptionEventHandlers,
+ ISet<IObservable<IAllocatedEvaluator>> allocatedEvaluatorEventHandlers,
+ ISet<IObservable<IFailedEvaluator>> failedEvaluatorHandlers,
+ ISet<IObservable<ICompletedEvaluator>> completedEvaluatorHandlers)
+ {
+ _clock = clock;
+ //_remoteManager = remoteManager;
+ _driverManager = driverManager;
+ _resourceReleaseHandler = resourceReleaseHandler;
+ _resourceLaunchHandler = resourceLaunchHandler;
+ _evaluatorId = evaluatorId;
+ _evaluatorDescriptor = evaluatorDescriptor;
+
+ //this.dispatcher = new DispatchingEStage(errorHandler, 16); // 16 threads
+
+ //this.dispatcher.register(ActiveContext.class, activeContextEventHandlers);
+ //this.dispatcher.register(ClosedContext.class, closedContextEventHandlers);
+ //this.dispatcher.register(FailedContext.class, failedContextEventHandlers);
+ //this.dispatcher.register(ContextMessage.class, contextMessageHandlers);
+
+ //this.dispatcher.register(RunningTask.class, runningTaskEventHandlers);
+ //this.dispatcher.register(CompletedTask.class, completedTaskEventHandlers);
+ //this.dispatcher.register(SuspendedTask.class, suspendedTaskEventHandlers);
+ //this.dispatcher.register(TaskMessage.class, taskMessageEventHandlers);
+ //this.dispatcher.register(FailedTask.class, taskExceptionEventHandlers);
+
+ //this.dispatcher.register(FailedEvaluator.class, failedEvaluatorHandlers);
+ //this.dispatcher.register(CompletedEvaluator.class, completedEvaluatorHandlers);
+ //this.dispatcher.register(AllocatedEvaluator.class, allocatedEvaluatorEventHandlers);
+
+ //this.dispatcher.onNext(AllocatedEvaluator.class,
+ // new AllocatedEvaluatorImpl(this, remoteManager.getMyIdentifier()));
+ }
+
+ /// <summary>
+ /// Various states that the EvaluatorManager could be in. The EvaluatorManager is created when a resource has been allocated by the ResourceManager.
+ /// </summary>
+ public enum STATE
+ {
+ ALLOCATED, // initial state
+ SUBMITTED, // client called AllocatedEvaluator.submitTask() and we're waiting for first contact
+ RUNNING, // first contact received, all communication channels established, Evaluator sent to client.
+ DONE, // clean shutdown
+ FAILED, // some failure occurred.
+ KILLED // unclean shutdown
+ }
+
+ public IEvaluatorDescriptor EvaluatorDescriptor
+ {
+ get
+ {
+ return _evaluatorDescriptor;
+ }
+ }
+
+ public INodeDescriptor NodeDescriptor
+ {
+ get
+ {
+ return EvaluatorDescriptor.NodeDescriptor;
+ }
+ }
+
+ public IRunningTask RunningTask
+ {
+ get
+ {
+ lock (_evaluatorDescriptor)
+ {
+ return _runningTask;
+ }
+ }
+ }
+
+ public string Id
+ {
+ get
+ {
+ return _evaluatorId;
+ }
+
+ set
+ {
+ }
+ }
+
+ public EvaluatorType Type
+ {
+ get
+ {
+ return _type;
+ }
+
+ set
+ {
+ _type = value;
+ _evaluatorDescriptor.EvaluatorType = value;
+ }
+ }
+
+ public void Dispose()
+ {
+ lock (_evaluatorDescriptor)
+ {
+ if (_state == STATE.RUNNING)
+ {
+ LOGGER.Log(Level.Warning, "Dirty shutdown of running evaluator :" + Id);
+ try
+ {
+ // Killing the evaluator means that it doesn't need to send a confirmation; it just dies.
+ EvaluatorControlProto proto = new EvaluatorControlProto();
+ proto.timestamp = DateTime.Now.Ticks;
+ proto.identifier = Id;
+ proto.kill_evaluator = new KillEvaluatorProto();
+ Handle(proto);
+ }
+ finally
+ {
+ _state = STATE.KILLED;
+ }
+ }
+ }
+
+ if (!_isResourceReleased)
+ {
+ try
+ {
+ // We need to wait awhile before returning the container to the RM in order to
+ // give the EvaluatorRuntime (and Launcher) time to cleanly exit.
+
+ // this.clock.scheduleAlarm(100, new EventHandler<Alarm>() {
+ //@Override
+ //public void onNext(final Alarm alarm) {
+ // EvaluatorManager.this.resourceReleaseHandler.onNext(
+ // DriverRuntimeProtocol.ResourceReleaseProto.newBuilder()
+ // .setIdentifier(EvaluatorManager.this.evaluatorId).build());
+ }
+ catch (Exception e)
+ {
+ Exceptions.Caught(e, Level.Warning, "Force resource release because the client closed the clock.", LOGGER);
+ ResourceReleaseProto proto = new ResourceReleaseProto();
+ proto.identifier = _evaluatorId;
+ _resourceReleaseHandler.OnNext(proto);
+ }
+ finally
+ {
+ _isResourceReleased = true;
+ _driverManager.Release(this);
+ }
+ }
+ }
+
+ /// <summary>
+ /// EvaluatorException will trigger is FailedEvaluator and state transition to FAILED
+ /// </summary>
+ /// <param name="exception"></param>
+ public void Handle(EvaluatorException exception)
+ {
+ lock (_evaluatorDescriptor)
+ {
+ if (_state >= STATE.DONE)
+ {
+ return;
+ }
+ LOGGER.Log(Level.Warning, "Failed Evaluator: " + Id + exception.Message);
+ try
+ {
+ IList<FailedContext> failedContexts = new List<FailedContext>();
+ IList<EvaluatorContext> activeContexts = new List<EvaluatorContext>(_activeContexts);
+ activeContexts = activeContexts.Reverse().ToList();
+ foreach (EvaluatorContext context in activeContexts)
+ {
+ Optional<IActiveContext> parentContext = context.ParentId.IsPresent()
+ ? Optional<IActiveContext>.Of(GetEvaluatorContext(context.ParentId.Value))
+ : Optional<IActiveContext>.Empty();
+ failedContexts.Add(context.GetFailedContext(parentContext, exception));
+ }
+
+ //Optional<FailedTask> failedTask = _runningTask != null ?
+ // Optional<FailedTask>.Of(new FailedTask(_runningTask.Id, exception)) : Optional<FailedTask>.Empty();
+ //LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + failedTask.ToString());
+ //this.dispatcher.onNext(FailedEvaluator.class, new FailedEvaluatorImpl(
+ //exception, failedContextList, failedTaskOptional, this.evaluatorId));
+ }
+ catch (Exception e)
+ {
+ Exceptions.CaughtAndThrow(e, Level.Error, "Exception while handling FailedEvaluator.", LOGGER);
+ }
+ finally
+ {
+ _state = STATE.FAILED;
+ Dispose();
+ }
+ }
+ }
+
+ public void Handle(IRemoteMessage<EvaluatorHeartbeatProto> evaluatorHearBeatProtoMessage)
+ {
+ lock (_evaluatorDescriptor)
+ {
+ EvaluatorHeartbeatProto heartbeatProto = evaluatorHearBeatProtoMessage.Message;
+ if (heartbeatProto.evaluator_status != null)
+ {
+ EvaluatorStatusProto status = heartbeatProto.evaluator_status;
+ if (status.error != null)
+ {
+ Handle(new EvaluatorException(Id, ByteUtilities.ByteArrarysToString(status.error)));
+ return;
+ }
+ else if (_state == STATE.SUBMITTED)
+ {
+ string evaluatorRId = evaluatorHearBeatProtoMessage.Identifier.ToString();
+ LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorRId);
+ // TODO
+ // _evaluatorControlHandler = _remoteManager.getHandler(evaluatorRID, EvaluatorRuntimeProtocol.EvaluatorControlProto.class);
+ _state = STATE.RUNNING;
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} is running", _evaluatorId));
+ }
+ }
+
+ LOGGER.Log(Level.Info, "Evaluator heartbeat: " + heartbeatProto);
+
+ EvaluatorStatusProto evaluatorStatusProto = heartbeatProto.evaluator_status;
+ foreach (ContextStatusProto contextStatusProto in heartbeatProto.context_status)
+ {
+ Handle(contextStatusProto, heartbeatProto.task_status != null);
+ }
+
+ if (heartbeatProto.task_status != null)
+ {
+ Handle(heartbeatProto.task_status);
+ }
+
+ if (evaluatorStatusProto.state == State.FAILED)
+ {
+ _state = STATE.FAILED;
+ EvaluatorException e = evaluatorStatusProto.error != null ?
+ new EvaluatorException(_evaluatorId, ByteUtilities.ByteArrarysToString(evaluatorStatusProto.error)) :
+ new EvaluatorException(_evaluatorId, "unknown cause");
+ LOGGER.Log(Level.Warning, "Failed evaluator: " + Id + e.Message);
+ Handle(e);
+ }
+ else if (evaluatorStatusProto.state == State.DONE)
+ {
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} done", Id));
+ _state = STATE.DONE;
+
+ // TODO
+ // dispatcher.onNext(CompletedEvaluator.class, new CompletedEvaluator() {
+ //@Override
+ //public String getId() {
+ // return EvaluatorManager.this.evaluatorId;
+ Dispose();
+ }
+ }
+ LOGGER.Log(Level.Info, "DONE with evaluator heartbeat");
+ }
+
+ public void Handle(ResourceLaunchProto resourceLaunchProto)
+ {
+ lock (_evaluatorDescriptor)
+ {
+ if (_state == STATE.ALLOCATED)
+ {
+ _state = STATE.SUBMITTED;
+ _resourceLaunchHandler.OnNext(resourceLaunchProto);
+ }
+ else
+ {
+ var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Evaluator manager expected {0} state, but instead is in state {1}", STATE.ALLOCATED, _state));
+ Exceptions.Throw(e, LOGGER);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Packages the TaskControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime
+ /// </summary>
+ /// <param name="contextControlProto"></param>
+ public void Handle(ContextControlProto contextControlProto)
+ {
+ lock (_evaluatorDescriptor)
+ {
+ LOGGER.Log(Level.Info, "Task control message from " + _evaluatorId);
+ EvaluatorControlProto evaluatorControlProto = new EvaluatorControlProto();
+ evaluatorControlProto.timestamp = DateTime.Now.Ticks;
+ evaluatorControlProto.identifier = Id;
+ evaluatorControlProto.context_control = contextControlProto;
+
+ Handle(evaluatorControlProto);
+ }
+ }
+
+ /// <summary>
+ /// Forward the EvaluatorControlProto to the EvaluatorRuntime
+ /// </summary>
+ /// <param name="proto"></param>
+ public void Handle(EvaluatorControlProto proto)
+ {
+ lock (_evaluatorDescriptor)
+ {
+ if (_state == STATE.RUNNING)
+ {
+ _evaluatorControlHandler.OnNext(proto);
+ }
+ else
+ {
+ var e = new InvalidOperationException(
+ string.Format(
+ CultureInfo.InvariantCulture,
+ "Evaluator manager expects to be in {0} state, but instead is in state {1}",
+ STATE.RUNNING,
+ _state));
+ Exceptions.Throw(e, LOGGER);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Resource status information from the (actual) resource manager.
+ /// </summary>
+ /// <param name="resourceStatusProto"></param>
+ public void Handle(ResourceStatusProto resourceStatusProto)
+ {
+ lock (_evaluatorDescriptor)
+ {
+ State resourceState = resourceStatusProto.state;
+ LOGGER.Log(Level.Info, "Resource manager state update: " + resourceState);
+
+ if (resourceState == State.DONE || resourceState == State.FAILED)
+ {
+ if (_state < STATE.DONE)
+ {
+ // something is wrong, I think I'm alive but the resource manager runtime says I'm dead
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.Append(
+ string.Format(
+ CultureInfo.InvariantCulture,
+ "The resource manager informed me that Evaluator {0} is in state {1} but I think I am in {2} state",
+ _evaluatorId,
+ resourceState,
+ _state));
+ if (resourceStatusProto.diagnostics != null)
+ {
+ stringBuilder.Append("Cause: " + resourceStatusProto.diagnostics);
+ }
+ if (_runningTask != null)
+ {
+ stringBuilder.Append(
+ string.Format(
+ CultureInfo.InvariantCulture,
+ "Taskruntime {0} did not complete before this evaluator died.",
+ _runningTask.Id));
+ }
+
+ // RM is telling me its DONE/FAILED - assuming it has already released the resources
+ _isResourceReleased = true;
+ //Handle(new EvaluatorException(_evaluatorId, stringBuilder.ToString(), _runningTask));
+ _state = STATE.KILLED;
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Handle a context status update
+ /// </summary>
+ /// <param name="contextStatusProto"></param>
+ /// <param name="notifyClientOnNewActiveContext"></param>
+ private void Handle(ContextStatusProto contextStatusProto, bool notifyClientOnNewActiveContext)
+ {
+ string contextId = contextStatusProto.context_id;
+ Optional<string> parentId = contextStatusProto.parent_id != null ?
+ Optional<string>.Of(contextStatusProto.parent_id) : Optional<string>.Empty();
+ if (ContextStatusProto.State.READY == contextStatusProto.context_state)
+ {
+ if (!_activeContextIds.Contains(contextId))
+ {
+ EvaluatorContext evaluatorContext = new EvaluatorContext(this, contextId, parentId);
+ AddEvaluatorContext(evaluatorContext);
+ if (notifyClientOnNewActiveContext)
+ {
+ LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext.ToString());
+ //TODO
+ //dispatcher.onNext(ActiveContext.class, context);
+ }
+ }
+ foreach (ContextStatusProto.ContextMessageProto contextMessageProto in contextStatusProto.context_message)
+ {
+ byte[] message = contextMessageProto.message;
+ string sourceId = contextMessageProto.source_id;
+ LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + sourceId + message);
+ // this.dispatcher.onNext(ContextMessage.class,
+ //new ContextMessageImpl(theMessage, contextID, sourceID));
+ }
+ }
+ else
+ {
+ if (!_activeContextIds.Contains(contextId))
+ {
+ if (ContextStatusProto.State.FAIL == contextStatusProto.context_state)
+ {
+ AddEvaluatorContext(new EvaluatorContext(this, contextId, parentId));
+ }
+ else
+ {
+ var e = new InvalidOperationException("unknown context signaling state " + contextStatusProto.context_state);
+ Exceptions.Throw(e, LOGGER);
+ }
+ }
+ }
+
+ EvaluatorContext context = GetEvaluatorContext(contextId);
+ EvaluatorContext parentContext = context.ParentId.IsPresent() ?
+ GetEvaluatorContext(context.ParentId.Value) : null;
+ RemoveEvaluatorContext(context);
+
+ if (ContextStatusProto.State.FAIL == contextStatusProto.context_state)
+ {
+ // TODO
+ Exception reason = new InvalidOperationException(ByteUtilities.ByteArrarysToString(contextStatusProto.error));
+ Optional<IActiveContext> optionalParentContext = (null == parentContext) ?
+ Optional<IActiveContext>.Empty() : Optional<IActiveContext>.Of(parentContext);
+ LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + reason.ToString() + optionalParentContext);
+ // TODO
+ //this.dispatcher.onNext(FailedContext.class,
+ //context.getFailedContext(optionalParentContext, reason));
+ }
+ else if (ContextStatusProto.State.DONE == contextStatusProto.context_state)
+ {
+ if (null != parentContext)
+ {
+ // TODO
+ //this.dispatcher.onNext(ClosedContext.class, context.getClosedContext(parentContext));
+ }
+ else
+ {
+ LOGGER.Log(Level.Info, "Root context closed. Evaluator closed will trigger final shutdown.");
+ }
+ }
+ else
+ {
+ var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown context state {0} for context {1}", contextStatusProto.context_state, contextId));
+ Exceptions.Throw(e, LOGGER);
+ }
+ }
+
+ /// <summary>
+ /// Handle task status messages.
+ /// </summary>
+ /// <param name="taskStatusProto"></param>
+ private void Handle(TaskStatusProto taskStatusProto)
+ {
+ LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received task {0} status {1}", taskStatusProto.task_id, taskStatusProto.state));
+ string taskId = taskStatusProto.task_id;
+ string contextId = taskStatusProto.context_id;
+ State taskState = taskStatusProto.state;
+
+ if (taskState == State.INIT)
+ {
+ EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId);
+ _runningTask = new RunningTaskImpl(this, taskId, evaluatorContext);
+ // this.dispatcher.onNext(RunningTask.class, this.runningTask);
+ }
+ else if (taskState == State.SUSPEND)
+ {
+ EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId);
+ _runningTask = null;
+ byte[] message = taskStatusProto.result != null ? taskStatusProto.result : null;
+ LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext + message.ToString());
+ //this.dispatcher.onNext(SuspendedTask.class, new SuspendedTaskImpl(evaluatorContext, message, taskId));
+ }
+ else if (taskState == State.DONE)
+ {
+ EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId);
+ _runningTask = null;
+ byte[] message = taskStatusProto.result != null ? taskStatusProto.result : null;
+ LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext + message.ToString());
+ //this.dispatcher.onNext(CompletedTask.class, new CompletedTaskImpl(evaluatorContext, message, taskId));
+ }
+ else if (taskState == State.FAILED)
+ {
+ _runningTask = null;
+ //EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId);
+ //FailedTask failedTask = taskStatusProto.result != null ?
+ // new FailedTask(taskId, ByteUtilities.ByteArrarysToString(taskStatusProto.result), Optional<IActiveContext>.Of(evaluatorContext)) :
+ // new FailedTask(taskId, "Failed task: " + taskState, Optional<IActiveContext>.Of(evaluatorContext));
+ //LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + failedTask.ToString());
+ //this.dispatcher.onNext(FailedTask.class, taskException);
+ }
+ else if (taskStatusProto.task_message.Count > 0)
+ {
+ if (_runningTask != null)
+ {
+ var e = new InvalidOperationException("runningTask must be null when there are multiple task messages");
+ Exceptions.Throw(e, LOGGER);
+ }
+ foreach (TaskStatusProto.TaskMessageProto taskMessageProto in taskStatusProto.task_message)
+ {
+ LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + taskMessageProto.ToString());
+ // this.dispatcher.onNext(TaskMessage.class,
+ //new TaskMessageImpl(taskMessageProto.getMessage().toByteArray(),
+ // taskId, contextId, taskMessageProto.getSourceId()));
+ }
+ }
+ }
+
+ private EvaluatorContext GetEvaluatorContext(string id)
+ {
+ foreach (EvaluatorContext context in _activeContexts)
+ {
+ if (context.Id.Equals(id))
+ {
+ return context;
+ }
+ var e = new InvalidOperationException("Unknown evaluator context with id " + id);
+ Exceptions.Throw(e, LOGGER);
+ }
+ return null;
+ }
+
+ private void AddEvaluatorContext(EvaluatorContext context)
+ {
+ _activeContexts.Add(context);
+ _activeContextIds.Add(context.Id);
+ }
+
+ private void RemoveEvaluatorContext(EvaluatorContext context)
+ {
+ _activeContexts.Remove(context);
+ _activeContextIds.Remove(context.Id);
+ }
+
+ [NamedParameter(documentation: "The Evaluator Identifier.")]
+ public class EvaluatorIdentifier : Name<string>
+ {
+ }
+
+ [NamedParameter(documentation: "The Evaluator Host.")]
+ public class EvaluatorDescriptorName : Name<EvaluatorDescriptorImpl>
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/FailedJob.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/FailedJob.cs b/lang/cs/Org.Apache.REEF.Driver/FailedJob.cs
new file mode 100644
index 0000000..5f36437
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/FailedJob.cs
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Common.Api;
+using Org.Apache.REEF.Utilities;
+using System;
+
+namespace Org.Apache.REEF.Driver
+{
+ /// <summary>
+ /// An error message that REEF Client receives when there is a user error in REEF job.
+ /// </summary>
+ public class FailedJob : AbstractFailure
+ {
+ /// <summary>
+ /// Create an error message given the entity ID and Java Exception. All accessor methods are provided by the base class.
+ /// </summary>
+ /// <param name="id"></param>
+ /// <param name="cause"></param>
+ public FailedJob(string id, Exception cause)
+ : base(id, cause)
+ {
+ }
+
+ public new string Id { get; set; }
+
+ public new string Message { get; set; }
+
+ public new Optional<string> Description { get; set; }
+
+ public new Optional<Exception> Cause { get; set; }
+
+ public new Optional<byte[]> Data { get; set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/IDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/IDriver.cs b/lang/cs/Org.Apache.REEF.Driver/IDriver.cs
new file mode 100644
index 0000000..e917ada
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/IDriver.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Driver
+{
+ /// <summary>
+ /// empty driver interface to facilitate referencing driver dll
+ /// </summary>
+ public interface IDriver
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/IStartHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/IStartHandler.cs b/lang/cs/Org.Apache.REEF.Driver/IStartHandler.cs
new file mode 100644
index 0000000..0f03295
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/IStartHandler.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Driver
+{
+ public interface IStartHandler
+ {
+ string Identifier { get; set; }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
new file mode 100644
index 0000000..ef874e8
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
@@ -0,0 +1,226 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProjectGuid>{A6BAA2A7-F52F-4329-884E-1BCF711D6805}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Org.Apache.REEF.Driver</RootNamespace>
+ <AssemblyName>Org.Apache.REEF.Driver</AssemblyName>
+ <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\..\..\..</SolutionDir>
+ <RestorePackages>true</RestorePackages>
+ </PropertyGroup>
+ <Import Project="$(SolutionDir)\Source\build.props" />
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>$(BinDir)\$(Platform)\$(Configuration)\$(RootNamespace)</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Microsoft.Hadoop.Avro">
+ <HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.1.4.0.0\lib\net40\Microsoft.Hadoop.Avro.dll</HintPath>
+ </Reference>
+ <Reference Include="Newtonsoft.Json">
+ <HintPath>$(PackagesDir)\Newtonsoft.Json.6.0.8\lib\net45\Newtonsoft.Json.dll</HintPath>
+ </Reference>
+ <Reference Include="protobuf-net">
+ <HintPath>$(PackagesDir)\protobuf-net.2.0.0.668\lib\net40\protobuf-net.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core" />
+ <Reference Include="System.Runtime.Serialization" />
+ <Reference Include="System.Xml.Linq" />
+ <Reference Include="System.Data.DataSetExtensions" />
+ <Reference Include="Microsoft.CSharp" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="bridge\BridgeLogger.cs" />
+ <Compile Include="bridge\clr2java\IActiveContextClr2Java.cs" />
+ <Compile Include="bridge\clr2java\IAllocatedEvaluaotrClr2Java.cs" />
+ <Compile Include="bridge\clr2java\IClosedContextClr2Java.cs" />
+ <Compile Include="bridge\clr2java\IClr2Java.cs" />
+ <Compile Include="bridge\clr2java\ICompletedEvaluatorClr2Java.cs" />
+ <Compile Include="bridge\clr2java\ICompletedTaskClr2Java.cs" />
+ <Compile Include="bridge\clr2java\IContextMessageClr2Java.cs" />
+ <Compile Include="bridge\clr2java\IEvaluatorRequestorClr2Java.cs" />
+ <Compile Include="bridge\clr2java\IFailedContextClr2Java.cs" />
+ <Compile Include="bridge\clr2java\IFailedEvaluatorClr2Java.cs" />
+ <Compile Include="bridge\clr2java\IFailedTaskClr2Java.cs" />
+ <Compile Include="bridge\clr2java\IHttpServerBridgeClr2Java.cs" />
+ <Compile Include="bridge\clr2java\IRunningTaskClr2Java.cs" />
+ <Compile Include="bridge\clr2java\ISuspendedTaskClr2Java.cs" />
+ <Compile Include="bridge\clr2java\ITaskMessageClr2Java.cs" />
+ <Compile Include="bridge\ClrClientHelper.cs" />
+ <Compile Include="bridge\ClrHandlerHelper.cs" />
+ <Compile Include="bridge\ClrSystemHandler.cs" />
+ <Compile Include="bridge\ClrSystemHandlerWrapper.cs" />
+ <Compile Include="bridge\DriverBridge.cs" />
+ <Compile Include="bridge\DriverBridgeConfiguration.cs" />
+ <Compile Include="bridge\DriverBridgeConfigurationOptions.cs" />
+ <Compile Include="bridge\events\ActiveContext.cs" />
+ <Compile Include="bridge\events\AllocatedEvaluator.cs" />
+ <Compile Include="bridge\events\ClosedContext.cs" />
+ <Compile Include="bridge\events\CompletedEvaluator.cs" />
+ <Compile Include="bridge\events\CompletedTask.cs" />
+ <Compile Include="bridge\events\ContextMessage.cs" />
+ <Compile Include="bridge\events\EvaluatorRequstor.cs" />
+ <Compile Include="bridge\events\FailedContext.cs" />
+ <Compile Include="bridge\events\FailedEvaluator.cs" />
+ <Compile Include="bridge\events\FailedTask.cs" />
+ <Compile Include="bridge\events\RunningTask.cs" />
+ <Compile Include="bridge\events\SuspendedTask.cs" />
+ <Compile Include="bridge\events\TaskMessage.cs" />
+ <Compile Include="bridge\HttpMessage.cs" />
+ <Compile Include="bridge\HttpServerHandler.cs" />
+ <Compile Include="bridge\HttpServerPort.cs" />
+ <Compile Include="bridge\IHttpHandler.cs" />
+ <Compile Include="bridge\IHttpMessage.cs" />
+ <Compile Include="bridge\ReefHttpRequest.cs" />
+ <Compile Include="bridge\ReefHttpResponse.cs" />
+ <Compile Include="ClientManager.cs" />
+ <Compile Include="Constants.cs" />
+ <Compile Include="context\ContextConfiguration.cs" />
+ <Compile Include="context\ContextConfigurationOptions.cs" />
+ <Compile Include="context\defaults\DefaultContextMessageSource.cs" />
+ <Compile Include="context\defaults\DefaultContextStartHandler.cs" />
+ <Compile Include="context\defaults\DefaultContextStopHandler.cs" />
+ <Compile Include="context\EvaluatorContext.cs" />
+ <Compile Include="context\IActiveContext.cs" />
+ <Compile Include="context\IClosedContext.cs" />
+ <Compile Include="context\IContext.cs" />
+ <Compile Include="context\IFailedContext.cs" />
+ <Compile Include="contract\IBridgeContract.cs" />
+ <Compile Include="defaults\DefaultClientCloseHandler.cs" />
+ <Compile Include="defaults\DefaultClientCloseWithMessageHandler.cs" />
+ <Compile Include="defaults\DefaultClientMessageHandler.cs" />
+ <Compile Include="defaults\DefaultContextActiveHandler.cs" />
+ <Compile Include="defaults\DefaultContextClosureHandler.cs" />
+ <Compile Include="defaults\DefaultContextFailureHandler.cs" />
+ <Compile Include="defaults\DefaultContextMessageHandler.cs" />
+ <Compile Include="defaults\DefaultCustomTraceListener.cs" />
+ <Compile Include="defaults\DefaultDriverRestartContextActiveHandler.cs" />
+ <Compile Include="defaults\DefaultDriverRestartHandler.cs" />
+ <Compile Include="defaults\DefaultDriverRestartTaskRunningHandler.cs" />
+ <Compile Include="defaults\DefaultEvaluatorAllocationHandler.cs" />
+ <Compile Include="defaults\DefaultEvaluatorCompletionHandler.cs" />
+ <Compile Include="defaults\DefaultEvaluatorFailureHandler.cs" />
+ <Compile Include="defaults\DefaultEvaluatorRequestorHandler.cs" />
+ <Compile Include="defaults\DefaultHttpHandler.cs" />
+ <Compile Include="defaults\DefaultTaskCompletionHandler.cs" />
+ <Compile Include="defaults\DefaultTaskFailureHandler.cs" />
+ <Compile Include="defaults\DefaultTaskMessageHandler.cs" />
+ <Compile Include="defaults\DefaultTaskRunningHandler.cs" />
+ <Compile Include="defaults\DefaultTaskSuspensionHandler.cs" />
+ <Compile Include="DriverConfigGenerator.cs" />
+ <Compile Include="DriverConfigurationSettings.cs" />
+ <Compile Include="DriverManager.cs" />
+ <Compile Include="DriverRuntimeConfiguration.cs" />
+ <Compile Include="DriverRuntimeConfigurationOptions.cs" />
+ <Compile Include="DriverSubmissionSettings.cs" />
+ <Compile Include="EvaluatorManager.cs" />
+ <Compile Include="evaluator\EvaluatorDescriptorImpl.cs" />
+ <Compile Include="evaluator\EvaluatorRequest.cs" />
+ <Compile Include="evaluator\EvaluatorRequestBuilder.cs" />
+ <Compile Include="evaluator\IAllocatedEvaluator.cs" />
+ <Compile Include="evaluator\ICompletedEvaluator.cs" />
+ <Compile Include="evaluator\IEvaluatorDescriptor.cs" />
+ <Compile Include="evaluator\IEvaluatorRequest .cs" />
+ <Compile Include="evaluator\IEvaluatorRequestor.cs" />
+ <Compile Include="evaluator\IFailedEvaluator.cs" />
+ <Compile Include="FailedJob.cs" />
+ <Compile Include="IDriver.cs" />
+ <Compile Include="IStartHandler.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="task\ICompletedTask.cs" />
+ <Compile Include="task\IFailedTask.cs" />
+ <Compile Include="task\IRunningTask.cs" />
+ <Compile Include="task\ISuspendedTask.cs" />
+ <Compile Include="task\ITaskMessage.cs" />
+ <Compile Include="task\RunningTaskImpl.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj">
+ <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project>
+ <Name>ReefCommon</Name>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
+ <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project>
+ <Name>Org.Apache.REEF.Tang</Name>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj">
+ <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project>
+ <Name>Org.Apache.REEF.Utilities</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj">
+ <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project>
+ <Name>Org.Apache.REEF.Wake</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Driver/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..79fba06
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Properties/AssemblyInfo.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Org.Apache.REEF.Driver")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Org.Apache.REEF.Driver")]
+[assembly: AssemblyCopyright("Copyright © 2015")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("81ea2648-b341-4852-93b0-806da615c6b8")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Driver/bridge/BridgeLogger.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/bridge/BridgeLogger.cs b/lang/cs/Org.Apache.REEF.Driver/bridge/BridgeLogger.cs
new file mode 100644
index 0000000..3e2dada
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/bridge/BridgeLogger.cs
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Driver.Bridge
+{
+ /// <summary>
+ /// A wrapper around the general Logger class used specifically for
+ /// logging in CPP bridge code.
+ /// This is enabled when trace leve is above Level.Info (included)
+ /// </summary>
+ public class BridgeLogger
+ {
+ private Logger _logger;
+
+ public BridgeLogger(string name)
+ {
+ _logger = Logger.GetLogger(name);
+ }
+
+ public static BridgeLogger GetLogger(string className)
+ {
+ return new BridgeLogger(className);
+ }
+
+ public void Log(string message)
+ {
+ _logger.Log(Level.Info, message);
+ }
+
+ public void LogStart(string message)
+ {
+ _logger.Log(Level.Start, message);
+ }
+
+ public void LogStop(string message)
+ {
+ _logger.Log(Level.Stop, message);
+ }
+
+ public void LogError(string message, Exception e)
+ {
+ _logger.Log(Level.Error, message, e);
+ }
+ }
+}