You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2015/07/15 23:48:57 UTC

[2/3] incubator-reef git commit: [REEF-447]Convert Network Service Layer from Writable to Streaming

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index b1b79d4..8fe19d6 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -32,7 +32,6 @@ using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Exceptions;
 using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.StreamingCodec;
 
 namespace Org.Apache.REEF.Network.Group.Task.Impl
 {
@@ -43,18 +42,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// Communication Group.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
     public sealed class OperatorTopology<T> : IOperatorTopology<T>, IObserver<GeneralGroupCommunicationMessage>
     {
-        private const int DefaultTimeout = 50000;
-        private const int RetryCount = 10;
-
         private static readonly Logger Logger = Logger.GetLogger(typeof(OperatorTopology<>));
 
         private readonly string _groupName;
         private readonly string _operatorName;
         private readonly string _selfId;
-        private string _driverId;
         private readonly int _timeout;
         private readonly int _retryCount;
         private readonly int _sleepTime;
@@ -66,7 +60,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         private readonly Sender _sender;
         private readonly BlockingCollection<NodeStruct<T>> _nodesWithData;
         private readonly Object _thisLock = new Object();
-        private readonly IStreamingCodec<T> _codec;
 
         /// <summary>
         /// Creates a new OperatorTopology object.
@@ -74,7 +67,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="operatorName">The name of the Group Communication Operator</param>
         /// <param name="groupName">The name of the operator's Communication Group</param>
         /// <param name="taskId">The operator's Task identifier</param>
-        /// <param name="driverId">The identifer for the driver</param>
         /// <param name="timeout">Timeout value for cancellation token</param>
         /// <param name="retryCount">Number of times to retry wating for registration</param>
         /// <param name="sleepTime">Sleep time between retry wating for registration</param>
@@ -82,26 +74,22 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="childIds">The set of child Task identifiers in the topology graph</param>
         /// <param name="networkService">The network service</param>
         /// <param name="sender">The Sender used to do point to point communication</param>
-        /// <param name="codec">Streaming codec to encode objects</param>
         [Inject]
         private OperatorTopology(
             [Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName,
             [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
             [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
-            [Parameter(typeof(GroupCommConfigurationOptions.DriverId))] string driverId,
             [Parameter(typeof(GroupCommConfigurationOptions.Timeout))] int timeout,
             [Parameter(typeof(GroupCommConfigurationOptions.RetryCountWaitingForRegistration))] int retryCount,
             [Parameter(typeof(GroupCommConfigurationOptions.SleepTimeWaitingForRegistration))] int sleepTime,
             [Parameter(typeof(GroupCommConfigurationOptions.TopologyRootTaskId))] string rootId,
             [Parameter(typeof(GroupCommConfigurationOptions.TopologyChildTaskIds))] ISet<string> childIds,
-            WritableNetworkService<GeneralGroupCommunicationMessage> networkService,
-            Sender sender,
-            IStreamingCodec<T> codec)
+            StreamingNetworkService<GeneralGroupCommunicationMessage> networkService,
+            Sender sender)
         {
             _operatorName = operatorName;
             _groupName = groupName;
             _selfId = taskId;
-            _driverId = driverId;
             _timeout = timeout;
             _retryCount = retryCount;
             _sleepTime = sleepTime;
@@ -110,7 +98,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             _nodesWithData = new BlockingCollection<NodeStruct<T>>();
             _children = new List<NodeStruct<T>>();
             _idToNodeMap = new Dictionary<string, NodeStruct<T>>();
-            _codec = codec;
 
             if (_selfId.Equals(rootId))
             {
@@ -201,7 +188,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                 throw new ArgumentException("No parent for node");
             }
 
-            SendToNode(message, MessageType.Data, _parent);
+            SendToNode(message, _parent);
         }
 
         /// <summary>
@@ -218,7 +205,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
 
             foreach (var child in _children)
             {
-                SendToNode(message, MessageType.Data, child);
+                SendToNode(message, child);
             }
         }
 
@@ -444,10 +431,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="message">The message to send</param>
         /// <param name="msgType">The message type</param>
         /// <param name="node">The NodeStruct representing the Task to send to</param>
-        private void SendToNode(T message, MessageType msgType, NodeStruct<T> node)
+        private void SendToNode(T message, NodeStruct<T> node)
         {
             GeneralGroupCommunicationMessage gcm = new GroupCommunicationMessage<T>(_groupName, _operatorName,
-                _selfId, node.Identifier, message, msgType, _codec);
+                _selfId, node.Identifier, message);
 
             _sender.Send(gcm);
         }
@@ -458,12 +445,12 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="messages">The list of messages to send</param>
         /// <param name="msgType">The message type</param>
         /// <param name="node">The NodeStruct representing the Task to send to</param>
-        private void SendToNode(IList<T> messages, MessageType msgType, NodeStruct<T> node)
+        private void SendToNode(IList<T> messages, NodeStruct<T> node)
         {
             T[] encodedMessages = messages.ToArray();
 
             GroupCommunicationMessage<T> gcm = new GroupCommunicationMessage<T>(_groupName, _operatorName,
-                _selfId, node.Identifier, encodedMessages, msgType, _codec);
+                _selfId, node.Identifier, encodedMessages);
 
             _sender.Send(gcm);
         }
@@ -511,7 +498,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                 }
 
                 IList<T> sublist = messages.ToList().GetRange(i, size);
-                SendToNode(sublist, MessageType.Data, nodeStruct);
+                SendToNode(sublist, nodeStruct);
 
                 i += size;
             }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs
