You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/29 21:42:53 UTC

[09/31] incubator-reef git commit: [REEF-97] Add the REEF.NET code base

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs
new file mode 100644
index 0000000..d5b987a
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Tang.Exceptions;
+using Org.Apache.Reef.Wake.Util;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Represents an open connection between remote hosts
+    /// </summary>
+    public class Link<T> : ILink<T>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(Link<T>));
+
+        private IPEndPoint _localEndpoint;
+        private ICodec<T> _codec;
+        private Channel _channel;
+        private bool _disposed;
+
+        /// <summary>
+        /// Constructs a Link object.
+        /// Connects to the specified remote endpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The remote endpoint to connect to</param>
+        /// <param name="codec">The codec for serializing messages</param>
+        public Link(IPEndPoint remoteEndpoint, ICodec<T> codec)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+            if (codec == null)
+            {
+                throw new ArgumentNullException("codec");
+            }
+
+            Client = new TcpClient();
+            Client.Connect(remoteEndpoint);
+
+            _codec = codec;
+            _channel = new Channel(Client.GetStream());
+            _localEndpoint = GetLocalEndpoint();
+            _disposed = false;
+        }
+
+        /// <summary>
+        /// Constructs a Link object.
+        /// Uses the already connected TcpClient.
+        /// </summary>
+        /// <param name="client">The already connected client</param>
+        /// <param name="codec">The encoder and decoder</param>
+        public Link(TcpClient client, ICodec<T> codec)
+        {
+            if (client == null)
+            {
+                throw new ArgumentNullException("client");
+            }
+            if (codec == null)
+            {
+                throw new ArgumentNullException("codec");
+            }
+
+            Client = client;
+            _codec = codec;
+            _channel = new Channel(Client.GetStream());
+            _localEndpoint = GetLocalEndpoint();
+            _disposed = false;
+        }
+
+        /// <summary>
+        /// Returns the local socket address
+        /// </summary>
+        public IPEndPoint LocalEndpoint
+        {
+            get { return _localEndpoint; }
+        }
+
+        /// <summary>
+        /// Returns the remote socket address
+        /// </summary>
+        public IPEndPoint RemoteEndpoint
+        {
+            get { return (IPEndPoint) Client.Client.RemoteEndPoint; }
+        }
+
+        /// <summary>
+        /// Gets the underlying TcpClient
+        /// </summary>
+        public TcpClient Client { get; private set; }
+
+        /// <summary>
+        /// Writes the message to the remote host
+        /// </summary>
+        /// <param name="value">The data to write</param>
+        public void Write(T value)
+        {
+            if (value == null)
+            {
+                throw new ArgumentNullException("value");    
+            }
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been closed."), LOGGER);
+            }
+
+            byte[] message = _codec.Encode(value);
+            _channel.Write(message);
+        }
+
+        /// <summary>
+        /// Writes the value to this link asynchronously
+        /// </summary>
+        /// <param name="value">The data to write</param>
+        /// <param name="token">The cancellation token</param>
+        public async Task WriteAsync(T value, CancellationToken token)
+        {
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been closed."), LOGGER); 
+            }
+
+            byte[] message = _codec.Encode(value);
+            await _channel.WriteAsync(message, token);
+        }
+
+        /// <summary>
+        /// Reads the value from the link synchronously
+        /// </summary>
+        public T Read()
+        {
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been disposed."), LOGGER);
+            }
+
+            byte[] message = _channel.Read();
+            return (message == null) ? default(T) : _codec.Decode(message);
+        }
+
+        /// <summary>
+        /// Reads the value from the link asynchronously
+        /// </summary>
+        /// <param name="token">The cancellation token</param>
+        public async Task<T> ReadAsync(CancellationToken token)
+        {
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been disposed."), LOGGER);
+            }
+
+            byte[] message = await _channel.ReadAsync(token);
+            return (message == null) ? default(T) : _codec.Decode(message);
+        }
+
+        /// <summary>
+        /// Close the client connection
+        /// </summary>
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        /// <summary>
+        /// Subclasses of Links should overwrite this to handle disposing
+        /// of the link
+        /// </summary>
+        /// <param name="disposing">To dispose or not</param>
+        public virtual void Dispose(bool disposing)
+        {
+            if (_disposed)
+            {
+                return;
+            }
+
+            if (disposing)
+            {
+                try
+                {
+                    Client.GetStream().Close();
+                }
+                catch (InvalidOperationException)
+                {
+                    LOGGER.Log(Level.Warning, "failed to close stream on a non-connected socket.");
+                }
+
+                Client.Close();
+            }
+            _disposed = true;
+        }
+
+        /// <summary>
+        /// Overrides Equals. Two Link objects are equal if they are connected
+        /// to the same remote endpoint.
+        /// </summary>
+        /// <param name="obj">The object to compare</param>
+        /// <returns>True if the object is equal to this Link, otherwise false</returns>
+        public override bool Equals(object obj)
+        {
+            Link<T> other = obj as Link<T>;
+            if (other == null)
+            {
+                return false;
+            }
+
+            return other.RemoteEndpoint.Equals(RemoteEndpoint);
+        }
+
+        /// <summary>
+        /// Gets the hash code for the Link object.
+        /// </summary>
+        /// <returns>The object's hash code</returns>
+        public override int GetHashCode()
+        {
+            return RemoteEndpoint.GetHashCode();
+        }
+
+        /// <summary>
+        /// Discovers the IPEndpoint for the current machine.
+        /// </summary>
+        /// <returns>The local IPEndpoint</returns>
+        private IPEndPoint GetLocalEndpoint()
+        {
+            IPAddress address = NetworkUtils.LocalIPAddress;
+            int port = ((IPEndPoint) Client.Client.LocalEndPoint).Port;
+            return new IPEndPoint(address, port);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs
new file mode 100644
index 0000000..2791eb3
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+using System;
+using System.Collections.Generic;
+using Org.Apache.Reef.Wake.Remote;
+using Org.Apache.Reef.Wake.Remote.Impl;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Codec that can encode and decode a class depending on the class type.
+    /// </summary>
+    public class MultiCodec<T> : ICodec<T>
+    {
+        private readonly MultiEncoder<T> _encoder;
+
+        private readonly MultiDecoder<T> _decoder;
+
+        /// <summary>
+        /// Constructs a new MultiCodec object.
+        /// </summary>
+        public MultiCodec()
+        {
+            _encoder = new MultiEncoder<T>();
+            _decoder = new MultiDecoder<T>();
+        }
+
+        /// <summary>
+        /// Register a codec to be used when encoding/decoding objects of this type.
+        /// </summary>
+        /// <typeparam name="U">The type of codec</typeparam>
+        /// <param name="codec">The codec to use when encoding/decoding
+        /// objects of this type</param>
+        public void Register<U>(ICodec<U> codec) where U : T
+        {
+            _encoder.Register(codec);
+            _decoder.Register(codec);
+        }
+
+        /// <summary>
+        /// Register a codec to be used when encoding/decoding objects of this type.
+        /// </summary>
+        /// <typeparam name="U">The type of codec</typeparam>
+        /// <param name="codec">The codec to use when encoding/decoding
+        /// objects of this type</param>
+        /// <param name="name">The name of the class to encode/decode</param>
+        public void Register<U>(ICodec<U> codec, string name) where U : T
+        {
+            _encoder.Register(codec, name);
+            _decoder.Register(codec, name);
+        }
+
+        /// <summary>
+        /// Encodes an object with the appropriate encoding or null if it cannot
+        /// be encoded.
+        /// </summary>
+        /// <param name="obj">Data to encode</param>
+        public byte[] Encode(T obj)
+        {
+            return _encoder.Encode(obj);
+        }
+
+        /// <summary>
+        /// Decodes byte array into the appripriate object type.
+        /// </summary>
+        /// <param name="data">Data to be decoded</param>
+        public T Decode(byte[] data)
+        {
+            return _decoder.Decode(data);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs
new file mode 100644
index 0000000..789e226
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Wake.Remote;
+using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Decoder using the WakeTuple protocol buffer
+    /// (class name and bytes)
+    /// </summary>
+    public class MultiDecoder<T> : IDecoder<T>
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(MultiDecoder<T>));
+        private Dictionary<Type, object> _decoderMap;
+        private Dictionary<string, Type> _nameMap;
+
+        /// <summary>
+        /// Constructs a decoder that decodes bytes based on the class type
+        /// </summary>
+        public MultiDecoder()
+        {
+            _decoderMap = new Dictionary<Type, object>();
+            _nameMap = new Dictionary<string, Type>();
+        }
+
+        /// <summary>
+        /// Register the decoder for objects of type U
+        /// </summary>
+        /// <typeparam name="U">The type of decoder to use when decoding
+        /// objects of this type</typeparam>
+        /// <param name="decoder">The decoder to use when decoding
+        /// objects of this type</param>
+        public void Register<U>(IDecoder<U> decoder) where U : T
+        {
+            Type type = typeof(U);
+            _decoderMap[type] = decoder;
+            _nameMap[type.ToString()] = type;
+        }
+
+        /// <summary>
+        /// Register the decoder for objects of type U
+        /// </summary>
+        /// <typeparam name="U">The type of decoder to use when decoding
+        /// objects of this type</typeparam>
+        /// <param name="decoder">The decoder to use when decoding
+        /// objects of this type</param>
+        /// <param name="name">The name of the class to decode</param>
+        public void Register<U>(IDecoder<U> decoder, string name) where U : T
+        {
+            Type type = typeof(U);
+            _decoderMap[type] = decoder;
+            _nameMap[name] = type;
+        }
+
+        /// <summary>
+        /// Decodes byte array according to the underlying object type.
+        /// </summary>
+        /// <param name="data">The data to decode</param>
+        public T Decode(byte[] data)
+        {
+            WakeTuplePBuf pbuf = WakeTuplePBuf.Deserialize(data);
+            if (pbuf == null)
+            {
+                return default(T);
+            }
+
+            // Get object's class Type
+            Type type;
+            if (!_nameMap.TryGetValue(pbuf.className, out type))
+            {
+                return default(T);
+            }
+
+            // Get decoder for that type
+            object decoder;
+            if (!_decoderMap.TryGetValue(type, out decoder))
+            {
+                Exceptions.Throw(new RemoteRuntimeException("Decoder for " + type + " not known."), LOGGER);
+            }
+
+            // Invoke the decoder to decode the byte array
+            Type handlerType = typeof(IDecoder<>).MakeGenericType(new[] { type });
+            MethodInfo info = handlerType.GetMethod("Decode");
+            return (T) info.Invoke(decoder, new[] { (object) pbuf.data });
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs
new file mode 100644
index 0000000..cccf52f
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Encoder using the WakeTuple protocol buffer
+    /// (class name and bytes)
+    /// </summary>
+    public class MultiEncoder<T> : IEncoder<T>
+    {
+        private static Logger _logger = Logger.GetLogger(typeof(MultiEncoder<>));
+        private Dictionary<Type, object> _encoderMap;
+        private Dictionary<Type, string> _nameMap;
+
+        /// <summary>
+        /// Constructs an encoder that encodes an object to bytes based on the class name
+        /// </summary>
+        public MultiEncoder()
+        {
+            _encoderMap = new Dictionary<Type, object>();
+            _nameMap = new Dictionary<Type, string>();
+        }
+
+        public void Register<U>(IEncoder<U> encoder) where U : T
+        {
+            _encoderMap[typeof(U)] = encoder;
+            _nameMap[typeof(U)] = typeof(U).ToString();
+        }
+
+        public void Register<U>(IEncoder<U> encoder, string name) where U : T
+        {
+            _encoderMap[typeof(U)] = encoder;
+            _nameMap[typeof(U)] = name;
+            _logger.Log(Level.Verbose, "Registering name for " + name);
+        }
+
+        /// <summary>Encodes an object to a byte array</summary>
+        /// <param name="obj"></param>
+        public byte[] Encode(T obj)
+        {
+            // Find encoder for object type
+            object encoder;
+            if (!_encoderMap.TryGetValue(obj.GetType(), out encoder))
+            {
+                return null;
+            }
+
+            // Invoke encoder for this type
+            Type handlerType = typeof(IEncoder<>).MakeGenericType(new[] { obj.GetType() });
+            MethodInfo info = handlerType.GetMethod("Encode");
+            byte[] data = (byte[]) info.Invoke(encoder, new[] { (object) obj });
+
+            // Serialize object type and object data into well known tuple
+            // To decode, deserialize the tuple, get object type, and look up the
+            // decoder for that type
+            string name = _nameMap[obj.GetType()];
+            _logger.Log(Level.Verbose, "Encoding name for " + name);
+            WakeTuplePBuf pbuf = new WakeTuplePBuf { className = name, data = data };
+            pbuf.className = name;
+            pbuf.data = data; 
+            return pbuf.Serialize();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs
new file mode 100644
index 0000000..577cd95
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using Org.Apache.Reef.Wake.RX.Impl;
+using Org.Apache.Reef.Wake.Util;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Stores registered IObservers for DefaultRemoteManager.
+    /// Can register and look up IObservers by remote IPEndPoint.
+    /// </summary>
+    internal class ObserverContainer<T> : IObserver<TransportEvent<IRemoteEvent<T>>>
+    {
+        private ConcurrentDictionary<IPEndPoint, IObserver<T>> _endpointMap;
+        private ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>> _typeMap;
+        private IObserver<T> _universalObserver;
+
+        /// <summary>
+        /// Constructs a new ObserverContainer used to manage remote IObservers.
+        /// </summary>
+        public ObserverContainer()
+        {
+            _endpointMap = new ConcurrentDictionary<IPEndPoint, IObserver<T>>(new IPEndPointComparer());
+            _typeMap = new ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>>();
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the remote host
+        /// at the specified IPEndPoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
+        /// <param name="observer">The IObserver to handle incoming messages</param>
+        /// <returns>An IDisposable used to unregister the observer with</returns>
+        public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer) 
+        {
+            if (remoteEndpoint.Address.Equals(IPAddress.Any))
+            {
+                _universalObserver = observer;
+                return Disposable.Create(() => { _universalObserver = null; });
+            }
+
+            _endpointMap[remoteEndpoint] = observer;
+            return Disposable.Create(() => _endpointMap.TryRemove(remoteEndpoint, out observer));
+        }
+
+        /// <summary>
+        /// Registers an IObserver to handle incoming messages from a remote host
+        /// </summary>
+        /// <param name="observer">The IObserver to handle incoming messages</param>
+        /// <returns>An IDisposable used to unregister the observer with</returns>
+        public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
+        {
+            _typeMap[typeof(T)] = observer;
+            return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out observer));
+        }
+
+        /// <summary>
+        /// Look up the IObserver for the registered IPEndPoint or event type 
+        /// and execute the IObserver.
+        /// </summary>
+        /// <param name="transportEvent">The incoming remote event</param>
+        public void OnNext(TransportEvent<IRemoteEvent<T>> transportEvent)
+        {
+            IRemoteEvent<T> remoteEvent = transportEvent.Data;
+            remoteEvent.LocalEndPoint = transportEvent.Link.LocalEndpoint;
+            remoteEvent.RemoteEndPoint = transportEvent.Link.RemoteEndpoint;
+            T value = remoteEvent.Value;
+            bool handled = false;
+
+            IObserver<T> observer1;
+            IObserver<IRemoteMessage<T>> observer2;
+            if (_universalObserver != null)
+            {
+                _universalObserver.OnNext(value);
+                handled = true;
+            }
+            if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer1))
+            {
+                // IObserver was registered by IPEndpoint
+                observer1.OnNext(value);
+                handled = true;
+            } 
+            else if (_typeMap.TryGetValue(value.GetType(), out observer2))
+            {
+                // IObserver was registered by event type
+                IRemoteIdentifier id = new SocketRemoteIdentifier(remoteEvent.RemoteEndPoint);
+                IRemoteMessage<T> remoteMessage = new DefaultRemoteMessage<T>(id, value);
+                observer2.OnNext(remoteMessage);
+                handled = true;
+            }
+
+            if (!handled)
+            {
+                throw new WakeRuntimeException("Unrecognized Wake RemoteEvent message");
+            }
+        }
+
+        public void OnError(Exception error)
+        {
+        }
+
+        public void OnCompleted()
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs
new file mode 100644
index 0000000..9e2fe2a
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Net;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    public class RemoteEvent<T> : IRemoteEvent<T>
+    {
+        public RemoteEvent(IPEndPoint localEndPoint, IPEndPoint remoteEndPoint, string source, string sink, long seq, T value)
+        {
+            LocalEndPoint = localEndPoint;
+            RemoteEndPoint = remoteEndPoint;
+            Source = source;
+            Sink = sink;
+            Value = value;
+            Sequence = seq;
+        }
+
+        public RemoteEvent(IPEndPoint localEndpoint, IPEndPoint remoteEndpoint, T value)
+        {
+            LocalEndPoint = localEndpoint;
+            RemoteEndPoint = remoteEndpoint;
+            Value = value;
+        }
+
+        public RemoteEvent()
+        {
+        }
+
+        public IPEndPoint LocalEndPoint { get; set; }
+
+        public IPEndPoint RemoteEndPoint { get; set; }
+
+        public string Source { get; set; }
+
+        public string Sink { get; set; }
+
+        public T Value { get; set; }
+
+        public long Sequence { get; set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs
new file mode 100644
index 0000000..2c5b16d
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    internal class RemoteEventCodec<T> : ICodec<IRemoteEvent<T>>
+    {
+        private readonly RemoteEventEncoder<T> _encoder;
+        private readonly RemoteEventDecoder<T> _decoder;
+
+        public RemoteEventCodec(ICodec<T> codec) 
+        {
+            _encoder = new RemoteEventEncoder<T>(codec);
+            _decoder = new RemoteEventDecoder<T>(codec);
+        }
+
+        public byte[] Encode(IRemoteEvent<T> obj)
+        {
+            return _encoder.Encode(obj);
+        }
+
+        public IRemoteEvent<T> Decode(byte[] data)
+        {
+            return _decoder.Decode(data);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs
new file mode 100644
index 0000000..f9cfbc1
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    public class RemoteEventDecoder<T> : IDecoder<IRemoteEvent<T>>
+    {
+        private IDecoder<T> _decoder;
+
+        public RemoteEventDecoder(IDecoder<T> decoder)
+        {
+            _decoder = decoder;
+        }
+
+        public IRemoteEvent<T> Decode(byte[] data)
+        {
+            WakeMessagePBuf pbuf = WakeMessagePBuf.Deserialize(data);
+            return new RemoteEvent<T>(null, null, pbuf.source, pbuf.sink, pbuf.seq, _decoder.Decode(pbuf.data));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs
new file mode 100644
index 0000000..432e688
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    public class RemoteEventEncoder<T> : IEncoder<IRemoteEvent<T>>
+    {
+        private readonly IEncoder<T> _encoder;
+
+        public RemoteEventEncoder(IEncoder<T> encoder)
+        {
+            _encoder = encoder;
+        }
+
+        public byte[] Encode(IRemoteEvent<T> obj)
+        {
+            WakeMessagePBuf pbuf = new WakeMessagePBuf();
+            pbuf.sink = obj.Sink;
+            pbuf.source = obj.Source;
+            pbuf.data = _encoder.Encode(obj.Value);
+            pbuf.seq = obj.Sequence;
+            return pbuf.Serialize();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs
new file mode 100644
index 0000000..10a048e
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    public class RemoteEventEndPoint<T>
+    {
+        private IRemoteIdentifier _id;
+
+        public RemoteEventEndPoint(IRemoteIdentifier id)
+        {
+            _id = id;
+        }
+
+        public IRemoteIdentifier Id
+        {
+            get { return _id; }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs
new file mode 100644
index 0000000..2f402a8
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using System.Globalization;
+using System.Net;
+using System.Text;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Remote identifier based on a socket address
+    /// </summary>
+    public class SocketRemoteIdentifier : IRemoteIdentifier
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(SocketRemoteIdentifier));
+        private IPEndPoint _addr;
+
+        public SocketRemoteIdentifier(IPEndPoint addr)
+        {
+            _addr = addr;
+        }
+
+        public SocketRemoteIdentifier(string str)
+        {
+            int index = str.IndexOf(":", System.StringComparison.Ordinal);
+            if (index <= 0)
+            {
+                Exceptions.Throw(new RemoteRuntimeException("Invalid name " + str), LOGGER); 
+            }
+            string host = str.Substring(0, index);
+            int port = int.Parse(str.Substring(index + 1), CultureInfo.InvariantCulture);
+            _addr = new IPEndPoint(IPAddress.Parse(host), port);
+        }
+
+        public IPEndPoint Addr
+        {
+            get { return _addr;  }
+        }
+
+        public override int GetHashCode()
+        {
+            return _addr.GetHashCode();
+        }
+
+        public override bool Equals(object obj)
+        {
+            return _addr.Equals(((SocketRemoteIdentifier)obj).Addr);
+        }
+
+        public override string ToString()
+        {
+            StringBuilder builder = new StringBuilder();
+            builder.Append("socket://");
+            builder.Append(_addr);
+            return builder.ToString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs
new file mode 100644
index 0000000..4e25b18
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Tang.Annotations;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    public class StringCodec : ICodec<string>
+    {
+        [Inject]
+        public StringCodec()
+        {
+        }
+
+        public byte[] Encode(string obj)
+        {
+            return Encoding.ASCII.GetBytes(obj);
+        }
+
+        public string Decode(byte[] data)
+        {
+            return Encoding.ASCII.GetString(data);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs
new file mode 100644
index 0000000..31829cd
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    public class StringIdentifier : IIdentifier
+    {
+        private readonly string _str;
+
+        public StringIdentifier(string s)
+        {
+            _str = s;
+        }
+
+        public override int GetHashCode()
+        {
+            return _str.GetHashCode();
+        }
+
+        public override bool Equals(object o)
+        {
+            StringIdentifier other = o as StringIdentifier;
+            return other != null && _str.Equals(other._str);
+        }
+
+        public override string ToString()
+        {
+            return _str;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs
new file mode 100644
index 0000000..b9bcb50
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    public class StringIdentifierFactory : IIdentifierFactory
+    {
+        [Inject]
+        public StringIdentifierFactory()
+        {
+        }
+
+        public IIdentifier Create(string s)
+        {
+            return new StringIdentifier(s);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs
new file mode 100644
index 0000000..848a915
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Establish connections to TransportServer for remote message passing
+    /// </summary>
+    public class TransportClient<T> : IDisposable
+    {
+        private ILink<T> _link;
+        private IObserver<TransportEvent<T>> _observer;
+        private CancellationTokenSource _cancellationSource;
+        private bool _disposed;
+
+        /// <summary>
+        /// Construct a TransportClient.
+        /// Used to send messages to the specified remote endpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
+        /// <param name="codec">Codec to decode/encodec</param>
+        public TransportClient(IPEndPoint remoteEndpoint, ICodec<T> codec) 
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+            if (codec == null)
+            {
+                throw new ArgumentNullException("codec");
+            }
+
+            _link = new Link<T>(remoteEndpoint, codec);
+            _cancellationSource = new CancellationTokenSource();
+            _disposed = false;
+        }
+
+        /// <summary>
+        /// Construct a TransportClient.
+        /// Used to send messages to the specified remote endpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
+        /// <param name="codec">Codec to decode/encodec</param>
+        /// <param name="observer">Callback used when receiving responses from remote host</param>
+        public TransportClient(IPEndPoint remoteEndpoint, 
+                               ICodec<T> codec, 
+                               IObserver<TransportEvent<T>> observer) 
+                                   : this(remoteEndpoint, codec)
+        {
+            _observer = observer;
+            Task.Run(() => ResponseLoop());
+        }
+
+        /// <summary>
+        /// Gets the underlying transport link.
+        /// </summary>
+        public ILink<T> Link
+        {
+            get { return _link; }
+        }
+
+        /// <summary>
+        /// Send the remote message.
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        public void Send(T message)
+        {
+            if (message == null)
+            {
+                throw new ArgumentNullException("message");    
+            }
+
+            _link.Write(message);
+        }
+
+        /// <summary>
+        /// Close all opened connections
+        /// </summary>
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected void Dispose(bool disposing)
+        {
+            if (!_disposed && disposing)
+            {
+                _link.Dispose();
+                _disposed = true;
+            }
+        }
+
+        /// <summary>
+        /// Continually read responses from remote host
+        /// </summary>
+        private async Task ResponseLoop()
+        {
+            while (!_cancellationSource.IsCancellationRequested)
+            {
+                T message = await _link.ReadAsync(_cancellationSource.Token);
+                if (message == null)
+                {
+                    break;
+                }
+
+                TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link);
+                _observer.OnNext(transportEvent);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs
new file mode 100644
index 0000000..6c4644c
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    public class TransportEvent<T>
+    {
+        public TransportEvent(T data, ILink<T> link)
+        {
+            Data = data;
+            Link = link;
+        }
+
+        public T Data { get; private set; }
+
+        public ILink<T> Link { get; private set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs
new file mode 100644
index 0000000..20cdc8a
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Utilities.Diagnostics;
+using Org.Apache.Reef.Utilities.Logging;
+using System;
+using System.Globalization;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Wake.Util;
+
+namespace Org.Apache.Reef.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Server to handle incoming remote messages.
+    /// </summary>
+    public class TransportServer<T> : IDisposable
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof(TransportServer<>));
+
+        private TcpListener _listener;
+        private CancellationTokenSource _cancellationSource;
+        private IObserver<TransportEvent<T>> _remoteObserver;
+        private ICodec<T> _codec; 
+        private bool _disposed;
+        private Task _serverTask;
+
+        /// <summary>
+        /// Constructs a TransportServer to listen for remote events.  
+        /// Listens on the specified remote endpoint.  When it recieves a remote
+        /// event, it will envoke the specified remote handler.
+        /// </summary>
+        /// <param name="port">Port to listen on</param>
+        /// <param name="remoteHandler">The handler to invoke when receiving incoming
+        /// remote messages</param>
+        /// <param name="codec">The codec to encode/decode"</param>
+        public TransportServer(int port, IObserver<TransportEvent<T>> remoteHandler, ICodec<T> codec)
+            : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler, codec)
+        {
+        }
+
+        /// <summary>
+        /// Constructs a TransportServer to listen for remote events.  
+        /// Listens on the specified remote endpoint.  When it recieves a remote
+        /// event, it will envoke the specified remote handler.
+        /// </summary>
+        /// <param name="localEndpoint">Endpoint to listen on</param>
+        /// <param name="remoteHandler">The handler to invoke when receiving incoming
+        /// remote messages</param>
+        /// <param name="codec">The codec to encode/decode"</param>
+        public TransportServer(IPEndPoint localEndpoint, 
+                               IObserver<TransportEvent<T>> remoteHandler, 
+                               ICodec<T> codec)
+        {
+            _listener = new TcpListener(localEndpoint.Address, localEndpoint.Port);
+            _remoteObserver = remoteHandler;
+            _cancellationSource = new CancellationTokenSource();
+            _cancellationSource.Token.ThrowIfCancellationRequested();
+            _codec = codec;
+            _disposed = false;
+        }
+
+        /// <summary>
+        /// Returns the listening endpoint for the TransportServer 
+        /// </summary>
+        public IPEndPoint LocalEndpoint
+        {
+            get { return _listener.LocalEndpoint as IPEndPoint; }
+        }
+
+        /// <summary>
+        /// Starts listening for incoming remote messages.
+        /// </summary>
+        public void Run()
+        {
+            _listener.Start();
+            _serverTask = Task.Run(() => StartServer());
+        }
+
+        /// <summary>
+        /// Close the TransportServer and all open connections
+        /// </summary>
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        public void Dispose(bool disposing)
+        {
+            if (!_disposed && disposing)
+            {
+                _cancellationSource.Cancel();
+                try
+                {
+                    _listener.Stop();
+                }
+                catch (SocketException)
+                {
+                    LOGGER.Log(Level.Info, "Disposing of transport server before listener is created.");
+                }
+
+                if (_serverTask != null)
+                {
+                    _serverTask.Wait();
+
+                    // Give the TransportServer Task 500ms to shut down, ignore any timeout errors
+                    try
+                    {
+                        CancellationTokenSource serverDisposeTimeout = new CancellationTokenSource(500);
+                        _serverTask.Wait(serverDisposeTimeout.Token);
+                    }
+                    catch (Exception e)
+                    {
+                        Console.Error.WriteLine(e); 
+                    }
+                    finally
+                    {
+                        _serverTask.Dispose();
+                    }
+                }
+            }
+
+            _disposed = true;
+        }
+
+        /// <summary>
+        /// Helper method to start TransportServer.  This will
+        /// be run in an asynchronous Task.
+        /// </summary>
+        /// <returns>An asynchronous Task for the running server.</returns>
+        private async Task StartServer()
+        {
+            try
+            {
+                while (!_cancellationSource.Token.IsCancellationRequested)
+                {
+                    TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false);
+                    ProcessClient(client).Forget();
+                }
+            }
+            catch (InvalidOperationException)
+            {
+                LOGGER.Log(Level.Info, "TransportServer has been closed.");
+            }
+            catch (OperationCanceledException)
+            {
+                LOGGER.Log(Level.Info, "TransportServer has been closed.");
+            }
+        }
+
+        /// <summary>
+        /// Recieves event from connected TcpClient and invokes handler on the event.
+        /// </summary>
+        /// <param name="client">The connected client</param>
+        private async Task ProcessClient(TcpClient client)
+        {
+            // Keep reading messages from client until they disconnect or timeout
+            CancellationToken token = _cancellationSource.Token;
+            using (ILink<T> link = new Link<T>(client, _codec))
+            {
+                while (!token.IsCancellationRequested)
+                {
+                    T message = await link.ReadAsync(token);
+                    TransportEvent<T> transportEvent = new TransportEvent<T>(message, link);
+
+                    _remoteObserver.OnNext(transportEvent);
+
+                    if (message == null)
+                    {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/Proto/WakeRemoteProtos.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Proto/WakeRemoteProtos.cs b/lang/cs/Source/WAKE/Wake/Remote/Proto/WakeRemoteProtos.cs
new file mode 100644
index 0000000..247fff9
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/Proto/WakeRemoteProtos.cs
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using ProtoBuf;
+using System;
+using System.IO;
+using System.Runtime.Serialization;
+
+namespace Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos
+{
+    /// <summary>
+    /// Message p buff
+    /// </summary>
+    public partial class WakeMessagePBuf
+    {
+        public static WakeMessagePBuf Deserialize(byte[] bytes)
+        {
+            WakeMessagePBuf pbuf = null;
+            using (var s = new MemoryStream(bytes))
+            {
+                pbuf = Serializer.Deserialize<WakeMessagePBuf>(s);
+            }
+            return pbuf;
+        }
+
+        public byte[] Serialize()
+        {
+            using (var s = new MemoryStream())
+            {
+                Serializer.Serialize(s, this);
+                return s.ToArray();
+            }
+        }
+    }
+
+    /// <summary>
+    /// Wake tuple buf
+    /// </summary>
+    public partial class WakeTuplePBuf
+    {
+        public static WakeTuplePBuf Deserialize(byte[] bytes)
+        {
+            WakeTuplePBuf pbuf = null;
+            using (var s = new MemoryStream(bytes))
+            {
+                pbuf = Serializer.Deserialize<WakeTuplePBuf>(s);
+            }
+            return pbuf;
+        }
+
+        public byte[] Serialize()
+        {
+            using (var s = new MemoryStream())
+            {
+                Serializer.Serialize(s, this);
+                return s.ToArray();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/RemoteConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/RemoteConfiguration.cs b/lang/cs/Source/WAKE/Wake/Remote/RemoteConfiguration.cs
new file mode 100644
index 0000000..633cc79
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/RemoteConfiguration.cs
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.Reef.Tang.Annotations;
+using System;
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+    public class RemoteConfiguration
+    {
+        [NamedParameter(shortName: "rm_name", documentation: "The name of the remote manager.")]
+        public class ManagerName : Name<string>
+        {
+        }
+
+        [NamedParameter(shortName: "rm_host", documentation: "The host address to be used for messages.")]
+        public class HostAddress : Name<string>
+        {
+        }
+
+        [NamedParameter(shortName: "rm_port", documentation: "The port to be used for messages.")]
+        public class Port : Name<int>
+        {
+        }
+
+        [NamedParameter(documentation: "The codec to be used for messages.")]
+        public class MessageCodec : Name<ICodec<Type>>
+        {
+        }
+
+        [NamedParameter(documentation: "The event handler to be used for exception")]
+        public class ErrorHandler : Name<IObserver<Exception>>
+        {
+        }
+
+        [NamedParameter(shortName: "rm_order", documentation: "Whether or not to use the message ordering guarantee", defaultValue: "true")]
+        public class OrderingGuarantee : Name<bool>
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Remote/RemoteRuntimeException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/RemoteRuntimeException.cs b/lang/cs/Source/WAKE/Wake/Remote/RemoteRuntimeException.cs
new file mode 100644
index 0000000..061e532
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Remote/RemoteRuntimeException.cs
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake.Remote
+{
+    /// <summary>Wake remote runtime exception</summary>
+    [System.Serializable]
+    public class RemoteRuntimeException : Exception
+    {
+        private const long serialVersionUID = 1L;
+
+        /// <summary>Constructs a new runtime remote exception with the specified detail message and cause
+        ///     </summary>
+        /// <param name="s">the detailed message</param>
+        /// <param name="e">the cause</param>
+        public RemoteRuntimeException(string s, Exception e)
+            : base(s, e)
+        {
+        }
+
+        /// <summary>Constructs a new runtime remote exception with the specified detail message
+        ///     </summary>
+        /// <param name="s">the detailed message</param>
+        public RemoteRuntimeException(string s)
+            : base(s)
+        {
+        }
+
+        /// <summary>Constructs a new runtime remote exception with the specified cause</summary>
+        /// <param name="e">the cause</param>
+        public RemoteRuntimeException(Exception e)
+            : base("Runtime Exception", e)
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Event/Alarm.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Event/Alarm.cs b/lang/cs/Source/WAKE/Wake/Time/Event/Alarm.cs
new file mode 100644
index 0000000..611b5a1
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Event/Alarm.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake.Time
+{
+    /// <summary>
+    ///  Represents a timer event.
+    /// </summary>
+    public abstract class Alarm : Time
+    {
+        private IObserver<Alarm> _handler;
+
+        public Alarm(long timestamp, IObserver<Alarm> handler) : base(timestamp)
+        {
+            _handler = handler;
+        }
+
+        public void Handle()
+        {
+            _handler.OnNext(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Event/StartTime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Event/StartTime.cs b/lang/cs/Source/WAKE/Wake/Time/Event/StartTime.cs
new file mode 100644
index 0000000..4cea1bc
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Event/StartTime.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Time
+{
+    /// <summary>
+    /// Represents the Time at which a component started.
+    /// </summary>
+    public class StartTime : Time
+    {
+        public StartTime(long timeStamp) : base(timeStamp)
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Event/StopTime.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Event/StopTime.cs b/lang/cs/Source/WAKE/Wake/Time/Event/StopTime.cs
new file mode 100644
index 0000000..8e3bf65
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Event/StopTime.cs
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Time
+{
+    /// <summary>
+    /// Represents the Time at which a component stops.
+    /// </summary>
+    public class StopTime : Time
+    {
+        public StopTime(long timeStamp) : base(timeStamp)
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/IClock.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/IClock.cs b/lang/cs/Source/WAKE/Wake/Time/IClock.cs
new file mode 100644
index 0000000..6d906b8
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/IClock.cs
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Wake.Impl;
+using Org.Apache.Reef.Wake.Time.Runtime.Event;
+
+namespace Org.Apache.Reef.Wake.Time
+{
+    public abstract class IClock : IDisposable
+    {
+        /// <summary>
+        /// Schedule a TimerEvent at the given future offset
+        /// </summary>
+        /// <param name="offset">The offset in the future to schedule the alarm</param>
+        /// <param name="handler">The IObserver to to be called</param>
+        public abstract void ScheduleAlarm(long offset, IObserver<Alarm> handler);
+
+        /// <summary>
+        /// Clock is idle if it has no future alarms set
+        /// </summary>
+        /// <returns>True if no future alarms are set, otherwise false</returns>
+        public abstract bool IsIdle();
+
+        /// <summary>
+        /// Dispose of the clock and all scheduled alarms
+        /// </summary>
+        public abstract void Dispose();
+
+        /// <summary>
+        /// Bind this to an event handler to statically subscribe to the StartTime Event
+        /// </summary>
+        [NamedParameter(documentation: "Will be called upon the start even", defaultClass: typeof(MissingStartHandlerHandler))]
+        public class StartHandler : Name<ISet<IObserver<StartTime>>>
+        {
+        }
+
+        /// <summary>
+        /// Bind this to an event handler to statically subscribe to the StopTime Event
+        /// </summary>
+        [NamedParameter(documentation: "Will be called upon the stop event", defaultClass: typeof(LoggingEventHandler<StopTime>))]
+        public class StopHandler : Name<ISet<IObserver<StopTime>>>
+        {
+        }
+
+        /// <summary>
+        /// Bind this to an event handler to statically subscribe to the RuntimeStart Event
+        /// </summary>
+        [NamedParameter(documentation: "Will be called upon the runtime start event", defaultClass: typeof(LoggingEventHandler<RuntimeStart>))]
+        public class RuntimeStartHandler : Name<ISet<IObserver<RuntimeStart>>>
+        {
+        }
+
+        /// <summary>
+        /// Bind this to an event handler to statically subscribe to the RuntimeStop Event
+        /// </summary>
+        [NamedParameter(documentation: "Will be called upon the runtime stop event", defaultClass: typeof(LoggingEventHandler<RuntimeStop>))]
+        public class RuntimeStopHandler : Name<ISet<IObserver<RuntimeStop>>>
+        {
+        }
+
+        /// <summary>
+        /// Bind this to an event handler to statically subscribe to the IdleClock Event
+        /// </summary>
+        [NamedParameter(documentation: "Will be called upon the Idle event", defaultClass: typeof(LoggingEventHandler<IdleClock>))]
+        public class IdleHandler : Name<ISet<IObserver<IdleClock>>>
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/ClientAlarm.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/ClientAlarm.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/ClientAlarm.cs
new file mode 100644
index 0000000..ac95896
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/ClientAlarm.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Time.Runtime.Event
+{
+    public class ClientAlarm : Alarm
+    {
+        public ClientAlarm(long timestamp, IObserver<Alarm> handler) : base(timestamp, handler)
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/IdleClock.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/IdleClock.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/IdleClock.cs
new file mode 100644
index 0000000..3ca1fd8
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/IdleClock.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Time.Runtime.Event
+{
+    public class IdleClock : Time
+    {
+        public IdleClock(long timestamp) : base(timestamp)
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeAlarm.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeAlarm.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeAlarm.cs
new file mode 100644
index 0000000..701bf88
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeAlarm.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Reef.Wake.Time.Runtime.Event
+{
+    public class RuntimeAlarm : Alarm
+    {
+        public RuntimeAlarm(long timestamp, IObserver<Alarm> handler) : base(timestamp, handler)
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStart.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStart.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStart.cs
new file mode 100644
index 0000000..d08cccc
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStart.cs
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.Reef.Wake.Time.Runtime.Event
+{
+    public class RuntimeStart : Time
+    {
+        public RuntimeStart(long timeStamp) : base(timeStamp)
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStop.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStop.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStop.cs
new file mode 100644
index 0000000..6324e20
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/Event/RuntimeStop.cs
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Reef.Wake.Time.Runtime.Event
+{
+    public class RuntimeStop : Time
+    {
+        public RuntimeStop(long timestamp) : this(timestamp, null)
+        {
+        }
+
+        public RuntimeStop(long timestamp, Exception e) : base(timestamp)
+        {
+            Exception = e;
+        }
+
+        public Exception Exception { get; private set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/ITimer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/ITimer.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/ITimer.cs
new file mode 100644
index 0000000..c932aa4
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/ITimer.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Tang.Annotations;
+
+namespace Org.Apache.Reef.Wake.Time.Runtime
+{
+    [DefaultImplementation(typeof(RealTimer))]
+    public interface ITimer
+    {
+        /// <summary>
+        /// Gets the current time
+        /// </summary>
+        long CurrentTime { get; }
+
+        /// <summary>
+        /// Gets the difference between the given time and the current time
+        /// </summary>
+        /// <param name="time">The time to compare against the current time</param>
+        long GetDuration(long time);
+
+        /// <summary>
+        /// Checks if the given time has already passed.
+        /// </summary>
+        /// <param name="time">The time to check if it has passed or not</param>
+        bool IsReady(long time);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/WAKE/Wake/Time/Runtime/LogicalTimer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Time/Runtime/LogicalTimer.cs b/lang/cs/Source/WAKE/Wake/Time/Runtime/LogicalTimer.cs
new file mode 100644
index 0000000..f4b6c75
--- /dev/null
+++ b/lang/cs/Source/WAKE/Wake/Time/Runtime/LogicalTimer.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.Reef.Tang.Annotations;
+using Org.Apache.Reef.Wake.Time.Runtime.Event;
+
+namespace Org.Apache.Reef.Wake.Time.Runtime
+{
+    /// <summary>
+    /// LogicalTimer class used for testing purposes.
+    /// </summary>
+    public class LogicalTimer : ITimer
+    {
+        [Inject]
+        public LogicalTimer()
+        {
+        }
+
+        public long CurrentTime
+        {
+            get { return 0; }
+        }
+
+        public long GetDuration(long time)
+        {
+            return 0;
+        }
+
+        public bool IsReady(long time)
+        {
+            return true;
+        }
+    }
+}