You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/02/05 22:05:27 UTC

[06/51] [partial] incubator-reef git commit: [REEF-131] Towards the new .Net project structure This is to change .Net project structure for Tang, Wake, REEF utilities, Common and Driver:

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs
deleted file mode 100644
index 184da8a..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/Channel.cs
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.IO;
-using System.Linq;
-using System.Net.Sockets;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Performs low level network IO operations between hosts
-    /// </summary>
-    public class Channel
-    {
-        private NetworkStream _stream;
-
-        /// <summary>
-        /// Constructs a new Channel with the the connected NetworkStream.
-        /// </summary>
-        /// <param name="stream">The connected stream</param>
-        public Channel(NetworkStream stream)
-        {
-            if (stream == null)
-            {
-                throw new ArgumentNullException("stream");
-            }
-
-            _stream = stream;
-        }
-
-        /// <summary>
-        /// Sends a message to the connected client synchronously
-        /// </summary>
-        /// <param name="message">The message to send</param>
-        public void Write(byte[] message)
-        {
-            if (message == null)
-            {
-                throw new ArgumentNullException("message");
-            }
-
-            byte[] messageBuffer = GenerateMessageBuffer(message);
-            _stream.Write(messageBuffer, 0, messageBuffer.Length);
-        }
-
-        /// <summary>
-        /// Sends a message to the connected client asynchronously
-        /// </summary>
-        /// <param name="message">The message to send</param>
-        /// <param name="token">The cancellation token</param>
-        /// <returns>The awaitable write task</returns>
-        public async Task WriteAsync(byte[] message, CancellationToken token)
-        {
-            byte[] messageBuffer = GenerateMessageBuffer(message);
-            await _stream.WriteAsync(messageBuffer, 0, messageBuffer.Length, token);
-        }
-
-        /// <summary>
-        /// Reads an incoming message as a byte array synchronously.
-        /// The message length is read as the first four bytes.
-        /// </summary>
-        /// <returns>The byte array message</returns>
-        public byte[] Read()
-        {
-            int payloadLength = ReadMessageLength();
-            if (payloadLength == 0)
-            {
-                return null;
-            }
-
-            return ReadBytes(payloadLength);
-        }
-
-        /// <summary>
-        /// Reads an incoming message as a byte array asynchronously.
-        /// The message length is read as the first four bytes.
-        /// </summary>
-        /// <param name="token">The cancellation token</param>
-        /// <returns>The byte array message</returns>
-        public async Task<byte[]> ReadAsync(CancellationToken token)
-        {
-            int payloadLength = await GetMessageLengthAsync(token);
-            if (payloadLength == 0)
-            {
-                return null;
-            }
-
-            return await ReadBytesAsync(payloadLength, token);
-        }
-
-        /// <summary>
-        /// Helper method to read the specified number of bytes from the network stream.
-        /// </summary>
-        /// <param name="bytesToRead">The number of bytes to read</param>
-        /// <returns>The byte[] read from the network stream with the requested 
-        /// number of bytes, otherwise null if the operation failed.
-        /// </returns>
-        private byte[] ReadBytes(int bytesToRead)
-        {
-            int totalBytesRead = 0;
-            byte[] buffer = new byte[bytesToRead];
-
-            while (totalBytesRead < bytesToRead)
-            {
-                int bytesRead = _stream.Read(buffer, totalBytesRead, bytesToRead - totalBytesRead);
-                if (bytesRead == 0)
-                {
-                    // Read timed out or connection was closed
-                    return null;
-                }
-
-                totalBytesRead += bytesRead;
-            }
-
-            return buffer;
-        }
-
-        /// <summary>
-        /// Helper method to read the specified number of bytes from the network stream.
-        /// </summary>
-        /// <param name="bytesToRead">The number of bytes to read</param>
-        /// <param name="token">The cancellation token</param>
-        /// <returns>The byte[] read from the network stream with the requested 
-        /// number of bytes, otherwise null if the operation failed.
-        /// </returns>
-        private async Task<byte[]> ReadBytesAsync(int bytesToRead, CancellationToken token)
-        {
-            int bytesRead = 0;
-            byte[] buffer = new byte[bytesToRead];
-
-            while (bytesRead < bytesToRead)
-            {
-                int amountRead = await _stream.ReadAsync(buffer, bytesRead, bytesToRead - bytesRead, token);
-                if (amountRead == 0)
-                {
-                    // Read timed out or connection was closed
-                    return null;
-                }
-
-                bytesRead += amountRead;
-            }
-
-            return buffer;
-        }
-
-        /// <summary>
-        /// Generates the payload buffer containing the message along
-        /// with a header indicating the message length.
-        /// </summary>
-        /// <param name="message">The message to send</param>
-        /// <returns>The payload buffer</returns>
-        private byte[] GenerateMessageBuffer(byte[] message)
-        {
-            byte[] lengthBuffer1 = BitConverter.GetBytes(message.Length + 4);
-            byte[] lengthBuffer2 = BitConverter.GetBytes(message.Length);
-            if (BitConverter.IsLittleEndian)
-            {
-                Array.Reverse(lengthBuffer1);
-            }
-
-            int len = lengthBuffer1.Length + lengthBuffer2.Length + message.Length;
-            byte[] messageBuffer = new byte[len];
-
-            int bytesCopied = 0;
-            bytesCopied += CopyBytes(lengthBuffer1, messageBuffer, 0);
-            bytesCopied += CopyBytes(lengthBuffer2, messageBuffer, bytesCopied);
-            CopyBytes(message, messageBuffer, bytesCopied);
-
-            return messageBuffer;
-        }
-
-        /// <summary>
-        /// Reads the first four bytes from the stream and decode
-        /// it to get the message length in bytes
-        /// </summary>
-        /// <returns>The incoming message's length in bytes</returns>
-        private int ReadMessageLength()
-        {
-            byte[] lenBytes = ReadBytes(sizeof(int));
-            if (lenBytes == null)
-            {
-                return 0;
-            }
-            if (BitConverter.IsLittleEndian)
-            {
-                Array.Reverse(lenBytes);
-            }
-            if (BitConverter.ToInt32(lenBytes, 0) == 0)
-            {
-                return 0;
-            }
-                
-            byte[] msgLength = ReadBytes(sizeof(int));
-            return (msgLength == null) ? 0 : BitConverter.ToInt32(msgLength, 0);
-        }
-
-        /// <summary>
-        /// Reads the first four bytes from the stream and decode
-        /// it to get the message length in bytes
-        /// </summary>
-        /// <param name="token">The cancellation token</param>
-        /// <returns>The incoming message's length in bytes</returns>
-        private async Task<int> GetMessageLengthAsync(CancellationToken token)
-        {
-            byte[] lenBytes = await ReadBytesAsync(sizeof(int), token);
-            if (lenBytes == null)
-            {
-                return 0;
-            }
-            if (BitConverter.IsLittleEndian)
-            {
-                Array.Reverse(lenBytes);
-            }
-            if (BitConverter.ToInt32(lenBytes, 0) == 0)
-            {
-                return 0;
-            }
-                
-            byte[] msgLength = ReadBytes(sizeof(int));
-            return (msgLength == null) ? 0 : BitConverter.ToInt32(msgLength, 0);
-        }
-
-        /// <summary>
-        /// Copies the entire source buffer into the destination buffer the specified
-        /// destination offset.
-        /// </summary>
-        /// <param name="source">The source buffer to be copied</param>
-        /// <param name="dest">The destination buffer to copy to</param>
-        /// <param name="destOffset">The offset at the destination buffer to begin
-        /// copying.</param>
-        /// <returns>The number of bytes copied</returns>
-        private int CopyBytes(byte[] source, byte[] dest, int destOffset)
-        {
-            Buffer.BlockCopy(source, 0, dest, destOffset, source.Length);
-            return source.Length;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs
deleted file mode 100644
index 2bba3c8..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteManager.cs
+++ /dev/null
@@ -1,338 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Utilities.Diagnostics;
-using Org.Apache.Reef.Utilities.Logging;
-using Org.Apache.Reef.Wake.Util;
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Linq;
-using System.Net;
-using System.Net.Sockets;
-using System.Reactive;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Manages incoming and outgoing messages between remote hosts.
-    /// </summary>
-    public class DefaultRemoteManager<T> : IRemoteManager<T>
-    {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof(DefaultRemoteManager<T>));
-
-        private ObserverContainer<T> _observerContainer;
-        private TransportServer<IRemoteEvent<T>> _server; 
-        private Dictionary<IPEndPoint, ProxyObserver> _cachedClients;
-        private ICodec<IRemoteEvent<T>> _codec;
-
-        /// <summary>
-        /// Constructs a DefaultRemoteManager listening on the specified address and any
-        /// available port.
-        /// </summary>
-        /// <param name="localAddress">The address to listen on</param>
-        /// <param name="codec">The codec used for serializing messages</param>
-        public DefaultRemoteManager(IPAddress localAddress, ICodec<T> codec) : this(localAddress, 0, codec)
-        {
-        }
-
-        /// <summary>
-        /// Constructs a DefaultRemoteManager listening on the specified IPEndPoint.
-        /// </summary>
-        /// <param name="localEndpoint">The endpoint to listen on</param>
-        /// <param name="codec">The codec used for serializing messages</param>
-        public DefaultRemoteManager(IPEndPoint localEndpoint, ICodec<T> codec)
-        {
-            if (localEndpoint == null)
-            {
-                throw new ArgumentNullException("localEndpoint");
-            }
-            if (localEndpoint.Port < 0)
-            {
-                throw new ArgumentException("Listening port must be greater than or equal to zero");
-            }
-            if (codec == null)
-            {
-                throw new ArgumentNullException("codec");
-            }
-
-            _codec = new RemoteEventCodec<T>(codec);
-            _observerContainer = new ObserverContainer<T>();
-            _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
-
-            // Begin to listen for incoming messages
-            _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec);
-            _server.Run();
-
-            LocalEndpoint = _server.LocalEndpoint;
-            Identifier = new SocketRemoteIdentifier(LocalEndpoint);
-        }
-
-        /// <summary>
-        /// Constructs a DefaultRemoteManager listening on the specified address and any
-        /// available port.
-        /// </summary>
-        /// <param name="localAddress">The address to listen on</param>
-        /// <param name="port">The port to listen on</param>
-        /// <param name="codec">The codec used for serializing messages</param>
-        public DefaultRemoteManager(IPAddress localAddress, int port, ICodec<T> codec)
-        {
-            if (localAddress == null)
-            {
-                throw new ArgumentNullException("localAddress");
-            }
-            if (port < 0)
-            {
-                throw new ArgumentException("Listening port must be greater than or equal to zero");
-            }
-            if (codec == null)
-            {
-                throw new ArgumentNullException("codec");
-            }
-
-            _observerContainer = new ObserverContainer<T>();
-            _codec = new RemoteEventCodec<T>(codec);
-            _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
-
-            IPEndPoint localEndpoint = new IPEndPoint(localAddress, port);
-
-            // Begin to listen for incoming messages
-            _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec);
-            _server.Run();
-
-            LocalEndpoint = _server.LocalEndpoint;
-            Identifier = new SocketRemoteIdentifier(LocalEndpoint);
-        }
-
-        /// <summary>
-        /// Constructs a DefaultRemoteManager. Does not listen for incoming messages.
-        /// </summary>
-        /// <param name="codec">The codec used for serializing messages</param>
-        public DefaultRemoteManager(ICodec<T> codec)
-        {
-            using (LOGGER.LogFunction("DefaultRemoteManager::DefaultRemoteManager"))
-            {
-                if (codec == null)
-                {
-                    throw new ArgumentNullException("codec");
-                }
-
-                _observerContainer = new ObserverContainer<T>();
-                _codec = new RemoteEventCodec<T>(codec);
-                _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
-
-                LocalEndpoint = new IPEndPoint(NetworkUtils.LocalIPAddress, 0);
-                Identifier = new SocketRemoteIdentifier(LocalEndpoint);
-            }
-        }
-
-        /// <summary>
-        /// Gets the RemoteIdentifier for the DefaultRemoteManager
-        /// </summary>
-        public IRemoteIdentifier Identifier { get; private set; }
-
-        /// <summary>
-        /// Gets the local IPEndPoint for the DefaultRemoteManager
-        /// </summary>
-        public IPEndPoint LocalEndpoint { get; private set; }
-
-        /// <summary>
-        /// Returns an IObserver used to send messages to the remote host at
-        /// the specified IPEndpoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
-        /// <returns>An IObserver used to send messages to the remote host</returns>
-        public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint)
-        {
-            if (remoteEndpoint == null)
-            {
-                throw new ArgumentNullException("remoteEndpoint");
-            }
-
-            SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
-            if (id == null)
-            {
-                throw new ArgumentException("ID not supported");
-            }
-
-            return GetRemoteObserver(id.Addr);
-        }
-
-        /// <summary>
-        /// Returns an IObserver used to send messages to the remote host at
-        /// the specified IPEndpoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
-        /// <returns>An IObserver used to send messages to the remote host</returns>
-        public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint)
-        {
-            if (remoteEndpoint == null)
-            {
-                throw new ArgumentNullException("remoteEndpoint");
-            }
-
-            ProxyObserver remoteObserver;
-            if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver))
-            {
-                TransportClient<IRemoteEvent<T>> client = 
-                    new TransportClient<IRemoteEvent<T>>(remoteEndpoint, _codec, _observerContainer);
-
-                remoteObserver = new ProxyObserver(client);
-                _cachedClients[remoteEndpoint] = remoteObserver;
-            }
-
-            return remoteObserver;
-        }
-
-        /// <summary>
-        /// Registers an IObserver used to handle incoming messages from the remote host
-        /// at the specified IPEndPoint.
-        /// The IDisposable that is returned can be used to unregister the IObserver.
-        /// </summary>
-        /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
-        /// <param name="observer">The IObserver to handle incoming messages</param>
-        /// <returns>An IDisposable used to unregister the observer with</returns>
-        public IDisposable RegisterObserver(RemoteEventEndPoint<T> remoteEndpoint, IObserver<T> observer)
-        {
-            if (remoteEndpoint == null)
-            {
-                throw new ArgumentNullException("remoteEndpoint");
-            }
-
-            SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
-            if (id == null)
-            {
-                throw new ArgumentException("ID not supported");
-            }
-
-            return RegisterObserver(id.Addr, observer);
-        }
-
-        /// <summary>
-        /// Registers an IObserver used to handle incoming messages from the remote host
-        /// at the specified IPEndPoint.
-        /// The IDisposable that is returned can be used to unregister the IObserver.
-        /// </summary>
-        /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
-        /// <param name="observer">The IObserver to handle incoming messages</param>
-        /// <returns>An IDisposable used to unregister the observer with</returns>
-        public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer)
-        {
-            if (remoteEndpoint == null)
-            {
-                throw new ArgumentNullException("remoteEndpoint");
-            }
-            if (observer == null)
-            {
-                throw new ArgumentNullException("observer");
-            }
-
-            return _observerContainer.RegisterObserver(remoteEndpoint, observer);
-        }
-
-        /// <summary>
-        /// Registers an IObserver used to handle incoming messages from the remote host
-        /// at the specified IPEndPoint.
-        /// The IDisposable that is returned can be used to unregister the IObserver.
-        /// </summary>
-        /// <param name="observer">The IObserver to handle incoming messages</param>
-        /// <returns>An IDisposable used to unregister the observer with</returns>
-        public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
-        {
-            if (observer == null)
-            {
-                throw new ArgumentNullException("observer");
-            }
-
-            return _observerContainer.RegisterObserver(observer);
-        }
-
-        /// <summary>
-        /// Release all resources for the DefaultRemoteManager.
-        /// </summary>
-        public void Dispose()
-        {
-            foreach (ProxyObserver cachedClient in _cachedClients.Values)
-            {
-                cachedClient.Dispose();
-            }
-
-            if (_server != null)
-            {
-                _server.Dispose();
-            }
-        }
-
-        /// <summary>
-        /// Observer to send messages to connected remote host
-        /// </summary>
-        private class ProxyObserver : IObserver<T>, IDisposable
-        {
-            private TransportClient<IRemoteEvent<T>> _client;
-            private int _messageCount;
-
-            /// <summary>
-            /// Create new ProxyObserver
-            /// </summary>
-            /// <param name="client">The connected transport client used to send
-            /// messages to remote host</param>
-            public ProxyObserver(TransportClient<IRemoteEvent<T>> client)
-            {
-                _client = client;
-                _messageCount = 0;
-            }
-
-            /// <summary>
-            /// Send the message to the remote host
-            /// </summary>
-            /// <param name="message">The message to send</param>
-            public void OnNext(T message)
-            {
-                IRemoteEvent<T> remoteEvent = new RemoteEvent<T>(_client.Link.LocalEndpoint, _client.Link.RemoteEndpoint, message)
-                {
-                    Sink = "default",
-                    Sequence = _messageCount
-                };
-
-                _messageCount++;
-                _client.Send(remoteEvent);
-            }
-
-            /// <summary>
-            /// Close underlying transport client
-            /// </summary>
-            public void Dispose()
-            {
-                _client.Dispose();
-            }
-
-            public void OnError(Exception error)
-            {
-                throw new NotImplementedException();
-            }
-
-            public void OnCompleted()
-            {
-                throw new NotImplementedException();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs
deleted file mode 100644
index 5b24276..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/DefaultRemoteMessage.cs
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    class DefaultRemoteMessage<T> : IRemoteMessage<T>
-    {
-        public DefaultRemoteMessage(IRemoteIdentifier id, T message)
-        {
-            Identifier = id;
-            Message = message;
-        }
-
-        public IRemoteIdentifier Identifier { get; private set; }
-
-        public T Message { get; private set; }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs
deleted file mode 100644
index 8d4b47d..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/IPEndpointComparer.cs
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Net;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Class to compare two IPEndPoint objects.
-    /// </summary>
-    internal class IPEndPointComparer : IEqualityComparer<IPEndPoint>
-    {
-        public bool Equals(IPEndPoint x, IPEndPoint y)
-        {
-            if (ReferenceEquals(x, y))
-            {
-                return true;
-            }
-            if (x == null || y == null)
-            {
-                return false;
-            }
-
-            // If either port is 0, don't check port
-            if (x.Port == 0 || y.Port == 0)
-            {
-                return x.Address.Equals(y.Address);
-            }
-
-            return x.Equals(y);
-        }
-
-        public int GetHashCode(IPEndPoint obj)
-        {
-            return obj.Address.GetHashCode();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs
deleted file mode 100644
index e413023..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/IntCodec.cs
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Tang.Annotations;
-using System;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    public class IntCodec : ICodec<int>
-    {
-        [Inject]
-        public IntCodec()
-        {
-        }
-
-        public byte[] Encode(int obj)
-        {
-            return BitConverter.GetBytes(obj);
-        }
-
-        public int Decode(byte[] data)
-        {
-            return BitConverter.ToInt32(data, 0);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs
deleted file mode 100644
index d5b987a..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/Link.cs
+++ /dev/null
@@ -1,256 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Net;
-using System.Net.Sockets;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.Reef.Utilities.Diagnostics;
-using Org.Apache.Reef.Utilities.Logging;
-using Org.Apache.Reef.Tang.Exceptions;
-using Org.Apache.Reef.Wake.Util;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Represents an open connection between remote hosts
-    /// </summary>
-    public class Link<T> : ILink<T>
-    {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof(Link<T>));
-
-        private IPEndPoint _localEndpoint;
-        private ICodec<T> _codec;
-        private Channel _channel;
-        private bool _disposed;
-
-        /// <summary>
-        /// Constructs a Link object.
-        /// Connects to the specified remote endpoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The remote endpoint to connect to</param>
-        /// <param name="codec">The codec for serializing messages</param>
-        public Link(IPEndPoint remoteEndpoint, ICodec<T> codec)
-        {
-            if (remoteEndpoint == null)
-            {
-                throw new ArgumentNullException("remoteEndpoint");
-            }
-            if (codec == null)
-            {
-                throw new ArgumentNullException("codec");
-            }
-
-            Client = new TcpClient();
-            Client.Connect(remoteEndpoint);
-
-            _codec = codec;
-            _channel = new Channel(Client.GetStream());
-            _localEndpoint = GetLocalEndpoint();
-            _disposed = false;
-        }
-
-        /// <summary>
-        /// Constructs a Link object.
-        /// Uses the already connected TcpClient.
-        /// </summary>
-        /// <param name="client">The already connected client</param>
-        /// <param name="codec">The encoder and decoder</param>
-        public Link(TcpClient client, ICodec<T> codec)
-        {
-            if (client == null)
-            {
-                throw new ArgumentNullException("client");
-            }
-            if (codec == null)
-            {
-                throw new ArgumentNullException("codec");
-            }
-
-            Client = client;
-            _codec = codec;
-            _channel = new Channel(Client.GetStream());
-            _localEndpoint = GetLocalEndpoint();
-            _disposed = false;
-        }
-
-        /// <summary>
-        /// Returns the local socket address
-        /// </summary>
-        public IPEndPoint LocalEndpoint
-        {
-            get { return _localEndpoint; }
-        }
-
-        /// <summary>
-        /// Returns the remote socket address
-        /// </summary>
-        public IPEndPoint RemoteEndpoint
-        {
-            get { return (IPEndPoint) Client.Client.RemoteEndPoint; }
-        }
-
-        /// <summary>
-        /// Gets the underlying TcpClient
-        /// </summary>
-        public TcpClient Client { get; private set; }
-
-        /// <summary>
-        /// Writes the message to the remote host
-        /// </summary>
-        /// <param name="value">The data to write</param>
-        public void Write(T value)
-        {
-            if (value == null)
-            {
-                throw new ArgumentNullException("value");    
-            }
-            if (_disposed)
-            {
-                Exceptions.Throw(new IllegalStateException("Link has been closed."), LOGGER);
-            }
-
-            byte[] message = _codec.Encode(value);
-            _channel.Write(message);
-        }
-
-        /// <summary>
-        /// Writes the value to this link asynchronously
-        /// </summary>
-        /// <param name="value">The data to write</param>
-        /// <param name="token">The cancellation token</param>
-        public async Task WriteAsync(T value, CancellationToken token)
-        {
-            if (_disposed)
-            {
-                Exceptions.Throw(new IllegalStateException("Link has been closed."), LOGGER); 
-            }
-
-            byte[] message = _codec.Encode(value);
-            await _channel.WriteAsync(message, token);
-        }
-
-        /// <summary>
-        /// Reads the value from the link synchronously
-        /// </summary>
-        public T Read()
-        {
-            if (_disposed)
-            {
-                Exceptions.Throw(new IllegalStateException("Link has been disposed."), LOGGER);
-            }
-
-            byte[] message = _channel.Read();
-            return (message == null) ? default(T) : _codec.Decode(message);
-        }
-
-        /// <summary>
-        /// Reads the value from the link asynchronously
-        /// </summary>
-        /// <param name="token">The cancellation token</param>
-        public async Task<T> ReadAsync(CancellationToken token)
-        {
-            if (_disposed)
-            {
-                Exceptions.Throw(new IllegalStateException("Link has been disposed."), LOGGER);
-            }
-
-            byte[] message = await _channel.ReadAsync(token);
-            return (message == null) ? default(T) : _codec.Decode(message);
-        }
-
-        /// <summary>
-        /// Close the client connection
-        /// </summary>
-        public void Dispose()
-        {
-            Dispose(true);
-            GC.SuppressFinalize(this);
-        }
-
-        /// <summary>
-        /// Subclasses of Links should overwrite this to handle disposing
-        /// of the link
-        /// </summary>
-        /// <param name="disposing">To dispose or not</param>
-        public virtual void Dispose(bool disposing)
-        {
-            if (_disposed)
-            {
-                return;
-            }
-
-            if (disposing)
-            {
-                try
-                {
-                    Client.GetStream().Close();
-                }
-                catch (InvalidOperationException)
-                {
-                    LOGGER.Log(Level.Warning, "failed to close stream on a non-connected socket.");
-                }
-
-                Client.Close();
-            }
-            _disposed = true;
-        }
-
-        /// <summary>
-        /// Overrides Equals. Two Link objects are equal if they are connected
-        /// to the same remote endpoint.
-        /// </summary>
-        /// <param name="obj">The object to compare</param>
-        /// <returns>True if the object is equal to this Link, otherwise false</returns>
-        public override bool Equals(object obj)
-        {
-            Link<T> other = obj as Link<T>;
-            if (other == null)
-            {
-                return false;
-            }
-
-            return other.RemoteEndpoint.Equals(RemoteEndpoint);
-        }
-
-        /// <summary>
-        /// Gets the hash code for the Link object.
-        /// </summary>
-        /// <returns>The object's hash code</returns>
-        public override int GetHashCode()
-        {
-            return RemoteEndpoint.GetHashCode();
-        }
-
-        /// <summary>
-        /// Discovers the IPEndpoint for the current machine.
-        /// </summary>
-        /// <returns>The local IPEndpoint</returns>
-        private IPEndPoint GetLocalEndpoint()
-        {
-            IPAddress address = NetworkUtils.LocalIPAddress;
-            int port = ((IPEndPoint) Client.Client.LocalEndPoint).Port;
-            return new IPEndPoint(address, port);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs
deleted file mode 100644
index 2791eb3..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiCodec.cs
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-using System;
-using System.Collections.Generic;
-using Org.Apache.Reef.Wake.Remote;
-using Org.Apache.Reef.Wake.Remote.Impl;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Codec that can encode and decode a class depending on the class type.
-    /// </summary>
-    public class MultiCodec<T> : ICodec<T>
-    {
-        private readonly MultiEncoder<T> _encoder;
-
-        private readonly MultiDecoder<T> _decoder;
-
-        /// <summary>
-        /// Constructs a new MultiCodec object.
-        /// </summary>
-        public MultiCodec()
-        {
-            _encoder = new MultiEncoder<T>();
-            _decoder = new MultiDecoder<T>();
-        }
-
-        /// <summary>
-        /// Register a codec to be used when encoding/decoding objects of this type.
-        /// </summary>
-        /// <typeparam name="U">The type of codec</typeparam>
-        /// <param name="codec">The codec to use when encoding/decoding
-        /// objects of this type</param>
-        public void Register<U>(ICodec<U> codec) where U : T
-        {
-            _encoder.Register(codec);
-            _decoder.Register(codec);
-        }
-
-        /// <summary>
-        /// Register a codec to be used when encoding/decoding objects of this type.
-        /// </summary>
-        /// <typeparam name="U">The type of codec</typeparam>
-        /// <param name="codec">The codec to use when encoding/decoding
-        /// objects of this type</param>
-        /// <param name="name">The name of the class to encode/decode</param>
-        public void Register<U>(ICodec<U> codec, string name) where U : T
-        {
-            _encoder.Register(codec, name);
-            _decoder.Register(codec, name);
-        }
-
-        /// <summary>
-        /// Encodes an object with the appropriate encoding or null if it cannot
-        /// be encoded.
-        /// </summary>
-        /// <param name="obj">Data to encode</param>
-        public byte[] Encode(T obj)
-        {
-            return _encoder.Encode(obj);
-        }
-
-        /// <summary>
-        /// Decodes byte array into the appripriate object type.
-        /// </summary>
-        /// <param name="data">Data to be decoded</param>
-        public T Decode(byte[] data)
-        {
-            return _decoder.Decode(data);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs
deleted file mode 100644
index 789e226..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiDecoder.cs
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Reflection;
-using Org.Apache.Reef.Utilities.Diagnostics;
-using Org.Apache.Reef.Utilities.Logging;
-using Org.Apache.Reef.Wake.Remote;
-using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Decoder using the WakeTuple protocol buffer
-    /// (class name and bytes)
-    /// </summary>
-    public class MultiDecoder<T> : IDecoder<T>
-    {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof(MultiDecoder<T>));
-        private Dictionary<Type, object> _decoderMap;
-        private Dictionary<string, Type> _nameMap;
-
-        /// <summary>
-        /// Constructs a decoder that decodes bytes based on the class type
-        /// </summary>
-        public MultiDecoder()
-        {
-            _decoderMap = new Dictionary<Type, object>();
-            _nameMap = new Dictionary<string, Type>();
-        }
-
-        /// <summary>
-        /// Register the decoder for objects of type U
-        /// </summary>
-        /// <typeparam name="U">The type of decoder to use when decoding
-        /// objects of this type</typeparam>
-        /// <param name="decoder">The decoder to use when decoding
-        /// objects of this type</param>
-        public void Register<U>(IDecoder<U> decoder) where U : T
-        {
-            Type type = typeof(U);
-            _decoderMap[type] = decoder;
-            _nameMap[type.ToString()] = type;
-        }
-
-        /// <summary>
-        /// Register the decoder for objects of type U
-        /// </summary>
-        /// <typeparam name="U">The type of decoder to use when decoding
-        /// objects of this type</typeparam>
-        /// <param name="decoder">The decoder to use when decoding
-        /// objects of this type</param>
-        /// <param name="name">The name of the class to decode</param>
-        public void Register<U>(IDecoder<U> decoder, string name) where U : T
-        {
-            Type type = typeof(U);
-            _decoderMap[type] = decoder;
-            _nameMap[name] = type;
-        }
-
-        /// <summary>
-        /// Decodes byte array according to the underlying object type.
-        /// </summary>
-        /// <param name="data">The data to decode</param>
-        public T Decode(byte[] data)
-        {
-            WakeTuplePBuf pbuf = WakeTuplePBuf.Deserialize(data);
-            if (pbuf == null)
-            {
-                return default(T);
-            }
-
-            // Get object's class Type
-            Type type;
-            if (!_nameMap.TryGetValue(pbuf.className, out type))
-            {
-                return default(T);
-            }
-
-            // Get decoder for that type
-            object decoder;
-            if (!_decoderMap.TryGetValue(type, out decoder))
-            {
-                Exceptions.Throw(new RemoteRuntimeException("Decoder for " + type + " not known."), LOGGER);
-            }
-
-            // Invoke the decoder to decode the byte array
-            Type handlerType = typeof(IDecoder<>).MakeGenericType(new[] { type });
-            MethodInfo info = handlerType.GetMethod("Decode");
-            return (T) info.Invoke(decoder, new[] { (object) pbuf.data });
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs
deleted file mode 100644
index cccf52f..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/MultiEncoder.cs
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Reflection;
-using Org.Apache.Reef.Utilities.Logging;
-using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Encoder using the WakeTuple protocol buffer
-    /// (class name and bytes)
-    /// </summary>
-    public class MultiEncoder<T> : IEncoder<T>
-    {
-        private static Logger _logger = Logger.GetLogger(typeof(MultiEncoder<>));
-        private Dictionary<Type, object> _encoderMap;
-        private Dictionary<Type, string> _nameMap;
-
-        /// <summary>
-        /// Constructs an encoder that encodes an object to bytes based on the class name
-        /// </summary>
-        public MultiEncoder()
-        {
-            _encoderMap = new Dictionary<Type, object>();
-            _nameMap = new Dictionary<Type, string>();
-        }
-
-        public void Register<U>(IEncoder<U> encoder) where U : T
-        {
-            _encoderMap[typeof(U)] = encoder;
-            _nameMap[typeof(U)] = typeof(U).ToString();
-        }
-
-        public void Register<U>(IEncoder<U> encoder, string name) where U : T
-        {
-            _encoderMap[typeof(U)] = encoder;
-            _nameMap[typeof(U)] = name;
-            _logger.Log(Level.Verbose, "Registering name for " + name);
-        }
-
-        /// <summary>Encodes an object to a byte array</summary>
-        /// <param name="obj"></param>
-        public byte[] Encode(T obj)
-        {
-            // Find encoder for object type
-            object encoder;
-            if (!_encoderMap.TryGetValue(obj.GetType(), out encoder))
-            {
-                return null;
-            }
-
-            // Invoke encoder for this type
-            Type handlerType = typeof(IEncoder<>).MakeGenericType(new[] { obj.GetType() });
-            MethodInfo info = handlerType.GetMethod("Encode");
-            byte[] data = (byte[]) info.Invoke(encoder, new[] { (object) obj });
-
-            // Serialize object type and object data into well known tuple
-            // To decode, deserialize the tuple, get object type, and look up the
-            // decoder for that type
-            string name = _nameMap[obj.GetType()];
-            _logger.Log(Level.Verbose, "Encoding name for " + name);
-            WakeTuplePBuf pbuf = new WakeTuplePBuf { className = name, data = data };
-            pbuf.className = name;
-            pbuf.data = data; 
-            return pbuf.Serialize();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs
deleted file mode 100644
index 577cd95..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/ObserverContainer.cs
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Linq;
-using System.Net;
-using System.Text;
-using System.Threading.Tasks;
-using Org.Apache.Reef.Utilities.Diagnostics;
-using Org.Apache.Reef.Utilities.Logging;
-using Org.Apache.Reef.Wake.RX.Impl;
-using Org.Apache.Reef.Wake.Util;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Stores registered IObservers for DefaultRemoteManager.
-    /// Can register and look up IObservers by remote IPEndPoint.
-    /// </summary>
-    internal class ObserverContainer<T> : IObserver<TransportEvent<IRemoteEvent<T>>>
-    {
-        private ConcurrentDictionary<IPEndPoint, IObserver<T>> _endpointMap;
-        private ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>> _typeMap;
-        private IObserver<T> _universalObserver;
-
-        /// <summary>
-        /// Constructs a new ObserverContainer used to manage remote IObservers.
-        /// </summary>
-        public ObserverContainer()
-        {
-            _endpointMap = new ConcurrentDictionary<IPEndPoint, IObserver<T>>(new IPEndPointComparer());
-            _typeMap = new ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>>();
-        }
-
-        /// <summary>
-        /// Registers an IObserver used to handle incoming messages from the remote host
-        /// at the specified IPEndPoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
-        /// <param name="observer">The IObserver to handle incoming messages</param>
-        /// <returns>An IDisposable used to unregister the observer with</returns>
-        public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer) 
-        {
-            if (remoteEndpoint.Address.Equals(IPAddress.Any))
-            {
-                _universalObserver = observer;
-                return Disposable.Create(() => { _universalObserver = null; });
-            }
-
-            _endpointMap[remoteEndpoint] = observer;
-            return Disposable.Create(() => _endpointMap.TryRemove(remoteEndpoint, out observer));
-        }
-
-        /// <summary>
-        /// Registers an IObserver to handle incoming messages from a remote host
-        /// </summary>
-        /// <param name="observer">The IObserver to handle incoming messages</param>
-        /// <returns>An IDisposable used to unregister the observer with</returns>
-        public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
-        {
-            _typeMap[typeof(T)] = observer;
-            return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out observer));
-        }
-
-        /// <summary>
-        /// Look up the IObserver for the registered IPEndPoint or event type 
-        /// and execute the IObserver.
-        /// </summary>
-        /// <param name="transportEvent">The incoming remote event</param>
-        public void OnNext(TransportEvent<IRemoteEvent<T>> transportEvent)
-        {
-            IRemoteEvent<T> remoteEvent = transportEvent.Data;
-            remoteEvent.LocalEndPoint = transportEvent.Link.LocalEndpoint;
-            remoteEvent.RemoteEndPoint = transportEvent.Link.RemoteEndpoint;
-            T value = remoteEvent.Value;
-            bool handled = false;
-
-            IObserver<T> observer1;
-            IObserver<IRemoteMessage<T>> observer2;
-            if (_universalObserver != null)
-            {
-                _universalObserver.OnNext(value);
-                handled = true;
-            }
-            if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer1))
-            {
-                // IObserver was registered by IPEndpoint
-                observer1.OnNext(value);
-                handled = true;
-            } 
-            else if (_typeMap.TryGetValue(value.GetType(), out observer2))
-            {
-                // IObserver was registered by event type
-                IRemoteIdentifier id = new SocketRemoteIdentifier(remoteEvent.RemoteEndPoint);
-                IRemoteMessage<T> remoteMessage = new DefaultRemoteMessage<T>(id, value);
-                observer2.OnNext(remoteMessage);
-                handled = true;
-            }
-
-            if (!handled)
-            {
-                throw new WakeRuntimeException("Unrecognized Wake RemoteEvent message");
-            }
-        }
-
-        public void OnError(Exception error)
-        {
-        }
-
-        public void OnCompleted()
-        {
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs
deleted file mode 100644
index 9e2fe2a..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEvent.cs
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System.Net;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    public class RemoteEvent<T> : IRemoteEvent<T>
-    {
-        public RemoteEvent(IPEndPoint localEndPoint, IPEndPoint remoteEndPoint, string source, string sink, long seq, T value)
-        {
-            LocalEndPoint = localEndPoint;
-            RemoteEndPoint = remoteEndPoint;
-            Source = source;
-            Sink = sink;
-            Value = value;
-            Sequence = seq;
-        }
-
-        public RemoteEvent(IPEndPoint localEndpoint, IPEndPoint remoteEndpoint, T value)
-        {
-            LocalEndPoint = localEndpoint;
-            RemoteEndPoint = remoteEndpoint;
-            Value = value;
-        }
-
-        public RemoteEvent()
-        {
-        }
-
-        public IPEndPoint LocalEndPoint { get; set; }
-
-        public IPEndPoint RemoteEndPoint { get; set; }
-
-        public string Source { get; set; }
-
-        public string Sink { get; set; }
-
-        public T Value { get; set; }
-
-        public long Sequence { get; set; }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs
deleted file mode 100644
index 2c5b16d..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventCodec.cs
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    internal class RemoteEventCodec<T> : ICodec<IRemoteEvent<T>>
-    {
-        private readonly RemoteEventEncoder<T> _encoder;
-        private readonly RemoteEventDecoder<T> _decoder;
-
-        public RemoteEventCodec(ICodec<T> codec) 
-        {
-            _encoder = new RemoteEventEncoder<T>(codec);
-            _decoder = new RemoteEventDecoder<T>(codec);
-        }
-
-        public byte[] Encode(IRemoteEvent<T> obj)
-        {
-            return _encoder.Encode(obj);
-        }
-
-        public IRemoteEvent<T> Decode(byte[] data)
-        {
-            return _decoder.Decode(data);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs
deleted file mode 100644
index f9cfbc1..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventDecoder.cs
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    public class RemoteEventDecoder<T> : IDecoder<IRemoteEvent<T>>
-    {
-        private IDecoder<T> _decoder;
-
-        public RemoteEventDecoder(IDecoder<T> decoder)
-        {
-            _decoder = decoder;
-        }
-
-        public IRemoteEvent<T> Decode(byte[] data)
-        {
-            WakeMessagePBuf pbuf = WakeMessagePBuf.Deserialize(data);
-            return new RemoteEvent<T>(null, null, pbuf.source, pbuf.sink, pbuf.seq, _decoder.Decode(pbuf.data));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs
deleted file mode 100644
index 432e688..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEncoder.cs
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Wake.Remote.Proto.WakeRemoteProtos;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    public class RemoteEventEncoder<T> : IEncoder<IRemoteEvent<T>>
-    {
-        private readonly IEncoder<T> _encoder;
-
-        public RemoteEventEncoder(IEncoder<T> encoder)
-        {
-            _encoder = encoder;
-        }
-
-        public byte[] Encode(IRemoteEvent<T> obj)
-        {
-            WakeMessagePBuf pbuf = new WakeMessagePBuf();
-            pbuf.sink = obj.Sink;
-            pbuf.source = obj.Source;
-            pbuf.data = _encoder.Encode(obj.Value);
-            pbuf.seq = obj.Sequence;
-            return pbuf.Serialize();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs
deleted file mode 100644
index 10a048e..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/RemoteEventEndpoint.cs
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    public class RemoteEventEndPoint<T>
-    {
-        private IRemoteIdentifier _id;
-
-        public RemoteEventEndPoint(IRemoteIdentifier id)
-        {
-            _id = id;
-        }
-
-        public IRemoteIdentifier Id
-        {
-            get { return _id; }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs
deleted file mode 100644
index 2f402a8..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/SocketRemoteIdentifier.cs
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Utilities.Diagnostics;
-using Org.Apache.Reef.Utilities.Logging;
-using System.Globalization;
-using System.Net;
-using System.Text;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Remote identifier based on a socket address
-    /// </summary>
-    public class SocketRemoteIdentifier : IRemoteIdentifier
-    {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof(SocketRemoteIdentifier));
-        private IPEndPoint _addr;
-
-        public SocketRemoteIdentifier(IPEndPoint addr)
-        {
-            _addr = addr;
-        }
-
-        public SocketRemoteIdentifier(string str)
-        {
-            int index = str.IndexOf(":", System.StringComparison.Ordinal);
-            if (index <= 0)
-            {
-                Exceptions.Throw(new RemoteRuntimeException("Invalid name " + str), LOGGER); 
-            }
-            string host = str.Substring(0, index);
-            int port = int.Parse(str.Substring(index + 1), CultureInfo.InvariantCulture);
-            _addr = new IPEndPoint(IPAddress.Parse(host), port);
-        }
-
-        public IPEndPoint Addr
-        {
-            get { return _addr;  }
-        }
-
-        public override int GetHashCode()
-        {
-            return _addr.GetHashCode();
-        }
-
-        public override bool Equals(object obj)
-        {
-            return _addr.Equals(((SocketRemoteIdentifier)obj).Addr);
-        }
-
-        public override string ToString()
-        {
-            StringBuilder builder = new StringBuilder();
-            builder.Append("socket://");
-            builder.Append(_addr);
-            return builder.ToString();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs
deleted file mode 100644
index 4e25b18..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringCodec.cs
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Org.Apache.Reef.Tang.Annotations;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    public class StringCodec : ICodec<string>
-    {
-        [Inject]
-        public StringCodec()
-        {
-        }
-
-        public byte[] Encode(string obj)
-        {
-            return Encoding.ASCII.GetBytes(obj);
-        }
-
-        public string Decode(byte[] data)
-        {
-            return Encoding.ASCII.GetString(data);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs
deleted file mode 100644
index 31829cd..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifier.cs
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    public class StringIdentifier : IIdentifier
-    {
-        private readonly string _str;
-
-        public StringIdentifier(string s)
-        {
-            _str = s;
-        }
-
-        public override int GetHashCode()
-        {
-            return _str.GetHashCode();
-        }
-
-        public override bool Equals(object o)
-        {
-            StringIdentifier other = o as StringIdentifier;
-            return other != null && _str.Equals(other._str);
-        }
-
-        public override string ToString()
-        {
-            return _str;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs
deleted file mode 100644
index b9bcb50..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/StringIdentifierFactory.cs
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Tang.Annotations;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    public class StringIdentifierFactory : IIdentifierFactory
-    {
-        [Inject]
-        public StringIdentifierFactory()
-        {
-        }
-
-        public IIdentifier Create(string s)
-        {
-            return new StringIdentifier(s);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs
deleted file mode 100644
index 848a915..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportClient.cs
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Net;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Establish connections to TransportServer for remote message passing
-    /// </summary>
-    public class TransportClient<T> : IDisposable
-    {
-        private ILink<T> _link;
-        private IObserver<TransportEvent<T>> _observer;
-        private CancellationTokenSource _cancellationSource;
-        private bool _disposed;
-
-        /// <summary>
-        /// Construct a TransportClient.
-        /// Used to send messages to the specified remote endpoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
-        /// <param name="codec">Codec to decode/encodec</param>
-        public TransportClient(IPEndPoint remoteEndpoint, ICodec<T> codec) 
-        {
-            if (remoteEndpoint == null)
-            {
-                throw new ArgumentNullException("remoteEndpoint");
-            }
-            if (codec == null)
-            {
-                throw new ArgumentNullException("codec");
-            }
-
-            _link = new Link<T>(remoteEndpoint, codec);
-            _cancellationSource = new CancellationTokenSource();
-            _disposed = false;
-        }
-
-        /// <summary>
-        /// Construct a TransportClient.
-        /// Used to send messages to the specified remote endpoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
-        /// <param name="codec">Codec to decode/encodec</param>
-        /// <param name="observer">Callback used when receiving responses from remote host</param>
-        public TransportClient(IPEndPoint remoteEndpoint, 
-                               ICodec<T> codec, 
-                               IObserver<TransportEvent<T>> observer) 
-                                   : this(remoteEndpoint, codec)
-        {
-            _observer = observer;
-            Task.Run(() => ResponseLoop());
-        }
-
-        /// <summary>
-        /// Gets the underlying transport link.
-        /// </summary>
-        public ILink<T> Link
-        {
-            get { return _link; }
-        }
-
-        /// <summary>
-        /// Send the remote message.
-        /// </summary>
-        /// <param name="message">The message to send</param>
-        public void Send(T message)
-        {
-            if (message == null)
-            {
-                throw new ArgumentNullException("message");    
-            }
-
-            _link.Write(message);
-        }
-
-        /// <summary>
-        /// Close all opened connections
-        /// </summary>
-        public void Dispose()
-        {
-            Dispose(true);
-            GC.SuppressFinalize(this);
-        }
-
-        protected void Dispose(bool disposing)
-        {
-            if (!_disposed && disposing)
-            {
-                _link.Dispose();
-                _disposed = true;
-            }
-        }
-
-        /// <summary>
-        /// Continually read responses from remote host
-        /// </summary>
-        private async Task ResponseLoop()
-        {
-            while (!_cancellationSource.IsCancellationRequested)
-            {
-                T message = await _link.ReadAsync(_cancellationSource.Token);
-                if (message == null)
-                {
-                    break;
-                }
-
-                TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link);
-                _observer.OnNext(transportEvent);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs
deleted file mode 100644
index 6c4644c..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportEvent.cs
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Net;
-using System.Net.Sockets;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    public class TransportEvent<T>
-    {
-        public TransportEvent(T data, ILink<T> link)
-        {
-            Data = data;
-            Link = link;
-        }
-
-        public T Data { get; private set; }
-
-        public ILink<T> Link { get; private set; }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs b/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs
deleted file mode 100644
index 20cdc8a..0000000
--- a/lang/cs/Source/WAKE/Wake/Remote/Impl/TransportServer.cs
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.Reef.Utilities.Diagnostics;
-using Org.Apache.Reef.Utilities.Logging;
-using System;
-using System.Globalization;
-using System.Net;
-using System.Net.Sockets;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.Reef.Wake.Util;
-
-namespace Org.Apache.Reef.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Server to handle incoming remote messages.
-    /// </summary>
-    public class TransportServer<T> : IDisposable
-    {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof(TransportServer<>));
-
-        private TcpListener _listener;
-        private CancellationTokenSource _cancellationSource;
-        private IObserver<TransportEvent<T>> _remoteObserver;
-        private ICodec<T> _codec; 
-        private bool _disposed;
-        private Task _serverTask;
-
-        /// <summary>
-        /// Constructs a TransportServer to listen for remote events.  
-        /// Listens on the specified remote endpoint.  When it recieves a remote
-        /// event, it will envoke the specified remote handler.
-        /// </summary>
-        /// <param name="port">Port to listen on</param>
-        /// <param name="remoteHandler">The handler to invoke when receiving incoming
-        /// remote messages</param>
-        /// <param name="codec">The codec to encode/decode"</param>
-        public TransportServer(int port, IObserver<TransportEvent<T>> remoteHandler, ICodec<T> codec)
-            : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler, codec)
-        {
-        }
-
-        /// <summary>
-        /// Constructs a TransportServer to listen for remote events.  
-        /// Listens on the specified remote endpoint.  When it recieves a remote
-        /// event, it will envoke the specified remote handler.
-        /// </summary>
-        /// <param name="localEndpoint">Endpoint to listen on</param>
-        /// <param name="remoteHandler">The handler to invoke when receiving incoming
-        /// remote messages</param>
-        /// <param name="codec">The codec to encode/decode"</param>
-        public TransportServer(IPEndPoint localEndpoint, 
-                               IObserver<TransportEvent<T>> remoteHandler, 
-                               ICodec<T> codec)
-        {
-            _listener = new TcpListener(localEndpoint.Address, localEndpoint.Port);
-            _remoteObserver = remoteHandler;
-            _cancellationSource = new CancellationTokenSource();
-            _cancellationSource.Token.ThrowIfCancellationRequested();
-            _codec = codec;
-            _disposed = false;
-        }
-
-        /// <summary>
-        /// Returns the listening endpoint for the TransportServer 
-        /// </summary>
-        public IPEndPoint LocalEndpoint
-        {
-            get { return _listener.LocalEndpoint as IPEndPoint; }
-        }
-
-        /// <summary>
-        /// Starts listening for incoming remote messages.
-        /// </summary>
-        public void Run()
-        {
-            _listener.Start();
-            _serverTask = Task.Run(() => StartServer());
-        }
-
-        /// <summary>
-        /// Close the TransportServer and all open connections
-        /// </summary>
-        public void Dispose()
-        {
-            Dispose(true);
-            GC.SuppressFinalize(this);
-        }
-
-        public void Dispose(bool disposing)
-        {
-            if (!_disposed && disposing)
-            {
-                _cancellationSource.Cancel();
-                try
-                {
-                    _listener.Stop();
-                }
-                catch (SocketException)
-                {
-                    LOGGER.Log(Level.Info, "Disposing of transport server before listener is created.");
-                }
-
-                if (_serverTask != null)
-                {
-                    _serverTask.Wait();
-
-                    // Give the TransportServer Task 500ms to shut down, ignore any timeout errors
-                    try
-                    {
-                        CancellationTokenSource serverDisposeTimeout = new CancellationTokenSource(500);
-                        _serverTask.Wait(serverDisposeTimeout.Token);
-                    }
-                    catch (Exception e)
-                    {
-                        Console.Error.WriteLine(e); 
-                    }
-                    finally
-                    {
-                        _serverTask.Dispose();
-                    }
-                }
-            }
-
-            _disposed = true;
-        }
-
-        /// <summary>
-        /// Helper method to start TransportServer.  This will
-        /// be run in an asynchronous Task.
-        /// </summary>
-        /// <returns>An asynchronous Task for the running server.</returns>
-        private async Task StartServer()
-        {
-            try
-            {
-                while (!_cancellationSource.Token.IsCancellationRequested)
-                {
-                    TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false);
-                    ProcessClient(client).Forget();
-                }
-            }
-            catch (InvalidOperationException)
-            {
-                LOGGER.Log(Level.Info, "TransportServer has been closed.");
-            }
-            catch (OperationCanceledException)
-            {
-                LOGGER.Log(Level.Info, "TransportServer has been closed.");
-            }
-        }
-
-        /// <summary>
-        /// Recieves event from connected TcpClient and invokes handler on the event.
-        /// </summary>
-        /// <param name="client">The connected client</param>
-        private async Task ProcessClient(TcpClient client)
-        {
-            // Keep reading messages from client until they disconnect or timeout
-            CancellationToken token = _cancellationSource.Token;
-            using (ILink<T> link = new Link<T>(client, _codec))
-            {
-                while (!token.IsCancellationRequested)
-                {
-                    T message = await link.ReadAsync(token);
-                    TransportEvent<T> transportEvent = new TransportEvent<T>(message, link);
-
-                    _remoteObserver.OnNext(transportEvent);
-
-                    if (message == null)
-                    {
-                        break;
-                    }
-                }
-            }
-        }
-    }
-}