new file mode 100644
index 0000000..c30c1bd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Network.NetworkService.Codec
+{
+    /// <summary>
+    /// Codec to serialize NsMessages for NetworkService.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    internal class NsMessageStreamingCodec<T> : IStreamingCodec<NsMessage<T>>
+    {
+        private readonly IIdentifierFactory _idFactory;
+        private readonly StreamingCodecFunctionCache<T> _codecFunctionsCache;
+
+        /// <summary>
+        /// Create new NsMessageCodec.
+        /// </summary>
+        /// <param name="idFactory">Used to create identifier from string.</param>
+        /// <param name="injector">Injector to instantiate codecs.</param>
+        [Inject]
+        private NsMessageStreamingCodec(IIdentifierFactory idFactory, IInjector injector)
+        {
+            _idFactory = idFactory;
+            _codecFunctionsCache = new StreamingCodecFunctionCache<T>(injector);
+        }
+
+        /// <summary>
+        /// Instantiate the class from the reader.
+        /// </summary>
+        /// <param name="reader">The reader from which to read</param>
+        ///<returns>The instance of type NsMessage<T></T> read from the reader</returns>
+        public NsMessage<T> Read(IDataReader reader)
+        {
+            int metadataSize = reader.ReadInt32();
+            byte[] metadata = new byte[metadataSize];
+            reader.Read(ref metadata, 0, metadataSize);
+            var res = GenerateMetaDataDecoding(metadata);
+
+            Type messageType = res.Item3;
+            NsMessage<T> message = res.Item1;
+
+            var codecReadFunc = _codecFunctionsCache.ReadFunction(messageType);
+            int messageCount = res.Item2;
+
+            for (int i = 0; i < messageCount; i++)
+            {
+                message.Data.Add(codecReadFunc(reader));
+            }
+
+            return message;
+        }
+
+        /// <summary>
+        /// Writes the class fields to the writer.
+        /// </summary>
+        /// <param name="obj">The object of type NsMessage<T></T> to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        public void Write(NsMessage<T> obj, IDataWriter writer)
+        {
+            byte[] encodedMetadata = GenerateMetaDataEncoding(obj);
+            byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length);
+            byte[] totalEncoding = encodedInt.Concat(encodedMetadata).ToArray();
+            writer.Write(totalEncoding, 0, totalEncoding.Length);
+
+            Type messageType = obj.Data[0].GetType();        
+            var codecWriteFunc = _codecFunctionsCache.WriteFunction(messageType);
+          
+            foreach (var data in obj.Data)
+            {
+                codecWriteFunc(data, writer);
+            }
+        }
+
+        ///  <summary>
+        ///  Instantiate the class from the reader.
+        ///  </summary>
+        ///  <param name="reader">The reader from which to read</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>The instance of type NsMessage<T> read from the reader</returns>
+        public async Task<NsMessage<T>> ReadAsync(IDataReader reader, CancellationToken token)
+        {
+            int metadataSize = await reader.ReadInt32Async(token);
+            byte[] metadata = new byte[metadataSize];
+            await reader.ReadAsync(metadata, 0, metadataSize, token);
+            var res = GenerateMetaDataDecoding(metadata);
+            Type messageType = res.Item3;
+            NsMessage<T> message = res.Item1;
+            var codecReadFunc = _codecFunctionsCache.ReadAsyncFunction(messageType);
+            int messageCount = res.Item2;
+
+            for (int i = 0; i < messageCount; i++)
+            {
+                message.Data.Add(codecReadFunc(reader, token));
+            }
+
+            return message;
+        }
+
+        /// <summary>
+        /// Writes the class fields to the writer.
+        /// </summary>
+        /// <param name="obj">The object of type NsMessage<T> to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">Cancellation token</param>
+        public async Task WriteAsync(NsMessage<T> obj, IDataWriter writer, CancellationToken token)
+        {
+            byte[] encodedMetadata = GenerateMetaDataEncoding(obj);
+            byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length);
+            byte[] totalEncoding = encodedInt.Concat(encodedMetadata).ToArray();
+            await writer.WriteAsync(totalEncoding, 0, totalEncoding.Length, token);
+
+            Type messageType = obj.Data[0].GetType();
+
+            var codecWriteFunc = _codecFunctionsCache.WriteAsyncFunction(messageType);
+
+            foreach (var data in obj.Data)
+            {
+                var asyncResult = codecWriteFunc.BeginInvoke(data, writer, token, null, null);
+                codecWriteFunc.EndInvoke(asyncResult);
+            }
+        }
+
+        private static byte[] GenerateMetaDataEncoding(NsMessage<T> obj )
+        {
+            List<byte[]> metadataBytes = new List<byte[]>();
+            byte[] sourceBytes = StringToBytes(obj.SourceId.ToString());
+            byte[] dstBytes = StringToBytes(obj.DestId.ToString());
+            byte[] messageTypeBytes = StringToBytes(obj.Data[0].GetType().AssemblyQualifiedName);
+            byte[] messageCount = BitConverter.GetBytes(obj.Data.Count);
+
+            metadataBytes.Add(BitConverter.GetBytes(sourceBytes.Length));
+            metadataBytes.Add(BitConverter.GetBytes(dstBytes.Length));
+            metadataBytes.Add(BitConverter.GetBytes(messageTypeBytes.Length));
+            metadataBytes.Add(sourceBytes);
+            metadataBytes.Add(dstBytes);
+            metadataBytes.Add(messageTypeBytes);
+            metadataBytes.Add(messageCount);
+
+            return metadataBytes.SelectMany(i => i).ToArray();
+        }
+
+        private Tuple<NsMessage<T>, int, Type> GenerateMetaDataDecoding(byte[] obj)
+        {
+            int srcCount = BitConverter.ToInt32(obj, 0);
+            int dstCount = BitConverter.ToInt32(obj, sizeof (int));
+            int msgTypeCount = BitConverter.ToInt32(obj, 2*sizeof (int));
+
+            int offset = 3*sizeof (int);
+            string srcString = BytesToString(obj.Skip(offset).Take(srcCount).ToArray());
+            offset += srcCount;
+            string dstString = BytesToString(obj.Skip(offset).Take(dstCount).ToArray());
+            offset += dstCount;
+            Type msgType = Type.GetType(BytesToString(obj.Skip(offset).Take(msgTypeCount).ToArray()));
+            offset += msgTypeCount;
+            int messageCount = BitConverter.ToInt32(obj, offset);
+
+            NsMessage<T> msg = new NsMessage<T>(_idFactory.Create(srcString), _idFactory.Create(dstString));
+            return new Tuple<NsMessage<T>, int, Type>(msg, messageCount, msgType);
+        }
+
+        private static byte[] StringToBytes(string str)
+        {
+            byte[] bytes = new byte[str.Length * sizeof(char)];
+            Buffer.BlockCopy(str.ToCharArray(), 0, bytes, 0, bytes.Length);
+            return bytes;
+        }
+
+        private static string BytesToString(byte[] bytes)
+        {
+            char[] chars = new char[bytes.Length / sizeof(char)];
+            Buffer.BlockCopy(bytes, 0, chars, 0, bytes.Length);
+            return new string(chars);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
new file mode 100644
index 0000000..6d91298
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Network.NetworkService.Codec
+{
+    /// <summary>
+    /// Cache of StreamingCodec functions used to store codec functions for messages
+    /// to avoid reflection cost. Each message type is assumed to have a unique 
+    /// associated codec
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    internal class StreamingCodecFunctionCache<T>
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof (StreamingCodecFunctionCache<T>));
+        private readonly Dictionary<Type, Func<IDataReader, T>> _readFuncCache;
+        private readonly Dictionary<Type, Func<IDataReader, CancellationToken, T>> _readAsyncFuncCache;
+        private readonly Dictionary<Type, Action<T, IDataWriter>> _writeFuncCache;
+        private readonly Dictionary<Type, Func<T, IDataWriter, CancellationToken, Task>> _writeAsyncFuncCache;
+        private readonly IInjector _injector;
+        private readonly Type _streamingCodecType;
+
+        /// <summary>
+        /// Create new StreamingCodecFunctionCache.
+        /// </summary>
+        /// <param name="injector"> Injector</param>
+        internal StreamingCodecFunctionCache(IInjector injector)
+        {
+            _injector = injector;
+            _readFuncCache = new Dictionary<Type, Func<IDataReader, T>>();
+            _readAsyncFuncCache = new Dictionary<Type, Func<IDataReader, CancellationToken, T>>();
+            _writeFuncCache = new Dictionary<Type, Action<T, IDataWriter>>();
+            _writeAsyncFuncCache = new Dictionary<Type, Func<T, IDataWriter, CancellationToken, Task>>();
+            _streamingCodecType = typeof(IStreamingCodec<>);
+        }
+
+        /// <summary>
+        /// Creates the read delegate function of StreamingCodec from the message type
+        /// </summary>
+        /// <param name="messageType">Type of message</param>
+        /// <returns>The read delegate function</returns>
+        internal Func<IDataReader, T> ReadFunction(Type messageType)
+        {
+            Func<IDataReader, T> readFunc;
+
+            if (!_readFuncCache.TryGetValue(messageType, out readFunc))
+            {
+                AddCodecFunctions(messageType);
+                readFunc = _readFuncCache[messageType];
+            }
+
+            return readFunc;
+        }
+
+        /// <summary>
+        /// Creates the read async delegate function of StreamingCodec from the message type
+        /// </summary>
+        /// <param name="messageType">Type of message</param>
+        /// <returns>The read async delegate function</returns>
+        internal Func<IDataReader, CancellationToken, T> ReadAsyncFunction(Type messageType)
+        {
+            Func<IDataReader, CancellationToken, T> readFunc;
+
+            if (!_readAsyncFuncCache.TryGetValue(messageType, out readFunc))
+            {
+                AddCodecFunctions(messageType);
+                readFunc = _readAsyncFuncCache[messageType];
+            }
+
+            return readFunc;
+        }
+
+        /// <summary>
+        /// Creates the write delegate function of StreamingCodec from the message type
+        /// </summary>
+        /// <param name="messageType">Type of message</param>
+        /// <returns>The write delegate function</returns>
+        internal Action<T, IDataWriter> WriteFunction(Type messageType)
+        {
+            Action<T, IDataWriter> writeFunc;
+
+            if (!_writeFuncCache.TryGetValue(messageType, out writeFunc))
+            {
+                AddCodecFunctions(messageType);
+                writeFunc = _writeFuncCache[messageType];
+            }
+
+            return writeFunc;
+        }
+
+        /// <summary>
+        /// Creates the write async delegate function of StreamingCodec from the message type
+        /// </summary>
+        /// <param name="messageType">Type of message</param>
+        /// <returns>The write async delegate function</returns>
+        internal Func<T, IDataWriter, CancellationToken, Task> WriteAsyncFunction(Type messageType)
+        {
+            Func<T, IDataWriter, CancellationToken, Task> writeFunc;
+
+            if (!_writeAsyncFuncCache.TryGetValue(messageType, out writeFunc))
+            {
+                AddCodecFunctions(messageType);
+                writeFunc = _writeAsyncFuncCache[messageType];
+            }
+
+            return writeFunc;
+        }
+
+        private void AddCodecFunctions(Type messageType)
+        {
+            if (!typeof(T).IsAssignableFrom(messageType))
+            {
+                Exceptions.CaughtAndThrow(new Exception("Message type not assignable to base type"), Level.Error,
+                    Logger);
+            }
+
+            Type codecType = _streamingCodecType.MakeGenericType(messageType);
+            var codec = _injector.GetInstance(codecType);
+
+            MethodInfo readMethod = codec.GetType().GetMethod("Read");
+            _readFuncCache[messageType] = (Func<IDataReader, T>) Delegate.CreateDelegate
+                (typeof (Func<IDataReader, T>), codec, readMethod);
+
+            MethodInfo readAsyncMethod = codec.GetType().GetMethod("ReadAsync");
+            MethodInfo genericHelper = GetType()
+                .GetMethod("ReadAsyncHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance);
+            MethodInfo constructedHelper = genericHelper.MakeGenericMethod(messageType);
+            _readAsyncFuncCache[messageType] =
+                (Func<IDataReader, CancellationToken, T>)constructedHelper.Invoke(this, new[] { readAsyncMethod, codec });
+
+            MethodInfo writeMethod = codec.GetType().GetMethod("Write");
+            genericHelper = GetType().GetMethod("WriteHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance);
+            constructedHelper = genericHelper.MakeGenericMethod(messageType);
+            _writeFuncCache[messageType] =
+                (Action<T, IDataWriter>) constructedHelper.Invoke(this, new[] {writeMethod, codec});
+            
+            MethodInfo writeAsyncMethod = codec.GetType().GetMethod("WriteAsync");
+            genericHelper = GetType().GetMethod("WriteAsyncHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance);
+            constructedHelper = genericHelper.MakeGenericMethod(messageType);
+            _writeAsyncFuncCache[messageType] =
+                (Func<T, IDataWriter, CancellationToken, Task>)
+                    constructedHelper.Invoke(this, new[] {writeAsyncMethod, codec});
+        }
+
+        private Action<T, IDataWriter> WriteHelperFunc<T1>(MethodInfo method, object codec) where T1 : class
+        {
+            Action<T1, IDataWriter> func = (Action<T1, IDataWriter>) Delegate.CreateDelegate
+                (typeof (Action<T1, IDataWriter>), codec, method);
+
+            Action<T, IDataWriter> ret = (obj, writer) => func(obj as T1, writer);
+            return ret;
+        }
+
+        private Func<T, IDataWriter, CancellationToken, Task> WriteAsyncHelperFunc<T1>(MethodInfo method, object codec)
+            where T1 : class
+        {
+            Func<T1, IDataWriter, CancellationToken, Task> func =
+                (Func<T1, IDataWriter, CancellationToken, Task>) Delegate.CreateDelegate
+                    (typeof (Func<T1, IDataWriter, CancellationToken, Task>), codec, method);
+
+            Func<T, IDataWriter, CancellationToken, Task> ret = (obj, writer, token) => func(obj as T1, writer, token);
+            return ret;
+        }
+
+        private Func<IDataReader, CancellationToken, T> ReadAsyncHelperFunc<T1>(MethodInfo method, object codec)
+            where T1 : class
+        {
+            Func<IDataReader, CancellationToken, Task<T1>> func =
+                (Func<IDataReader, CancellationToken, Task<T1>>) Delegate.CreateDelegate
+                    (typeof (Func<IDataReader, CancellationToken, Task<T1>>), codec, method);
+
+            Func<IDataReader, CancellationToken, T1> func1 = (writer, token) => func(writer, token).Result;
+            Func<IDataReader, CancellationToken, T> func2 = (writer, token) => ((T)(object)func1(writer, token));
+            return func2;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
new file mode 100644
index 0000000..1ff2517
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Network.NetworkService.Codec;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Network.NetworkService
+{
+    /// <summary>
+    /// Writable Network service used for Reef Task communication.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public class StreamingNetworkService<T> : INetworkService<T>
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(StreamingNetworkService<>));
+
+        private readonly IRemoteManager<NsMessage<T>> _remoteManager;
+        private IIdentifier _localIdentifier;
+        private readonly IDisposable _messageHandlerDisposable;
+        private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap;
+        private readonly INameClient _nameClient;
+
+        /// <summary>
+        /// Create a new Writable NetworkService.
+        /// </summary>
+        /// <param name="messageHandler">The observer to handle incoming messages</param>
+        /// <param name="idFactory">The factory used to create IIdentifiers</param>
+        /// <param name="nameClient">The name client used to register Ids</param>
+        /// <param name="remoteManagerFactory">Writable RemoteManagerFactory to create a 
+        /// Writable RemoteManager</param>
+        /// <param name="codec">Codec for Network Service message</param>
+        /// <param name="injector">Fork of the injector that created the Network service</param>
+        [Inject]
+        private StreamingNetworkService(
+            IObserver<NsMessage<T>> messageHandler,
+            IIdentifierFactory idFactory,
+            INameClient nameClient,
+            StreamingRemoteManagerFactory remoteManagerFactory,
+            NsMessageStreamingCodec<T> codec,
+            IInjector injector)
+        {
+            IPAddress localAddress = NetworkUtils.LocalIPAddress;
+            _remoteManager = remoteManagerFactory.GetInstance(localAddress, codec);
+
+            // Create and register incoming message handler
+            // TODO[REEF-419] This should use the TcpPortProvider mechanism
+            var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
+            _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, messageHandler);
+
+            _nameClient = nameClient;
+            _connectionMap = new Dictionary<IIdentifier, IConnection<T>>();
+
+            Logger.Log(Level.Info, "Started network service");
+        }
+
+        /// <summary>
+        /// Name client for registering ids
+        /// </summary>
+        public INameClient NamingClient
+        {
+            get { return _nameClient; }
+        }
+
+        /// <summary>
+        /// Open a new connection to the remote host registered to
+        /// the name service with the given identifier
+        /// </summary>
+        /// <param name="destinationId">The identifier of the remote host</param>
+        /// <returns>The IConnection used for communication</returns>
+        public IConnection<T> NewConnection(IIdentifier destinationId)
+        {
+            if (_localIdentifier == null)
+            {
+                throw new IllegalStateException("Cannot open connection without first registering an ID");
+            }
+
+            IConnection<T> connection;
+            if (_connectionMap.TryGetValue(destinationId, out connection))
+            {
+                return connection;
+            }
+            else
+            {
+                connection = new NsConnection<T>(_localIdentifier, destinationId,
+                    NamingClient, _remoteManager, _connectionMap);
+
+                _connectionMap[destinationId] = connection;
+                return connection;
+            }
+        }
+
+        /// <summary>
+        /// Register the identifier for the NetworkService with the NameService.
+        /// </summary>
+        /// <param name="id">The identifier to register</param>
+        public void Register(IIdentifier id)
+        {
+            Logger.Log(Level.Info, "Registering id {0} with network service.", id);
+
+            _localIdentifier = id;
+            NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint);
+
+            Logger.Log(Level.Info, "End of Registering id {0} with network service.", id);
+        }
+
+        /// <summary>
+        /// Unregister the identifier for the NetworkService with the NameService.
+        /// </summary>
+        public void Unregister()
+        {
+            if (_localIdentifier == null)
+            {
+                throw new IllegalStateException("Cannot unregister a non existant identifier");
+            }
+
+            NamingClient.Unregister(_localIdentifier.ToString());
+            _localIdentifier = null;
+            _messageHandlerDisposable.Dispose();
+        }
+
+        /// <summary>
+        /// Dispose of the NetworkService's resources
+        /// </summary>
+        public void Dispose()
+        {
+            NamingClient.Dispose();
+            _remoteManager.Dispose();
+
+            Logger.Log(Level.Info, "Disposed of network service");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
deleted file mode 100644
index 93da126..0000000
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Net;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Exceptions;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Network.NetworkService
-{
-    /// <summary>
-    /// Writable Network service used for Reef Task communication.
-    /// </summary>
-    /// <typeparam name="T">The message type</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public class WritableNetworkService<T> : INetworkService<T> where T : IWritable
-    {
-        private static readonly Logger Logger = Logger.GetLogger(typeof(NetworkService<>));
-
-        private readonly IRemoteManager<WritableNsMessage<T>> _remoteManager;
-        private readonly IObserver<WritableNsMessage<T>> _messageHandler;
-        private IIdentifier _localIdentifier;
-        private IDisposable _messageHandlerDisposable;
-        private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap;
-        private readonly INameClient _nameClient;
-
-        /// <summary>
-        /// Create a new Writable NetworkService.
-        /// </summary>
-        /// <param name="nsPort">The port that the NetworkService will listen on</param>
-        /// <param name="messageHandler">The observer to handle incoming messages</param>
-        /// <param name="idFactory">The factory used to create IIdentifiers</param>
-        /// <param name="nameClient">The name client used to register Ids</param>
-        /// <param name="remoteManagerFactory">Writable RemoteManagerFactory to create a 
-        /// Writable RemoteManager</param>
-        [Inject]
-        private WritableNetworkService(
-            [Parameter(typeof (NetworkServiceOptions.NetworkServicePort))] int nsPort,
-            IObserver<WritableNsMessage<T>> messageHandler,
-            IIdentifierFactory idFactory,
-            INameClient nameClient,
-            StreamingRemoteManagerFactory remoteManagerFactory)
-        {
- 
-            IPAddress localAddress = NetworkUtils.LocalIPAddress;
-            _remoteManager = remoteManagerFactory.GetInstance<WritableNsMessage<T>>(localAddress, nsPort);
-            _messageHandler = messageHandler;
-
-            // Create and register incoming message handler
-            // TODO[REEF-419] This should use the TcpPortProvider mechanism
-            var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
-            _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, _messageHandler);
-
-            _nameClient = nameClient;
-            _connectionMap = new Dictionary<IIdentifier, IConnection<T>>();
-
-            Logger.Log(Level.Info, "Started network service");
-        }
-
-        /// <summary>
-        /// Name client for registering ids
-        /// </summary>
-        public INameClient NamingClient
-        {
-            get { return _nameClient; }
-        }
-
-        /// <summary>
-        /// Open a new connection to the remote host registered to
-        /// the name service with the given identifier
-        /// </summary>
-        /// <param name="destinationId">The identifier of the remote host</param>
-        /// <returns>The IConnection used for communication</returns>
-        public IConnection<T> NewConnection(IIdentifier destinationId)
-        {
-            if (_localIdentifier == null)
-            {
-                throw new IllegalStateException("Cannot open connection without first registering an ID");
-            }
-
-            IConnection<T> connection;
-            if (_connectionMap.TryGetValue(destinationId, out connection))
-            {
-                return connection;
-            }
-            else
-            {
-                connection = new WritableNsConnection<T>(_localIdentifier, destinationId,
-                    NamingClient, _remoteManager, _connectionMap);
-
-                _connectionMap[destinationId] = connection;
-                return connection;
-            }
-        }
-
-        /// <summary>
-        /// Register the identifier for the NetworkService with the NameService.
-        /// </summary>
-        /// <param name="id">The identifier to register</param>
-        public void Register(IIdentifier id)
-        {
-            Logger.Log(Level.Info, "Registering id {0} with network service.", id);
-
-            _localIdentifier = id;
-            NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint);
-
-            Logger.Log(Level.Info, "End of Registering id {0} with network service.", id);
-        }
-
-        /// <summary>
-        /// Unregister the identifier for the NetworkService with the NameService.
-        /// </summary>
-        public void Unregister()
-        {
-            if (_localIdentifier == null)
-            {
-                throw new IllegalStateException("Cannot unregister a non existant identifier");
-            }
-
-            NamingClient.Unregister(_localIdentifier.ToString());
-            _localIdentifier = null;
-            _messageHandlerDisposable.Dispose();
-        }
-
-        /// <summary>
-        /// Dispose of the NetworkService's resources
-        /// </summary>
-        public void Dispose()
-        {
-            NamingClient.Dispose();
-            _remoteManager.Dispose();
-
-            Logger.Log(Level.Info, "Disposed of network service");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs
deleted file mode 100644
index c20238c..0000000
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Net;
-using System.Net.Sockets;
-using System.Runtime.Remoting;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Tang.Exceptions;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Network.NetworkService
-{
-    /// <summary>
-    /// Represents a connection between two hosts using the Writable NetworkService.
-    /// </summary>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public class WritableNsConnection<T> : IConnection<T> where T : IWritable
-    {
-        private static readonly Logger Logger = Logger.GetLogger(typeof (WritableNsConnection<T>));
-
-        private readonly IIdentifier _sourceId;
-        private readonly IIdentifier _destId;
-        private readonly INameClient _nameClient;
-        private readonly IRemoteManager<WritableNsMessage<T>> _remoteManager;
-        private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap;
-        private IObserver<WritableNsMessage<T>> _remoteSender;
-
-        /// <summary>
-        /// Creates a new NsConnection between two hosts.
-        /// </summary>
-        /// <param name="sourceId">The identifier of the sender</param>
-        /// <param name="destId">The identifier of the receiver</param>
-        /// <param name="nameClient">The NameClient used for naming lookup</param>
-        /// <param name="remoteManager">The remote manager used for network communication</param>
-        /// <param name="connectionMap">A cache of opened connections.  Will remove itself from
-        /// the cache when the NsConnection is disposed.</param>
-        public WritableNsConnection(
-            IIdentifier sourceId,
-            IIdentifier destId,
-            INameClient nameClient,
-            IRemoteManager<WritableNsMessage<T>> remoteManager,
-            Dictionary<IIdentifier, IConnection<T>> connectionMap)
-        {
-            _sourceId = sourceId;
-            _destId = destId;
-            _nameClient = nameClient;
-            _remoteManager = remoteManager;
-            _connectionMap = connectionMap;
-        }
-
-        /// <summary>
-        /// Opens the connection to the remote host.
-        /// </summary>
-        public void Open()
-        {
-            string destStr = _destId.ToString();
-            Logger.Log(Level.Verbose, "Network service opening connection to {0}...", destStr);
-
-            IPEndPoint destAddr = _nameClient.Lookup(destStr);
-            if (null == destAddr)
-            {
-                throw new RemotingException("Destination Address identifier cannot be found");
-            }
-
-            try
-            {
-                _remoteSender = _remoteManager.GetRemoteObserver(destAddr);
-                Logger.Log(Level.Verbose, "Network service completed connection to {0}.", destStr);
-            }
-            catch (SocketException)
-            {
-                Logger.Log(Level.Error, "Network Service cannot open connection to " + destAddr);
-                throw;
-            }
-            catch (ObjectDisposedException)
-            {
-                Logger.Log(Level.Error, "Network Service cannot open connection to " + destAddr);
-                throw;
-            }
-        }
-
-        /// <summary>
-        /// Writes the object to the remote host.
-        /// </summary>
-        /// <param name="message">The message to send</param>
-        public void Write(T message)
-        {
-            if (_remoteSender == null)
-            {
-                throw new IllegalStateException("NsConnection has not been opened yet.");
-            }
-
-            try
-            {
-                _remoteSender.OnNext(new WritableNsMessage<T>(_sourceId, _destId, message));
-            }
-            catch (IOException)
-            {
-                Logger.Log(Level.Error, "Network Service cannot write message to {0}", _destId);
-                throw;
-            }
-            catch (ObjectDisposedException)
-            {
-                Logger.Log(Level.Error, "Network Service cannot write message to {0}", _destId);
-                throw;
-            }
-        }
-
-        /// <summary>
-        /// Closes the connection
-        /// </summary>
-        public void Dispose()
-        {
-            _connectionMap.Remove(_destId);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
deleted file mode 100644
index a9299bb..0000000
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Linq;
-using System.Runtime.Serialization;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.Hadoop.Avro;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-
-
-namespace Org.Apache.REEF.Network.NetworkService
-{
-    /// <summary>
-    /// Writable Message sent between NetworkServices.</summary>
-    /// <typeparam name="T">The type of data being sent. It is assumed to be Writable</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public class WritableNsMessage<T> : IWritable where T : IWritable
-    {
-        private IIdentifierFactory _factory;
-        private IInjector _injection;
-        /// <summary>
-        /// Constructor to allow instantiation by reflection
-        /// </summary>
-        [Inject]
-        public WritableNsMessage(IIdentifierFactory factory, IInjector injection)
-        {
-            _factory = factory;
-            _injection = injection;
-        }
-        
-        /// <summary>
-        /// Create a new Writable NsMessage with no data.
-        /// </summary>
-        /// <param name="sourceId">The identifier of the sender</param>
-        /// <param name="destId">The identifier of the receiver</param>
-        public WritableNsMessage(IIdentifier sourceId, IIdentifier destId)
-        {
-            SourceId = sourceId;
-            DestId = destId;
-            Data = new List<T>();
-        }
-
-        /// <summary>
-        /// Create a new Writable NsMessage with data.
-        /// </summary>
-        /// <param name="sourceId">The identifier of the sender</param>
-        /// <param name="destId">The identifier of the receiver</param>
-        /// <param name="message">The message to send</param>
-        public WritableNsMessage(IIdentifier sourceId, IIdentifier destId, T message)
-        {
-            SourceId = sourceId;
-            DestId = destId;
-            Data = new List<T> {message};
-        }
-
-        /// <summary>
-        /// The identifier of the sender of the message.
-        /// </summary>
-        internal IIdentifier SourceId { get; private set; }
-
-        /// <summary>
-        /// The identifier of the receiver of the message.
-        /// </summary>
-        internal IIdentifier DestId { get; private set; }
-
-        /// <summary>
-        /// A list of data being sent in the message.
-        /// </summary>
-        public IList<T> Data { get; set; }
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        public void Read(IDataReader reader)
-        {
-            SourceId = _factory.Create(reader.ReadString());
-            DestId = _factory.Create(reader.ReadString());
-            int messageCount = reader.ReadInt32();
-            string dataType = reader.ReadString();
-
-            Data = new List<T>();
-
-            for (int index = 0; index < messageCount; index++)
-            {
-                var dataPoint = (T)_injection.ForkInjector().GetInstance(Type.GetType(dataType));
-
-                if (null == dataPoint)
-                {
-                    throw new Exception("T type instance cannot be created from the stream data in Network Service Message");
-                }
-
-                dataPoint.Read(reader);
-                Data.Add(dataPoint);
-            }
-        }
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        public void Write(IDataWriter writer)
-        {
-            writer.WriteString(SourceId.ToString());
-            writer.WriteString(DestId.ToString());
-            writer.WriteInt32(Data.Count);
-            writer.WriteString(Data[0].GetType().AssemblyQualifiedName);
-
-            foreach (var data in Data)
-            {
-                data.Write(writer);
-            }
-        }
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        /// <param name="token">The cancellation token</param>
-        public async Task ReadAsync(IDataReader reader, CancellationToken token)
-        {
-            SourceId = _factory.Create(await reader.ReadStringAsync(token));
-            DestId = _factory.Create(await reader.ReadStringAsync(token));
-            int messageCount = await reader.ReadInt32Async(token);
-            string dataType = await reader.ReadStringAsync(token);
-
-            Data = new List<T>();
-
-            for (int index = 0; index < messageCount; index++)
-            {
-                var dataPoint = (T) _injection.ForkInjector().GetInstance(Type.GetType(dataType));
-
-                if (null == dataPoint)
-                {
-                    throw new Exception("T type instance cannot be created from the stream data in Network Service Message");
-                }
-
-                await dataPoint.ReadAsync(reader, token);
-                Data.Add(dataPoint);
-            }
-        }
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        /// <param name="token">The cancellation token</param>
-        public async Task WriteAsync(IDataWriter writer, CancellationToken token)
-        {
-            await writer.WriteStringAsync(SourceId.ToString(), token);
-            await writer.WriteStringAsync(DestId.ToString(), token);
-            await writer.WriteInt32Async(Data.Count, token);
-            await writer.WriteStringAsync(Data[0].GetType().AssemblyQualifiedName, token);
-
-            foreach (var data in Data)
-            {
-                data.Write(writer);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 73b6d9d..a0e6038 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -61,6 +61,7 @@ under the License.
     <Compile Include="Group\Driver\IGroupCommDriver.cs" />
     <Compile Include="Group\Driver\Impl\CommunicationGroupDriver.cs" />
     <Compile Include="Group\Driver\Impl\GroupCommunicationMessage.cs" />
+    <Compile Include="Group\Driver\Impl\GroupCommunicationMessageStreamingCodec.cs" />
     <Compile Include="Group\Driver\Impl\MessageType.cs" />
     <Compile Include="Group\Driver\Impl\GroupCommDriver.cs" />
     <Compile Include="Group\Driver\Impl\TaskStarter.cs" />
@@ -139,6 +140,8 @@ under the License.
     <Compile Include="NetworkService\Codec\ControlMessageCodec.cs" />
     <Compile Include="NetworkService\Codec\NsMessageCodec.cs" />
     <Compile Include="NetworkService\Codec\NsMessageProto.cs" />
+    <Compile Include="NetworkService\Codec\NsMessageStreamingCodec.cs" />
+    <Compile Include="NetworkService\Codec\StreamingCodecFunctionCache.cs" />
     <Compile Include="NetworkService\ControlMessage.cs" />
     <Compile Include="NetworkService\IConnection.cs" />
     <Compile Include="NetworkService\INetworkService.cs" />
@@ -147,9 +150,7 @@ under the License.
     <Compile Include="NetworkService\NetworkServiceOptions.cs" />
     <Compile Include="NetworkService\NsConnection.cs" />
     <Compile Include="NetworkService\NsMessage.cs" />
-    <Compile Include="NetworkService\WritableNetworkService.cs" />
-    <Compile Include="NetworkService\WritableNsConnection.cs" />
-    <Compile Include="NetworkService\WritableNsMessage.cs" />
+    <Compile Include="NetworkService\StreamingNetworkService.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="Utilities\BlockingCollectionExtensions.cs" />
     <Compile Include="Utilities\Utils.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index babc26d..9f13c83 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -52,7 +52,6 @@ under the License.
     <Compile Include="StreamingRemoteManagerTest.cs" />
     <Compile Include="StreamingTransportTest.cs" />
     <Compile Include="TransportTest.cs" />
-    <Compile Include="WritableString.cs" />
   </ItemGroup>
   <ItemGroup>
     <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
index 20f75be..a0be7ee 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
@@ -25,7 +25,8 @@ using Microsoft.VisualStudio.TestTools.UnitTesting;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
 
 namespace Org.Apache.REEF.Wake.Tests
 {
@@ -34,9 +35,6 @@ namespace Org.Apache.REEF.Wake.Tests
     {
         private readonly StreamingRemoteManagerFactory _remoteManagerFactory1 =
             TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>();
-
-        private readonly StreamingRemoteManagerFactory _remoteManagerFactory2 =
-        TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>();
         
         /// <summary>
         /// Tests one way communication between Remote Managers 
@@ -47,24 +45,25 @@ namespace Org.Apache.REEF.Wake.Tests
         {
             IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
 
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            BlockingCollection<string> queue = new BlockingCollection<string>();
             List<string> events = new List<string>();
+            IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
 
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
             {
-                var observer = Observer.Create<WritableString>(queue.Add);
+                var observer = Observer.Create<string>(queue.Add);
                 IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
                 remoteManager2.RegisterObserver(endpoint1, observer);
 
                 var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver.OnNext(new WritableString("abc"));
-                remoteObserver.OnNext(new WritableString("def"));
-                remoteObserver.OnNext(new WritableString("ghi"));
+                remoteObserver.OnNext("abc");
+                remoteObserver.OnNext("def");
+                remoteObserver.OnNext("ghi");
 
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+                events.Add(queue.Take());
             }
 
             Assert.AreEqual(3, events.Count);
@@ -78,42 +77,44 @@ namespace Org.Apache.REEF.Wake.Tests
         {
             IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
 
-            BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
-            BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
+            BlockingCollection<string> queue1 = new BlockingCollection<string>();
+            BlockingCollection<string> queue2 = new BlockingCollection<string>();
             List<string> events1 = new List<string>();
             List<string> events2 = new List<string>();
 
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
             {
                 // Register observers for remote manager 1 and remote manager 2
                 var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-                var observer1 = Observer.Create<WritableString>(queue1.Add);
-                var observer2 = Observer.Create<WritableString>(queue2.Add);
+                var observer1 = Observer.Create<string>(queue1.Add);
+                var observer2 = Observer.Create<string>(queue2.Add);
                 remoteManager1.RegisterObserver(remoteEndpoint, observer1);
                 remoteManager2.RegisterObserver(remoteEndpoint, observer2);
 
                 // Remote manager 1 sends 3 events to remote manager 2
                 var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver1.OnNext(new WritableString("abc"));
-                remoteObserver1.OnNext(new WritableString("def"));
-                remoteObserver1.OnNext(new WritableString("ghi"));
+                remoteObserver1.OnNext("abc");
+                remoteObserver1.OnNext("def");
+                remoteObserver1.OnNext("ghi");
 
                 // Remote manager 2 sends 4 events to remote manager 1
                 var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
-                remoteObserver2.OnNext(new WritableString("jkl"));
-                remoteObserver2.OnNext(new WritableString("mno"));
-                remoteObserver2.OnNext(new WritableString("pqr"));
-                remoteObserver2.OnNext(new WritableString("stu"));
-
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
+                remoteObserver2.OnNext("jkl");
+                remoteObserver2.OnNext("mno");
+                remoteObserver2.OnNext("pqr");
+                remoteObserver2.OnNext("stu");
+
+                events1.Add(queue1.Take());
+                events1.Add(queue1.Take());
+                events1.Add(queue1.Take());
+                events1.Add(queue1.Take());
+
+                events2.Add(queue2.Take());
+                events2.Add(queue2.Take());
+                events2.Add(queue2.Take());
             }
 
             Assert.AreEqual(4, events1.Count);
@@ -129,29 +130,30 @@ namespace Org.Apache.REEF.Wake.Tests
         {
             IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
 
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            BlockingCollection<string> queue = new BlockingCollection<string>();
             List<string> events = new List<string>();
+            IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
 
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager3 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
             {
                 var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-                var observer = Observer.Create<WritableString>(queue.Add);
+                var observer = Observer.Create<string>(queue.Add);
                 remoteManager3.RegisterObserver(remoteEndpoint, observer);
 
                 var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
                 var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
 
-                remoteObserver2.OnNext(new WritableString("abc"));
-                remoteObserver1.OnNext(new WritableString("def"));
-                remoteObserver2.OnNext(new WritableString("ghi"));
-                remoteObserver1.OnNext(new WritableString("jkl"));
-                remoteObserver2.OnNext(new WritableString("mno"));
+                remoteObserver2.OnNext("abc");
+                remoteObserver1.OnNext("def");
+                remoteObserver2.OnNext("ghi");
+                remoteObserver1.OnNext("jkl");
+                remoteObserver2.OnNext("mno");
 
                 for (int i = 0; i < 5; i++)
                 {
-                    events.Add(queue.Take().Data);
+                    events.Add(queue.Take());
                 }
             }
 
@@ -167,58 +169,60 @@ namespace Org.Apache.REEF.Wake.Tests
         {
             IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
 
-            BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
-            BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
-            BlockingCollection<WritableString> queue3 = new BlockingCollection<WritableString>();
+            BlockingCollection<string> queue1 = new BlockingCollection<string>();
+            BlockingCollection<string> queue2 = new BlockingCollection<string>();
+            BlockingCollection<string> queue3 = new BlockingCollection<string>();
             List<string> events1 = new List<string>();
             List<string> events2 = new List<string>();
             List<string> events3 = new List<string>();
 
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager3 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
             {
                 var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
 
-                var observer = Observer.Create<WritableString>(queue1.Add);
+                var observer = Observer.Create<string>(queue1.Add);
                 remoteManager1.RegisterObserver(remoteEndpoint, observer);
-                var observer2 = Observer.Create<WritableString>(queue2.Add);
+                var observer2 = Observer.Create<string>(queue2.Add);
                 remoteManager2.RegisterObserver(remoteEndpoint, observer2);
-                var observer3 = Observer.Create<WritableString>(queue3.Add);
+                var observer3 = Observer.Create<string>(queue3.Add);
                 remoteManager3.RegisterObserver(remoteEndpoint, observer3);
 
                 var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
                 var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
 
                 // Observer 1 and 2 send messages to observer 3
-                remoteObserver1.OnNext(new WritableString("abc"));
-                remoteObserver1.OnNext(new WritableString("abc"));
-                remoteObserver1.OnNext(new WritableString("abc"));
-                remoteObserver2.OnNext(new WritableString("def"));
-                remoteObserver2.OnNext(new WritableString("def"));
+                remoteObserver1.OnNext("abc");
+                remoteObserver1.OnNext("abc");
+                remoteObserver1.OnNext("abc");
+                remoteObserver2.OnNext("def");
+                remoteObserver2.OnNext("def");
 
                 // Observer 3 sends messages back to observers 1 and 2
                 var remoteObserver3A = remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint);
                 var remoteObserver3B = remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint);
 
-                remoteObserver3A.OnNext(new WritableString("ghi"));
-                remoteObserver3A.OnNext(new WritableString("ghi"));
-                remoteObserver3B.OnNext(new WritableString("jkl"));
-                remoteObserver3B.OnNext(new WritableString("jkl"));
-                remoteObserver3B.OnNext(new WritableString("jkl"));
+                remoteObserver3A.OnNext("ghi");
+                remoteObserver3A.OnNext("ghi");
+                remoteObserver3B.OnNext("jkl");
+                remoteObserver3B.OnNext("jkl");
+                remoteObserver3B.OnNext("jkl");
 
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take());
+                events1.Add(queue1.Take());
 
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
+                events2.Add(queue2.Take());
+                events2.Add(queue2.Take());
+                events2.Add(queue2.Take());
 
-                events3.Add(queue3.Take().Data);
-                events3.Add(queue3.Take().Data);
-                events3.Add(queue3.Take().Data);
-                events3.Add(queue3.Take().Data);
-                events3.Add(queue3.Take().Data);
+                events3.Add(queue3.Take());
+                events3.Add(queue3.Take());
+                events3.Add(queue3.Take());
+                events3.Add(queue3.Take());
+                events3.Add(queue3.Take());
             }
 
             Assert.AreEqual(2, events1.Count);
@@ -234,34 +238,36 @@ namespace Org.Apache.REEF.Wake.Tests
         {
             IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
 
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            BlockingCollection<string> queue = new BlockingCollection<string>();
             List<string> events = new List<string>();
 
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
             {
                 // Register handler for when remote manager 2 receives events; respond
                 // with an ack
                 var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
                 var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
 
-                var receiverObserver = Observer.Create<WritableString>(
-                    message => remoteObserver2.OnNext(new WritableString("received message: " + message.Data)));
+                var receiverObserver = Observer.Create<string>(
+                    message => remoteObserver2.OnNext("received message: " + message));
                 remoteManager2.RegisterObserver(remoteEndpoint, receiverObserver);
 
                 // Register handler for remote manager 1 to record the ack
-                var senderObserver = Observer.Create<WritableString>(queue.Add);
+                var senderObserver = Observer.Create<string>(queue.Add);
                 remoteManager1.RegisterObserver(remoteEndpoint, senderObserver);
 
                 // Begin to send messages
                 var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver1.OnNext(new WritableString("hello"));
-                remoteObserver1.OnNext(new WritableString("there"));
-                remoteObserver1.OnNext(new WritableString("buddy"));
+                remoteObserver1.OnNext("hello");
+                remoteObserver1.OnNext("there");
+                remoteObserver1.OnNext("buddy");
 
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+                events.Add(queue.Take());
             }
 
             Assert.AreEqual(3, events.Count);
@@ -278,25 +284,27 @@ namespace Org.Apache.REEF.Wake.Tests
         {
             IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
 
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            BlockingCollection<string> queue = new BlockingCollection<string>();
             List<string> events = new List<string>();
 
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
             {
-                // RemoteManager2 listens and records events of type IRemoteEvent<WritableString>
-                var observer = Observer.Create<IRemoteMessage<WritableString>>(message => queue.Add(message.Message));
+                // RemoteManager2 listens and records events of type IRemoteEvent<string>
+                var observer = Observer.Create<IRemoteMessage<string>>(message => queue.Add(message.Message));
                 remoteManager2.RegisterObserver(observer);
 
                 // Remote manager 1 sends 3 events to remote manager 2
                 var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver.OnNext(new WritableString("abc"));
-                remoteObserver.OnNext(new WritableString("def"));
-                remoteObserver.OnNext(new WritableString("ghi"));
+                remoteObserver.OnNext("abc");
+                remoteObserver.OnNext("def");
+                remoteObserver.OnNext("ghi");
 
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+                events.Add(queue.Take());
             }
 
             Assert.AreEqual(3, events.Count);
@@ -310,28 +318,30 @@ namespace Org.Apache.REEF.Wake.Tests
         {
             IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
 
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            BlockingCollection<string> queue = new BlockingCollection<string>();
             List<string> events = new List<string>();
 
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
             {
-                var observer = Observer.Create<WritableString>(queue.Add);
+                var observer = Observer.Create<string>(queue.Add);
                 IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
                 remoteManager2.RegisterObserver(endpoint1, observer);
 
                 var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver.OnNext(new WritableString("abc"));
-                remoteObserver.OnNext(new WritableString("def"));
+                remoteObserver.OnNext("abc");
+                remoteObserver.OnNext("def");
 
                 var cachedObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                cachedObserver.OnNext(new WritableString("ghi"));
-                cachedObserver.OnNext(new WritableString("jkl"));
+                cachedObserver.OnNext("ghi");
+                cachedObserver.OnNext("jkl");
 
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+                events.Add(queue.Take());
             }
 
             Assert.AreEqual(4, events.Count);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
deleted file mode 100644
index 30ff487..0000000
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Wake.Tests
-{
-    /// <summary>
-    /// Writable wrapper around the string class
-    /// </summary>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public class WritableString : IWritable
-    {
-        /// <summary>
-        /// Returns the actual string data
-        /// </summary>
-        public string Data { get; set; }
-        
-        /// <summary>
-        /// Empty constructor for instantiation with reflection
-        /// </summary>
-        [Inject]
-        public WritableString()
-        {
-        }
-
-        /// <summary>
-        /// Constructor
-        /// </summary>
-        /// <param name="data">The string data</param>
-        public WritableString(string data)
-        {
-            Data = data;
-        }
-
-        /// <summary>
-        /// Reads the string
-        /// </summary>
-        /// <param name="reader">reader to read from</param>
-        public void Read(IDataReader reader)
-        {
-            Data = reader.ReadString();
-        }
-
-        /// <summary>
-        /// Writes the string
-        /// </summary>
-        /// <param name="writer">Writer to write</param>
-        public void Write(IDataWriter writer)
-        {
-            writer.WriteString(Data);
-        }
-
-        /// <summary>
-        /// Reads the string
-        /// </summary>
-        /// <param name="reader">reader to read from</param>
-        /// <param name="token">the cancellation token</param>
-        public async Task ReadAsync(IDataReader reader, CancellationToken token)
-        {
-            Data = await reader.ReadStringAsync(token);
-        }
-
-        /// <summary>
-        /// Writes the string
-        /// </summary>
-        /// <param name="writer">Writer to write</param>
-        /// <param name="token">the cancellation token</param>
-        public async Task WriteAsync(IDataWriter writer, CancellationToken token)
-        {
-            await writer.WriteStringAsync(Data, token);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index 4069d15..5767f8a 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -49,7 +49,6 @@ under the License.
     <Compile Include="IIdentifier.cs" />
     <Compile Include="IIdentifierFactory.cs" />
     <Compile Include="Remote\Impl\RemoteEventStreamingCodec.cs" />
-    <Compile Include="Remote\Impl\TemporaryWritableToStreamingCodec.cs" />
     <Compile Include="Remote\Impl\StreamingRemoteManagerFactory.cs" />
     <Compile Include="Remote\Impl\DefaultRemoteManagerFactory.cs" />
     <Compile Include="Impl\LoggingEventHandler.cs" />
@@ -78,7 +77,6 @@ under the License.
     <Compile Include="Remote\Impl\StreamingTransportClient.cs" />
     <Compile Include="Remote\Impl\StreamingTransportServer.cs" />
     <Compile Include="Remote\IRemoteManagerFactory.cs" />
-    <Compile Include="Remote\IWritable.cs" />
     <Compile Include="Remote\Proto\WakeRemoteProtosGen.cs" />
     <Compile Include="Remote\ICodec.cs" />
     <Compile Include="Remote\ICodecFactory.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs
deleted file mode 100644
index 644cf82..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Org.Apache.REEF.Wake.Remote
-{
-    /// <summary>
-    /// Interface that classes should implement if they need to be readable to and writable 
-    /// from the stream. It is assumed that the classes inheriting this interface will have a 
-    /// default empty constructor
-    /// </summary>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public interface IWritable
-    {
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        void Read(IDataReader reader);
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        void Write(IDataWriter writer);
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        /// <param name="token">The cancellation token</param>
-        Task ReadAsync(IDataReader reader, CancellationToken token);
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        /// <param name="token">The cancellation token</param>
-        Task WriteAsync(IDataWriter writer, CancellationToken token);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
index 90e3aca..da549ea 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
@@ -20,6 +20,7 @@
 using System.Net;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.StreamingCodec;
 
 namespace Org.Apache.REEF.Wake.Remote.Impl
 {
@@ -38,12 +39,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             _injector = injector;
         }
 
-        //ToDo: The port argument will be removed once changes are made in WritableNetworkService [REEF-447]
-        public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable
+        public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, IStreamingCodec<T> codec)
         {
 #pragma warning disable 618
 // This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
-            var codec = _injector.GetInstance<TemporaryWritableToStreamingCodec<T>>();
             return new StreamingRemoteManager<T>(localAddress, _tcpPortProvider, codec);
 #pragma warning disable 618
         }