You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/07/07 23:00:57 UTC

[1/2] incubator-reef git commit: [REEF-422] Convert Wake Layer from Writable to Streaming

Repository: incubator-reef
Updated Branches:
  refs/heads/master ec9b497d4 -> ba2653d6b


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
new file mode 100644
index 0000000..6eb0190
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManager.cs
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Manages incoming and outgoing messages between remote hosts.
+    /// </summary>
+    /// <typeparam name="T">Message type T.</typeparam>
+    internal sealed class StreamingRemoteManager<T> : IRemoteManager<T>
+    {
+        private readonly ObserverContainer<T> _observerContainer;
+        private readonly StreamingTransportServer<IRemoteEvent<T>> _server;
+        private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients;
+        private readonly IStreamingCodec<IRemoteEvent<T>> _remoteEventCodec;
+
+        /// <summary>
+        /// Constructs a DefaultRemoteManager listening on the specified address and
+        /// a specific port.
+        /// </summary>
+        /// <param name="localAddress">The address to listen on</param>
+        /// <param name="tcpPortProvider">Tcp port provider</param>
+        /// <param name="streamingCodec">Streaming codec</param>
+        [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
+        public StreamingRemoteManager(IPAddress localAddress, ITcpPortProvider tcpPortProvider, IStreamingCodec<T> streamingCodec)
+        {
+            if (localAddress == null)
+            {
+                throw new ArgumentNullException("localAddress");
+            }
+
+            _observerContainer = new ObserverContainer<T>();
+            _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
+            _remoteEventCodec = new RemoteEventStreamingCodec<T>(streamingCodec);
+
+            // Begin to listen for incoming messages
+            _server = new StreamingTransportServer<IRemoteEvent<T>>(localAddress, _observerContainer, tcpPortProvider, _remoteEventCodec);
+            _server.Run();
+
+            LocalEndpoint = _server.LocalEndpoint;  
+            Identifier = new SocketRemoteIdentifier(LocalEndpoint);
+        }
+
+        /// <summary>
+        /// Gets the RemoteIdentifier for the DefaultRemoteManager
+        /// </summary>
+        public IRemoteIdentifier Identifier { get; private set; }
+
+        /// <summary>
+        /// Gets the local IPEndPoint for the DefaultRemoteManager
+        /// </summary>
+        public IPEndPoint LocalEndpoint { get; private set; }
+
+        /// <summary>
+        /// Returns an IObserver used to send messages to the remote host at
+        /// the specified IPEndpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
+        /// <returns>An IObserver used to send messages to the remote host</returns>
+        public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
+            if (id == null)
+            {
+                throw new ArgumentException("ID not supported");
+            }
+
+            return GetRemoteObserver(id.Addr);
+        }
+
+        /// <summary>
+        /// Returns an IObserver used to send messages to the remote host at
+        /// the specified IPEndpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
+        /// <returns>An IObserver used to send messages to the remote host</returns>
+        public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            ProxyObserver remoteObserver;
+            if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver))
+            {
+                StreamingTransportClient<IRemoteEvent<T>> client =
+                    new StreamingTransportClient<IRemoteEvent<T>>(remoteEndpoint, _observerContainer, _remoteEventCodec);
+
+                remoteObserver = new ProxyObserver(client);
+                _cachedClients[remoteEndpoint] = remoteObserver;
+            }
+
+            return remoteObserver;
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the remote host
+        /// at the specified IPEndPoint.
+        /// The IDisposable that is returned can be used to unregister the IObserver.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
+        /// <param name="observer">The IObserver to handle incoming messages</param>
+        /// <returns>An IDisposable used to unregister the observer with</returns>
+        public IDisposable RegisterObserver(RemoteEventEndPoint<T> remoteEndpoint, IObserver<T> observer)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            if (observer == null)
+            {
+                throw new ArgumentNullException("observer");
+            }
+
+            SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
+            if (id == null)
+            {
+                throw new ArgumentException("ID not supported");
+            }
+
+            return RegisterObserver(id.Addr, observer);
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the remote host
+        /// at the specified IPEndPoint.
+        /// The IDisposable that is returned can be used to unregister the IObserver.
+        /// </summary>
+        /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
+        /// <param name="observer">The IObserver to handle incoming messages</param>
+        /// <returns>An IDisposable used to unregister the observer with</returns>
+        public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+            if (observer == null)
+            {
+                throw new ArgumentNullException("observer");
+            }
+
+            return _observerContainer.RegisterObserver(remoteEndpoint, observer);
+        }
+
+        /// <summary>
+        /// Registers an IObserver used to handle incoming messages from the remote host
+        /// at the specified IPEndPoint.
+        /// The IDisposable that is returned can be used to unregister the IObserver.
+        /// </summary>
+        /// <param name="observer">The IObserver to handle incoming messages</param>
+        /// <returns>An IDisposable used to unregister the observer with</returns>
+        public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
+        {
+            if (observer == null)
+            {
+                throw new ArgumentNullException("observer");
+            }
+
+            return _observerContainer.RegisterObserver(observer);
+        }
+        
+        /// <summary>
+        /// Release all resources for the DefaultRemoteManager.
+        /// </summary>
+        public void Dispose()
+        {
+            foreach (ProxyObserver cachedClient in _cachedClients.Values)
+            {
+                cachedClient.Dispose();
+            }
+
+            if (_server != null)
+            {
+                _server.Dispose();
+            }
+        }
+
+        /// <summary>
+        /// Observer to send messages to connected remote host
+        /// </summary>
+        private class ProxyObserver : IObserver<T>, IDisposable
+        {
+            private readonly StreamingTransportClient<IRemoteEvent<T>> _client;
+
+            /// <summary>
+            /// Create new ProxyObserver
+            /// </summary>
+            /// <param name="client">The connected WritableTransport client used to send
+            /// messages to remote host</param>
+            public ProxyObserver(StreamingTransportClient<IRemoteEvent<T>> client)
+            {
+                _client = client;
+            }
+
+            /// <summary>
+            /// Send the message to the remote host
+            /// </summary>
+            /// <param name="message">The message to send</param>
+            public void OnNext(T message)
+            {
+                IRemoteEvent<T> remoteEvent = new RemoteEvent<T>(_client.Link.LocalEndpoint,
+                    _client.Link.RemoteEndpoint,
+                    message);
+
+                _client.Send(remoteEvent);
+            }
+
+            /// <summary>
+            /// Close underlying WritableTransport client
+            /// </summary>
+            public void Dispose()
+            {
+                _client.Dispose();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw error;
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
new file mode 100644
index 0000000..90e3aca
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Net;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// StreamingRemoteManagerFactory for StreamingRemoteManager.
+    /// </summary>
+    public sealed class StreamingRemoteManagerFactory
+    {
+        private readonly ITcpPortProvider _tcpPortProvider;
+        private readonly IInjector _injector;
+
+        [Inject]  
+        private StreamingRemoteManagerFactory(ITcpPortProvider tcpPortProvider, IInjector injector)
+        {
+            _tcpPortProvider = tcpPortProvider;
+            _injector = injector;
+        }
+
+        //ToDo: The port argument will be removed once changes are made in WritableNetworkService [REEF-447]
+        public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable
+        {
+#pragma warning disable 618
+// This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
+            var codec = _injector.GetInstance<TemporaryWritableToStreamingCodec<T>>();
+            return new StreamingRemoteManager<T>(localAddress, _tcpPortProvider, codec);
+#pragma warning disable 618
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
new file mode 100644
index 0000000..111be5d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Establish connections to TransportServer for remote message passing
+    /// </summary>
+    /// <typeparam name="T">Generic Type of message.</typeparam>
+    internal sealed class StreamingTransportClient<T> : IDisposable
+    {
+        private readonly ILink<T> _link;
+        private readonly IObserver<TransportEvent<T>> _observer;
+        private readonly CancellationTokenSource _cancellationSource;
+        private bool _disposed;
+        private static readonly Logger Logger = Logger.GetLogger(typeof(StreamingTransportClient<T>));
+
+        /// <summary>
+        /// Construct a TransportClient.
+        /// Used to send messages to the specified remote endpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
+        /// <param name="streamingCodec">Streaming codec</param>
+        internal StreamingTransportClient(IPEndPoint remoteEndpoint, IStreamingCodec<T> streamingCodec)
+        {
+            Exceptions.ThrowIfArgumentNull(remoteEndpoint, "remoteEndpoint", Logger);
+
+            _link = new StreamingLink<T>(remoteEndpoint, streamingCodec);
+            _cancellationSource = new CancellationTokenSource();
+            _disposed = false;
+        }
+
+        /// <summary>
+        /// Construct a TransportClient.
+        /// Used to send messages to the specified remote endpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
+        /// <param name="observer">Callback used when receiving responses from remote host</param>
+        /// <param name="streamingCodec">Streaming codec</param>
+        internal StreamingTransportClient(IPEndPoint remoteEndpoint,
+            IObserver<TransportEvent<T>> observer,
+            IStreamingCodec<T> streamingCodec)
+            : this(remoteEndpoint, streamingCodec)
+        {
+            _observer = observer;
+            Task.Run(() => ResponseLoop());
+        }
+
+        /// <summary>
+        /// Gets the underlying transport link.
+        /// </summary>
+        public ILink<T> Link
+        {
+            get { return _link; }
+        }
+
+        /// <summary>
+        /// Send the remote message.
+        /// </summary>
+        /// <param name="message">The message to send</param>
+        public void Send(T message)
+        {
+            if (message == null)
+            {
+                throw new ArgumentNullException("message");
+            }
+
+            _link.Write(message);
+        }
+
+        /// <summary>
+        /// Close all opened connections
+        /// </summary>
+        public void Dispose()
+        {
+            if (!_disposed)
+            {
+                _link.Dispose();
+                _disposed = true;
+            }
+        }
+
+        /// <summary>
+        /// Continually read responses from remote host
+        /// </summary>
+        private async Task ResponseLoop()
+        {
+            while (!_cancellationSource.IsCancellationRequested)
+            {
+                T message = await _link.ReadAsync(_cancellationSource.Token);
+                if (message == null)
+                {
+                    break;
+                }
+
+                TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link);
+                _observer.OnNext(transportEvent);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
new file mode 100644
index 0000000..3f05c17
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Server to handle incoming remote messages.
+    /// </summary>
+    /// <typeparam name="T">Generic Type of message. It is constrained to have implemented IWritable and IType interface</typeparam>
+    internal sealed class StreamingTransportServer<T> : IDisposable
+    {
+        private static readonly Logger LOGGER = Logger.GetLogger(typeof (TransportServer<>));
+
+        private TcpListener _listener;
+        private readonly CancellationTokenSource _cancellationSource;
+        private readonly IObserver<TransportEvent<T>> _remoteObserver;
+        private readonly ITcpPortProvider _tcpPortProvider;
+        private readonly IStreamingCodec<T> _streamingCodec;
+        private bool _disposed;
+        private Task _serverTask;
+
+        /// <summary>
+        /// Constructs a TransportServer to listen for remote events.  
+        /// Listens on the specified remote endpoint.  When it recieves a remote
+        /// event, it will envoke the specified remote handler.
+        /// </summary>
+        /// <param name="address">Endpoint addres to listen on</param>
+        /// <param name="remoteHandler">The handler to invoke when receiving incoming
+        /// remote messages</param>
+        /// <param name="tcpPortProvider">Find port numbers if listenport is 0</param>
+        /// <param name="streamingCodec">Streaming codec</param>
+        internal StreamingTransportServer(
+            IPAddress address,
+            IObserver<TransportEvent<T>> remoteHandler,
+            ITcpPortProvider tcpPortProvider,
+            IStreamingCodec<T> streamingCodec)
+        {
+            _listener = new TcpListener(address, 0);
+            _remoteObserver = remoteHandler;
+            _tcpPortProvider = tcpPortProvider;
+            _cancellationSource = new CancellationTokenSource();
+            _cancellationSource.Token.ThrowIfCancellationRequested();
+            _streamingCodec = streamingCodec;
+            _disposed = false;
+        }
+
+        /// <summary>
+        /// Returns the listening endpoint for the TransportServer 
+        /// </summary>
+        public IPEndPoint LocalEndpoint
+        {
+            get { return _listener.LocalEndpoint as IPEndPoint; }
+        }
+
+        /// <summary>
+        /// Starts listening for incoming remote messages.
+        /// </summary>
+        public void Run()
+        {
+            FindAPortAndStartListener();
+            _serverTask = Task.Run(() => StartServer());
+        }
+
+        private void FindAPortAndStartListener()
+        {
+            var foundAPort = false;
+            var exception = new SocketException((int)SocketError.AddressAlreadyInUse);
+            for (var enumerator = _tcpPortProvider.GetEnumerator();
+                !foundAPort && enumerator.MoveNext();
+                )
+            {
+                _listener = new TcpListener(LocalEndpoint.Address, enumerator.Current);
+                try
+                {
+                    _listener.Start();
+                    foundAPort = true;
+                }
+                catch (SocketException e)
+                {
+                    exception = e;
+                }
+            }
+            if (!foundAPort)
+            {
+                Exceptions.Throw(exception, "Could not find a port to listen on", LOGGER);
+            }
+            LOGGER.Log(Level.Info,
+                String.Format("Listening on {0}", _listener.LocalEndpoint.ToString()));
+        }
+
+
+        /// <summary>
+        /// Close the TransportServer and all open connections
+        /// </summary>
+        public void Dispose()
+        {
+            if (!_disposed)
+            {
+                _cancellationSource.Cancel();
+
+                try
+                {
+                    _listener.Stop();
+                }
+                catch (SocketException)
+                {
+                    LOGGER.Log(Level.Info, "Disposing of transport server before listener is created.");
+                }
+
+                if (_serverTask != null)
+                {
+                    _serverTask.Wait();
+
+                    // Give the TransportServer Task 500ms to shut down, ignore any timeout errors
+                    try
+                    {
+                        CancellationTokenSource serverDisposeTimeout = new CancellationTokenSource(500);
+                        _serverTask.Wait(serverDisposeTimeout.Token);
+                    }
+                    catch (Exception e)
+                    {
+                        Console.Error.WriteLine(e);
+                    }
+                    finally
+                    {
+                        _serverTask.Dispose();
+                    }
+                }
+            }
+
+            _disposed = true;
+        }
+
+        /// <summary>
+        /// Helper method to start TransportServer.  This will
+        /// be run in an asynchronous Task.
+        /// </summary>
+        /// <returns>An asynchronous Task for the running server.</returns>
+        private async Task StartServer()
+        {
+            try
+            {
+                while (!_cancellationSource.Token.IsCancellationRequested)
+                {
+                    TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false);
+                    ProcessClient(client).Forget();
+                }
+            }
+            catch (InvalidOperationException)
+            {
+                LOGGER.Log(Level.Info, "TransportServer has been closed.");
+            }
+            catch (OperationCanceledException)
+            {
+                LOGGER.Log(Level.Info, "TransportServer has been closed.");
+            }
+        }
+
+        /// <summary>
+        /// Recieves event from connected TcpClient and invokes handler on the event.
+        /// </summary>
+        /// <param name="client">The connected client</param>
+        private async Task ProcessClient(TcpClient client)
+        {
+            // Keep reading messages from client until they disconnect or timeout
+            CancellationToken token = _cancellationSource.Token;
+            using (ILink<T> link = new StreamingLink<T>(client, _streamingCodec))
+            {
+                while (!token.IsCancellationRequested)
+                {
+                    T message = await link.ReadAsync(token);
+
+                    if (message == null)
+                    {
+                        break;
+                    }
+
+                    TransportEvent<T> transportEvent = new TransportEvent<T>(message, link);
+                    _remoteObserver.OnNext(transportEvent);
+                }
+                LOGGER.Log(Level.Error,
+                    "ProcessClient close the Link. IsCancellationRequested: " + token.IsCancellationRequested);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs
new file mode 100644
index 0000000..1af9b86
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    // TODO: This class will be removed shortly and is used to reduce the size of pull request.
+    internal sealed class TemporaryWritableToStreamingCodec<T> : IStreamingCodec<T> where T:IWritable
+    {
+        private readonly IInjector _injector;
+
+        [Inject]
+        public TemporaryWritableToStreamingCodec(IInjector injector)
+        {
+            _injector = injector;
+        }
+
+        public T Read(IDataReader reader)
+        {
+            string type = reader.ReadString();
+            var value = (T) _injector.ForkInjector().GetInstance(type);
+            value.Read(reader);
+            return value;
+        }
+
+        public void Write(T obj, IDataWriter writer)
+        {
+            writer.WriteString(obj.GetType().AssemblyQualifiedName);
+            obj.Write(writer);
+        }
+
+        public async Task<T> ReadAsync(IDataReader reader, CancellationToken token)
+        {
+            string type = await reader.ReadStringAsync(token);
+            var value = (T) _injector.ForkInjector().GetInstance(type);
+            await value.ReadAsync(reader, token);
+            return value;
+        }
+
+        public async Task WriteAsync(T obj, IDataWriter writer, CancellationToken token)
+        {
+            await writer.WriteStringAsync(obj.GetType().AssemblyQualifiedName, token);
+            await obj.WriteAsync(writer, token);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs
deleted file mode 100644
index 9859338..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableLink.cs
+++ /dev/null
@@ -1,295 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Net;
-using System.Net.Sockets;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Exceptions;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Represents an open connection between remote hosts. This class is not thread safe
-    /// </summary>
-    /// <typeparam name="T">Generic Type of message. It is constrained to have implemented IWritable and IType interface</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public class WritableLink<T> : ILink<T> where T : IWritable
-    {
-        private static readonly Logger Logger = Logger.GetLogger(typeof (WritableLink<T>));
-
-        private readonly IPEndPoint _localEndpoint;
-        private bool _disposed;
-        private readonly NetworkStream _stream;
-        private readonly IInjector _injector;
-       
-
-        /// <summary>
-        /// Stream reader to be passed to IWritable
-        /// </summary>
-        private readonly StreamDataReader _reader;
-
-        /// <summary>
-        /// Stream writer from which to read from IWritable
-        /// </summary>
-        private readonly StreamDataWriter _writer;
-
-        /// <summary>
-        /// Constructs a Link object.
-        /// Connects to the specified remote endpoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The remote endpoint to connect to</param>
-        /// <param name="injector">The injector to pass arguments to incoming messages</param>
-        public WritableLink(IPEndPoint remoteEndpoint, IInjector injector)
-        {
-            if (remoteEndpoint == null)
-            {
-                throw new ArgumentNullException("remoteEndpoint");
-            }
-
-            Client = new TcpClient();
-            Client.Connect(remoteEndpoint);
-
-            _stream = Client.GetStream();
-            _localEndpoint = GetLocalEndpoint();
-            _disposed = false;
-            _reader = new StreamDataReader(_stream);
-            _writer = new StreamDataWriter(_stream);
-            _injector = injector;
-        }
-
-        /// <summary>
-        /// Constructs a Link object.
-        /// Uses the already connected TcpClient.
-        /// </summary>
-        /// <param name="client">The already connected client</param>
-        /// <param name="injector">The injector to pass arguments to incoming messages</param>
-        public WritableLink(TcpClient client, IInjector injector)
-        {
-            if (client == null)
-            {
-                throw new ArgumentNullException("client");
-            }
-
-            Client = client;
-            _stream = Client.GetStream();
-            _localEndpoint = GetLocalEndpoint();
-            _disposed = false;
-            _reader = new StreamDataReader(_stream);
-            _writer = new StreamDataWriter(_stream);
-            _injector = injector;
-        }
-
-        /// <summary>
-        /// Returns the local socket address
-        /// </summary>
-        public IPEndPoint LocalEndpoint
-        {
-            get { return _localEndpoint; }
-        }
-
-        /// <summary>
-        /// Returns the remote socket address
-        /// </summary>
-        public IPEndPoint RemoteEndpoint
-        {
-            get { return (IPEndPoint) Client.Client.RemoteEndPoint; }
-        }
-
-        /// <summary>
-        /// Gets the underlying TcpClient
-        /// </summary>
-        public TcpClient Client { get; private set; }
-
-        /// <summary>
-        /// Writes the message to the remote host
-        /// </summary>
-        /// <param name="value">The data to write</param>
-        public void Write(T value)
-        {
-            if (value == null)
-            {
-                throw new ArgumentNullException("value");
-            }
-            if (_disposed)
-            {
-                Exceptions.Throw(new IllegalStateException("Link has been closed."), Logger);
-            }
-
-            _writer.WriteString(value.GetType().AssemblyQualifiedName);
-            value.Write(_writer);
-        }
-
-        /// <summary>
-        /// Writes the value to this link asynchronously
-        /// </summary>
-        /// <param name="value">The data to write</param>
-        /// <param name="token">The cancellation token</param>
-        public async Task WriteAsync(T value, CancellationToken token)
-        {
-            if (_disposed)
-            {
-                Exceptions.Throw(new IllegalStateException("Link has been closed."), Logger);
-            }
-
-            await _writer.WriteStringAsync(value.GetType().AssemblyQualifiedName, token);
-            await value.WriteAsync(_writer, token);
-        }
-
-        /// <summary>
-        /// Reads the value from the link synchronously
-        /// </summary>
-        public T Read()
-        {
-            if (_disposed)
-            {
-                Exceptions.Throw(new IllegalStateException("Link has been disposed."), Logger);
-            }
-
-            string dataType = _reader.ReadString();
-
-            if (dataType == null)
-            {
-                return default(T);
-            }
-
-            try
-            {
-                T value = (T) _injector.ForkInjector().GetInstance(dataType);
-                value.Read(_reader);
-                return value;
-            }
-            catch (InjectionException)
-            {
-                return default(T);
-            }
-        }
-
-        /// <summary>
-        /// Reads the value from the link asynchronously
-        /// </summary>
-        /// <param name="token">The cancellation token</param>
-        public async Task<T> ReadAsync(CancellationToken token)
-        {
-            if (_disposed)
-            {
-                Exceptions.Throw(new IllegalStateException("Link has been disposed."), Logger);
-            }
-
-            string dataType = "";
-
-            dataType = await _reader.ReadStringAsync(token);
-
-            if (dataType == null)
-            {
-                return default(T);
-            }
-
-            try
-            {
-                T value = (T) _injector.ForkInjector().GetInstance(dataType);
-                await value.ReadAsync(_reader, token);
-                return value;
-            }
-            catch (InjectionException)
-            {
-                return default(T);
-            }
-        }
-
-        /// <summary>
-        /// Close the client connection
-        /// </summary>
-        public void Dispose()
-        {
-            Dispose(true);
-            GC.SuppressFinalize(this);
-        }
-
-        /// <summary>
-        /// Subclasses of Links should overwrite this to handle disposing
-        /// of the link
-        /// </summary>
-        /// <param name="disposing">To dispose or not</param>
-        public virtual void Dispose(bool disposing)
-        {
-            if (_disposed)
-            {
-                return;
-            }
-
-            if (disposing)
-            {
-                try
-                {
-                    Client.GetStream().Close();
-                }
-                catch (InvalidOperationException)
-                {
-                    Logger.Log(Level.Warning, "failed to close stream on a non-connected socket.");
-                }
-
-                Client.Close();
-            }
-            _disposed = true;
-        }
-
-        /// <summary>
-        /// Overrides Equals. Two Link objects are equal if they are connected
-        /// to the same remote endpoint.
-        /// </summary>
-        /// <param name="obj">The object to compare</param>
-        /// <returns>True if the object is equal to this Link, otherwise false</returns>
-        public override bool Equals(object obj)
-        {
-            Link<T> other = obj as Link<T>;
-            if (other == null)
-            {
-                return false;
-            }
-
-            return other.RemoteEndpoint.Equals(RemoteEndpoint);
-        }
-
-        /// <summary>
-        /// Gets the hash code for the Link object.
-        /// </summary>
-        /// <returns>The object's hash code</returns>
-        public override int GetHashCode()
-        {
-            return RemoteEndpoint.GetHashCode();
-        }
-
-        /// <summary>
-        /// Discovers the IPEndpoint for the current machine.
-        /// </summary>
-        /// <returns>The local IPEndpoint</returns>
-        private IPEndPoint GetLocalEndpoint()
-        {
-            IPAddress address = NetworkUtils.LocalIPAddress;
-            int port = ((IPEndPoint) Client.Client.LocalEndPoint).Port;
-            return new IPEndPoint(address, port);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs
deleted file mode 100644
index 9790e3e..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableObserverContainer.cs
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Net;
-using Org.Apache.REEF.Wake.Util;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Stores registered IObservers for DefaultRemoteManager.
-    /// Can register and look up IObservers by remote IPEndPoint.
-    /// </summary>
-    /// <typeparam name="T">Message type T. It is assumed to be IWritable</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    internal class WritableObserverContainer<T> : IObserver<TransportEvent<IWritableRemoteEvent<T>>> where T : IWritable
-    {
-        private static readonly Logger Logger = Logger.GetLogger(typeof(WritableObserverContainer<>));
-        private readonly ConcurrentDictionary<IPEndPoint, IObserver<T>> _endpointMap;
-        private readonly ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>> _typeMap;
-        private IObserver<T> _universalObserver;
-
-        /// <summary>
-        /// Constructs a new ObserverContainer used to manage remote IObservers.
-        /// </summary>
-        public WritableObserverContainer()
-        {
-            _endpointMap = new ConcurrentDictionary<IPEndPoint, IObserver<T>>(new IPEndPointComparer());
-            _typeMap = new ConcurrentDictionary<Type, IObserver<IRemoteMessage<T>>>();
-        }
-
-        /// <summary>
-        /// Registers an IObserver used to handle incoming messages from the remote host
-        /// at the specified IPEndPoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
-        /// <param name="observer">The IObserver to handle incoming messages</param>
-        /// <returns>An IDisposable used to unregister the observer with</returns>
-        public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer) 
-        {
-            if (remoteEndpoint.Address.Equals(IPAddress.Any))
-            {
-                _universalObserver = observer;
-                return Disposable.Create(() => { _universalObserver = null; });
-            }
-
-            _endpointMap[remoteEndpoint] = observer;
-            return Disposable.Create(() => _endpointMap.TryRemove(remoteEndpoint, out observer));
-        }
-
-        /// <summary>
-        /// Registers an IObserver to handle incoming messages from a remote host
-        /// </summary>
-        /// <param name="observer">The IObserver to handle incoming messages</param>
-        /// <returns>An IDisposable used to unregister the observer with</returns>
-        public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
-        {
-            _typeMap[typeof(T)] = observer;
-            return Disposable.Create(() => _typeMap.TryRemove(typeof(T), out observer));
-        }
-
-        /// <summary>
-        /// Look up the IObserver for the registered IPEndPoint or event type 
-        /// and execute the IObserver.
-        /// </summary>
-        /// <param name="transportEvent">The incoming remote event</param>
-        public void OnNext(TransportEvent<IWritableRemoteEvent<T>> transportEvent)
-        {
-            IWritableRemoteEvent<T> remoteEvent = transportEvent.Data;
-            remoteEvent.LocalEndPoint = transportEvent.Link.LocalEndpoint;
-            remoteEvent.RemoteEndPoint = transportEvent.Link.RemoteEndpoint;
-            T value = remoteEvent.Value;
-            bool handled = false;
-
-            IObserver<T> observer1;
-            IObserver<IRemoteMessage<T>> observer2;
-            if (_universalObserver != null)
-            {
-                _universalObserver.OnNext(value);
-                handled = true;
-            }
-            if (_endpointMap.TryGetValue(remoteEvent.RemoteEndPoint, out observer1))
-            {
-                // IObserver was registered by IPEndpoint
-                observer1.OnNext(value);
-                handled = true;
-            } 
-            else if (_typeMap.TryGetValue(value.GetType(), out observer2))
-            {
-                // IObserver was registered by event type
-                IRemoteIdentifier id = new SocketRemoteIdentifier(remoteEvent.RemoteEndPoint);
-                IRemoteMessage<T> remoteMessage = new DefaultRemoteMessage<T>(id, value);
-                observer2.OnNext(remoteMessage);
-                handled = true;
-            }
-
-            if (!handled)
-            {
-                throw new WakeRuntimeException("Unrecognized Wake RemoteEvent message");
-            }
-        }
-
-        public void OnError(Exception error)
-        {
-            throw error;
-        }
-
-        public void OnCompleted()
-        {
-            Logger.Log(Level.Info, "Exiting the Writable Observer Container");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
deleted file mode 100644
index b3664d0..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteEvent.cs
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Net;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Interface;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Writable remote event class
-    /// </summary>
-    /// <typeparam name="T">Type of remote event message. It is assumed that T implements IWritable</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    internal sealed class WritableRemoteEvent<T> : IWritableRemoteEvent<T> where T : IWritable
-    {
-        private readonly IInjector _injector;
-
-        /// <summary>
-        /// Creates the Remote Event
-        /// </summary>
-        /// <param name="localEndpoint">Local Address</param>
-        /// <param name="remoteEndpoint">Remote Address</param>
-        /// <param name="value">Actual message</param>
-        public WritableRemoteEvent(IPEndPoint localEndpoint, IPEndPoint remoteEndpoint, T value)
-        {
-            LocalEndPoint = localEndpoint;
-            RemoteEndPoint = remoteEndpoint;
-            Value = value;
-        }
-
-        /// <summary>
-        /// Creates empty Remote Event
-        /// </summary>
-        [Inject]
-        public WritableRemoteEvent(IInjector injector)
-        {
-            _injector = injector;
-        }
-
-        /// <summary>
-        /// Local Address
-        /// </summary>
-        public IPEndPoint LocalEndPoint { get; set; }
-
-        /// <summary>
-        /// Remote Address
-        /// </summary>
-        public IPEndPoint RemoteEndPoint { get; set; }
-
-        /// <summary>
-        /// The actual message
-        /// </summary>
-        public T Value { get; set; }
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        public void Read(IDataReader reader)
-        {
-            Value = (T)_injector.ForkInjector().GetInstance(typeof(T));
-            Value.Read(reader);         
-        }
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        public void Write(IDataWriter writer)
-        {
-            Value.Write(writer);           
-        }
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        /// <param name="token">The cancellation token</param>
-        public async Task ReadAsync(IDataReader reader, CancellationToken token)
-        {
-            Value = (T)_injector.ForkInjector().GetInstance(typeof(T));
-            await Value.ReadAsync(reader, token);      
-        }
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        /// <param name="token">The cancellation token</param>
-        public async Task WriteAsync(IDataWriter writer, CancellationToken token)
-        {
-            await Value.WriteAsync(writer, token);    
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
deleted file mode 100644
index 73d8bb6..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Net;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Manages incoming and outgoing messages between remote hosts.
-    /// </summary>
-    /// <typeparam name="T">Message type T. It is assumed to be IWritable</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public sealed class WritableRemoteManager<T> : IRemoteManager<T> where T : IWritable
-    {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof (WritableRemoteManager<T>));
-
-        private readonly WritableObserverContainer<T> _observerContainer;
-        private readonly WritableTransportServer<IWritableRemoteEvent<T>> _server;
-        private readonly Dictionary<IPEndPoint, ProxyObserver> _cachedClients;
-        private readonly IInjector _injector;
-
-        /// <summary>
-        /// Constructs a DefaultRemoteManager listening on the specified address and
-        /// a specific port.
-        /// </summary>
-        /// <param name="localAddress">The address to listen on</param>
-        /// <param name="port">The port to listen on</param>
-        /// <param name="tcpPortProvider">Tcp port provider</param>
-        /// <param name="injector">The injector to pass arguments to incoming messages</param>
-        [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
-        public WritableRemoteManager(IPAddress localAddress, int port, ITcpPortProvider tcpPortProvider, IInjector injector)
-        {
-            if (localAddress == null)
-            {
-                throw new ArgumentNullException("localAddress");
-            }
-            if (port < 0)
-            {
-                throw new ArgumentException("Listening port must be greater than or equal to zero");
-            }
-
-            _observerContainer = new WritableObserverContainer<T>();
-            _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
-            _injector = injector;
-
-            IPEndPoint localEndpoint = new IPEndPoint(localAddress, port);
-
-            // Begin to listen for incoming messages
-            _server = new WritableTransportServer<IWritableRemoteEvent<T>>(localEndpoint, _observerContainer, tcpPortProvider, injector);
-            _server.Run();
-
-            LocalEndpoint = _server.LocalEndpoint;  
-            Identifier = new SocketRemoteIdentifier(LocalEndpoint);
-        }
-
-        /// <summary>
-        /// Constructs a DefaultRemoteManager. Does not listen for incoming messages.
-        /// </summary>
-        /// <param name="injector">The injector to pass arguments to incoming messages</param>
-        [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)]
-        public WritableRemoteManager(IInjector injector)
-        {
-            using (LOGGER.LogFunction("WritableRemoteManager::WritableRemoteManager"))
-            {
-                _observerContainer = new WritableObserverContainer<T>();
-                _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>();
-                _injector = injector;
-
-                LocalEndpoint = new IPEndPoint(NetworkUtils.LocalIPAddress, 0);
-                Identifier = new SocketRemoteIdentifier(LocalEndpoint);
-            }
-        }
-
-        /// <summary>
-        /// Gets the RemoteIdentifier for the DefaultRemoteManager
-        /// </summary>
-        public IRemoteIdentifier Identifier { get; private set; }
-
-        /// <summary>
-        /// Gets the local IPEndPoint for the DefaultRemoteManager
-        /// </summary>
-        public IPEndPoint LocalEndpoint { get; private set; }
-
-        /// <summary>
-        /// Returns an IObserver used to send messages to the remote host at
-        /// the specified IPEndpoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
-        /// <returns>An IObserver used to send messages to the remote host</returns>
-        public IObserver<T> GetRemoteObserver(RemoteEventEndPoint<T> remoteEndpoint)
-        {
-            if (remoteEndpoint == null)
-            {
-                throw new ArgumentNullException("remoteEndpoint");
-            }
-
-            SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
-            if (id == null)
-            {
-                throw new ArgumentException("ID not supported");
-            }
-
-            return GetRemoteObserver(id.Addr);
-        }
-
-        /// <summary>
-        /// Returns an IObserver used to send messages to the remote host at
-        /// the specified IPEndpoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The IPEndpoint of the remote host</param>
-        /// <returns>An IObserver used to send messages to the remote host</returns>
-        public IObserver<T> GetRemoteObserver(IPEndPoint remoteEndpoint)
-        {
-            if (remoteEndpoint == null)
-            {
-                throw new ArgumentNullException("remoteEndpoint");
-            }
-
-            ProxyObserver remoteObserver;
-            if (!_cachedClients.TryGetValue(remoteEndpoint, out remoteObserver))
-            {
-                WritableTransportClient<IWritableRemoteEvent<T>> client =
-                    new WritableTransportClient<IWritableRemoteEvent<T>>(remoteEndpoint, _observerContainer, _injector);
-
-                remoteObserver = new ProxyObserver(client);
-                _cachedClients[remoteEndpoint] = remoteObserver;
-            }
-
-            return remoteObserver;
-        }
-
-        /// <summary>
-        /// Registers an IObserver used to handle incoming messages from the remote host
-        /// at the specified IPEndPoint.
-        /// The IDisposable that is returned can be used to unregister the IObserver.
-        /// </summary>
-        /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
-        /// <param name="observer">The IObserver to handle incoming messages</param>
-        /// <returns>An IDisposable used to unregister the observer with</returns>
-        public IDisposable RegisterObserver(RemoteEventEndPoint<T> remoteEndpoint, IObserver<T> observer)
-        {
-            if (remoteEndpoint == null)
-            {
-                throw new ArgumentNullException("remoteEndpoint");
-            }
-
-            if (observer == null)
-            {
-                throw new ArgumentNullException("observer");
-            }
-
-            SocketRemoteIdentifier id = remoteEndpoint.Id as SocketRemoteIdentifier;
-            if (id == null)
-            {
-                throw new ArgumentException("ID not supported");
-            }
-
-            return RegisterObserver(id.Addr, observer);
-        }
-
-        /// <summary>
-        /// Registers an IObserver used to handle incoming messages from the remote host
-        /// at the specified IPEndPoint.
-        /// The IDisposable that is returned can be used to unregister the IObserver.
-        /// </summary>
-        /// <param name="remoteEndpoint">The IPEndPoint of the remote host</param>
-        /// <param name="observer">The IObserver to handle incoming messages</param>
-        /// <returns>An IDisposable used to unregister the observer with</returns>
-        public IDisposable RegisterObserver(IPEndPoint remoteEndpoint, IObserver<T> observer)
-        {
-            if (remoteEndpoint == null)
-            {
-                throw new ArgumentNullException("remoteEndpoint");
-            }
-            if (observer == null)
-            {
-                throw new ArgumentNullException("observer");
-            }
-
-            return _observerContainer.RegisterObserver(remoteEndpoint, observer);
-        }
-
-        /// <summary>
-        /// Registers an IObserver used to handle incoming messages from the remote host
-        /// at the specified IPEndPoint.
-        /// The IDisposable that is returned can be used to unregister the IObserver.
-        /// </summary>
-        /// <param name="observer">The IObserver to handle incoming messages</param>
-        /// <returns>An IDisposable used to unregister the observer with</returns>
-        public IDisposable RegisterObserver(IObserver<IRemoteMessage<T>> observer)
-        {
-            if (observer == null)
-            {
-                throw new ArgumentNullException("observer");
-            }
-
-            return _observerContainer.RegisterObserver(observer);
-        }
-        
-        /// <summary>
-        /// Release all resources for the DefaultRemoteManager.
-        /// </summary>
-        public void Dispose()
-        {
-            foreach (ProxyObserver cachedClient in _cachedClients.Values)
-            {
-                cachedClient.Dispose();
-            }
-
-            if (_server != null)
-            {
-                _server.Dispose();
-            }
-        }
-
-        /// <summary>
-        /// Observer to send messages to connected remote host
-        /// </summary>
-        private class ProxyObserver : IObserver<T>, IDisposable
-        {
-            private readonly WritableTransportClient<IWritableRemoteEvent<T>> _client;
-
-            /// <summary>
-            /// Create new ProxyObserver
-            /// </summary>
-            /// <param name="client">The connected WritableTransport client used to send
-            /// messages to remote host</param>
-            public ProxyObserver(WritableTransportClient<IWritableRemoteEvent<T>> client)
-            {
-                _client = client;
-            }
-
-            /// <summary>
-            /// Send the message to the remote host
-            /// </summary>
-            /// <param name="message">The message to send</param>
-            public void OnNext(T message)
-            {
-                IWritableRemoteEvent<T> remoteEvent = new WritableRemoteEvent<T>(_client.Link.LocalEndpoint,
-                    _client.Link.RemoteEndpoint,
-                    message);
-
-                _client.Send(remoteEvent);
-            }
-
-            /// <summary>
-            /// Close underlying WritableTransport client
-            /// </summary>
-            public void Dispose()
-            {
-                _client.Dispose();
-            }
-
-            public void OnError(Exception error)
-            {
-                throw error;
-            }
-
-            public void OnCompleted()
-            {
-                throw new NotImplementedException();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
deleted file mode 100644
index 52fef8d..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Net;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Interface;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
-    /// <summary>
-    /// WritableRemoteManagerFactory for WritableRemoteManager.
-    /// </summary>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public sealed class WritableRemoteManagerFactory
-    {
-        private readonly ITcpPortProvider _tcpPortProvider;
-        private readonly IInjector _injector;
-
-        [Inject]  
-        private WritableRemoteManagerFactory(ITcpPortProvider tcpPortProvider, IInjector injector)
-        {
-            _tcpPortProvider = tcpPortProvider;
-            _injector = injector;
-        }
-
-        public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable
-        {
-#pragma warning disable 618
-// This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
-            return new WritableRemoteManager<T>(localAddress, port, _tcpPortProvider, _injector);
-#pragma warning disable 618
-        }
-
-        public IRemoteManager<T> GetInstance<T>() where T : IWritable
-        {
-#pragma warning disable 618
-// This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
-            return new WritableRemoteManager<T>(_injector);
-#pragma warning disable 618
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs
deleted file mode 100644
index b245f0f..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportClient.cs
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Net;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Establish connections to TransportServer for remote message passing
-    /// </summary>
-    /// <typeparam name="T">Generic Type of message. It is constrained to have implemented IWritable and IType interface</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public class WritableTransportClient<T> : IDisposable where T : IWritable
-    {
-        private readonly ILink<T> _link;
-        private readonly IObserver<TransportEvent<T>> _observer;
-        private readonly CancellationTokenSource _cancellationSource;
-        private bool _disposed;
-        private static readonly Logger Logger = Logger.GetLogger(typeof(WritableTransportClient<T>));
-
-        /// <summary>
-        /// Construct a TransportClient.
-        /// Used to send messages to the specified remote endpoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
-        /// <param name="injector">The injector to pass arguments to incoming messages</param>
-        public WritableTransportClient(IPEndPoint remoteEndpoint, IInjector injector)
-        {
-            Exceptions.ThrowIfArgumentNull(remoteEndpoint, "remoteEndpoint", Logger);
-
-            _link = new WritableLink<T>(remoteEndpoint, injector);
-            _cancellationSource = new CancellationTokenSource();
-            _disposed = false;
-        }
-
-        /// <summary>
-        /// Construct a TransportClient.
-        /// Used to send messages to the specified remote endpoint.
-        /// </summary>
-        /// <param name="remoteEndpoint">The endpoint of the remote server to connect to</param>
-        /// <param name="observer">Callback used when receiving responses from remote host</param>
-        /// <param name="injector">The injector to pass arguments to incoming messages</param>
-        public WritableTransportClient(IPEndPoint remoteEndpoint,
-            IObserver<TransportEvent<T>> observer,
-            IInjector injector)
-            : this(remoteEndpoint, injector)
-        {
-            _observer = observer;
-            Task.Run(() => ResponseLoop());
-        }
-
-        /// <summary>
-        /// Gets the underlying transport link.
-        /// </summary>
-        public ILink<T> Link
-        {
-            get { return _link; }
-        }
-
-        /// <summary>
-        /// Send the remote message.
-        /// </summary>
-        /// <param name="message">The message to send</param>
-        public void Send(T message)
-        {
-            if (message == null)
-            {
-                throw new ArgumentNullException("message");
-            }
-
-            _link.Write(message);
-        }
-
-        /// <summary>
-        /// Close all opened connections
-        /// </summary>
-        public void Dispose()
-        {
-            Dispose(true);
-            GC.SuppressFinalize(this);
-        }
-
-        protected void Dispose(bool disposing)
-        {
-            if (!_disposed && disposing)
-            {
-                _link.Dispose();
-                _disposed = true;
-            }
-        }
-
-        /// <summary>
-        /// Continually read responses from remote host
-        /// </summary>
-        private async Task ResponseLoop()
-        {
-            while (!_cancellationSource.IsCancellationRequested)
-            {
-                T message = await _link.ReadAsync(_cancellationSource.Token);
-                if (message == null)
-                {
-                    break;
-                }
-
-                TransportEvent<T> transportEvent = new TransportEvent<T>(message, _link);
-                _observer.OnNext(transportEvent);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
deleted file mode 100644
index 6b5961f..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Net;
-using System.Net.Sockets;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
-    /// <summary>
-    /// Server to handle incoming remote messages.
-    /// </summary>
-    /// <typeparam name="T">Generic Type of message. It is constrained to have implemented IWritable and IType interface</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public class WritableTransportServer<T> : IDisposable where T : IWritable
-    {
-        private static readonly Logger LOGGER = Logger.GetLogger(typeof (TransportServer<>));
-
-        private TcpListener _listener;
-        private readonly CancellationTokenSource _cancellationSource;
-        private readonly IObserver<TransportEvent<T>> _remoteObserver;
-        private readonly ITcpPortProvider _tcpPortProvider;
-        private readonly IInjector _injector;
-        private bool _disposed;
-        private Task _serverTask;
-        /// <summary>
-        /// Constructs a TransportServer to listen for remote events.  
-        /// Listens on the specified remote endpoint.  When it recieves a remote
-        /// event, it will envoke the specified remote handler.
-        /// </summary>
-        /// <param name="port">Port to listen on</param>
-        /// <param name="remoteHandler">The handler to invoke when receiving incoming
-        /// remote messages</param>
-        /// <param name="tcpPortProvider">Find port numbers if listenport is 0</param>
-        /// <param name="injector">The injector to pass arguments to incoming messages</param>
-        public WritableTransportServer(int port, IObserver<TransportEvent<T>> remoteHandler, ITcpPortProvider tcpPortProvider, IInjector injector)
-            : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler, tcpPortProvider, injector)
-        {
-        }
-
-        /// <summary>
-        /// Constructs a TransportServer to listen for remote events.  
-        /// Listens on the specified remote endpoint.  When it recieves a remote
-        /// event, it will envoke the specified remote handler.
-        /// </summary>
-        /// <param name="localEndpoint">Endpoint to listen on</param>
-        /// <param name="remoteHandler">The handler to invoke when receiving incoming
-        /// remote messages</param>
-        /// <param name="tcpPortProvider">Find port numbers if listenport is 0</param>
-        /// <param name="injector">The injector to pass arguments to incoming messages</param>
-        public WritableTransportServer(
-            IPEndPoint localEndpoint,
-            IObserver<TransportEvent<T>> remoteHandler,
-            ITcpPortProvider tcpPortProvider,
-            IInjector injector)
-        {
-            _listener = new TcpListener(localEndpoint.Address, localEndpoint.Port);
-            _remoteObserver = remoteHandler;
-            _tcpPortProvider = tcpPortProvider;
-            _cancellationSource = new CancellationTokenSource();
-            _cancellationSource.Token.ThrowIfCancellationRequested();
-            _injector = injector;
-            _disposed = false;
-        }
-
-        /// <summary>
-        /// Returns the listening endpoint for the TransportServer 
-        /// </summary>
-        public IPEndPoint LocalEndpoint
-        {
-            get { return _listener.LocalEndpoint as IPEndPoint; }
-        }
-
-        /// <summary>
-        /// Starts listening for incoming remote messages.
-        /// </summary>
-        public void Run()
-        {
-            if (LocalEndpoint.Port == 0)
-            {
-                FindAPortAndStartListener();
-            }
-            else
-            {
-                _listener.Start();
-            }
-
-            _serverTask = Task.Run(() => StartServer());
-        }
-
-        private void FindAPortAndStartListener()
-        {
-            var foundAPort = false;
-            var exception = new SocketException((int)SocketError.AddressAlreadyInUse);
-            for (var enumerator = _tcpPortProvider.GetEnumerator();
-                !foundAPort && enumerator.MoveNext();
-                )
-            {
-                _listener = new TcpListener(LocalEndpoint.Address, enumerator.Current);
-                try
-                {
-                    _listener.Start();
-                    foundAPort = true;
-                }
-                catch (SocketException e)
-                {
-                    exception = e;
-                }
-            }
-            if (!foundAPort)
-            {
-                Exceptions.Throw(exception, "Could not find a port to listen on", LOGGER);
-            }
-            LOGGER.Log(Level.Info,
-                String.Format("Listening on {0}", _listener.LocalEndpoint.ToString()));
-        }
-
-
-        /// <summary>
-        /// Close the TransportServer and all open connections
-        /// </summary>
-        public void Dispose()
-        {
-            Dispose(true);
-            GC.SuppressFinalize(this);
-        }
-
-        public void Dispose(bool disposing)
-        {
-            if (!_disposed && disposing)
-            {
-                _cancellationSource.Cancel();
-
-                try
-                {
-                    _listener.Stop();
-                }
-                catch (SocketException)
-                {
-                    LOGGER.Log(Level.Info, "Disposing of transport server before listener is created.");
-                }
-
-                if (_serverTask != null)
-                {
-                    _serverTask.Wait();
-
-                    // Give the TransportServer Task 500ms to shut down, ignore any timeout errors
-                    try
-                    {
-                        CancellationTokenSource serverDisposeTimeout = new CancellationTokenSource(500);
-                        _serverTask.Wait(serverDisposeTimeout.Token);
-                    }
-                    catch (Exception e)
-                    {
-                        Console.Error.WriteLine(e);
-                    }
-                    finally
-                    {
-                        _serverTask.Dispose();
-                    }
-                }
-            }
-
-            _disposed = true;
-        }
-
-        /// <summary>
-        /// Helper method to start TransportServer.  This will
-        /// be run in an asynchronous Task.
-        /// </summary>
-        /// <returns>An asynchronous Task for the running server.</returns>
-        private async Task StartServer()
-        {
-            try
-            {
-                while (!_cancellationSource.Token.IsCancellationRequested)
-                {
-                    TcpClient client = await _listener.AcceptTcpClientAsync().ConfigureAwait(false);
-                    ProcessClient(client).Forget();
-                }
-            }
-            catch (InvalidOperationException)
-            {
-                LOGGER.Log(Level.Info, "TransportServer has been closed.");
-            }
-            catch (OperationCanceledException)
-            {
-                LOGGER.Log(Level.Info, "TransportServer has been closed.");
-            }
-        }
-
-        /// <summary>
-        /// Recieves event from connected TcpClient and invokes handler on the event.
-        /// </summary>
-        /// <param name="client">The connected client</param>
-        private async Task ProcessClient(TcpClient client)
-        {
-            
-            // Keep reading messages from client until they disconnect or timeout
-            CancellationToken token = _cancellationSource.Token;
-            using (ILink<T> link = new WritableLink<T>(client, _injector))
-            {
-                while (!token.IsCancellationRequested)
-                {
-                    T message = await link.ReadAsync(token);
-
-                    if (message == null)
-                    {
-                        break;
-                    }
-
-                    TransportEvent<T> transportEvent = new TransportEvent<T>(message, link);
-                    _remoteObserver.OnNext(transportEvent);
-                }
-                LOGGER.Log(Level.Error,
-                    "ProcessClient close the Link. IsCancellationRequested: " + token.IsCancellationRequested);
-            }
-        }
-    }
-}
\ No newline at end of file


[2/2] incubator-reef git commit: [REEF-422] Convert Wake Layer from Writable to Streaming

Posted by we...@apache.org.
[REEF-422] Convert Wake Layer from Writable to Streaming

This addressed the issue by
  * using `StreamingCodec` in WritableLink instead of `Writable` by
    reflection.
  * Removing Writable constraint on generic in `WritableLink`,
    `WritableTransportServer`, `WritableTransportClient`,
    `WritableRemoteManager`
  * Introduce `TemporaryWritableToStreamingCodec` so that Network
    Service layer can still use `Writable` interface.

`StreamingCodec` is assumed to have at most static argument parameters.
That is, it cannot have parameters that change from one message to
another.

JIRA:
  [REEF-422](https://issues.apache.org/jira/browse/REEF-422)

Pull Request:
  This closes #261


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/ba2653d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/ba2653d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/ba2653d6

Branch: refs/heads/master
Commit: ba2653d6bfac3aabaf640474efbdfcb18c87baf8
Parents: ec9b497
Author: Dhruv <dh...@gmail.com>
Authored: Tue Jun 30 11:01:06 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Tue Jul 7 13:56:04 2015 -0700

----------------------------------------------------------------------
 .../NetworkService/WritableNetworkService.cs    |   2 +-
 .../Org.Apache.REEF.Wake.Tests.csproj           |   5 +-
 .../PrefixedStringWritable.cs                   | 108 -----
 .../StreamingRemoteManagerTest.cs               | 340 +++++++++++++++
 .../StreamingTransportTest.cs                   | 177 ++++++++
 .../WritableRemoteManagerTest.cs                | 437 -------------------
 .../WritableTransportTest.cs                    | 223 ----------
 .../Org.Apache.REEF.Wake.csproj                 |  15 +-
 .../Remote/IWritableRemoteEvent.cs              |  45 --
 .../Remote/Impl/RemoteEvent.cs                  |   4 +
 .../Remote/Impl/RemoteEventStreamingCodec.cs    |  82 ++++
 .../Remote/Impl/StreamingLink.cs                | 258 +++++++++++
 .../Remote/Impl/StreamingRemoteManager.cs       | 256 +++++++++++
 .../Impl/StreamingRemoteManagerFactory.cs       |  51 +++
 .../Remote/Impl/StreamingTransportClient.cs     | 125 ++++++
 .../Remote/Impl/StreamingTransportServer.cs     | 212 +++++++++
 .../Impl/TemporaryWritableToStreamingCodec.cs   |  70 +++
 .../Remote/Impl/WritableLink.cs                 | 295 -------------
 .../Remote/Impl/WritableObserverContainer.cs    | 132 ------
 .../Remote/Impl/WritableRemoteEvent.cs          | 115 -----
 .../Remote/Impl/WritableRemoteManager.cs        | 286 ------------
 .../Remote/Impl/WritableRemoteManagerFactory.cs |  59 ---
 .../Remote/Impl/WritableTransportClient.cs      | 132 ------
 .../Remote/Impl/WritableTransportServer.cs      | 244 -----------
 24 files changed, 1585 insertions(+), 2088 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
index f383697..7d9d015 100644
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
@@ -62,7 +62,7 @@ namespace Org.Apache.REEF.Network.NetworkService
             IObserver<WritableNsMessage<T>> messageHandler,
             IIdentifierFactory idFactory,
             INameClient nameClient,
-            WritableRemoteManagerFactory remoteManagerFactory)
+            StreamingRemoteManagerFactory remoteManagerFactory)
         {
  
             IPAddress localAddress = NetworkUtils.LocalIPAddress;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index 581508b..babc26d 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -46,12 +46,11 @@ under the License.
   <ItemGroup>
     <Compile Include="ClockTest.cs" />
     <Compile Include="MultiCodecTest.cs" />
-    <Compile Include="PrefixedStringWritable.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="PubSubSubjectTest.cs" />
     <Compile Include="RemoteManagerTest.cs" />
-    <Compile Include="WritableRemoteManagerTest.cs" />
-    <Compile Include="WritableTransportTest.cs" />
+    <Compile Include="StreamingRemoteManagerTest.cs" />
+    <Compile Include="StreamingTransportTest.cs" />
     <Compile Include="TransportTest.cs" />
     <Compile Include="WritableString.cs" />
   </ItemGroup>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs
deleted file mode 100644
index dbb8af3..0000000
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/PrefixedStringWritable.cs
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Wake.Tests
-{
-    [NamedParameter("identifier in PrefixedWritable")]
-    public class StringId : Name<int>
-    {
-    }
-       
-    /// <summary>
-    /// Writable wrapper around the string class which takes integer prefix
-    /// This class is used to test non empty injector in TransportServer and Client
-    /// </summary>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public class PrefixedStringWritable : IWritable
-    {
-        private readonly int _id;
-        private string _data;
-
-        /// <summary>
-        /// Returns the actual string data
-        /// </summary>
-        public string Data
-        {
-            get { return _data + "_" + _id; }
-            set { _data = value; }
-        }
-
-        /// <summary>
-        /// Empty constructor for instantiation with reflection
-        /// </summary>
-        [Inject]
-        public PrefixedStringWritable([Parameter(typeof(StringId))] int id)
-        {
-            _id = id;
-        }
-
-        /// <summary>
-        /// Constructor
-        /// </summary>
-        /// <param name="data">The string data</param>
-        public PrefixedStringWritable(string data)
-        {
-            _data = data;
-        }
-
-        /// <summary>
-        /// Reads the string
-        /// </summary>
-        /// <param name="reader">reader to read from</param>
-        public void Read(IDataReader reader)
-        {
-            _data = reader.ReadString();
-        }
-
-        /// <summary>
-        /// Writes the string
-        /// </summary>
-        /// <param name="writer">Writer to write</param>
-        public void Write(IDataWriter writer)
-        {
-            writer.WriteString(_data);
-        }
-
-        /// <summary>
-        /// Reads the string
-        /// </summary>
-        /// <param name="reader">reader to read from</param>
-        /// <param name="token">the cancellation token</param>
-        public async Task ReadAsync(IDataReader reader, CancellationToken token)
-        {
-            _data = await reader.ReadStringAsync(token);
-        }
-
-        /// <summary>
-        /// Writes the string
-        /// </summary>
-        /// <param name="writer">Writer to write</param>
-        /// <param name="token">the cancellation token</param>
-        public async Task WriteAsync(IDataWriter writer, CancellationToken token)
-        {
-            await writer.WriteStringAsync(_data, token);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
new file mode 100644
index 0000000..20f75be
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Net;
+using System.Reactive;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+    [TestClass]
+    public class StreamingRemoteManagerTest
+    {
+        private readonly StreamingRemoteManagerFactory _remoteManagerFactory1 =
+            TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>();
+
+        private readonly StreamingRemoteManagerFactory _remoteManagerFactory2 =
+        TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>();
+        
+        /// <summary>
+        /// Tests one way communication between Remote Managers 
+        /// Remote Manager listens on any available port
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingOneWayCommunication()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                var observer = Observer.Create<WritableString>(queue.Add);
+                IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
+                remoteManager2.RegisterObserver(endpoint1, observer);
+
+                var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver.OnNext(new WritableString("abc"));
+                remoteObserver.OnNext(new WritableString("def"));
+                remoteObserver.OnNext(new WritableString("ghi"));
+
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+            }
+
+            Assert.AreEqual(3, events.Count);
+        }
+
+        /// <summary>
+        /// Tests two way communications. Checks whether both sides are able to receive messages
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingTwoWayCommunication()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
+            BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
+            List<string> events1 = new List<string>();
+            List<string> events2 = new List<string>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                // Register observers for remote manager 1 and remote manager 2
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var observer1 = Observer.Create<WritableString>(queue1.Add);
+                var observer2 = Observer.Create<WritableString>(queue2.Add);
+                remoteManager1.RegisterObserver(remoteEndpoint, observer1);
+                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+
+                // Remote manager 1 sends 3 events to remote manager 2
+                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver1.OnNext(new WritableString("abc"));
+                remoteObserver1.OnNext(new WritableString("def"));
+                remoteObserver1.OnNext(new WritableString("ghi"));
+
+                // Remote manager 2 sends 4 events to remote manager 1
+                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+                remoteObserver2.OnNext(new WritableString("jkl"));
+                remoteObserver2.OnNext(new WritableString("mno"));
+                remoteObserver2.OnNext(new WritableString("pqr"));
+                remoteObserver2.OnNext(new WritableString("stu"));
+
+                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take().Data);
+
+                events2.Add(queue2.Take().Data);
+                events2.Add(queue2.Take().Data);
+                events2.Add(queue2.Take().Data);
+            }
+
+            Assert.AreEqual(4, events1.Count);
+            Assert.AreEqual(3, events2.Count);
+        }
+
+        /// <summary>
+        /// Tests one way communication between 3 nodes.
+        /// nodes 1 and 2 send messages to node 3
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingCommunicationThreeNodesOneWay()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var observer = Observer.Create<WritableString>(queue.Add);
+                remoteManager3.RegisterObserver(remoteEndpoint, observer);
+
+                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
+                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
+
+                remoteObserver2.OnNext(new WritableString("abc"));
+                remoteObserver1.OnNext(new WritableString("def"));
+                remoteObserver2.OnNext(new WritableString("ghi"));
+                remoteObserver1.OnNext(new WritableString("jkl"));
+                remoteObserver2.OnNext(new WritableString("mno"));
+
+                for (int i = 0; i < 5; i++)
+                {
+                    events.Add(queue.Take().Data);
+                }
+            }
+
+            Assert.AreEqual(5, events.Count);
+        }
+
+        /// <summary>
+        /// Tests one way communication between 3 nodes.
+        /// nodes 1 and 2 send messages to node 3 and node 3 sends message back
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingCommunicationThreeNodesBothWays()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
+            BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
+            BlockingCollection<WritableString> queue3 = new BlockingCollection<WritableString>();
+            List<string> events1 = new List<string>();
+            List<string> events2 = new List<string>();
+            List<string> events3 = new List<string>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+
+                var observer = Observer.Create<WritableString>(queue1.Add);
+                remoteManager1.RegisterObserver(remoteEndpoint, observer);
+                var observer2 = Observer.Create<WritableString>(queue2.Add);
+                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+                var observer3 = Observer.Create<WritableString>(queue3.Add);
+                remoteManager3.RegisterObserver(remoteEndpoint, observer3);
+
+                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
+                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
+
+                // Observer 1 and 2 send messages to observer 3
+                remoteObserver1.OnNext(new WritableString("abc"));
+                remoteObserver1.OnNext(new WritableString("abc"));
+                remoteObserver1.OnNext(new WritableString("abc"));
+                remoteObserver2.OnNext(new WritableString("def"));
+                remoteObserver2.OnNext(new WritableString("def"));
+
+                // Observer 3 sends messages back to observers 1 and 2
+                var remoteObserver3A = remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint);
+                var remoteObserver3B = remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint);
+
+                remoteObserver3A.OnNext(new WritableString("ghi"));
+                remoteObserver3A.OnNext(new WritableString("ghi"));
+                remoteObserver3B.OnNext(new WritableString("jkl"));
+                remoteObserver3B.OnNext(new WritableString("jkl"));
+                remoteObserver3B.OnNext(new WritableString("jkl"));
+
+                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take().Data);
+
+                events2.Add(queue2.Take().Data);
+                events2.Add(queue2.Take().Data);
+                events2.Add(queue2.Take().Data);
+
+                events3.Add(queue3.Take().Data);
+                events3.Add(queue3.Take().Data);
+                events3.Add(queue3.Take().Data);
+                events3.Add(queue3.Take().Data);
+                events3.Add(queue3.Take().Data);
+            }
+
+            Assert.AreEqual(2, events1.Count);
+            Assert.AreEqual(3, events2.Count);
+            Assert.AreEqual(5, events3.Count);
+        }
+
+        /// <summary>
+        /// Tests whether remote manager is able to send acknowledgement back
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingRemoteSenderCallback()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                // Register handler for when remote manager 2 receives events; respond
+                // with an ack
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+
+                var receiverObserver = Observer.Create<WritableString>(
+                    message => remoteObserver2.OnNext(new WritableString("received message: " + message.Data)));
+                remoteManager2.RegisterObserver(remoteEndpoint, receiverObserver);
+
+                // Register handler for remote manager 1 to record the ack
+                var senderObserver = Observer.Create<WritableString>(queue.Add);
+                remoteManager1.RegisterObserver(remoteEndpoint, senderObserver);
+
+                // Begin to send messages
+                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver1.OnNext(new WritableString("hello"));
+                remoteObserver1.OnNext(new WritableString("there"));
+                remoteObserver1.OnNext(new WritableString("buddy"));
+
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+            }
+
+            Assert.AreEqual(3, events.Count);
+            Assert.AreEqual("received message: hello", events[0]);
+            Assert.AreEqual("received message: there", events[1]);
+            Assert.AreEqual("received message: buddy", events[2]);
+        }
+        
+        /// <summary>
+        /// Test whether observer can be created with IRemoteMessage interface
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingRegisterObserverByType()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                // RemoteManager2 listens and records events of type IRemoteEvent<WritableString>
+                var observer = Observer.Create<IRemoteMessage<WritableString>>(message => queue.Add(message.Message));
+                remoteManager2.RegisterObserver(observer);
+
+                // Remote manager 1 sends 3 events to remote manager 2
+                var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver.OnNext(new WritableString("abc"));
+                remoteObserver.OnNext(new WritableString("def"));
+                remoteObserver.OnNext(new WritableString("ghi"));
+
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+            }
+
+            Assert.AreEqual(3, events.Count);
+        }
+
+        /// <summary>
+        /// Tests whether we get the cached observer back for sending message without reinstantiating it
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingCachedConnection()
+        {
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+
+            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            List<string> events = new List<string>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            {
+                var observer = Observer.Create<WritableString>(queue.Add);
+                IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
+                remoteManager2.RegisterObserver(endpoint1, observer);
+
+                var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver.OnNext(new WritableString("abc"));
+                remoteObserver.OnNext(new WritableString("def"));
+
+                var cachedObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                cachedObserver.OnNext(new WritableString("ghi"));
+                cachedObserver.OnNext(new WritableString("jkl"));
+
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+                events.Add(queue.Take().Data);
+            }
+
+            Assert.AreEqual(4, events.Count);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
new file mode 100644
index 0000000..268da70
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingTransportTest.cs
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Net;
+using System.Reactive;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+    /// <summary>
+    /// Tests the StreamingTransportServer, StreamingTransportClient and StreamingLink.
+    /// Basically the Wake transport layer.
+    /// </summary>
+    [TestClass]
+    public class StreamingTransportTest
+    {
+        private readonly ITcpPortProvider _tcpPortProvider = GetTcpProvider(8900, 8940);
+        private readonly IInjector _injector = TangFactory.GetTang().NewInjector();
+
+        /// <summary>
+        /// Tests whether StreamingTransportServer receives 
+        /// string messages from StreamingTransportClient
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingTransportServer()
+        {
+            BlockingCollection<string> queue = new BlockingCollection<string>();
+            List<string> events = new List<string>();
+            IStreamingCodec<string> stringCodec = _injector.GetInstance<StringStreamingCodec>();
+
+            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
+            var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
+
+            using (var server = new StreamingTransportServer<string>(endpoint.Address, remoteHandler, _tcpPortProvider, stringCodec))
+            {
+                server.Run();
+
+                IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
+                using (var client = new StreamingTransportClient<string>(remoteEndpoint, stringCodec))
+                {
+                    client.Send("Hello");
+                    client.Send(", ");
+                    client.Send("World!");
+
+                    events.Add(queue.Take());
+                    events.Add(queue.Take());
+                    events.Add(queue.Take());
+                } 
+            }
+
+            Assert.AreEqual(3, events.Count);
+            Assert.AreEqual(events[0], "Hello");
+            Assert.AreEqual(events[1], ", ");
+            Assert.AreEqual(events[2], "World!");
+        }
+
+        /// <summary>
+        /// Checks whether StreamingTransportClient is able to receive messages from remote host
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingTransportSenderStage()
+        {
+            
+            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
+
+            List<string> events = new List<string>();
+            BlockingCollection<string> queue = new BlockingCollection<string>();
+            IStreamingCodec<string> stringCodec = _injector.GetInstance<StringStreamingCodec>();
+
+            // Server echoes the message back to the client
+            var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => tEvent.Link.Write(tEvent.Data));
+
+            using (var server = new StreamingTransportServer<string>(endpoint.Address, remoteHandler, _tcpPortProvider, stringCodec))
+            {
+                server.Run();
+
+                var clientHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
+                IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
+                using (var client = new StreamingTransportClient<string>(remoteEndpoint, clientHandler, stringCodec))
+                {
+                    client.Send("Hello");
+                    client.Send(", ");
+                    client.Send(" World");
+
+                    events.Add(queue.Take());
+                    events.Add(queue.Take());
+                    events.Add(queue.Take());
+                } 
+            }
+
+            Assert.AreEqual(3, events.Count);
+            Assert.AreEqual(events[0], "Hello");
+            Assert.AreEqual(events[1], ", ");
+            Assert.AreEqual(events[2], " World");
+        }
+
+        /// <summary>
+        /// Checks whether StreamingTransportClient and StreamingTransportServer works 
+        /// in asynchronous condition while sending messages asynchronously from different 
+        /// threads
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingRaceCondition()
+        {
+            BlockingCollection<string> queue = new BlockingCollection<string>();
+            List<string> events = new List<string>();
+            IStreamingCodec<string> stringCodec = _injector.GetInstance<StringStreamingCodec>();
+            int numEventsExpected = 150;
+
+            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
+            var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data));
+
+            using (var server = new StreamingTransportServer<string>(endpoint.Address, remoteHandler, _tcpPortProvider, stringCodec))
+            {
+                server.Run();
+
+                for (int i = 0; i < numEventsExpected / 3; i++)
+                {
+                    Task.Run(() =>
+                    {
+                        IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
+                        using (var client = new StreamingTransportClient<string>(remoteEndpoint, stringCodec))
+                        {
+                            client.Send("Hello");
+                            client.Send(", ");
+                            client.Send("World!");
+                        }
+                    });
+                }
+
+                for (int i = 0; i < numEventsExpected; i++)
+                {
+                    events.Add(queue.Take());
+                }
+            }
+
+            Assert.AreEqual(numEventsExpected, events.Count);
+
+        }
+
+        private static ITcpPortProvider GetTcpProvider(int portRangeStart, int portRangeEnd)
+        {
+            var configuration = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindImplementation<ITcpPortProvider, TcpPortProvider>()
+                .BindIntNamedParam<TcpPortRangeStart>(portRangeStart.ToString())
+                .BindIntNamedParam<TcpPortRangeCount>((portRangeEnd - portRangeStart + 1).ToString())
+                .Build();
+            return TangFactory.GetTang().NewInjector(configuration).GetInstance<ITcpPortProvider>();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
deleted file mode 100644
index 49c0f5b..0000000
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs
+++ /dev/null
@@ -1,437 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Globalization;
-using System.IO;
-using System.Net;
-using System.Reactive;
-using System.Threading.Tasks;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Impl;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Tests
-{
-    [TestClass]
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public class WritableRemoteManagerTest
-    {
-        private const int Id = 5;
-
-        private static IConfiguration _config = TangFactory.GetTang().NewConfigurationBuilder().BindNamedParameter<StringId, int>(
-               GenericType<StringId>.Class, Id.ToString(CultureInfo.InvariantCulture)).Build();
-
-        private readonly WritableRemoteManagerFactory _remoteManagerFactory1 =
-            TangFactory.GetTang().NewInjector().GetInstance<WritableRemoteManagerFactory>();
-
-        private readonly WritableRemoteManagerFactory _remoteManagerFactory2 =
-        TangFactory.GetTang().NewInjector(_config).GetInstance<WritableRemoteManagerFactory>();
-        
-        /// <summary>
-        /// Tests one way communication between Remote Managers 
-        /// Remote Manager listens on any available port
-        /// </summary>
-        [TestMethod]
-        public void TestWritableOneWayCommunication()
-        {
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
-            List<string> events = new List<string>();
-
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            {
-                var observer = Observer.Create<WritableString>(queue.Add);
-                IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
-                remoteManager2.RegisterObserver(endpoint1, observer);
-
-                var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver.OnNext(new WritableString("abc"));
-                remoteObserver.OnNext(new WritableString("def"));
-                remoteObserver.OnNext(new WritableString("ghi"));
-
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-            }
-
-            Assert.AreEqual(3, events.Count);
-        }
-
-        /// <summary>
-        /// Tests one way communication between Remote Managers 
-        /// Remote manager listens on a particular port
-        /// </summary>
-        [TestMethod]
-        public void TestWritableOneWayCommunicationClientOnly()
-        {
-            int listeningPort = NetworkUtils.GenerateRandomPort(8900, 8940);
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
-            List<string> events = new List<string>();
-
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>())
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, listeningPort))
-            {
-                IPEndPoint remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-                var observer = Observer.Create<WritableString>(queue.Add);
-                remoteManager2.RegisterObserver(remoteEndpoint, observer);
-
-                var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver.OnNext(new WritableString("abc"));
-                remoteObserver.OnNext(new WritableString("def"));
-                remoteObserver.OnNext(new WritableString("ghi"));
-
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-            }
-
-            Assert.AreEqual(3, events.Count);
-        }
-
-        /// <summary>
-        /// Tests two way communications. Checks whether both sides are able to receive messages
-        /// </summary>
-        [TestMethod]
-        public void TestWritableTwoWayCommunication()
-        {
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
-            BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
-            BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
-            List<string> events1 = new List<string>();
-            List<string> events2 = new List<string>();
-
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            {
-                // Register observers for remote manager 1 and remote manager 2
-                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-                var observer1 = Observer.Create<WritableString>(queue1.Add);
-                var observer2 = Observer.Create<WritableString>(queue2.Add);
-                remoteManager1.RegisterObserver(remoteEndpoint, observer1);
-                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
-
-                // Remote manager 1 sends 3 events to remote manager 2
-                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver1.OnNext(new WritableString("abc"));
-                remoteObserver1.OnNext(new WritableString("def"));
-                remoteObserver1.OnNext(new WritableString("ghi"));
-
-                // Remote manager 2 sends 4 events to remote manager 1
-                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
-                remoteObserver2.OnNext(new WritableString("jkl"));
-                remoteObserver2.OnNext(new WritableString("mno"));
-                remoteObserver2.OnNext(new WritableString("pqr"));
-                remoteObserver2.OnNext(new WritableString("stu"));
-
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
-            }
-
-            Assert.AreEqual(4, events1.Count);
-            Assert.AreEqual(3, events2.Count);
-        }
-
-        /// <summary>
-        /// Tests two way communications where message needs an injectable argument 
-        /// to be passed. Checks whether both sides are able to receive messages
-        /// </summary>
-        [TestMethod]
-        public void TestNonEmptyArgumentInjectionWritableTwoWayCommunication()
-        {
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");            
-
-            BlockingCollection<PrefixedStringWritable> queue1 = new BlockingCollection<PrefixedStringWritable>();
-            BlockingCollection<PrefixedStringWritable> queue2 = new BlockingCollection<PrefixedStringWritable>();
-            List<string> events1 = new List<string>();
-            List<string> events2 = new List<string>();
-
-            using (var remoteManager1 = _remoteManagerFactory2.GetInstance<PrefixedStringWritable>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory2.GetInstance<PrefixedStringWritable>(listeningAddress, 0))
-            {
-                // Register observers for remote manager 1 and remote manager 2
-                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-                var observer1 = Observer.Create<PrefixedStringWritable>(queue1.Add);
-                var observer2 = Observer.Create<PrefixedStringWritable>(queue2.Add);
-                remoteManager1.RegisterObserver(remoteEndpoint, observer1);
-                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
-
-                // Remote manager 1 sends 3 events to remote manager 2
-                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver1.OnNext(new PrefixedStringWritable("abc"));
-                remoteObserver1.OnNext(new PrefixedStringWritable("def"));
-                remoteObserver1.OnNext(new PrefixedStringWritable("ghi"));
-
-                // Remote manager 2 sends 4 events to remote manager 1
-                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
-                remoteObserver2.OnNext(new PrefixedStringWritable("jkl"));
-                remoteObserver2.OnNext(new PrefixedStringWritable("mno"));
-                remoteObserver2.OnNext(new PrefixedStringWritable("pqr"));
-                remoteObserver2.OnNext(new PrefixedStringWritable("stu"));
-
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
-            }
-
-            Assert.AreEqual(4, events1.Count);
-            Assert.AreEqual(3, events2.Count);
-        }
-
-        /// <summary>
-        /// Tests one way communication between 3 nodes.
-        /// nodes 1 and 2 send messages to node 3
-        /// </summary>
-        [TestMethod]
-        public void TestWritableCommunicationThreeNodesOneWay()
-        {
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
-            List<string> events = new List<string>();
-
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            {
-                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-                var observer = Observer.Create<WritableString>(queue.Add);
-                remoteManager3.RegisterObserver(remoteEndpoint, observer);
-
-                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
-                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
-
-                remoteObserver2.OnNext(new WritableString("abc"));
-                remoteObserver1.OnNext(new WritableString("def"));
-                remoteObserver2.OnNext(new WritableString("ghi"));
-                remoteObserver1.OnNext(new WritableString("jkl"));
-                remoteObserver2.OnNext(new WritableString("mno"));
-
-                for (int i = 0; i < 5; i++)
-                {
-                    events.Add(queue.Take().Data);
-                }
-            }
-
-            Assert.AreEqual(5, events.Count);
-        }
-
-        /// <summary>
-        /// Tests one way communication between 3 nodes.
-        /// nodes 1 and 2 send messages to node 3 and node 3 sends message back
-        /// </summary>
-        [TestMethod]
-        public void TestWritableCommunicationThreeNodesBothWays()
-        {
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
-            BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
-            BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
-            BlockingCollection<WritableString> queue3 = new BlockingCollection<WritableString>();
-            List<string> events1 = new List<string>();
-            List<string> events2 = new List<string>();
-            List<string> events3 = new List<string>();
-
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            {
-                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-
-                var observer = Observer.Create<WritableString>(queue1.Add);
-                remoteManager1.RegisterObserver(remoteEndpoint, observer);
-                var observer2 = Observer.Create<WritableString>(queue2.Add);
-                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
-                var observer3 = Observer.Create<WritableString>(queue3.Add);
-                remoteManager3.RegisterObserver(remoteEndpoint, observer3);
-
-                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
-                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
-
-                // Observer 1 and 2 send messages to observer 3
-                remoteObserver1.OnNext(new WritableString("abc"));
-                remoteObserver1.OnNext(new WritableString("abc"));
-                remoteObserver1.OnNext(new WritableString("abc"));
-                remoteObserver2.OnNext(new WritableString("def"));
-                remoteObserver2.OnNext(new WritableString("def"));
-
-                // Observer 3 sends messages back to observers 1 and 2
-                var remoteObserver3A = remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint);
-                var remoteObserver3B = remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint);
-
-                remoteObserver3A.OnNext(new WritableString("ghi"));
-                remoteObserver3A.OnNext(new WritableString("ghi"));
-                remoteObserver3B.OnNext(new WritableString("jkl"));
-                remoteObserver3B.OnNext(new WritableString("jkl"));
-                remoteObserver3B.OnNext(new WritableString("jkl"));
-
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
-
-                events3.Add(queue3.Take().Data);
-                events3.Add(queue3.Take().Data);
-                events3.Add(queue3.Take().Data);
-                events3.Add(queue3.Take().Data);
-                events3.Add(queue3.Take().Data);
-            }
-
-            Assert.AreEqual(2, events1.Count);
-            Assert.AreEqual(3, events2.Count);
-            Assert.AreEqual(5, events3.Count);
-        }
-
-        /// <summary>
-        /// Tests whether remote manager is able to send acknowledgement back
-        /// </summary>
-        [TestMethod]
-        public void TestWritableRemoteSenderCallback()
-        {
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
-            List<string> events = new List<string>();
-
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            {
-                // Register handler for when remote manager 2 receives events; respond
-                // with an ack
-                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-                var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
-
-                var receiverObserver = Observer.Create<WritableString>(
-                    message => remoteObserver2.OnNext(new WritableString("received message: " + message.Data)));
-                remoteManager2.RegisterObserver(remoteEndpoint, receiverObserver);
-
-                // Register handler for remote manager 1 to record the ack
-                var senderObserver = Observer.Create<WritableString>(queue.Add);
-                remoteManager1.RegisterObserver(remoteEndpoint, senderObserver);
-
-                // Begin to send messages
-                var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver1.OnNext(new WritableString("hello"));
-                remoteObserver1.OnNext(new WritableString("there"));
-                remoteObserver1.OnNext(new WritableString("buddy"));
-
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-            }
-
-            Assert.AreEqual(3, events.Count);
-            Assert.AreEqual("received message: hello", events[0]);
-            Assert.AreEqual("received message: there", events[1]);
-            Assert.AreEqual("received message: buddy", events[2]);
-        }
-        
-        /// <summary>
-        /// Test whether observer can be created with IRemoteMessage interface
-        /// </summary>
-        [TestMethod]
-        public void TestWritableRegisterObserverByType()
-        {
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
-            List<string> events = new List<string>();
-
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            {
-                // RemoteManager2 listens and records events of type IRemoteEvent<WritableString>
-                var observer = Observer.Create<IRemoteMessage<WritableString>>(message => queue.Add(message.Message));
-                remoteManager2.RegisterObserver(observer);
-
-                // Remote manager 1 sends 3 events to remote manager 2
-                var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver.OnNext(new WritableString("abc"));
-                remoteObserver.OnNext(new WritableString("def"));
-                remoteObserver.OnNext(new WritableString("ghi"));
-
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-            }
-
-            Assert.AreEqual(3, events.Count);
-        }
-
-        /// <summary>
-        /// Tests whether we get the cached observer back for sending message without reinstantiating it
-        /// </summary>
-        [TestMethod]
-        public void TestWritableCachedConnection()
-        {
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
-
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
-            List<string> events = new List<string>();
-
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            {
-                var observer = Observer.Create<WritableString>(queue.Add);
-                IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
-                remoteManager2.RegisterObserver(endpoint1, observer);
-
-                var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver.OnNext(new WritableString("abc"));
-                remoteObserver.OnNext(new WritableString("def"));
-
-                var cachedObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                cachedObserver.OnNext(new WritableString("ghi"));
-                cachedObserver.OnNext(new WritableString("jkl"));
-
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-            }
-
-            Assert.AreEqual(4, events.Count);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
deleted file mode 100644
index 2255cfa..0000000
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Globalization;
-using System.IO;
-using System.Net;
-using System.Reactive;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Remote.Parameters;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Wake.Tests
-{
-    /// <summary>
-    /// Tests the WritableTransportServer, WritableTransportClient and WritableLink.
-    /// Basically the Wake transport layer.
-    /// </summary>
-    [TestClass]
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public class WritableTransportTest
-    {
-        private readonly ITcpPortProvider _tcpPortProvider = GetTcpProvider(8900, 8940);
-        private readonly IInjector _injector = TangFactory.GetTang().NewInjector();
-
-        /// <summary>
-        /// Tests whether WritableTransportServer receives 
-        /// string messages from WritableTransportClient
-        /// </summary>
-        [TestMethod]
-        public void TestWritableTransportServer()
-        {
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
-            List<string> events = new List<string>();
-
-            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
-            var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data));
-
-            using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider, _injector))
-            {
-                server.Run();
-
-                IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
-                using (var client = new WritableTransportClient<WritableString>(remoteEndpoint, _injector))
-                {
-                    client.Send(new WritableString("Hello"));
-                    client.Send(new WritableString(", "));
-                    client.Send(new WritableString("World!"));
-
-                    events.Add(queue.Take().Data);
-                    events.Add(queue.Take().Data);
-                    events.Add(queue.Take().Data);
-                } 
-            }
-
-            Assert.AreEqual(3, events.Count);
-            Assert.AreEqual(events[0], "Hello");
-            Assert.AreEqual(events[1], ", ");
-            Assert.AreEqual(events[2], "World!");
-        }
-
-
-        /// <summary>
-        /// Tests whether WritableTransportServer receives 
-        /// string messages from WritableTransportClient with non empty injector
-        /// </summary>
-        [TestMethod]
-        public void TestNonEmptyInjectionTransportServer()
-        {
-            int id = 5;
-            IConfiguration config = TangFactory.GetTang().NewConfigurationBuilder().BindNamedParameter<StringId, int>(
-                GenericType<StringId>.Class, id.ToString(CultureInfo.InvariantCulture)).Build();
-
-            IInjector injector = TangFactory.GetTang().NewInjector(config);
-
-            BlockingCollection<PrefixedStringWritable> queue = new BlockingCollection<PrefixedStringWritable>();
-            List<string> events = new List<string>();
-
-            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
-            var remoteHandler = Observer.Create<TransportEvent<PrefixedStringWritable>>(tEvent => queue.Add(tEvent.Data));
-
-            using (var server = new WritableTransportServer<PrefixedStringWritable>(endpoint, remoteHandler, _tcpPortProvider, injector.ForkInjector()))
-            {
-                server.Run();
-
-                IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
-                using (var client = new WritableTransportClient<PrefixedStringWritable>(remoteEndpoint, injector.ForkInjector()))
-                {
-                    client.Send(new PrefixedStringWritable("Hello"));
-                    client.Send(new PrefixedStringWritable(", "));
-                    client.Send(new PrefixedStringWritable("World!"));
-
-                    events.Add(queue.Take().Data);
-                    events.Add(queue.Take().Data);
-                    events.Add(queue.Take().Data);
-                }
-            }
-
-            Assert.AreEqual(3, events.Count);
-            Assert.AreEqual(events[0], "Hello_" + id);
-            Assert.AreEqual(events[1], ", _" + id);
-            Assert.AreEqual(events[2], "World!_" + id);
-        }
-
-
-        /// <summary>
-        /// Checks whether WritableTransportClient is able to receive messages from remote host
-        /// </summary>
-        [TestMethod]
-        public void TestWritableTransportSenderStage()
-        {
-            
-            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
-
-            List<string> events = new List<string>();
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
-
-            // Server echoes the message back to the client
-            var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => tEvent.Link.Write(tEvent.Data));
-
-            using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider, _injector))
-            {
-                server.Run();
-
-                var clientHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data));
-                IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
-                using (var client = new WritableTransportClient<WritableString>(remoteEndpoint, clientHandler, _injector))
-                {
-                    client.Send(new WritableString("Hello"));
-                    client.Send(new WritableString(", "));
-                    client.Send(new WritableString(" World"));
-
-                    events.Add(queue.Take().Data);
-                    events.Add(queue.Take().Data);
-                    events.Add(queue.Take().Data);
-                } 
-            }
-
-            Assert.AreEqual(3, events.Count);
-            Assert.AreEqual(events[0], "Hello");
-            Assert.AreEqual(events[1], ", ");
-            Assert.AreEqual(events[2], " World");
-        }
-
-        /// <summary>
-        /// Checks whether WritableTransportClient and WritableTransportServer works 
-        /// in asynchronous condition while sending messages asynchronously from different 
-        /// threads
-        /// </summary>
-        [TestMethod]
-        public void TestWritableRaceCondition()
-        {
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
-            List<string> events = new List<string>();
-            int numEventsExpected = 150;
-
-            IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
-            var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data));
-
-            using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider, _injector))
-            {
-                server.Run();
-
-                for (int i = 0; i < numEventsExpected / 3; i++)
-                {
-                    Task.Run(() =>
-                    {
-                        IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port);
-                        using (var client = new WritableTransportClient<WritableString>(remoteEndpoint, _injector))
-                        {
-                            client.Send(new WritableString("Hello"));
-                            client.Send(new WritableString(", "));
-                            client.Send(new WritableString("World!"));
-                        }
-                    });
-                }
-
-                for (int i = 0; i < numEventsExpected; i++)
-                {
-                    events.Add(queue.Take().Data);
-                }
-            }
-
-            Assert.AreEqual(numEventsExpected, events.Count);
-
-        }
-
-        private static ITcpPortProvider GetTcpProvider(int portRangeStart, int portRangeEnd)
-        {
-            var configuration = TangFactory.GetTang().NewConfigurationBuilder()
-                .BindImplementation<ITcpPortProvider, TcpPortProvider>()
-                .BindIntNamedParam<TcpPortRangeStart>(portRangeStart.ToString())
-                .BindIntNamedParam<TcpPortRangeCount>((portRangeEnd - portRangeStart + 1).ToString())
-                .Build();
-            return TangFactory.GetTang().NewInjector(configuration).GetInstance<ITcpPortProvider>();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index d2b2970..4069d15 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -48,7 +48,9 @@ under the License.
     <Compile Include="IEventHandler.cs" />
     <Compile Include="IIdentifier.cs" />
     <Compile Include="IIdentifierFactory.cs" />
-    <Compile Include="Remote\Impl\WritableRemoteManagerFactory.cs" />
+    <Compile Include="Remote\Impl\RemoteEventStreamingCodec.cs" />
+    <Compile Include="Remote\Impl\TemporaryWritableToStreamingCodec.cs" />
+    <Compile Include="Remote\Impl\StreamingRemoteManagerFactory.cs" />
     <Compile Include="Remote\Impl\DefaultRemoteManagerFactory.cs" />
     <Compile Include="Impl\LoggingEventHandler.cs" />
     <Compile Include="Impl\MissingStartHandlerHandler.cs" />
@@ -71,15 +73,12 @@ under the License.
     <Compile Include="Remote\IDataWriter.cs" />
     <Compile Include="Remote\Impl\StreamDataReader.cs" />
     <Compile Include="Remote\Impl\StreamDataWriter.cs" />
-    <Compile Include="Remote\Impl\WritableRemoteManager.cs" />
-    <Compile Include="Remote\Impl\WritableLink.cs" />
-    <Compile Include="Remote\Impl\WritableObserverContainer.cs" />
-    <Compile Include="Remote\Impl\WritableRemoteEvent.cs" />
-    <Compile Include="Remote\Impl\WritableTransportClient.cs" />
-    <Compile Include="Remote\Impl\WritableTransportServer.cs" />
+    <Compile Include="Remote\Impl\StreamingRemoteManager.cs" />
+    <Compile Include="Remote\Impl\StreamingLink.cs" />
+    <Compile Include="Remote\Impl\StreamingTransportClient.cs" />
+    <Compile Include="Remote\Impl\StreamingTransportServer.cs" />
     <Compile Include="Remote\IRemoteManagerFactory.cs" />
     <Compile Include="Remote\IWritable.cs" />
-    <Compile Include="Remote\IWritableRemoteEvent.cs" />
     <Compile Include="Remote\Proto\WakeRemoteProtosGen.cs" />
     <Compile Include="Remote\ICodec.cs" />
     <Compile Include="Remote\ICodecFactory.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
deleted file mode 100644
index 40222aa..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritableRemoteEvent.cs
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Linq.Expressions;
-using System.Net;
-
-namespace Org.Apache.REEF.Wake.Remote
-{
-    /// <summary>
-    /// Interface for remote event
-    /// </summary>
-    /// <typeparam name="T">Type of remote event message. It is assumed that T implements IWritable</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    internal interface IWritableRemoteEvent<T> : IWritable where T : IWritable
-    {
-        /// <summary>
-        /// Local Endpoint
-        /// </summary>
-        IPEndPoint LocalEndPoint { get; set; }
-
-        /// <summary>
-        /// Remote Endpoint
-        /// </summary>
-        IPEndPoint RemoteEndPoint { get; set; }
-
-        T Value { get; }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
index b39b20f..bfed7f9 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEvent.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using System;
 using System.Net;
 using Org.Apache.REEF.Tang.Annotations;
 
@@ -50,12 +51,15 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
 
         public IPEndPoint RemoteEndPoint { get; set; }
 
+        [Obsolete("This field is never used and will be removed as part of 0.13. See [REEF-445]", false)]
         public string Source { get; set; }
 
+        [Obsolete("This field is never used and will be removed as part of 0.13. See [REEF-445]", false)]
         public string Sink { get; set; }
 
         public T Value { get; set; }
 
+        [Obsolete("This field is never used and will be removed as part of 0.13. See [REEF-445]", false)]
         public long Sequence { get; set; }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventStreamingCodec.cs
new file mode 100644
index 0000000..01acd73
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/RemoteEventStreamingCodec.cs
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Writable remote event class
+    /// </summary>
+    /// <typeparam name="T">Type of remote event message. It is assumed that T implements IWritable</typeparam>
+    internal sealed class RemoteEventStreamingCodec<T> : IStreamingCodec<IRemoteEvent<T>>
+    {
+        private readonly IStreamingCodec<T> _codec;
+
+        internal RemoteEventStreamingCodec(IStreamingCodec<T> codec)
+        {
+            _codec = codec;
+        } 
+        
+        /// <summary>
+        /// Read the class fields.
+        /// </summary>
+        /// <param name="reader">The reader from which to read </param>
+        /// <returns>The remote event</returns>
+        public IRemoteEvent<T> Read(IDataReader reader)
+        {
+            return new RemoteEvent<T>(null, null, _codec.Read(reader));
+        }
+
+        /// <summary>
+        /// Writes the class fields.
+        /// </summary>
+        /// <param name="value">The remote event</param>
+        /// <param name="writer">The writer to which to write</param>
+        public void Write(IRemoteEvent<T> value, IDataWriter writer)
+        {
+            _codec.Write(value.Value, writer);
+        }
+
+        /// <summary>
+        /// Read the class fields.
+        /// </summary>
+        /// <param name="reader">The reader from which to read </param>
+        /// <param name="token">The cancellation token</param>
+        /// <returns>The remote event</returns>
+        public async Task<IRemoteEvent<T>>  ReadAsync(IDataReader reader, CancellationToken token)
+        {
+            T message =  await _codec.ReadAsync(reader, token);
+            return new RemoteEvent<T>(null, null, message);     
+        }
+
+        /// <summary>
+        /// Writes the class fields.
+        /// </summary>
+        /// <param name="value">The remote event</param>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">The cancellation token</param>
+        public async Task WriteAsync(IRemoteEvent<T> value, IDataWriter writer, CancellationToken token)
+        {
+            await _codec.WriteAsync(value.Value, writer, token);        
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ba2653d6/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
new file mode 100644
index 0000000..4396b56
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Wake.Remote.Impl
+{
+    /// <summary>
+    /// Represents an open connection between remote hosts. This class is not thread safe
+    /// </summary>
+    /// <typeparam name="T">Generic Type of message.</typeparam>
+    internal sealed class StreamingLink<T> : ILink<T>
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof (StreamingLink<T>));
+
+        private readonly IPEndPoint _localEndpoint;
+        private bool _disposed;
+        private readonly IStreamingCodec<T> _streamingCodec;
+        private readonly TcpClient _client;
+
+        /// <summary>
+        /// Stream reader to be passed to codec
+        /// </summary>
+        private readonly StreamDataReader _reader;
+
+        /// <summary>
+        /// Stream writer from which to read from in codec
+        /// </summary>
+        private readonly StreamDataWriter _writer;
+
+        /// <summary>
+        /// Constructs a Link object.
+        /// Connects to the specified remote endpoint.
+        /// </summary>
+        /// <param name="remoteEndpoint">The remote endpoint to connect to</param>
+        /// <param name="streamingCodec">Streaming codec</param>
+        internal StreamingLink(IPEndPoint remoteEndpoint, IStreamingCodec<T> streamingCodec)
+        {
+            if (remoteEndpoint == null)
+            {
+                throw new ArgumentNullException("remoteEndpoint");
+            }
+
+            _client = new TcpClient();
+            _client.Connect(remoteEndpoint);
+
+            var stream = _client.GetStream();
+            _localEndpoint = GetLocalEndpoint();
+            _disposed = false;
+            _reader = new StreamDataReader(stream);
+            _writer = new StreamDataWriter(stream);
+            _streamingCodec = streamingCodec;
+        }
+
+        /// <summary>
+        /// Constructs a Link object.
+        /// Uses the already connected TcpClient.
+        /// </summary>
+        /// <param name="client">The already connected client</param>
+        /// <param name="streamingCodec">Streaming codec</param>
+        internal StreamingLink(TcpClient client, IStreamingCodec<T> streamingCodec)
+        {
+            if (client == null)
+            {
+                throw new ArgumentNullException("client");
+            }
+
+            _client = client;
+            var stream = _client.GetStream();
+            _localEndpoint = GetLocalEndpoint();
+            _disposed = false;
+            _reader = new StreamDataReader(stream);
+            _writer = new StreamDataWriter(stream);
+            _streamingCodec = streamingCodec;
+        }
+
+        /// <summary>
+        /// Returns the local socket address
+        /// </summary>
+        public IPEndPoint LocalEndpoint
+        {
+            get { return _localEndpoint; }
+        }
+
+        /// <summary>
+        /// Returns the remote socket address
+        /// </summary>
+        public IPEndPoint RemoteEndpoint
+        {
+            get { return (IPEndPoint) _client.Client.RemoteEndPoint; }
+        }
+
+        /// <summary>
+        /// Writes the message to the remote host
+        /// </summary>
+        /// <param name="value">The data to write</param>
+        public void Write(T value)
+        {
+            if (value == null)
+            {
+                throw new ArgumentNullException("value");
+            }
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been closed."), Logger);
+            }
+
+            _streamingCodec.Write(value, _writer);
+        }
+
+        /// <summary>
+        /// Writes the value to this link asynchronously
+        /// </summary>
+        /// <param name="value">The data to write</param>
+        /// <param name="token">The cancellation token</param>
+        public async Task WriteAsync(T value, CancellationToken token)
+        {
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been closed."), Logger);
+            }
+
+            await _streamingCodec.WriteAsync(value, _writer, token);
+        }
+
+        /// <summary>
+        /// Reads the value from the link synchronously
+        /// </summary>
+        public T Read()
+        {
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been disposed."), Logger);
+            }
+
+            try
+            {
+                T value = _streamingCodec.Read(_reader);
+                return value;
+            }
+            catch (Exception e)
+            {
+                Logger.Log(Level.Warning, "In Read function unable to read the message.");
+                Exceptions.CaughtAndThrow(e, Level.Error, Logger);
+                throw;
+            }
+        }
+
+        /// <summary>
+        /// Reads the value from the link asynchronously
+        /// </summary>
+        /// <param name="token">The cancellation token</param>
+        public async Task<T> ReadAsync(CancellationToken token)
+        {
+            if (_disposed)
+            {
+                Exceptions.Throw(new IllegalStateException("Link has been disposed."), Logger);
+            }
+
+            try
+            {
+                T value = await _streamingCodec.ReadAsync(_reader, token);
+                return value;
+            }
+            catch (Exception e)
+            {
+                Logger.Log(Level.Warning, "In ReadAsync function unable to read the message.");
+                Exceptions.CaughtAndThrow(e, Level.Error, Logger);
+                throw;
+            }
+        }
+
+        /// <summary>
+        /// Close the client connection
+        /// </summary>
+        public void Dispose()
+        {
+            if (_disposed)
+            {
+                return;
+            }
+
+            try
+            {
+                _client.GetStream().Close();
+            }
+            catch (InvalidOperationException)
+            {
+                Logger.Log(Level.Warning, "failed to close stream on a non-connected socket.");
+            }
+
+            _client.Close();
+            _disposed = true;
+        }
+
+        /// <summary>
+        /// Overrides Equals. Two Link objects are equal if they are connected
+        /// to the same remote endpoint.
+        /// </summary>
+        /// <param name="obj">The object to compare</param>
+        /// <returns>True if the object is equal to this Link, otherwise false</returns>
+        public override bool Equals(object obj)
+        {
+            Link<T> other = obj as Link<T>;
+            if (other == null)
+            {
+                return false;
+            }
+
+            return other.RemoteEndpoint.Equals(RemoteEndpoint);
+        }
+
+        /// <summary>
+        /// Gets the hash code for the Link object.
+        /// </summary>
+        /// <returns>The object's hash code</returns>
+        public override int GetHashCode()
+        {
+            return RemoteEndpoint.GetHashCode();
+        }
+
+        /// <summary>
+        /// Discovers the IPEndpoint for the current machine.
+        /// </summary>
+        /// <returns>The local IPEndpoint</returns>
+        private IPEndPoint GetLocalEndpoint()
+        {
+            IPAddress address = NetworkUtils.LocalIPAddress;
+            int port = ((IPEndPoint) _client.Client.LocalEndPoint).Port;
+            return new IPEndPoint(address, port);
+        }
+    }
+}
\ No newline at end of file