You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/29 21:43:09 UTC

[25/31] incubator-reef git commit: [REEF-97] Add the REEF.NET code base

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/ClientManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/ClientManager.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/ClientManager.cs
new file mode 100644
index 0000000..3cd3e15
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefDriver/ClientManager.cs
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.ProtoBuf.ClienRuntimeProto;
+using System;
+
+// TODO
+namespace Org.Apache.Reef.Driver
+{
+    public class ClientManager : IObserver<JobControlProto>
+    {
+        public void OnNext(JobControlProto value)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/Constants.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/Constants.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/Constants.cs
new file mode 100644
index 0000000..4e8e68c
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefDriver/Constants.cs
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+
+namespace Org.Apache.Reef.Driver
+{
+    public class Constants
+    {
+        public const ulong NullHandler = 0;
+
+        public const string ClassHierarachyBin = "clrClassHierarchy.bin";
+
+        public const string GlobalUserSuppliedJavaLibraries = "userSuppliedGlobalLibraries.txt";
+
+        public const int DefaultMemoryGranularity = 1024;
+
+        public const int HandlersNumber = 17;
+
+        public const string EvaluatorRequestorHandler = "EvaluatorRequestor";
+
+        public const string AllocatedEvaluatorHandler = "AllocatedEvaluator";
+
+        public const string CompletedEvaluatorHandler = "CompletedEvaluator";
+
+        public const string ActiveContextHandler = "ActiveContext";
+
+        public const string ClosedContextHandler = "ClosedContext";
+
+        public const string FailedContextHandler = "FailedContext";
+
+        public const string ContextMessageHandler = "ContextMessage";
+
+        public const string TaskMessageHandler = "TaskMessage";
+
+        public const string FailedTaskHandler = "FailedTask";
+
+        public const string RunningTaskHandler = "RunningTask";
+
+        public const string FailedEvaluatorHandler = "FailedEvaluator";
+
+        public const string CompletedTaskHandler = "CompletedTask";
+
+        public const string SuspendedTaskHandler = "SuspendedTask";
+
+        public const string HttpServerHandler = "HttpServerHandler";
+
+        public const string DriverRestartHandler = "DriverRestart";
+
+        public const string DriverRestartActiveContextHandler = "DriverRestartActiveContext";
+
+        public const string DriverRestartRunningTaskHandler = "DriverRestartRunningTask";
+
+        public const string DriverBridgeConfiguration = Common.Constants.ClrBridgeRuntimeConfiguration;
+
+        public const string DriverAppDirectory = "ReefDriverAppDlls";
+
+        public const string BridgeJarFileName = "reef-bridge-0.11.0-incubating-SNAPSHOT-shaded.jar";
+
+        public const string BridgeLaunchClass = "org.apache.reef.javabridge.generic.Launch";
+
+        public const string BridgeLaunchHeadlessClass = "org.apache.reef.javabridge.generic.LaunchHeadless";
+
+        public const string DirectLauncherClass = "org.apache.reef.runtime.common.Launcher";
+
+        public const string JavaToCLRLoggingConfig = "-Djava.util.logging.config.class=org.apache.reef.util.logging.CLRLoggingConfig";
+
+        public const string JavaVerboseLoggingConfig = "-Djava.util.logging.config.class=org.apache.reef.util.logging.Config";
+
+        public static Dictionary<string, int> Handlers
+        {
+            get
+            {
+                return
+                    new Dictionary<string, int>()
+                    {
+                        { EvaluatorRequestorHandler, 0 },
+                        { AllocatedEvaluatorHandler, 1 },
+                        { ActiveContextHandler, 2 },
+                        { TaskMessageHandler, 3 },
+                        { FailedTaskHandler, 4 },
+                        { FailedEvaluatorHandler, 5 },
+                        { HttpServerHandler, 6 },
+                        { CompletedTaskHandler, 7 },
+                        { RunningTaskHandler, 8 },
+                        { SuspendedTaskHandler, 9 },
+                        { CompletedEvaluatorHandler, 10 },
+                        { ClosedContextHandler, 11 },
+                        { FailedContextHandler, 12 },
+                        { ContextMessageHandler, 13 },
+                        { DriverRestartHandler, 14 },
+                        { DriverRestartActiveContextHandler, 15 },
+                        { DriverRestartRunningTaskHandler, 16 },
+                    };
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/DriverConfigGenerator.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/DriverConfigGenerator.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverConfigGenerator.cs
new file mode 100644
index 0000000..46c56c5
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverConfigGenerator.cs
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using Org.Apache.Reef.Driver.bridge;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Formats;
+using Org.Apache.Reef.Tang.Implementations.Configuration;
+using Org.Apache.Reef.Tang.Interface;
+using Org.Apache.Reef.Tang.Protobuf;
+
+namespace Org.Apache.Reef.Driver
+{
+    public class DriverConfigGenerator
+    {
+        public const string DriverConfigFile = "driver.config";
+        public const string JobDriverConfigFile = "jobDriver.config";
+        public const string DriverChFile = "driverClassHierarchy.bin";
+        public const string HttpServerConfigFile = "httpServer.config";
+        public const string NameServerConfigFile = "nameServer.config";
+        public const string UserSuppliedGlobalLibraries = "userSuppliedGlobalLibraries.txt";
+
+        private static readonly Logger Log = Logger.GetLogger(typeof(DriverConfigGenerator));
+
+        public static void DriverConfigurationBuilder(DriverConfigurationSettings driverConfigurationSettings)
+        {
+            ExtractConfigFromJar(driverConfigurationSettings.JarFileFolder);
+
+            if (!File.Exists(DriverChFile))
+            {
+                Log.Log(Level.Warning, string.Format(CultureInfo.CurrentCulture, "There is no file {0} extracted from the jar file at {1}.", DriverChFile, driverConfigurationSettings.JarFileFolder));
+                return;
+            }
+
+            if (!File.Exists(HttpServerConfigFile))
+            {
+                Log.Log(Level.Warning, string.Format(CultureInfo.CurrentCulture, "There is no file {0} extracted from the jar file at {1}.", HttpServerConfigFile, driverConfigurationSettings.JarFileFolder));
+                return;
+            }
+
+            if (!File.Exists(JobDriverConfigFile))
+            {
+                Log.Log(Level.Warning, string.Format(CultureInfo.CurrentCulture, "There is no file {0} extracted from the jar file at {1}.", JobDriverConfigFile, driverConfigurationSettings.JarFileFolder));
+                return;
+            }
+
+            if (!File.Exists(NameServerConfigFile))
+            {
+                Log.Log(Level.Warning, string.Format(CultureInfo.CurrentCulture, "There is no file {0} extracted from the jar file at {1}.", NameServerConfigFile, driverConfigurationSettings.JarFileFolder));
+                return;
+            }
+
+            AvroConfigurationSerializer serializer = new AvroConfigurationSerializer();
+
+            IClassHierarchy drvierClassHierarchy = ProtocolBufferClassHierarchy.DeSerialize(DriverChFile);
+
+            AvroConfiguration jobDriverAvroconfiguration = serializer.AvroDeseriaizeFromFile(JobDriverConfigFile);
+            IConfiguration jobDriverConfiguration = serializer.FromAvro(jobDriverAvroconfiguration, drvierClassHierarchy);
+
+            AvroConfiguration httpAvroconfiguration = serializer.AvroDeseriaizeFromFile(HttpServerConfigFile);
+            IConfiguration httpConfiguration = serializer.FromAvro(httpAvroconfiguration, drvierClassHierarchy);
+
+            AvroConfiguration nameAvroconfiguration = serializer.AvroDeseriaizeFromFile(NameServerConfigFile);
+            IConfiguration nameConfiguration = serializer.FromAvro(nameAvroconfiguration, drvierClassHierarchy);
+
+            IConfiguration merged;
+
+            if (driverConfigurationSettings.IncludingHttpServer && driverConfigurationSettings.IncludingNameServer)
+            {
+                merged = Configurations.MergeDeserializedConfs(jobDriverConfiguration, httpConfiguration, nameConfiguration);
+            } 
+            else if (driverConfigurationSettings.IncludingHttpServer)
+            {
+                merged = Configurations.MergeDeserializedConfs(jobDriverConfiguration, httpConfiguration);                
+            }
+            else if (driverConfigurationSettings.IncludingNameServer)
+            {
+                merged = Configurations.MergeDeserializedConfs(jobDriverConfiguration, nameConfiguration);
+            }
+            else
+            {
+                merged = jobDriverConfiguration;
+            }
+
+            var b = merged.newBuilder();
+
+            b.BindSetEntry("org.apache.reef.driver.parameters.DriverIdentifier", driverConfigurationSettings.DriverIdentifier);
+            b.Bind("org.apache.reef.driver.parameters.DriverMemory", driverConfigurationSettings.DriverMemory.ToString(CultureInfo.CurrentCulture));
+            b.Bind("org.apache.reef.driver.parameters.DriverJobSubmissionDirectory", driverConfigurationSettings.SubmissionDirectory);
+
+            //add for all the globallibaries
+            if (File.Exists(UserSuppliedGlobalLibraries))
+            {
+                var globalLibString = File.ReadAllText(UserSuppliedGlobalLibraries);
+                if (!string.IsNullOrEmpty(globalLibString))
+                {
+                    foreach (string fname in globalLibString.Split(','))
+                    {
+                        b.BindSetEntry("org.apache.reef.driver.parameters.JobGlobalLibraries", fname);
+                    }
+                }
+            }
+
+            foreach (string f in Directory.GetFiles(driverConfigurationSettings.ClrFolder))
+            {
+                b.BindSetEntry("org.apache.reef.driver.parameters.JobGlobalFiles", f);
+            }
+
+            IConfiguration c = b.Build();
+
+            serializer.ToFile(c, DriverConfigFile);
+
+            Log.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "driver.config is written to: {0} {1}.", Directory.GetCurrentDirectory(), DriverConfigFile));
+
+            //additional file for easy to read
+            using (StreamWriter outfile = new StreamWriter(DriverConfigFile + ".txt"))
+            {
+                outfile.Write(serializer.ToString(c));
+            }
+        }
+
+        private static void ExtractConfigFromJar(string jarfileFolder)
+        {
+            string jarfile = jarfileFolder + Constants.BridgeJarFileName;
+            List<string> files = new List<string>();
+            files.Add(DriverConfigGenerator.HttpServerConfigFile);
+            files.Add(DriverConfigGenerator.JobDriverConfigFile);
+            files.Add(DriverConfigGenerator.NameServerConfigFile);
+            files.Add(DriverConfigGenerator.DriverChFile);
+            ClrClientHelper.ExtractConfigfileFromJar(jarfile, files, ".");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/DriverConfigurationSettings.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/DriverConfigurationSettings.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverConfigurationSettings.cs
new file mode 100644
index 0000000..463e983
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverConfigurationSettings.cs
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.Reef.Utilities.Logging;
+
+namespace Org.Apache.Reef.Driver
+{
+    public class DriverConfigurationSettings
+    {
+        // default to "ReefDevClrBridge"
+        private string _driverIdentifier = "ReefDevClrBridge";
+
+        // default to _defaultSubmissionDirectory if not provided
+        private string _submissionDirectory = "reefTmp/job_" + DateTime.Now.Millisecond;
+
+        // deault to 512MB if no value is provided
+        private int _driverMemory = 512;
+
+        // folder path that constains clr dlls used by reef
+        private string _clrFolder = ".";
+
+        // folder that contains jar File provided Byte REEF
+        private string _jarFileFolder = ".";
+
+        // default to true if no value is specified
+        private bool _includeHttpServer = true;
+
+        // default to true if no value is specified
+        private bool _includeNameServer = true;
+
+        /// <summary>
+        /// Memory allocated for driver, default to 512 MB
+        /// </summary>
+        public int DriverMemory
+        {
+            get
+            {
+                return _driverMemory;
+            }
+
+            set
+            {
+                if (value < 0)
+                {
+                    throw new ArgumentException("driver memory cannot be negatvie value.");
+                }
+                _driverMemory = value;
+            }
+        }
+
+        /// <summary>
+        /// Gets or sets a value indicating whether including name server in the config file.
+        /// </summary>
+        /// <value>
+        ///   <c>true</c> if [including name server]; otherwise, <c>false</c>.
+        /// </value>
+        public bool IncludingNameServer
+        {
+            get { return _includeNameServer; }
+            set { _includeNameServer = value; }
+        }
+
+        /// <summary>
+        /// Gets or sets a value indicating whether including HTTP server in the config file.
+        /// </summary>
+        /// <value>
+        ///   <c>true</c> if [including HTTP server]; otherwise, <c>false</c>.
+        /// </value>
+        public bool IncludingHttpServer
+        {
+            get { return _includeHttpServer; }
+            set { _includeHttpServer = value; }
+        } 
+
+        /// <summary>
+        /// Driver Identifier, default to "ReefDevClrBridge" 
+        /// </summary>
+        public string DriverIdentifier
+        {
+            get { return _driverIdentifier; }
+            set { _driverIdentifier = value; }
+        }
+
+        /// <summary>
+        /// Driver job submission directory in (H)DFS where jar file shall be uploaded, default to a tmp directory with GUID name
+        /// If set by CLR user, the user must guarantee the uniquness of the directory across multiple jobs
+        /// </summary>
+        public string SubmissionDirectory
+        {
+            get { return _submissionDirectory; }
+            set { _submissionDirectory = value; }
+        }
+
+        /// <summary>
+        /// Gets or sets the CLR folder.
+        /// </summary>
+        /// <value>
+        /// The CLR folder.
+        /// </value>
+        public string ClrFolder
+        {
+            get { return this._clrFolder; }
+            set { _clrFolder = value; }
+        }
+
+        /// <summary>
+        /// Gets or sets the jar file folder.
+        /// </summary>
+        /// <value>
+        /// The jar file folder.
+        /// </value>
+        public string JarFileFolder
+        {
+            get { return this._jarFileFolder; }
+            set { _jarFileFolder = value; }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/DriverManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/DriverManager.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverManager.cs
new file mode 100644
index 0000000..b0efc2a
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverManager.cs
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common;
+using Org.Apache.Reef.Common.Api;
+using Org.Apache.Reef.Common.Catalog;
+using Org.Apache.Reef.Common.Evaluator;
+using Org.Apache.Reef.Common.Exceptions;
+using Org.Apache.Reef.Common.ProtoBuf.DriverRuntimeProto;
+using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol;
+using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.Reef.Driver.Bridge;
+using Org.Apache.Reef.Driver.Evaluator;
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Implementations;
+using Org.Apache.Reef.Tang.Interface;
+using Org.Apache.Reef.Wake.Remote;
+using Org.Apache.Reef.Wake.Time;
+using Org.Apache.Reef.Wake.Time.Runtime.Event;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+
+namespace Org.Apache.Reef.Driver
+{
+    public class DriverManager : 
+        IEvaluatorRequestor, 
+        IObserver<RuntimeStatusProto>, 
+        IObserver<ResourceStatusProto>,
+        IObserver<ResourceAllocationProto>,
+        IObserver<NodeDescriptorProto>,
+        IObserver<RuntimeStart>,
+        IObserver<RuntimeStop>,
+        IObserver<IdleClock>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(DriverManager));
+        
+        private IInjector _injector;
+
+        private IInjectionFuture<IClock> _clockFuture; 
+
+        private ResourceCatalogImpl _resourceCatalog;
+
+        private IInjectionFuture<IResourceRequestHandler> _futureResourceRequestHandler;
+        
+        private Dictionary<string, EvaluatorManager> _evaluators = new Dictionary<string, EvaluatorManager>();
+
+        private EvaluatorHeartBeatSanityChecker _sanityChecker = new EvaluatorHeartBeatSanityChecker();
+
+        private ClientJobStatusHandler _clientJobStatusHandler;
+
+        private IDisposable _heartbeatConnectionChannel;
+
+        private IDisposable _errorChannel;
+
+        private IObserver<RuntimeErrorProto> _runtimeErrorHandler;
+
+        public DriverManager(
+            IInjector injector,
+            ResourceCatalogImpl resourceCatalog,
+            IRemoteManager<REEFMessage> remoteManager,
+            IInjectionFuture<IClock> clockFuture,
+            IInjectionFuture<IResourceRequestHandler> futureResourceRequestHandler,
+            ClientJobStatusHandler clientJobStatusHandler,
+            string clientRId)
+        {
+            _injector = injector;
+            _clockFuture = clockFuture;
+            _resourceCatalog = resourceCatalog;
+            _futureResourceRequestHandler = futureResourceRequestHandler;
+            _clientJobStatusHandler = clientJobStatusHandler;
+
+            _heartbeatConnectionChannel = null;
+            _errorChannel = null;
+            _runtimeErrorHandler = null;
+            LOGGER.Log(Level.Info, "DriverManager instantiated");
+        }
+
+        public IResourceCatalog ResourceCatalog
+        {
+            get
+            {
+                return _resourceCatalog;
+            }
+
+            set
+            {
+            }
+        }
+
+        private RuntimeStatusProto _runtimeStatusProto
+        {
+            get
+            {
+                RuntimeStatusProto proto = new RuntimeStatusProto();
+                proto.state = State.INIT;
+                proto.name = "REEF";
+                proto.outstanding_container_requests = 0;
+                return proto;
+            }
+
+            set
+            {
+                _runtimeStatusProto = value;
+            }
+        }
+
+        public void Submit(IEvaluatorRequest request)
+        {
+            LOGGER.Log(Level.Info, "Got an EvaluatorRequest");
+            ResourceRequestProto proto = new ResourceRequestProto();
+            //TODO: request.size deprecated should use megabytes instead
+            //switch (request.Size)
+            //{
+            //        case EvaluatorRequest.EvaluatorSize.SMALL:
+            //        proto.resource_size = SIZE.SMALL;
+            //        break;
+            //        case EvaluatorRequest.EvaluatorSize.MEDIUM:
+            //        proto.resource_size = SIZE.MEDIUM;
+            //        break;
+            //        case EvaluatorRequest.EvaluatorSize.LARGE:
+            //        proto.resource_size = SIZE.LARGE;
+            //        break;
+            //        case EvaluatorRequest.EvaluatorSize.XLARGE:
+            //        proto.resource_size = SIZE.XLARGE;
+            //        break;
+            //    default:
+            //        throw new InvalidOperationException("invalid request size" + request.Size);
+            //}
+            proto.resource_count = request.Number;
+            if (request.MemoryMegaBytes > 0)
+            {
+                proto.memory_size = request.MemoryMegaBytes;
+            }
+
+            //final ResourceCatalog.Descriptor descriptor = req.getDescriptor();
+            //if (descriptor != null) {
+            //  if (descriptor instanceof RackDescriptor) {
+            //    request.addRackName(descriptor.getName());
+            //  } else if (descriptor instanceof NodeDescriptor) {
+            //    request.addNodeName(descriptor.getName());
+            //  }
+            //}
+
+            //_futureResourceRequestHandler.Get().OnNext(proto);
+        }
+
+        public void Release(EvaluatorManager evaluatorManager)
+        {
+            lock (this)
+            {
+                string evaluatorManagerId = evaluatorManager.Id;
+                if (_evaluators.ContainsKey(evaluatorManagerId))
+                {
+                    _evaluators.Remove(evaluatorManagerId);
+                }
+                else
+                {
+                    var e = new InvalidOperationException("Trying to remove an unknown evaluator manager with id " + evaluatorManagerId);
+                    Exceptions.Throw(e, LOGGER);
+                }
+            }
+        }
+
+        /// <summary>
+        /// This handles runtime error occurs on the evaluator
+        /// </summary>
+        /// <param name="runtimeErrorProto"></param>
+        public void Handle(RuntimeErrorProto runtimeErrorProto)
+        {
+            FailedRuntime error = new FailedRuntime(runtimeErrorProto);
+            LOGGER.Log(Level.Warning, "Runtime error:" + error);
+
+            EvaluatorException evaluatorException = error.Cause != null
+                ? new EvaluatorException(error.Id, error.Cause.Value)
+                : new EvaluatorException(error.Id, "Runtime error");
+            EvaluatorManager evaluatorManager = null;
+            lock (_evaluators)
+            {
+                if (_evaluators.ContainsKey(error.Id))
+                {
+                    evaluatorManager = _evaluators[error.Id];
+                }
+                else
+                {
+                    LOGGER.Log(Level.Warning, "Unknown evaluator runtime error: " + error.Cause);
+                }
+            }
+            if (null != evaluatorManager)
+            {
+                evaluatorManager.Handle(evaluatorException);
+            }
+        }
+
+        /// <summary>
+        /// A RuntimeStatusProto comes from the ResourceManager layer indicating its current status
+        /// </summary>
+        /// <param name="runtimeStatusProto"></param>
+        public void OnNext(RuntimeStatusProto runtimeStatusProto)
+        {
+            Handle(runtimeStatusProto);
+        }
+
+        /// <summary>
+        /// A ResourceStatusProto message comes from the ResourceManager layer to indicate what it thinks
+        /// about the current state of a given resource. Ideally, we should think the same thing.
+        /// </summary>
+        /// <param name="resourceStatusProto"></param>
+        public void OnNext(ResourceStatusProto resourceStatusProto)
+        {
+            Handle(resourceStatusProto);
+        }
+
+        /// <summary>
+        /// A ResourceAllocationProto indicates a resource allocation given by the ResourceManager layer.
+        /// </summary>
+        /// <param name="resourceAllocationProto"></param>
+        public void OnNext(ResourceAllocationProto resourceAllocationProto)
+        {
+            Handle(resourceAllocationProto);
+        }
+
+        /// <summary>
+        ///  A NodeDescriptorProto defines a new node in the cluster. We should add this to the resource catalog
+        /// so that clients can make resource requests against it.
+        /// </summary>
+        /// <param name="nodeDescriptorProto"></param>
+        public void OnNext(NodeDescriptorProto nodeDescriptorProto)
+        {
+            _resourceCatalog.Handle(nodeDescriptorProto);
+        }
+
+        /// <summary>
+        /// This EventHandler is subscribed to the StartTime event of the Clock statically. It therefore provides the entrance
+        /// point to REEF.
+        /// </summary>
+        /// <param name="runtimeStart"></param>
+        public void OnNext(RuntimeStart runtimeStart)
+        {
+            LOGGER.Log(Level.Info, "RuntimeStart: " + runtimeStart);
+            _runtimeStatusProto = new RuntimeStatusProto();
+            _runtimeStatusProto.state = State.RUNNING;
+            _runtimeStatusProto.name = "REEF";
+            _runtimeStatusProto.outstanding_container_requests = 0;
+        }
+
+        /// <summary>
+        /// Handles RuntimeStop
+        /// </summary>
+        /// <param name="runtimeStop"></param>
+        public void OnNext(RuntimeStop runtimeStop)
+        {
+            LOGGER.Log(Level.Info, "RuntimeStop: " + runtimeStop);
+            if (runtimeStop.Exception != null)
+            {
+                string exceptionMessage = runtimeStop.Exception.Message;
+                LOGGER.Log(Level.Warning, "Sending runtime error:" + exceptionMessage);
+                RuntimeErrorProto runtimeErrorProto = new RuntimeErrorProto();
+                runtimeErrorProto.message = exceptionMessage;
+                runtimeErrorProto.exception = ByteUtilities.StringToByteArrays(exceptionMessage);
+                runtimeErrorProto.name = "REEF";
+                _runtimeErrorHandler.OnNext(runtimeErrorProto);
+
+                LOGGER.Log(Level.Warning, "DONE Sending runtime error: " + exceptionMessage);
+            }
+
+            lock (_evaluators)
+            {
+                foreach (EvaluatorManager evaluatorManager in _evaluators.Values)
+                {
+                    LOGGER.Log(Level.Warning, "Unclean shutdown of evaluator: " + evaluatorManager.Id);
+                    evaluatorManager.Dispose();
+                }
+            }
+
+            try
+            {
+                _heartbeatConnectionChannel.Dispose();
+                _errorChannel.Dispose();
+                Optional<Exception> e = runtimeStop.Exception != null ?
+                    Optional<Exception>.Of(runtimeStop.Exception) : Optional<Exception>.Empty();
+                _clientJobStatusHandler.Dispose(e);
+
+                LOGGER.Log(Level.Info, "driver manager closed");
+            }
+            catch (Exception e)
+            {
+                Exceptions.Caught(e, Level.Error, "Error disposing Driver manager", LOGGER);
+                Exceptions.Throw(new InvalidOperationException("Cannot dispose driver manager"), LOGGER);
+            }
+        }
+
+        public void OnNext(IdleClock value)
+        {
+            string message = string.Format(
+                CultureInfo.InvariantCulture,
+                "IdleClock: [{0}], RuntimeState [{1}], Outstanding container requests [{2}], Container allocation count[{3}]",
+                value + Environment.NewLine,
+                _runtimeStatusProto.state + Environment.NewLine,
+                _runtimeStatusProto.outstanding_container_requests + Environment.NewLine,
+                _runtimeStatusProto.container_allocation.Count);
+            LOGGER.Log(Level.Info, message);
+
+            lock (_evaluators)
+            {
+                if (_runtimeStatusProto.state == State.RUNNING
+                    && _runtimeStatusProto.outstanding_container_requests == 0
+                    && _runtimeStatusProto.container_allocation.Count == 0)
+                {
+                    LOGGER.Log(Level.Info, "Idle runtime shutdown");
+                    _clockFuture.Get().Dispose();
+                }
+            }
+        }
+
+        void IObserver<IdleClock>.OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<IdleClock>.OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<RuntimeStop>.OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<RuntimeStop>.OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<RuntimeStart>.OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<RuntimeStart>.OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<NodeDescriptorProto>.OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<NodeDescriptorProto>.OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<ResourceAllocationProto>.OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<ResourceAllocationProto>.OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<ResourceStatusProto>.OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<ResourceStatusProto>.OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<RuntimeStatusProto>.OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        void IObserver<RuntimeStatusProto>.OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        /// <summary>
+        /// Something went wrong at the runtime layer (either driver or evaluator). This
+        /// method simply forwards the RuntimeErrorProto to the client via the RuntimeErrorHandler.
+        /// </summary>
+        /// <param name="runtimeErrorProto"></param>
+        private void Fail(RuntimeErrorProto runtimeErrorProto)
+        {
+            _runtimeErrorHandler.OnNext(runtimeErrorProto);
+            _clockFuture.Get().Dispose();
+        }
+
+        /// <summary>
+        ///  Helper method to create a new EvaluatorManager instance
+        /// </summary>
+        /// <param name="id">identifier of the Evaluator</param>
+        /// <param name="descriptor"> NodeDescriptor on which the Evaluator executes.</param>
+        /// <returns>new EvaluatorManager instance.</returns>
+        private EvaluatorManager GetNewEvaluatorManagerInstance(string id, EvaluatorDescriptorImpl descriptor)
+        {
+            LOGGER.Log(Level.Info, "Creating Evaluator Manager: " + id);
+            //TODO bindVolatieParameter
+            return (EvaluatorManager)_injector.GetInstance(typeof(EvaluatorManager));
+        }
+
+        /// <summary>
+        ///  Receives and routes heartbeats from Evaluators.
+        /// </summary>
+        /// <param name="evaluatorHearBeatProto"></param>
+        private void Handle(IRemoteMessage<EvaluatorHeartbeatProto> evaluatorHearBeatProto)
+        {
+            EvaluatorHeartbeatProto heartbeat = evaluatorHearBeatProto.Message;
+            EvaluatorStatusProto status = heartbeat.evaluator_status;
+            string evaluatorId = status.evaluator_id;
+            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Heartbeat from Evaluator {0} with state {1} timestamp {2}", evaluatorId, status.state, heartbeat.timestamp));
+            _sanityChecker.check(evaluatorId, heartbeat.timestamp);
+
+            lock (_evaluators)
+            {
+                if (_evaluators.ContainsKey(evaluatorId))
+                {
+                    EvaluatorManager evaluatorManager = _evaluators[evaluatorId];
+                    evaluatorManager.Handle(evaluatorHearBeatProto);
+                }
+                else
+                {
+                    string msg = "Contact from unkonwn evaluator with id: " + evaluatorId;
+                    if (heartbeat.evaluator_status != null)
+                    {
+                        msg += " with state" + status.state;
+                    }
+                    LOGGER.Log(Level.Error, msg);
+                    Exceptions.Throw(new InvalidOperationException(msg), LOGGER);
+                }
+            }            
+        }
+
+        /// <summary>
+        /// This resource status message comes from the ResourceManager layer; telling me what it thinks
+        /// about the state of the resource executing an Evaluator; This method simply passes the message
+        /// off to the referenced EvaluatorManager
+        /// </summary>
+        /// <param name="resourceStatusProto"></param>
+        private void Handle(ResourceStatusProto resourceStatusProto)
+        {
+            lock (_evaluators)
+            {
+                if (_evaluators.ContainsKey(resourceStatusProto.identifier))
+                {
+                    EvaluatorManager evaluatorManager = _evaluators[resourceStatusProto.identifier];
+                    evaluatorManager.Handle(resourceStatusProto);
+                }
+                else
+                {
+                    var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown resource status from evaluator {0} with state {1}", resourceStatusProto.identifier, resourceStatusProto.state));
+                    Exceptions.Throw(e, LOGGER);
+                }
+            }
+        }
+
+        /// <summary>
+        ///  This method handles resource allocations by creating a new EvaluatorManager instance.
+        /// </summary>
+        /// <param name="resourceAllocationProto"></param>
+        private void Handle(ResourceAllocationProto resourceAllocationProto)
+        {
+            lock (_evaluators)
+            {
+                try
+                {
+                    INodeDescriptor nodeDescriptor = _resourceCatalog.GetNode(resourceAllocationProto.node_id);
+                    if (nodeDescriptor == null)
+                    {
+                        Exceptions.Throw(new InvalidOperationException("Unknown resurce: " + resourceAllocationProto.node_id), LOGGER);
+                    }
+                    EvaluatorDescriptorImpl evaluatorDescriptor = new EvaluatorDescriptorImpl(nodeDescriptor, EvaluatorType.UNDECIDED, resourceAllocationProto.resource_memory, resourceAllocationProto.virtual_cores);
+                    LOGGER.Log(Level.Info, "Resource allocation: new evaluator id: " + resourceAllocationProto.identifier);
+                    EvaluatorManager evaluatorManager = GetNewEvaluatorManagerInstance(resourceAllocationProto.identifier, evaluatorDescriptor);
+                    _evaluators.Add(resourceAllocationProto.identifier, evaluatorManager);
+                }
+                catch (Exception e)
+                {
+                    Exceptions.Caught(e, Level.Error, LOGGER);
+                    Exceptions.Throw(new InvalidOperationException("Error handling resourceAllocationProto."), LOGGER);
+                }
+            }
+        }
+
+        private void Handle(RuntimeStatusProto runtimeStatusProto)
+        {
+            State runtimeState = runtimeStatusProto.state;
+            LOGGER.Log(Level.Info, "Runtime status: " + runtimeStatusProto.state);
+
+            switch (runtimeState)
+            {
+                case State.FAILED:
+                    Fail(runtimeStatusProto.error);
+                    break;
+                case State.DONE:
+                    _clockFuture.Get().Dispose();
+                    break;
+                case State.RUNNING:
+                    lock (_evaluators)
+                    {
+                        _runtimeStatusProto = runtimeStatusProto;
+                        if (_clockFuture.Get().IsIdle()
+                            && runtimeStatusProto.outstanding_container_requests == 0
+                            && runtimeStatusProto.container_allocation.Count == 0)
+                        {
+                            _clockFuture.Get().Dispose();
+                        }
+                    }
+                    break;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/DriverRuntimeConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/DriverRuntimeConfiguration.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverRuntimeConfiguration.cs
new file mode 100644
index 0000000..a3dff0e
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverRuntimeConfiguration.cs
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common;
+using Org.Apache.Reef.Common.Api;
+using Org.Apache.Reef.Common.Catalog;
+using Org.Apache.Reef.Common.Client;
+using Org.Apache.Reef.Common.Evaluator;
+using Org.Apache.Reef.Driver.Evaluator;
+using Org.Apache.Reef.Tang.Formats;
+using Org.Apache.Reef.Tang.Util;
+
+namespace Org.Apache.Reef.Driver
+{
+    public class DriverRuntimeConfiguration : ConfigurationModuleBuilder
+    {
+        public static ConfigurationModule ConfigurationModule
+        {
+            get
+            {
+                return new DriverRuntimeConfiguration()
+                // Resource Catalog
+                .BindImplementation(GenericType<IResourceCatalog>.Class, GenericType<ResourceCatalogImpl>.Class)
+
+                // JobMessageObserver
+                //.BindImplementation(GenericType<IEvaluatorRequestor>.Class, GenericType<DriverManager>.Class)
+                .BindImplementation(GenericType<IJobMessageObserver>.Class, GenericType<ClientJobStatusHandler>.Class)
+
+                // JobMessageObserver Wake event handler bindings
+                .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobMessageHandler>.Class, GenericType<ClientJobStatusHandler>.Class)
+                .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobExceptionHandler>.Class, GenericType<ClientJobStatusHandler>.Class)
+
+                // Client manager
+                .BindNamedParameter(GenericType<DriverRuntimeConfigurationOptions.JobControlHandler>.Class, GenericType<ClientManager>.Class)
+
+                // Bind the runtime parameters
+                //.BindNamedParameter(GenericType<RuntimeParameters.NodeDescriptorHandler>.Class, GenericType<DriverManager>.Class)
+                //.BindNamedParameter(GenericType<RuntimeParameters.ResourceAllocationHandler>.Class, GenericType<DriverManager>.Class)
+                //.BindNamedParameter(GenericType<RuntimeParameters.ResourceStatusHandler>.Class, GenericType<DriverManager>.Class)
+                //.BindNamedParameter(GenericType<RuntimeParameters.RuntimeStatusHandler>.Class, GenericType<DriverManager>.Class)
+
+                // Bind to the Clock
+                //.BindSetEntry(GenericType<IClock.RuntimeStopHandler>.Class, GenericType<DriverManager>.Class)
+                .Build();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/DriverRuntimeConfigurationOptions.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/DriverRuntimeConfigurationOptions.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverRuntimeConfigurationOptions.cs
new file mode 100644
index 0000000..5de7856
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverRuntimeConfigurationOptions.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common;
+using Org.Apache.Reef.Tang.Annotations;
+
+namespace Org.Apache.Reef.Driver
+{
+    public class DriverRuntimeConfigurationOptions
+    {
+        [NamedParameter(documentation: "Job message handler (see ClientJobStatusHandler)")]
+        public class JobMessageHandler : Name<ClientJobStatusHandler>
+        {
+        }
+
+        [NamedParameter(documentation: "Job exception handler (see ClientJobStatusHandler)")]
+        public class JobExceptionHandler : Name<ClientJobStatusHandler>
+        {
+        }
+
+        [NamedParameter(documentation: "Called when a job control message is received by the client.")]
+        public class JobControlHandler : Name<ClientManager>
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/DriverSubmissionSettings.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/DriverSubmissionSettings.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverSubmissionSettings.cs
new file mode 100644
index 0000000..3bdaf6b
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefDriver/DriverSubmissionSettings.cs
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Org.Apache.Reef.Utilities.Logging;
+
+namespace Org.Apache.Reef.Driver
+{
+    // TODO: merge with EvaluatorConfigurations class
+    public class DriverSubmissionSettings
+    {
+        // default to "ReefDevClrBridge"
+        private string _driverIdentifier;
+
+        // default to _defaultSubmissionDirectory is not provided
+        private string _submissionDirectory;
+
+        // deault to 512MB if no value is provided
+        private int _driverMemory = 0;
+
+        // default value, client wait till driver exit
+        private int _clientWaitTime = -1;
+
+        // default to submit to driver with driver config
+        private bool _submit = true;
+
+        // default to always update jar before submission
+        private bool _updateJar = true;
+
+        // default to run on local
+        private bool _runOnYarn;
+
+        // default to set to info logging
+        private JavaLoggingSetting _javaLogLevel = JavaLoggingSetting.INFO;
+
+        /// <summary>
+        /// Whether to update jar file with needed dlls before submission
+        /// User can choose to reduce startup time by skipping the update, if jar file already contains all necessary dll
+        /// Note this settig is .NET only, it does not propagate to java side.
+        /// </summary>
+        public bool UpdateJarBeforeSubmission 
+        {
+            get { return _updateJar; }
+            set { _updateJar = value; }
+        }
+
+        /// <summary>
+        /// Determine the vebosity of Java driver's log.
+        /// Note this parameter is used when launching java process only, it does not propagate to java side.
+        /// </summary>
+        public JavaLoggingSetting JavaLogLevel
+        {
+            get { return _javaLogLevel; }
+            set { _javaLogLevel = value; }
+        }
+
+        /// <summary>
+        /// Memory allocated for driver, default to 512 MB
+        /// </summary>
+        public int DriverMemory
+        {
+            get
+            {
+                return _driverMemory;
+            }
+
+            set
+            {
+                if (value < 0)
+                {
+                    throw new ArgumentException("driver memory cannot be negatvie value.");
+                }
+                _driverMemory = value;
+            }
+        }
+
+        /// <summary>
+        /// Driver Identifier, default to "ReefDevClrBridge" 
+        /// </summary>
+        public string DriverIdentifier
+        {
+            get
+            {
+                return _driverIdentifier;
+            }
+
+            set
+            {
+                _driverIdentifier = value;
+            }
+        }
+
+        /// <summary>
+        /// Whether to submit driver with config after driver configuration is construted, default to True
+        /// </summary>
+        public bool Submit
+        {
+            get
+            {
+                return _submit;
+            }
+
+            set
+            {
+                _submit = value;
+            }
+        }
+
+        /// <summary>
+        /// How long client would wait for Driver, default to wait till  driver is done
+        /// </summary>
+        public int ClientWaitTime
+        {
+            get
+            {
+                return _clientWaitTime;
+            }
+
+            set
+            {
+                _clientWaitTime = value;
+            }
+        }
+
+        /// <summary>
+        /// Driver job submission directory in (H)DFS where jar file shall be uploaded, default to a tmp directory with GUID name
+        /// If set by CLR user, the user must guarantee the uniquness of the directory across multiple jobs
+        /// </summary>
+        public string SubmissionDirectory
+        {
+            get
+            {
+                return _submissionDirectory;
+            }
+
+            set
+            {
+                _submissionDirectory = value;
+            }
+        }
+
+        /// <summary>
+        /// Whether to Run on YARN runtime, default to false
+        /// </summary>
+        public bool RunOnYarn
+        {
+            get
+            {
+                return _runOnYarn;
+            }
+
+            set
+            {
+                _runOnYarn = value;
+            }
+        }
+
+        public string ToComamndLineArguments()
+        {
+            return
+                (RunOnYarn ? " -local false" : string.Empty) +
+                (!Submit ? " -submit false" : string.Empty) +
+                (DriverMemory > 0 ? " -driver_memory " + DriverMemory : string.Empty) +
+                (!string.IsNullOrWhiteSpace(DriverIdentifier) ? " -drive_id " + DriverIdentifier : string.Empty) +
+                (ClientWaitTime > 0 ? " -wait_time " + ClientWaitTime : string.Empty) +
+                (!string.IsNullOrWhiteSpace(SubmissionDirectory) ? " -submission_directory " + SubmissionDirectory : string.Empty);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/EvaluatorManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/EvaluatorManager.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/EvaluatorManager.cs
new file mode 100644
index 0000000..5dc1ce7
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefDriver/EvaluatorManager.cs
@@ -0,0 +1,655 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.Api;
+using Org.Apache.Reef.Common.Catalog;
+using Org.Apache.Reef.Common.Evaluator;
+using Org.Apache.Reef.Common.Exceptions;
+using Org.Apache.Reef.Common.ProtoBuf.DriverRuntimeProto;
+using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto;
+using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto;
+using Org.Apache.Reef.Driver.Bridge;
+using Org.Apache.Reef.Driver.Context;
+using Org.Apache.Reef.Driver.Evaluator;
+using Org.Apache.Reef.Driver.Task;
+using Org.Apache.Reef.Utilities;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Wake.Remote;
+using Org.Apache.Reef.Wake.Time;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Linq;
+using System.Text;
+
+using TaskMessage = Org.Apache.Reef.Tasks.TaskMessage;
+
+namespace Org.Apache.Reef.Driver
+{
+    /// <summary>
+    /// Manages a single Evaluator instance including all lifecycle instances:
+    /// (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator).
+    /// A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager.
+    /// The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this heartbeat channel.
+    /// A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime.
+    /// The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate
+    /// control information (e.g., shutdown, suspend).* Manages a single Evaluator instance including all lifecycle instances:
+    /// (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator).
+    /// A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager.
+    /// The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this heartbeat channel.
+    /// A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime.
+    /// The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate control information (e.g., shutdown, suspend).
+    /// </summary>
+    public class EvaluatorManager : IDisposable, IIdentifiable
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorManager));
+        
+        private STATE _state = STATE.ALLOCATED;
+
+        private IClock _clock;
+
+        // TODO
+        //  private final RemoteManager remoteManager;
+        private DriverManager _driverManager;
+
+        private IResourceReleaseHandler _resourceReleaseHandler;
+
+        private IResourceLaunchHandler _resourceLaunchHandler;
+
+        private EvaluatorDescriptorImpl _evaluatorDescriptor;
+
+        private string _evaluatorId;
+
+        private IList<EvaluatorContext> _activeContexts = new List<EvaluatorContext>();
+
+        private HashSet<string> _activeContextIds = new HashSet<string>();
+
+        private IRunningTask _runningTask = null;
+
+        private IObserver<EvaluatorControlProto> _evaluatorControlHandler = null;
+
+        private bool _isResourceReleased = false;
+
+        //TODO
+        //private final DispatchingEStage dispatcher;
+        private EvaluatorType _type = EvaluatorType.CLR;
+
+        public EvaluatorManager(
+            IClock clock,
+            //RemoteManager remoteManager,
+            DriverManager driverManager,
+            IResourceReleaseHandler resourceReleaseHandler,
+            IResourceLaunchHandler resourceLaunchHandler,
+            //REEFErrorHandler errorHandler,
+            string evaluatorId,
+            EvaluatorDescriptorImpl evaluatorDescriptor,
+            ISet<IObservable<IActiveContext>> activeContextEventHandler,
+            ISet<IObservable<IClosedContext>> closedContextEventHandlers,
+            ISet<IObservable<FailedContext>> failedContextEventHandlers,
+            ISet<IObservable<ContextMessage>> contextMessageHandlers,
+            ISet<IObservable<IRunningTask>> runningTaskEventHandlers,
+            ISet<IObservable<ICompletedTask>> completedTaskEventHandlers,
+            ISet<IObservable<ISuspendedTask>> suspendedTaskEventHandlers,
+            ISet<IObservable<TaskMessage>> taskMessageEventHandlers,
+            ISet<IObservable<FailedTask>> taskExceptionEventHandlers,
+            ISet<IObservable<IAllocatedEvaluator>> allocatedEvaluatorEventHandlers,
+            ISet<IObservable<IFailedEvaluator>> failedEvaluatorHandlers,
+            ISet<IObservable<ICompletedEvaluator>> completedEvaluatorHandlers)
+        {
+            _clock = clock;
+            //_remoteManager = remoteManager;
+            _driverManager = driverManager;
+            _resourceReleaseHandler = resourceReleaseHandler;
+            _resourceLaunchHandler = resourceLaunchHandler;
+            _evaluatorId = evaluatorId;
+            _evaluatorDescriptor = evaluatorDescriptor;
+
+            //this.dispatcher = new DispatchingEStage(errorHandler, 16); // 16 threads
+
+            //this.dispatcher.register(ActiveContext.class, activeContextEventHandlers);
+            //this.dispatcher.register(ClosedContext.class, closedContextEventHandlers);
+            //this.dispatcher.register(FailedContext.class, failedContextEventHandlers);
+            //this.dispatcher.register(ContextMessage.class, contextMessageHandlers);
+
+            //this.dispatcher.register(RunningTask.class, runningTaskEventHandlers);
+            //this.dispatcher.register(CompletedTask.class, completedTaskEventHandlers);
+            //this.dispatcher.register(SuspendedTask.class, suspendedTaskEventHandlers);
+            //this.dispatcher.register(TaskMessage.class, taskMessageEventHandlers);
+            //this.dispatcher.register(FailedTask.class, taskExceptionEventHandlers);
+
+            //this.dispatcher.register(FailedEvaluator.class, failedEvaluatorHandlers);
+            //this.dispatcher.register(CompletedEvaluator.class, completedEvaluatorHandlers);
+            //this.dispatcher.register(AllocatedEvaluator.class, allocatedEvaluatorEventHandlers);
+
+            //this.dispatcher.onNext(AllocatedEvaluator.class,
+            //    new AllocatedEvaluatorImpl(this, remoteManager.getMyIdentifier()));
+        }
+
+        /// <summary>
+        /// Various states that the EvaluatorManager could be in. The EvaluatorManager is created when a resource has been allocated by the ResourceManager.
+        /// </summary>
+        public enum STATE
+        {
+            ALLOCATED,  // initial state
+            SUBMITTED,  // client called AllocatedEvaluator.submitTask() and we're waiting for first contact
+            RUNNING,    // first contact received, all communication channels established, Evaluator sent to client.
+            DONE,       // clean shutdown
+            FAILED,     // some failure occurred.
+            KILLED      // unclean shutdown
+        }
+
+        public IEvaluatorDescriptor EvaluatorDescriptor
+        {
+            get
+            {
+                return _evaluatorDescriptor;
+            }
+        }
+
+        public INodeDescriptor NodeDescriptor
+        {
+            get
+            {
+                return EvaluatorDescriptor.NodeDescriptor;
+            }
+        }
+
+        public IRunningTask RunningTask
+        {
+            get
+            {
+                lock (_evaluatorDescriptor)
+                {
+                    return _runningTask;
+                }
+            }
+        }
+
+        public string Id
+        {
+            get
+            {
+                return _evaluatorId;
+            }
+
+            set
+            {
+            }
+        }
+
+        public EvaluatorType Type
+        {
+            get
+            {
+                return _type;
+            }
+
+            set
+            {
+                _type = value;
+                _evaluatorDescriptor.EvaluatorType = value;
+            }
+        }
+
+        public void Dispose()
+        {
+            lock (_evaluatorDescriptor)
+            {
+                if (_state == STATE.RUNNING)
+                {
+                    LOGGER.Log(Level.Warning, "Dirty shutdown of running evaluator :" + Id);
+                    try
+                    {
+                        // Killing the evaluator means that it doesn't need to send a confirmation; it just dies.
+                        EvaluatorControlProto proto = new EvaluatorControlProto();
+                        proto.timestamp = DateTime.Now.Ticks;
+                        proto.identifier = Id;
+                        proto.kill_evaluator = new KillEvaluatorProto();
+                        Handle(proto);
+                    }
+                    finally
+                    {
+                        _state = STATE.KILLED;
+                    }
+                }
+            }
+
+            if (!_isResourceReleased)
+            {
+                try
+                {
+                    // We need to wait awhile before returning the container to the RM in order to
+                    // give the EvaluatorRuntime (and Launcher) time to cleanly exit. 
+
+                    // this.clock.scheduleAlarm(100, new EventHandler<Alarm>() {
+                    //@Override
+                    //public void onNext(final Alarm alarm) {
+                    //  EvaluatorManager.this.resourceReleaseHandler.onNext(
+                    //      DriverRuntimeProtocol.ResourceReleaseProto.newBuilder()
+                    //          .setIdentifier(EvaluatorManager.this.evaluatorId).build());
+                }
+                catch (Exception e)
+                {
+                    Exceptions.Caught(e, Level.Warning, "Force resource release because the client closed the clock.", LOGGER);
+                    ResourceReleaseProto proto = new ResourceReleaseProto();
+                    proto.identifier = _evaluatorId;
+                    _resourceReleaseHandler.OnNext(proto);
+                }
+                finally
+                {
+                    _isResourceReleased = true;
+                    _driverManager.Release(this);
+                }
+            }
+        }
+
+        /// <summary>
+        /// EvaluatorException will trigger is FailedEvaluator and state transition to FAILED
+        /// </summary>
+        /// <param name="exception"></param>
+        public void Handle(EvaluatorException exception)
+        {
+            lock (_evaluatorDescriptor)
+            {
+                if (_state >= STATE.DONE)
+                {
+                    return;
+                } 
+                LOGGER.Log(Level.Warning, "Failed Evaluator: " + Id + exception.Message);
+                try
+                {
+                    IList<FailedContext> failedContexts = new List<FailedContext>();
+                    IList<EvaluatorContext> activeContexts = new List<EvaluatorContext>(_activeContexts);
+                    activeContexts = activeContexts.Reverse().ToList();
+                    foreach (EvaluatorContext context in activeContexts)
+                    {
+                        Optional<IActiveContext> parentContext = context.ParentId.IsPresent()
+                            ? Optional<IActiveContext>.Of(GetEvaluatorContext(context.ParentId.Value))
+                            : Optional<IActiveContext>.Empty();
+                        failedContexts.Add(context.GetFailedContext(parentContext, exception));
+                    }
+
+                    //Optional<FailedTask> failedTask = _runningTask != null ? 
+                    //    Optional<FailedTask>.Of(new FailedTask(_runningTask.Id, exception)) : Optional<FailedTask>.Empty();
+                    //LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + failedTask.ToString());
+                    //this.dispatcher.onNext(FailedEvaluator.class, new FailedEvaluatorImpl(
+                    //exception, failedContextList, failedTaskOptional, this.evaluatorId));
+                }
+                catch (Exception e)
+                {
+                    Exceptions.CaughtAndThrow(e, Level.Error, "Exception while handling FailedEvaluator.", LOGGER);
+                }
+                finally
+                {
+                    _state = STATE.FAILED; 
+                    Dispose();
+                }
+            }
+        }
+
+        public void Handle(IRemoteMessage<EvaluatorHeartbeatProto> evaluatorHearBeatProtoMessage)
+        {
+            lock (_evaluatorDescriptor)
+            {
+                EvaluatorHeartbeatProto heartbeatProto = evaluatorHearBeatProtoMessage.Message;
+                if (heartbeatProto.evaluator_status != null)
+                {
+                    EvaluatorStatusProto status = heartbeatProto.evaluator_status;
+                    if (status.error != null)
+                    {
+                        Handle(new EvaluatorException(Id, ByteUtilities.ByteArrarysToString(status.error)));
+                        return;
+                    }
+                    else if (_state == STATE.SUBMITTED)
+                    {
+                        string evaluatorRId = evaluatorHearBeatProtoMessage.Identifier.ToString();
+                        LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorRId);
+                        // TODO
+                        // _evaluatorControlHandler = _remoteManager.getHandler(evaluatorRID, EvaluatorRuntimeProtocol.EvaluatorControlProto.class);
+                        _state = STATE.RUNNING;
+                        LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} is running", _evaluatorId));
+                    }
+                }
+
+                LOGGER.Log(Level.Info, "Evaluator heartbeat: " + heartbeatProto);
+
+                EvaluatorStatusProto evaluatorStatusProto = heartbeatProto.evaluator_status;
+                foreach (ContextStatusProto contextStatusProto in heartbeatProto.context_status)
+                {
+                    Handle(contextStatusProto, heartbeatProto.task_status != null);
+                }
+
+                if (heartbeatProto.task_status != null)
+                {
+                    Handle(heartbeatProto.task_status);
+                }
+
+                if (evaluatorStatusProto.state == State.FAILED)
+                {
+                    _state = STATE.FAILED;
+                    EvaluatorException e = evaluatorStatusProto.error != null ?
+                        new EvaluatorException(_evaluatorId, ByteUtilities.ByteArrarysToString(evaluatorStatusProto.error)) :
+                        new EvaluatorException(_evaluatorId, "unknown cause");
+                    LOGGER.Log(Level.Warning, "Failed evaluator: " + Id + e.Message);
+                    Handle(e);
+                }
+                else if (evaluatorStatusProto.state == State.DONE)
+                {
+                    LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} done", Id));
+                    _state = STATE.DONE;
+
+                  // TODO
+                  // dispatcher.onNext(CompletedEvaluator.class, new CompletedEvaluator() {
+                  //@Override
+                  //public String getId() {
+                  //  return EvaluatorManager.this.evaluatorId;
+                    Dispose();
+                }
+            }
+            LOGGER.Log(Level.Info, "DONE with evaluator heartbeat");
+        }
+
+        public void Handle(ResourceLaunchProto resourceLaunchProto)
+        {
+            lock (_evaluatorDescriptor)
+            {
+                if (_state == STATE.ALLOCATED)
+                {
+                    _state = STATE.SUBMITTED;
+                    _resourceLaunchHandler.OnNext(resourceLaunchProto);
+                }
+                else
+                {
+                    var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Evaluator manager expected {0} state, but instead is in state {1}", STATE.ALLOCATED, _state));
+                    Exceptions.Throw(e, LOGGER);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Packages the TaskControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime
+        /// </summary>
+        /// <param name="contextControlProto"></param>
+        public void Handle(ContextControlProto contextControlProto)
+        {
+            lock (_evaluatorDescriptor)
+            {
+                LOGGER.Log(Level.Info, "Task control message from " + _evaluatorId);
+                EvaluatorControlProto evaluatorControlProto = new EvaluatorControlProto();
+                evaluatorControlProto.timestamp = DateTime.Now.Ticks;
+                evaluatorControlProto.identifier = Id;
+                evaluatorControlProto.context_control = contextControlProto;
+
+                Handle(evaluatorControlProto);
+            }
+        }
+
+        /// <summary>
+        /// Forward the EvaluatorControlProto to the EvaluatorRuntime
+        /// </summary>
+        /// <param name="proto"></param>
+        public void Handle(EvaluatorControlProto proto)
+        {
+            lock (_evaluatorDescriptor)
+            {
+                if (_state == STATE.RUNNING)
+                {
+                    _evaluatorControlHandler.OnNext(proto);
+                }
+                else
+                {
+                    var e = new InvalidOperationException(
+                        string.Format(
+                        CultureInfo.InvariantCulture, 
+                        "Evaluator manager expects to be in {0} state, but instead is in state {1}", 
+                        STATE.RUNNING, 
+                        _state));
+                    Exceptions.Throw(e, LOGGER);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Resource status information from the (actual) resource manager.
+        /// </summary>
+        /// <param name="resourceStatusProto"></param>
+        public void Handle(ResourceStatusProto resourceStatusProto)
+        {
+            lock (_evaluatorDescriptor)
+            {
+                State resourceState = resourceStatusProto.state;
+                LOGGER.Log(Level.Info, "Resource manager state update: " + resourceState);
+
+                if (resourceState == State.DONE || resourceState == State.FAILED)
+                {
+                    if (_state < STATE.DONE)
+                    {
+                        // something is wrong, I think I'm alive but the resource manager runtime says I'm dead
+                        StringBuilder stringBuilder = new StringBuilder();
+                        stringBuilder.Append(
+                            string.Format(
+                                CultureInfo.InvariantCulture,
+                                "The resource manager informed me that Evaluator {0} is in state {1} but I think I am in {2} state",
+                                _evaluatorId,
+                                resourceState,
+                                _state));
+                        if (resourceStatusProto.diagnostics != null)
+                        {
+                            stringBuilder.Append("Cause: " + resourceStatusProto.diagnostics);
+                        }
+                        if (_runningTask != null)
+                        {
+                            stringBuilder.Append(
+                                string.Format(
+                                    CultureInfo.InvariantCulture,
+                                    "Taskruntime {0} did not complete before this evaluator died.",
+                                    _runningTask.Id));
+                        }
+
+                        // RM is telling me its DONE/FAILED - assuming it has already released the resources
+                        _isResourceReleased = true;
+                        //Handle(new EvaluatorException(_evaluatorId, stringBuilder.ToString(), _runningTask));
+                        _state = STATE.KILLED;
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// Handle a context status update
+        /// </summary>
+        /// <param name="contextStatusProto"></param>
+        /// <param name="notifyClientOnNewActiveContext"></param>
+        private void Handle(ContextStatusProto contextStatusProto, bool notifyClientOnNewActiveContext)
+        {
+            string contextId = contextStatusProto.context_id;
+            Optional<string> parentId = contextStatusProto.parent_id != null ?
+                Optional<string>.Of(contextStatusProto.parent_id) : Optional<string>.Empty();
+            if (ContextStatusProto.State.READY == contextStatusProto.context_state)
+            {
+                if (!_activeContextIds.Contains(contextId))
+                {
+                    EvaluatorContext evaluatorContext = new EvaluatorContext(this, contextId, parentId);
+                    AddEvaluatorContext(evaluatorContext);
+                    if (notifyClientOnNewActiveContext)
+                    {
+                        LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext.ToString());
+                        //TODO
+                        //dispatcher.onNext(ActiveContext.class, context);
+                    }
+                }
+                foreach (ContextStatusProto.ContextMessageProto contextMessageProto in contextStatusProto.context_message)
+                {
+                    byte[] message = contextMessageProto.message;
+                    string sourceId = contextMessageProto.source_id;
+                    LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + sourceId + message);
+                    //        this.dispatcher.onNext(ContextMessage.class,
+                    //new ContextMessageImpl(theMessage, contextID, sourceID));
+                }
+            }
+            else
+            {
+                if (!_activeContextIds.Contains(contextId))
+                {
+                    if (ContextStatusProto.State.FAIL == contextStatusProto.context_state)
+                    {
+                        AddEvaluatorContext(new EvaluatorContext(this, contextId, parentId));
+                    }
+                    else
+                    {
+                        var e = new InvalidOperationException("unknown context signaling state " + contextStatusProto.context_state);
+                        Exceptions.Throw(e, LOGGER);
+                    }
+                }
+            }
+
+            EvaluatorContext context = GetEvaluatorContext(contextId);
+            EvaluatorContext parentContext = context.ParentId.IsPresent() ?
+                GetEvaluatorContext(context.ParentId.Value) : null;
+            RemoveEvaluatorContext(context);
+
+            if (ContextStatusProto.State.FAIL == contextStatusProto.context_state)
+            {
+                // TODO
+                Exception reason = new InvalidOperationException(ByteUtilities.ByteArrarysToString(contextStatusProto.error));
+                Optional<IActiveContext> optionalParentContext = (null == parentContext) ?
+                    Optional<IActiveContext>.Empty() : Optional<IActiveContext>.Of(parentContext);
+                LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + reason.ToString() + optionalParentContext);
+                // TODO
+                //this.dispatcher.onNext(FailedContext.class,
+                //context.getFailedContext(optionalParentContext, reason));
+            }
+            else if (ContextStatusProto.State.DONE == contextStatusProto.context_state)
+            {
+                if (null != parentContext)
+                {
+                    // TODO
+                    //this.dispatcher.onNext(ClosedContext.class, context.getClosedContext(parentContext));
+                }
+                else
+                {
+                    LOGGER.Log(Level.Info, "Root context closed. Evaluator closed will trigger final shutdown.");
+                }
+            }
+            else
+            {
+                var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown context state {0} for context {1}", contextStatusProto.context_state, contextId));
+                Exceptions.Throw(e, LOGGER);
+            }
+        }
+
+        /// <summary>
+        /// Handle task status messages.
+        /// </summary>
+        /// <param name="taskStatusProto"></param>
+        private void Handle(TaskStatusProto taskStatusProto)
+        {
+            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Received task {0} status {1}", taskStatusProto.task_id, taskStatusProto.state));
+            string taskId = taskStatusProto.task_id;
+            string contextId = taskStatusProto.context_id;
+            State taskState = taskStatusProto.state;
+
+            if (taskState == State.INIT)
+            {
+                EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId);
+                _runningTask = new RunningTaskImpl(this, taskId, evaluatorContext);
+                // this.dispatcher.onNext(RunningTask.class, this.runningTask);
+            }
+            else if (taskState == State.SUSPEND)
+            {
+                EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId);
+                _runningTask = null;
+                byte[] message = taskStatusProto.result != null ? taskStatusProto.result : null;
+                LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext + message.ToString());
+                //this.dispatcher.onNext(SuspendedTask.class, new SuspendedTaskImpl(evaluatorContext, message, taskId));
+            }
+            else if (taskState == State.DONE)
+            {
+                EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId);
+                _runningTask = null;
+                byte[] message = taskStatusProto.result != null ? taskStatusProto.result : null;
+                LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + evaluatorContext + message.ToString());
+                //this.dispatcher.onNext(CompletedTask.class, new CompletedTaskImpl(evaluatorContext, message, taskId));
+            }
+            else if (taskState == State.FAILED)
+            {
+                _runningTask = null;
+                //EvaluatorContext evaluatorContext = GetEvaluatorContext(contextId);
+                //FailedTask failedTask = taskStatusProto.result != null ?
+                //    new FailedTask(taskId, ByteUtilities.ByteArrarysToString(taskStatusProto.result), Optional<IActiveContext>.Of(evaluatorContext)) :
+                //    new FailedTask(taskId, "Failed task: " + taskState, Optional<IActiveContext>.Of(evaluatorContext));
+                //LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + failedTask.ToString());
+                //this.dispatcher.onNext(FailedTask.class, taskException);
+            }
+            else if (taskStatusProto.task_message.Count > 0)
+            {
+                if (_runningTask != null)
+                {
+                    var e = new InvalidOperationException("runningTask must be null when there are multiple task messages");
+                    Exceptions.Throw(e, LOGGER);
+                }
+                foreach (TaskStatusProto.TaskMessageProto taskMessageProto in taskStatusProto.task_message)
+                {
+                    LOGGER.Log(Level.Info, "TODO: REPLACE THIS " + taskMessageProto.ToString());
+                    //        this.dispatcher.onNext(TaskMessage.class,
+                    //new TaskMessageImpl(taskMessageProto.getMessage().toByteArray(),
+                    //    taskId, contextId, taskMessageProto.getSourceId()));
+                }
+            }
+        }
+
+        private EvaluatorContext GetEvaluatorContext(string id)
+        {
+            foreach (EvaluatorContext context in _activeContexts)
+            {
+                if (context.Id.Equals(id))
+                {
+                    return context;
+                }
+                var e = new InvalidOperationException("Unknown evaluator context with id " + id);
+                Exceptions.Throw(e, LOGGER);
+            }
+            return null;
+        }
+
+        private void AddEvaluatorContext(EvaluatorContext context)
+        {
+            _activeContexts.Add(context);
+            _activeContextIds.Add(context.Id);
+        }
+
+        private void RemoveEvaluatorContext(EvaluatorContext context)
+        {
+            _activeContexts.Remove(context);
+            _activeContextIds.Remove(context.Id);
+        }
+
+        [NamedParameter(documentation: "The Evaluator Identifier.")]
+        public class EvaluatorIdentifier : Name<string>
+        {
+        }
+
+        [NamedParameter(documentation: "The Evaluator Host.")]
+        public class EvaluatorDescriptorName : Name<EvaluatorDescriptorImpl>
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/FailedJob.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/FailedJob.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/FailedJob.cs
new file mode 100644
index 0000000..b0cde11
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefDriver/FailedJob.cs
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Common.Api;
+using Org.Apache.Reef.Utilities;
+using System;
+
+namespace Org.Apache.Reef.Driver
+{
+    /// <summary>
+    /// An error message that REEF Client receives when there is a user error in REEF job.
+    /// </summary>
+    public class FailedJob : AbstractFailure
+    {
+        /// <summary>
+        /// Create an error message given the entity ID and Java Exception. All accessor methods are provided by the base class.
+        /// </summary>
+        /// <param name="id"></param>
+        /// <param name="cause"></param>
+        public FailedJob(string id, Exception cause)
+            : base(id, cause)
+        {
+        }
+
+        public new string Id { get; set; }
+
+        public new string Message { get; set; }
+
+        public new Optional<string> Description { get; set; }
+
+        public new Optional<Exception> Cause { get; set; }
+
+        public new Optional<byte[]> Data { get; set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/IDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/IDriver.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/IDriver.cs
new file mode 100644
index 0000000..b0a37dc
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefDriver/IDriver.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Driver
+{
+    /// <summary>
+    /// empty driver interface to facilitate referencing driver dll
+    /// </summary>
+    public interface IDriver
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefDriver/IStartHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/REEF/reef-common/ReefDriver/IStartHandler.cs b/lang/cs/Source/REEF/reef-common/ReefDriver/IStartHandler.cs
new file mode 100644
index 0000000..64ea1f4
--- /dev/null
+++ b/lang/cs/Source/REEF/reef-common/ReefDriver/IStartHandler.cs
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Driver
+{
+    public interface IStartHandler
+    {
+        string Identifier { get; set; }
+    }
+}
\ No newline at end of file