You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2015/07/15 23:48:56 UTC

[1/3] incubator-reef git commit: [REEF-447]Convert Network Service Layer from Writable to Streaming

Repository: incubator-reef
Updated Branches:
  refs/heads/master 5cdd655e4 -> 8505dee94


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs
deleted file mode 100644
index 1af9b86..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.StreamingCodec;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
-    // TODO: This class will be removed shortly and is used to reduce the size of pull request.
-    internal sealed class TemporaryWritableToStreamingCodec<T> : IStreamingCodec<T> where T:IWritable
-    {
-        private readonly IInjector _injector;
-
-        [Inject]
-        public TemporaryWritableToStreamingCodec(IInjector injector)
-        {
-            _injector = injector;
-        }
-
-        public T Read(IDataReader reader)
-        {
-            string type = reader.ReadString();
-            var value = (T) _injector.ForkInjector().GetInstance(type);
-            value.Read(reader);
-            return value;
-        }
-
-        public void Write(T obj, IDataWriter writer)
-        {
-            writer.WriteString(obj.GetType().AssemblyQualifiedName);
-            obj.Write(writer);
-        }
-
-        public async Task<T> ReadAsync(IDataReader reader, CancellationToken token)
-        {
-            string type = await reader.ReadStringAsync(token);
-            var value = (T) _injector.ForkInjector().GetInstance(type);
-            await value.ReadAsync(reader, token);
-            return value;
-        }
-
-        public async Task WriteAsync(T obj, IDataWriter writer, CancellationToken token)
-        {
-            await writer.WriteStringAsync(obj.GetType().AssemblyQualifiedName, token);
-            await obj.WriteAsync(writer, token);
-        }
-    }
-}


[2/3] incubator-reef git commit: [REEF-447]Convert Network Service Layer from Writable to Streaming

Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index b1b79d4..8fe19d6 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -32,7 +32,6 @@ using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Exceptions;
 using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.StreamingCodec;
 
 namespace Org.Apache.REEF.Network.Group.Task.Impl
 {
@@ -43,18 +42,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// Communication Group.
     /// </summary>
     /// <typeparam name="T">The message type</typeparam>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
     public sealed class OperatorTopology<T> : IOperatorTopology<T>, IObserver<GeneralGroupCommunicationMessage>
     {
-        private const int DefaultTimeout = 50000;
-        private const int RetryCount = 10;
-
         private static readonly Logger Logger = Logger.GetLogger(typeof(OperatorTopology<>));
 
         private readonly string _groupName;
         private readonly string _operatorName;
         private readonly string _selfId;
-        private string _driverId;
         private readonly int _timeout;
         private readonly int _retryCount;
         private readonly int _sleepTime;
@@ -66,7 +60,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         private readonly Sender _sender;
         private readonly BlockingCollection<NodeStruct<T>> _nodesWithData;
         private readonly Object _thisLock = new Object();
-        private readonly IStreamingCodec<T> _codec;
 
         /// <summary>
         /// Creates a new OperatorTopology object.
@@ -74,7 +67,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="operatorName">The name of the Group Communication Operator</param>
         /// <param name="groupName">The name of the operator's Communication Group</param>
         /// <param name="taskId">The operator's Task identifier</param>
-        /// <param name="driverId">The identifer for the driver</param>
         /// <param name="timeout">Timeout value for cancellation token</param>
         /// <param name="retryCount">Number of times to retry wating for registration</param>
         /// <param name="sleepTime">Sleep time between retry wating for registration</param>
@@ -82,26 +74,22 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="childIds">The set of child Task identifiers in the topology graph</param>
         /// <param name="networkService">The network service</param>
         /// <param name="sender">The Sender used to do point to point communication</param>
-        /// <param name="codec">Streaming codec to encode objects</param>
         [Inject]
         private OperatorTopology(
             [Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName,
             [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
             [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
-            [Parameter(typeof(GroupCommConfigurationOptions.DriverId))] string driverId,
             [Parameter(typeof(GroupCommConfigurationOptions.Timeout))] int timeout,
             [Parameter(typeof(GroupCommConfigurationOptions.RetryCountWaitingForRegistration))] int retryCount,
             [Parameter(typeof(GroupCommConfigurationOptions.SleepTimeWaitingForRegistration))] int sleepTime,
             [Parameter(typeof(GroupCommConfigurationOptions.TopologyRootTaskId))] string rootId,
             [Parameter(typeof(GroupCommConfigurationOptions.TopologyChildTaskIds))] ISet<string> childIds,
-            WritableNetworkService<GeneralGroupCommunicationMessage> networkService,
-            Sender sender,
-            IStreamingCodec<T> codec)
+            StreamingNetworkService<GeneralGroupCommunicationMessage> networkService,
+            Sender sender)
         {
             _operatorName = operatorName;
             _groupName = groupName;
             _selfId = taskId;
-            _driverId = driverId;
             _timeout = timeout;
             _retryCount = retryCount;
             _sleepTime = sleepTime;
@@ -110,7 +98,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             _nodesWithData = new BlockingCollection<NodeStruct<T>>();
             _children = new List<NodeStruct<T>>();
             _idToNodeMap = new Dictionary<string, NodeStruct<T>>();
-            _codec = codec;
 
             if (_selfId.Equals(rootId))
             {
@@ -201,7 +188,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                 throw new ArgumentException("No parent for node");
             }
 
-            SendToNode(message, MessageType.Data, _parent);
+            SendToNode(message, _parent);
         }
 
         /// <summary>
@@ -218,7 +205,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
 
             foreach (var child in _children)
             {
-                SendToNode(message, MessageType.Data, child);
+                SendToNode(message, child);
             }
         }
 
@@ -444,10 +431,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="message">The message to send</param>
         /// <param name="msgType">The message type</param>
         /// <param name="node">The NodeStruct representing the Task to send to</param>
-        private void SendToNode(T message, MessageType msgType, NodeStruct<T> node)
+        private void SendToNode(T message, NodeStruct<T> node)
         {
             GeneralGroupCommunicationMessage gcm = new GroupCommunicationMessage<T>(_groupName, _operatorName,
-                _selfId, node.Identifier, message, msgType, _codec);
+                _selfId, node.Identifier, message);
 
             _sender.Send(gcm);
         }
@@ -458,12 +445,12 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="messages">The list of messages to send</param>
         /// <param name="msgType">The message type</param>
         /// <param name="node">The NodeStruct representing the Task to send to</param>
-        private void SendToNode(IList<T> messages, MessageType msgType, NodeStruct<T> node)
+        private void SendToNode(IList<T> messages, NodeStruct<T> node)
         {
             T[] encodedMessages = messages.ToArray();
 
             GroupCommunicationMessage<T> gcm = new GroupCommunicationMessage<T>(_groupName, _operatorName,
-                _selfId, node.Identifier, encodedMessages, msgType, _codec);
+                _selfId, node.Identifier, encodedMessages);
 
             _sender.Send(gcm);
         }
@@ -511,7 +498,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                 }
 
                 IList<T> sublist = messages.ToList().GetRange(i, size);
-                SendToNode(sublist, MessageType.Data, nodeStruct);
+                SendToNode(sublist, nodeStruct);
 
                 i += size;
             }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs
new file mode 100644
index 0000000..c30c1bd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Network.NetworkService.Codec
+{
+    /// <summary>
+    /// Codec to serialize NsMessages for NetworkService.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    internal class NsMessageStreamingCodec<T> : IStreamingCodec<NsMessage<T>>
+    {
+        private readonly IIdentifierFactory _idFactory;
+        private readonly StreamingCodecFunctionCache<T> _codecFunctionsCache;
+
+        /// <summary>
+        /// Create new NsMessageCodec.
+        /// </summary>
+        /// <param name="idFactory">Used to create identifier from string.</param>
+        /// <param name="injector">Injector to instantiate codecs.</param>
+        [Inject]
+        private NsMessageStreamingCodec(IIdentifierFactory idFactory, IInjector injector)
+        {
+            _idFactory = idFactory;
+            _codecFunctionsCache = new StreamingCodecFunctionCache<T>(injector);
+        }
+
+        /// <summary>
+        /// Instantiate the class from the reader.
+        /// </summary>
+        /// <param name="reader">The reader from which to read</param>
+        ///<returns>The instance of type NsMessage<T></T> read from the reader</returns>
+        public NsMessage<T> Read(IDataReader reader)
+        {
+            int metadataSize = reader.ReadInt32();
+            byte[] metadata = new byte[metadataSize];
+            reader.Read(ref metadata, 0, metadataSize);
+            var res = GenerateMetaDataDecoding(metadata);
+
+            Type messageType = res.Item3;
+            NsMessage<T> message = res.Item1;
+
+            var codecReadFunc = _codecFunctionsCache.ReadFunction(messageType);
+            int messageCount = res.Item2;
+
+            for (int i = 0; i < messageCount; i++)
+            {
+                message.Data.Add(codecReadFunc(reader));
+            }
+
+            return message;
+        }
+
+        /// <summary>
+        /// Writes the class fields to the writer.
+        /// </summary>
+        /// <param name="obj">The object of type NsMessage<T></T> to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        public void Write(NsMessage<T> obj, IDataWriter writer)
+        {
+            byte[] encodedMetadata = GenerateMetaDataEncoding(obj);
+            byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length);
+            byte[] totalEncoding = encodedInt.Concat(encodedMetadata).ToArray();
+            writer.Write(totalEncoding, 0, totalEncoding.Length);
+
+            Type messageType = obj.Data[0].GetType();        
+            var codecWriteFunc = _codecFunctionsCache.WriteFunction(messageType);
+          
+            foreach (var data in obj.Data)
+            {
+                codecWriteFunc(data, writer);
+            }
+        }
+
+        ///  <summary>
+        ///  Instantiate the class from the reader.
+        ///  </summary>
+        ///  <param name="reader">The reader from which to read</param>
+        /// <param name="token">Cancellation token</param>
+        /// <returns>The instance of type NsMessage<T> read from the reader</returns>
+        public async Task<NsMessage<T>> ReadAsync(IDataReader reader, CancellationToken token)
+        {
+            int metadataSize = await reader.ReadInt32Async(token);
+            byte[] metadata = new byte[metadataSize];
+            await reader.ReadAsync(metadata, 0, metadataSize, token);
+            var res = GenerateMetaDataDecoding(metadata);
+            Type messageType = res.Item3;
+            NsMessage<T> message = res.Item1;
+            var codecReadFunc = _codecFunctionsCache.ReadAsyncFunction(messageType);
+            int messageCount = res.Item2;
+
+            for (int i = 0; i < messageCount; i++)
+            {
+                message.Data.Add(codecReadFunc(reader, token));
+            }
+
+            return message;
+        }
+
+        /// <summary>
+        /// Writes the class fields to the writer.
+        /// </summary>
+        /// <param name="obj">The object of type NsMessage<T> to be encoded</param>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">Cancellation token</param>
+        public async Task WriteAsync(NsMessage<T> obj, IDataWriter writer, CancellationToken token)
+        {
+            byte[] encodedMetadata = GenerateMetaDataEncoding(obj);
+            byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length);
+            byte[] totalEncoding = encodedInt.Concat(encodedMetadata).ToArray();
+            await writer.WriteAsync(totalEncoding, 0, totalEncoding.Length, token);
+
+            Type messageType = obj.Data[0].GetType();
+
+            var codecWriteFunc = _codecFunctionsCache.WriteAsyncFunction(messageType);
+
+            foreach (var data in obj.Data)
+            {
+                var asyncResult = codecWriteFunc.BeginInvoke(data, writer, token, null, null);
+                codecWriteFunc.EndInvoke(asyncResult);
+            }
+        }
+
+        private static byte[] GenerateMetaDataEncoding(NsMessage<T> obj )
+        {
+            List<byte[]> metadataBytes = new List<byte[]>();
+            byte[] sourceBytes = StringToBytes(obj.SourceId.ToString());
+            byte[] dstBytes = StringToBytes(obj.DestId.ToString());
+            byte[] messageTypeBytes = StringToBytes(obj.Data[0].GetType().AssemblyQualifiedName);
+            byte[] messageCount = BitConverter.GetBytes(obj.Data.Count);
+
+            metadataBytes.Add(BitConverter.GetBytes(sourceBytes.Length));
+            metadataBytes.Add(BitConverter.GetBytes(dstBytes.Length));
+            metadataBytes.Add(BitConverter.GetBytes(messageTypeBytes.Length));
+            metadataBytes.Add(sourceBytes);
+            metadataBytes.Add(dstBytes);
+            metadataBytes.Add(messageTypeBytes);
+            metadataBytes.Add(messageCount);
+
+            return metadataBytes.SelectMany(i => i).ToArray();
+        }
+
+        private Tuple<NsMessage<T>, int, Type> GenerateMetaDataDecoding(byte[] obj)
+        {
+            int srcCount = BitConverter.ToInt32(obj, 0);
+            int dstCount = BitConverter.ToInt32(obj, sizeof (int));
+            int msgTypeCount = BitConverter.ToInt32(obj, 2*sizeof (int));
+
+            int offset = 3*sizeof (int);
+            string srcString = BytesToString(obj.Skip(offset).Take(srcCount).ToArray());
+            offset += srcCount;
+            string dstString = BytesToString(obj.Skip(offset).Take(dstCount).ToArray());
+            offset += dstCount;
+            Type msgType = Type.GetType(BytesToString(obj.Skip(offset).Take(msgTypeCount).ToArray()));
+            offset += msgTypeCount;
+            int messageCount = BitConverter.ToInt32(obj, offset);
+
+            NsMessage<T> msg = new NsMessage<T>(_idFactory.Create(srcString), _idFactory.Create(dstString));
+            return new Tuple<NsMessage<T>, int, Type>(msg, messageCount, msgType);
+        }
+
+        private static byte[] StringToBytes(string str)
+        {
+            byte[] bytes = new byte[str.Length * sizeof(char)];
+            Buffer.BlockCopy(str.ToCharArray(), 0, bytes, 0, bytes.Length);
+            return bytes;
+        }
+
+        private static string BytesToString(byte[] bytes)
+        {
+            char[] chars = new char[bytes.Length / sizeof(char)];
+            Buffer.BlockCopy(bytes, 0, chars, 0, bytes.Length);
+            return new string(chars);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
new file mode 100644
index 0000000..6d91298
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Network.NetworkService.Codec
+{
+    /// <summary>
+    /// Cache of StreamingCodec functions used to store codec functions for messages
+    /// to avoid reflection cost. Each message type is assumed to have a unique 
+    /// associated codec
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    internal class StreamingCodecFunctionCache<T>
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof (StreamingCodecFunctionCache<T>));
+        private readonly Dictionary<Type, Func<IDataReader, T>> _readFuncCache;
+        private readonly Dictionary<Type, Func<IDataReader, CancellationToken, T>> _readAsyncFuncCache;
+        private readonly Dictionary<Type, Action<T, IDataWriter>> _writeFuncCache;
+        private readonly Dictionary<Type, Func<T, IDataWriter, CancellationToken, Task>> _writeAsyncFuncCache;
+        private readonly IInjector _injector;
+        private readonly Type _streamingCodecType;
+
+        /// <summary>
+        /// Create new StreamingCodecFunctionCache.
+        /// </summary>
+        /// <param name="injector"> Injector</param>
+        internal StreamingCodecFunctionCache(IInjector injector)
+        {
+            _injector = injector;
+            _readFuncCache = new Dictionary<Type, Func<IDataReader, T>>();
+            _readAsyncFuncCache = new Dictionary<Type, Func<IDataReader, CancellationToken, T>>();
+            _writeFuncCache = new Dictionary<Type, Action<T, IDataWriter>>();
+            _writeAsyncFuncCache = new Dictionary<Type, Func<T, IDataWriter, CancellationToken, Task>>();
+            _streamingCodecType = typeof(IStreamingCodec<>);
+        }
+
+        /// <summary>
+        /// Creates the read delegate function of StreamingCodec from the message type
+        /// </summary>
+        /// <param name="messageType">Type of message</param>
+        /// <returns>The read delegate function</returns>
+        internal Func<IDataReader, T> ReadFunction(Type messageType)
+        {
+            Func<IDataReader, T> readFunc;
+
+            if (!_readFuncCache.TryGetValue(messageType, out readFunc))
+            {
+                AddCodecFunctions(messageType);
+                readFunc = _readFuncCache[messageType];
+            }
+
+            return readFunc;
+        }
+
+        /// <summary>
+        /// Creates the read async delegate function of StreamingCodec from the message type
+        /// </summary>
+        /// <param name="messageType">Type of message</param>
+        /// <returns>The read async delegate function</returns>
+        internal Func<IDataReader, CancellationToken, T> ReadAsyncFunction(Type messageType)
+        {
+            Func<IDataReader, CancellationToken, T> readFunc;
+
+            if (!_readAsyncFuncCache.TryGetValue(messageType, out readFunc))
+            {
+                AddCodecFunctions(messageType);
+                readFunc = _readAsyncFuncCache[messageType];
+            }
+
+            return readFunc;
+        }
+
+        /// <summary>
+        /// Creates the write delegate function of StreamingCodec from the message type
+        /// </summary>
+        /// <param name="messageType">Type of message</param>
+        /// <returns>The write delegate function</returns>
+        internal Action<T, IDataWriter> WriteFunction(Type messageType)
+        {
+            Action<T, IDataWriter> writeFunc;
+
+            if (!_writeFuncCache.TryGetValue(messageType, out writeFunc))
+            {
+                AddCodecFunctions(messageType);
+                writeFunc = _writeFuncCache[messageType];
+            }
+
+            return writeFunc;
+        }
+
+        /// <summary>
+        /// Creates the write async delegate function of StreamingCodec from the message type
+        /// </summary>
+        /// <param name="messageType">Type of message</param>
+        /// <returns>The write async delegate function</returns>
+        internal Func<T, IDataWriter, CancellationToken, Task> WriteAsyncFunction(Type messageType)
+        {
+            Func<T, IDataWriter, CancellationToken, Task> writeFunc;
+
+            if (!_writeAsyncFuncCache.TryGetValue(messageType, out writeFunc))
+            {
+                AddCodecFunctions(messageType);
+                writeFunc = _writeAsyncFuncCache[messageType];
+            }
+
+            return writeFunc;
+        }
+
+        private void AddCodecFunctions(Type messageType)
+        {
+            if (!typeof(T).IsAssignableFrom(messageType))
+            {
+                Exceptions.CaughtAndThrow(new Exception("Message type not assignable to base type"), Level.Error,
+                    Logger);
+            }
+
+            Type codecType = _streamingCodecType.MakeGenericType(messageType);
+            var codec = _injector.GetInstance(codecType);
+
+            MethodInfo readMethod = codec.GetType().GetMethod("Read");
+            _readFuncCache[messageType] = (Func<IDataReader, T>) Delegate.CreateDelegate
+                (typeof (Func<IDataReader, T>), codec, readMethod);
+
+            MethodInfo readAsyncMethod = codec.GetType().GetMethod("ReadAsync");
+            MethodInfo genericHelper = GetType()
+                .GetMethod("ReadAsyncHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance);
+            MethodInfo constructedHelper = genericHelper.MakeGenericMethod(messageType);
+            _readAsyncFuncCache[messageType] =
+                (Func<IDataReader, CancellationToken, T>)constructedHelper.Invoke(this, new[] { readAsyncMethod, codec });
+
+            MethodInfo writeMethod = codec.GetType().GetMethod("Write");
+            genericHelper = GetType().GetMethod("WriteHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance);
+            constructedHelper = genericHelper.MakeGenericMethod(messageType);
+            _writeFuncCache[messageType] =
+                (Action<T, IDataWriter>) constructedHelper.Invoke(this, new[] {writeMethod, codec});
+            
+            MethodInfo writeAsyncMethod = codec.GetType().GetMethod("WriteAsync");
+            genericHelper = GetType().GetMethod("WriteAsyncHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance);
+            constructedHelper = genericHelper.MakeGenericMethod(messageType);
+            _writeAsyncFuncCache[messageType] =
+                (Func<T, IDataWriter, CancellationToken, Task>)
+                    constructedHelper.Invoke(this, new[] {writeAsyncMethod, codec});
+        }
+
+        private Action<T, IDataWriter> WriteHelperFunc<T1>(MethodInfo method, object codec) where T1 : class
+        {
+            Action<T1, IDataWriter> func = (Action<T1, IDataWriter>) Delegate.CreateDelegate
+                (typeof (Action<T1, IDataWriter>), codec, method);
+
+            Action<T, IDataWriter> ret = (obj, writer) => func(obj as T1, writer);
+            return ret;
+        }
+
+        private Func<T, IDataWriter, CancellationToken, Task> WriteAsyncHelperFunc<T1>(MethodInfo method, object codec)
+            where T1 : class
+        {
+            Func<T1, IDataWriter, CancellationToken, Task> func =
+                (Func<T1, IDataWriter, CancellationToken, Task>) Delegate.CreateDelegate
+                    (typeof (Func<T1, IDataWriter, CancellationToken, Task>), codec, method);
+
+            Func<T, IDataWriter, CancellationToken, Task> ret = (obj, writer, token) => func(obj as T1, writer, token);
+            return ret;
+        }
+
+        private Func<IDataReader, CancellationToken, T> ReadAsyncHelperFunc<T1>(MethodInfo method, object codec)
+            where T1 : class
+        {
+            Func<IDataReader, CancellationToken, Task<T1>> func =
+                (Func<IDataReader, CancellationToken, Task<T1>>) Delegate.CreateDelegate
+                    (typeof (Func<IDataReader, CancellationToken, Task<T1>>), codec, method);
+
+            Func<IDataReader, CancellationToken, T1> func1 = (writer, token) => func(writer, token).Result;
+            Func<IDataReader, CancellationToken, T> func2 = (writer, token) => ((T)(object)func1(writer, token));
+            return func2;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
new file mode 100644
index 0000000..1ff2517
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Network.NetworkService.Codec;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Network.NetworkService
+{
+    /// <summary>
+    /// Writable Network service used for Reef Task communication.
+    /// </summary>
+    /// <typeparam name="T">The message type</typeparam>
+    public class StreamingNetworkService<T> : INetworkService<T>
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(StreamingNetworkService<>));
+
+        private readonly IRemoteManager<NsMessage<T>> _remoteManager;
+        private IIdentifier _localIdentifier;
+        private readonly IDisposable _messageHandlerDisposable;
+        private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap;
+        private readonly INameClient _nameClient;
+
+        /// <summary>
+        /// Create a new Writable NetworkService.
+        /// </summary>
+        /// <param name="messageHandler">The observer to handle incoming messages</param>
+        /// <param name="idFactory">The factory used to create IIdentifiers</param>
+        /// <param name="nameClient">The name client used to register Ids</param>
+        /// <param name="remoteManagerFactory">Writable RemoteManagerFactory to create a 
+        /// Writable RemoteManager</param>
+        /// <param name="codec">Codec for Network Service message</param>
+        /// <param name="injector">Fork of the injector that created the Network service</param>
+        [Inject]
+        private StreamingNetworkService(
+            IObserver<NsMessage<T>> messageHandler,
+            IIdentifierFactory idFactory,
+            INameClient nameClient,
+            StreamingRemoteManagerFactory remoteManagerFactory,
+            NsMessageStreamingCodec<T> codec,
+            IInjector injector)
+        {
+            IPAddress localAddress = NetworkUtils.LocalIPAddress;
+            _remoteManager = remoteManagerFactory.GetInstance(localAddress, codec);
+
+            // Create and register incoming message handler
+            // TODO[REEF-419] This should use the TcpPortProvider mechanism
+            var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
+            _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, messageHandler);
+
+            _nameClient = nameClient;
+            _connectionMap = new Dictionary<IIdentifier, IConnection<T>>();
+
+            Logger.Log(Level.Info, "Started network service");
+        }
+
+        /// <summary>
+        /// Name client for registering ids
+        /// </summary>
+        public INameClient NamingClient
+        {
+            get { return _nameClient; }
+        }
+
+        /// <summary>
+        /// Open a new connection to the remote host registered to
+        /// the name service with the given identifier
+        /// </summary>
+        /// <param name="destinationId">The identifier of the remote host</param>
+        /// <returns>The IConnection used for communication</returns>
+        public IConnection<T> NewConnection(IIdentifier destinationId)
+        {
+            if (_localIdentifier == null)
+            {
+                throw new IllegalStateException("Cannot open connection without first registering an ID");
+            }
+
+            IConnection<T> connection;
+            if (_connectionMap.TryGetValue(destinationId, out connection))
+            {
+                return connection;
+            }
+            else
+            {
+                connection = new NsConnection<T>(_localIdentifier, destinationId,
+                    NamingClient, _remoteManager, _connectionMap);
+
+                _connectionMap[destinationId] = connection;
+                return connection;
+            }
+        }
+
+        /// <summary>
+        /// Register the identifier for the NetworkService with the NameService.
+        /// </summary>
+        /// <param name="id">The identifier to register</param>
+        public void Register(IIdentifier id)
+        {
+            Logger.Log(Level.Info, "Registering id {0} with network service.", id);
+
+            _localIdentifier = id;
+            NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint);
+
+            Logger.Log(Level.Info, "End of Registering id {0} with network service.", id);
+        }
+
+        /// <summary>
+        /// Unregister the identifier for the NetworkService with the NameService.
+        /// </summary>
+        public void Unregister()
+        {
+            if (_localIdentifier == null)
+            {
+                throw new IllegalStateException("Cannot unregister a non existant identifier");
+            }
+
+            NamingClient.Unregister(_localIdentifier.ToString());
+            _localIdentifier = null;
+            _messageHandlerDisposable.Dispose();
+        }
+
+        /// <summary>
+        /// Dispose of the NetworkService's resources
+        /// </summary>
+        public void Dispose()
+        {
+            NamingClient.Dispose();
+            _remoteManager.Dispose();
+
+            Logger.Log(Level.Info, "Disposed of network service");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
deleted file mode 100644
index 93da126..0000000
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Net;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Exceptions;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Network.NetworkService
-{
-    /// <summary>
-    /// Writable Network service used for Reef Task communication.
-    /// </summary>
-    /// <typeparam name="T">The message type</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public class WritableNetworkService<T> : INetworkService<T> where T : IWritable
-    {
-        private static readonly Logger Logger = Logger.GetLogger(typeof(NetworkService<>));
-
-        private readonly IRemoteManager<WritableNsMessage<T>> _remoteManager;
-        private readonly IObserver<WritableNsMessage<T>> _messageHandler;
-        private IIdentifier _localIdentifier;
-        private IDisposable _messageHandlerDisposable;
-        private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap;
-        private readonly INameClient _nameClient;
-
-        /// <summary>
-        /// Create a new Writable NetworkService.
-        /// </summary>
-        /// <param name="nsPort">The port that the NetworkService will listen on</param>
-        /// <param name="messageHandler">The observer to handle incoming messages</param>
-        /// <param name="idFactory">The factory used to create IIdentifiers</param>
-        /// <param name="nameClient">The name client used to register Ids</param>
-        /// <param name="remoteManagerFactory">Writable RemoteManagerFactory to create a 
-        /// Writable RemoteManager</param>
-        [Inject]
-        private WritableNetworkService(
-            [Parameter(typeof (NetworkServiceOptions.NetworkServicePort))] int nsPort,
-            IObserver<WritableNsMessage<T>> messageHandler,
-            IIdentifierFactory idFactory,
-            INameClient nameClient,
-            StreamingRemoteManagerFactory remoteManagerFactory)
-        {
- 
-            IPAddress localAddress = NetworkUtils.LocalIPAddress;
-            _remoteManager = remoteManagerFactory.GetInstance<WritableNsMessage<T>>(localAddress, nsPort);
-            _messageHandler = messageHandler;
-
-            // Create and register incoming message handler
-            // TODO[REEF-419] This should use the TcpPortProvider mechanism
-            var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
-            _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, _messageHandler);
-
-            _nameClient = nameClient;
-            _connectionMap = new Dictionary<IIdentifier, IConnection<T>>();
-
-            Logger.Log(Level.Info, "Started network service");
-        }
-
-        /// <summary>
-        /// Name client for registering ids
-        /// </summary>
-        public INameClient NamingClient
-        {
-            get { return _nameClient; }
-        }
-
-        /// <summary>
-        /// Open a new connection to the remote host registered to
-        /// the name service with the given identifier
-        /// </summary>
-        /// <param name="destinationId">The identifier of the remote host</param>
-        /// <returns>The IConnection used for communication</returns>
-        public IConnection<T> NewConnection(IIdentifier destinationId)
-        {
-            if (_localIdentifier == null)
-            {
-                throw new IllegalStateException("Cannot open connection without first registering an ID");
-            }
-
-            IConnection<T> connection;
-            if (_connectionMap.TryGetValue(destinationId, out connection))
-            {
-                return connection;
-            }
-            else
-            {
-                connection = new WritableNsConnection<T>(_localIdentifier, destinationId,
-                    NamingClient, _remoteManager, _connectionMap);
-
-                _connectionMap[destinationId] = connection;
-                return connection;
-            }
-        }
-
-        /// <summary>
-        /// Register the identifier for the NetworkService with the NameService.
-        /// </summary>
-        /// <param name="id">The identifier to register</param>
-        public void Register(IIdentifier id)
-        {
-            Logger.Log(Level.Info, "Registering id {0} with network service.", id);
-
-            _localIdentifier = id;
-            NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint);
-
-            Logger.Log(Level.Info, "End of Registering id {0} with network service.", id);
-        }
-
-        /// <summary>
-        /// Unregister the identifier for the NetworkService with the NameService.
-        /// </summary>
-        public void Unregister()
-        {
-            if (_localIdentifier == null)
-            {
-                throw new IllegalStateException("Cannot unregister a non existant identifier");
-            }
-
-            NamingClient.Unregister(_localIdentifier.ToString());
-            _localIdentifier = null;
-            _messageHandlerDisposable.Dispose();
-        }
-
-        /// <summary>
-        /// Dispose of the NetworkService's resources
-        /// </summary>
-        public void Dispose()
-        {
-            NamingClient.Dispose();
-            _remoteManager.Dispose();
-
-            Logger.Log(Level.Info, "Disposed of network service");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs
deleted file mode 100644
index c20238c..0000000
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Net;
-using System.Net.Sockets;
-using System.Runtime.Remoting;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Tang.Exceptions;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Network.NetworkService
-{
-    /// <summary>
-    /// Represents a connection between two hosts using the Writable NetworkService.
-    /// </summary>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public class WritableNsConnection<T> : IConnection<T> where T : IWritable
-    {
-        private static readonly Logger Logger = Logger.GetLogger(typeof (WritableNsConnection<T>));
-
-        private readonly IIdentifier _sourceId;
-        private readonly IIdentifier _destId;
-        private readonly INameClient _nameClient;
-        private readonly IRemoteManager<WritableNsMessage<T>> _remoteManager;
-        private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap;
-        private IObserver<WritableNsMessage<T>> _remoteSender;
-
-        /// <summary>
-        /// Creates a new NsConnection between two hosts.
-        /// </summary>
-        /// <param name="sourceId">The identifier of the sender</param>
-        /// <param name="destId">The identifier of the receiver</param>
-        /// <param name="nameClient">The NameClient used for naming lookup</param>
-        /// <param name="remoteManager">The remote manager used for network communication</param>
-        /// <param name="connectionMap">A cache of opened connections.  Will remove itself from
-        /// the cache when the NsConnection is disposed.</param>
-        public WritableNsConnection(
-            IIdentifier sourceId,
-            IIdentifier destId,
-            INameClient nameClient,
-            IRemoteManager<WritableNsMessage<T>> remoteManager,
-            Dictionary<IIdentifier, IConnection<T>> connectionMap)
-        {
-            _sourceId = sourceId;
-            _destId = destId;
-            _nameClient = nameClient;
-            _remoteManager = remoteManager;
-            _connectionMap = connectionMap;
-        }
-
-        /// <summary>
-        /// Opens the connection to the remote host.
-        /// </summary>
-        public void Open()
-        {
-            string destStr = _destId.ToString();
-            Logger.Log(Level.Verbose, "Network service opening connection to {0}...", destStr);
-
-            IPEndPoint destAddr = _nameClient.Lookup(destStr);
-            if (null == destAddr)
-            {
-                throw new RemotingException("Destination Address identifier cannot be found");
-            }
-
-            try
-            {
-                _remoteSender = _remoteManager.GetRemoteObserver(destAddr);
-                Logger.Log(Level.Verbose, "Network service completed connection to {0}.", destStr);
-            }
-            catch (SocketException)
-            {
-                Logger.Log(Level.Error, "Network Service cannot open connection to " + destAddr);
-                throw;
-            }
-            catch (ObjectDisposedException)
-            {
-                Logger.Log(Level.Error, "Network Service cannot open connection to " + destAddr);
-                throw;
-            }
-        }
-
-        /// <summary>
-        /// Writes the object to the remote host.
-        /// </summary>
-        /// <param name="message">The message to send</param>
-        public void Write(T message)
-        {
-            if (_remoteSender == null)
-            {
-                throw new IllegalStateException("NsConnection has not been opened yet.");
-            }
-
-            try
-            {
-                _remoteSender.OnNext(new WritableNsMessage<T>(_sourceId, _destId, message));
-            }
-            catch (IOException)
-            {
-                Logger.Log(Level.Error, "Network Service cannot write message to {0}", _destId);
-                throw;
-            }
-            catch (ObjectDisposedException)
-            {
-                Logger.Log(Level.Error, "Network Service cannot write message to {0}", _destId);
-                throw;
-            }
-        }
-
-        /// <summary>
-        /// Closes the connection
-        /// </summary>
-        public void Dispose()
-        {
-            _connectionMap.Remove(_destId);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
deleted file mode 100644
index a9299bb..0000000
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Linq;
-using System.Runtime.Serialization;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.Hadoop.Avro;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-
-
-namespace Org.Apache.REEF.Network.NetworkService
-{
-    /// <summary>
-    /// Writable Message sent between NetworkServices.</summary>
-    /// <typeparam name="T">The type of data being sent. It is assumed to be Writable</typeparam>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public class WritableNsMessage<T> : IWritable where T : IWritable
-    {
-        private IIdentifierFactory _factory;
-        private IInjector _injection;
-        /// <summary>
-        /// Constructor to allow instantiation by reflection
-        /// </summary>
-        [Inject]
-        public WritableNsMessage(IIdentifierFactory factory, IInjector injection)
-        {
-            _factory = factory;
-            _injection = injection;
-        }
-        
-        /// <summary>
-        /// Create a new Writable NsMessage with no data.
-        /// </summary>
-        /// <param name="sourceId">The identifier of the sender</param>
-        /// <param name="destId">The identifier of the receiver</param>
-        public WritableNsMessage(IIdentifier sourceId, IIdentifier destId)
-        {
-            SourceId = sourceId;
-            DestId = destId;
-            Data = new List<T>();
-        }
-
-        /// <summary>
-        /// Create a new Writable NsMessage with data.
-        /// </summary>
-        /// <param name="sourceId">The identifier of the sender</param>
-        /// <param name="destId">The identifier of the receiver</param>
-        /// <param name="message">The message to send</param>
-        public WritableNsMessage(IIdentifier sourceId, IIdentifier destId, T message)
-        {
-            SourceId = sourceId;
-            DestId = destId;
-            Data = new List<T> {message};
-        }
-
-        /// <summary>
-        /// The identifier of the sender of the message.
-        /// </summary>
-        internal IIdentifier SourceId { get; private set; }
-
-        /// <summary>
-        /// The identifier of the receiver of the message.
-        /// </summary>
-        internal IIdentifier DestId { get; private set; }
-
-        /// <summary>
-        /// A list of data being sent in the message.
-        /// </summary>
-        public IList<T> Data { get; set; }
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        public void Read(IDataReader reader)
-        {
-            SourceId = _factory.Create(reader.ReadString());
-            DestId = _factory.Create(reader.ReadString());
-            int messageCount = reader.ReadInt32();
-            string dataType = reader.ReadString();
-
-            Data = new List<T>();
-
-            for (int index = 0; index < messageCount; index++)
-            {
-                var dataPoint = (T)_injection.ForkInjector().GetInstance(Type.GetType(dataType));
-
-                if (null == dataPoint)
-                {
-                    throw new Exception("T type instance cannot be created from the stream data in Network Service Message");
-                }
-
-                dataPoint.Read(reader);
-                Data.Add(dataPoint);
-            }
-        }
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        public void Write(IDataWriter writer)
-        {
-            writer.WriteString(SourceId.ToString());
-            writer.WriteString(DestId.ToString());
-            writer.WriteInt32(Data.Count);
-            writer.WriteString(Data[0].GetType().AssemblyQualifiedName);
-
-            foreach (var data in Data)
-            {
-                data.Write(writer);
-            }
-        }
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        /// <param name="token">The cancellation token</param>
-        public async Task ReadAsync(IDataReader reader, CancellationToken token)
-        {
-            SourceId = _factory.Create(await reader.ReadStringAsync(token));
-            DestId = _factory.Create(await reader.ReadStringAsync(token));
-            int messageCount = await reader.ReadInt32Async(token);
-            string dataType = await reader.ReadStringAsync(token);
-
-            Data = new List<T>();
-
-            for (int index = 0; index < messageCount; index++)
-            {
-                var dataPoint = (T) _injection.ForkInjector().GetInstance(Type.GetType(dataType));
-
-                if (null == dataPoint)
-                {
-                    throw new Exception("T type instance cannot be created from the stream data in Network Service Message");
-                }
-
-                await dataPoint.ReadAsync(reader, token);
-                Data.Add(dataPoint);
-            }
-        }
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        /// <param name="token">The cancellation token</param>
-        public async Task WriteAsync(IDataWriter writer, CancellationToken token)
-        {
-            await writer.WriteStringAsync(SourceId.ToString(), token);
-            await writer.WriteStringAsync(DestId.ToString(), token);
-            await writer.WriteInt32Async(Data.Count, token);
-            await writer.WriteStringAsync(Data[0].GetType().AssemblyQualifiedName, token);
-
-            foreach (var data in Data)
-            {
-                data.Write(writer);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 73b6d9d..a0e6038 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -61,6 +61,7 @@ under the License.
     <Compile Include="Group\Driver\IGroupCommDriver.cs" />
     <Compile Include="Group\Driver\Impl\CommunicationGroupDriver.cs" />
     <Compile Include="Group\Driver\Impl\GroupCommunicationMessage.cs" />
+    <Compile Include="Group\Driver\Impl\GroupCommunicationMessageStreamingCodec.cs" />
     <Compile Include="Group\Driver\Impl\MessageType.cs" />
     <Compile Include="Group\Driver\Impl\GroupCommDriver.cs" />
     <Compile Include="Group\Driver\Impl\TaskStarter.cs" />
@@ -139,6 +140,8 @@ under the License.
     <Compile Include="NetworkService\Codec\ControlMessageCodec.cs" />
     <Compile Include="NetworkService\Codec\NsMessageCodec.cs" />
     <Compile Include="NetworkService\Codec\NsMessageProto.cs" />
+    <Compile Include="NetworkService\Codec\NsMessageStreamingCodec.cs" />
+    <Compile Include="NetworkService\Codec\StreamingCodecFunctionCache.cs" />
     <Compile Include="NetworkService\ControlMessage.cs" />
     <Compile Include="NetworkService\IConnection.cs" />
     <Compile Include="NetworkService\INetworkService.cs" />
@@ -147,9 +150,7 @@ under the License.
     <Compile Include="NetworkService\NetworkServiceOptions.cs" />
     <Compile Include="NetworkService\NsConnection.cs" />
     <Compile Include="NetworkService\NsMessage.cs" />
-    <Compile Include="NetworkService\WritableNetworkService.cs" />
-    <Compile Include="NetworkService\WritableNsConnection.cs" />
-    <Compile Include="NetworkService\WritableNsMessage.cs" />
+    <Compile Include="NetworkService\StreamingNetworkService.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="Utilities\BlockingCollectionExtensions.cs" />
     <Compile Include="Utilities\Utils.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index babc26d..9f13c83 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -52,7 +52,6 @@ under the License.
     <Compile Include="StreamingRemoteManagerTest.cs" />
     <Compile Include="StreamingTransportTest.cs" />
     <Compile Include="TransportTest.cs" />
-    <Compile Include="WritableString.cs" />
   </ItemGroup>
   <ItemGroup>
     <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
index 20f75be..a0be7ee 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
@@ -25,7 +25,8 @@ using Microsoft.VisualStudio.TestTools.UnitTesting;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
 
 namespace Org.Apache.REEF.Wake.Tests
 {
@@ -34,9 +35,6 @@ namespace Org.Apache.REEF.Wake.Tests
     {
         private readonly StreamingRemoteManagerFactory _remoteManagerFactory1 =
             TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>();
-
-        private readonly StreamingRemoteManagerFactory _remoteManagerFactory2 =
-        TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>();
         
         /// <summary>
         /// Tests one way communication between Remote Managers 
@@ -47,24 +45,25 @@ namespace Org.Apache.REEF.Wake.Tests
         {
             IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
 
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            BlockingCollection<string> queue = new BlockingCollection<string>();
             List<string> events = new List<string>();
+            IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
 
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
             {
-                var observer = Observer.Create<WritableString>(queue.Add);
+                var observer = Observer.Create<string>(queue.Add);
                 IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
                 remoteManager2.RegisterObserver(endpoint1, observer);
 
                 var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver.OnNext(new WritableString("abc"));
-                remoteObserver.OnNext(new WritableString("def"));
-                remoteObserver.OnNext(new WritableString("ghi"));
+                remoteObserver.OnNext("abc");
+                remoteObserver.OnNext("def");
+                remoteObserver.OnNext("ghi");
 
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+                events.Add(queue.Take());
             }
 
             Assert.AreEqual(3, events.Count);
@@ -78,42 +77,44 @@ namespace Org.Apache.REEF.Wake.Tests
         {
             IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
 
-            BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
-            BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
+            BlockingCollection<string> queue1 = new BlockingCollection<string>();
+            BlockingCollection<string> queue2 = new BlockingCollection<string>();
             List<string> events1 = new List<string>();
             List<string> events2 = new List<string>();
 
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
             {
                 // Register observers for remote manager 1 and remote manager 2
                 var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-                var observer1 = Observer.Create<WritableString>(queue1.Add);
-                var observer2 = Observer.Create<WritableString>(queue2.Add);
+                var observer1 = Observer.Create<string>(queue1.Add);
+                var observer2 = Observer.Create<string>(queue2.Add);
                 remoteManager1.RegisterObserver(remoteEndpoint, observer1);
                 remoteManager2.RegisterObserver(remoteEndpoint, observer2);
 
                 // Remote manager 1 sends 3 events to remote manager 2
                 var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver1.OnNext(new WritableString("abc"));
-                remoteObserver1.OnNext(new WritableString("def"));
-                remoteObserver1.OnNext(new WritableString("ghi"));
+                remoteObserver1.OnNext("abc");
+                remoteObserver1.OnNext("def");
+                remoteObserver1.OnNext("ghi");
 
                 // Remote manager 2 sends 4 events to remote manager 1
                 var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
-                remoteObserver2.OnNext(new WritableString("jkl"));
-                remoteObserver2.OnNext(new WritableString("mno"));
-                remoteObserver2.OnNext(new WritableString("pqr"));
-                remoteObserver2.OnNext(new WritableString("stu"));
-
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
-
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
+                remoteObserver2.OnNext("jkl");
+                remoteObserver2.OnNext("mno");
+                remoteObserver2.OnNext("pqr");
+                remoteObserver2.OnNext("stu");
+
+                events1.Add(queue1.Take());
+                events1.Add(queue1.Take());
+                events1.Add(queue1.Take());
+                events1.Add(queue1.Take());
+
+                events2.Add(queue2.Take());
+                events2.Add(queue2.Take());
+                events2.Add(queue2.Take());
             }
 
             Assert.AreEqual(4, events1.Count);
@@ -129,29 +130,30 @@ namespace Org.Apache.REEF.Wake.Tests
         {
             IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
 
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            BlockingCollection<string> queue = new BlockingCollection<string>();
             List<string> events = new List<string>();
+            IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
 
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager3 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
             {
                 var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-                var observer = Observer.Create<WritableString>(queue.Add);
+                var observer = Observer.Create<string>(queue.Add);
                 remoteManager3.RegisterObserver(remoteEndpoint, observer);
 
                 var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
                 var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
 
-                remoteObserver2.OnNext(new WritableString("abc"));
-                remoteObserver1.OnNext(new WritableString("def"));
-                remoteObserver2.OnNext(new WritableString("ghi"));
-                remoteObserver1.OnNext(new WritableString("jkl"));
-                remoteObserver2.OnNext(new WritableString("mno"));
+                remoteObserver2.OnNext("abc");
+                remoteObserver1.OnNext("def");
+                remoteObserver2.OnNext("ghi");
+                remoteObserver1.OnNext("jkl");
+                remoteObserver2.OnNext("mno");
 
                 for (int i = 0; i < 5; i++)
                 {
-                    events.Add(queue.Take().Data);
+                    events.Add(queue.Take());
                 }
             }
 
@@ -167,58 +169,60 @@ namespace Org.Apache.REEF.Wake.Tests
         {
             IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
 
-            BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
-            BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
-            BlockingCollection<WritableString> queue3 = new BlockingCollection<WritableString>();
+            BlockingCollection<string> queue1 = new BlockingCollection<string>();
+            BlockingCollection<string> queue2 = new BlockingCollection<string>();
+            BlockingCollection<string> queue3 = new BlockingCollection<string>();
             List<string> events1 = new List<string>();
             List<string> events2 = new List<string>();
             List<string> events3 = new List<string>();
 
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager3 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
             {
                 var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
 
-                var observer = Observer.Create<WritableString>(queue1.Add);
+                var observer = Observer.Create<string>(queue1.Add);
                 remoteManager1.RegisterObserver(remoteEndpoint, observer);
-                var observer2 = Observer.Create<WritableString>(queue2.Add);
+                var observer2 = Observer.Create<string>(queue2.Add);
                 remoteManager2.RegisterObserver(remoteEndpoint, observer2);
-                var observer3 = Observer.Create<WritableString>(queue3.Add);
+                var observer3 = Observer.Create<string>(queue3.Add);
                 remoteManager3.RegisterObserver(remoteEndpoint, observer3);
 
                 var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
                 var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
 
                 // Observer 1 and 2 send messages to observer 3
-                remoteObserver1.OnNext(new WritableString("abc"));
-                remoteObserver1.OnNext(new WritableString("abc"));
-                remoteObserver1.OnNext(new WritableString("abc"));
-                remoteObserver2.OnNext(new WritableString("def"));
-                remoteObserver2.OnNext(new WritableString("def"));
+                remoteObserver1.OnNext("abc");
+                remoteObserver1.OnNext("abc");
+                remoteObserver1.OnNext("abc");
+                remoteObserver2.OnNext("def");
+                remoteObserver2.OnNext("def");
 
                 // Observer 3 sends messages back to observers 1 and 2
                 var remoteObserver3A = remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint);
                 var remoteObserver3B = remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint);
 
-                remoteObserver3A.OnNext(new WritableString("ghi"));
-                remoteObserver3A.OnNext(new WritableString("ghi"));
-                remoteObserver3B.OnNext(new WritableString("jkl"));
-                remoteObserver3B.OnNext(new WritableString("jkl"));
-                remoteObserver3B.OnNext(new WritableString("jkl"));
+                remoteObserver3A.OnNext("ghi");
+                remoteObserver3A.OnNext("ghi");
+                remoteObserver3B.OnNext("jkl");
+                remoteObserver3B.OnNext("jkl");
+                remoteObserver3B.OnNext("jkl");
 
-                events1.Add(queue1.Take().Data);
-                events1.Add(queue1.Take().Data);
+                events1.Add(queue1.Take());
+                events1.Add(queue1.Take());
 
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
-                events2.Add(queue2.Take().Data);
+                events2.Add(queue2.Take());
+                events2.Add(queue2.Take());
+                events2.Add(queue2.Take());
 
-                events3.Add(queue3.Take().Data);
-                events3.Add(queue3.Take().Data);
-                events3.Add(queue3.Take().Data);
-                events3.Add(queue3.Take().Data);
-                events3.Add(queue3.Take().Data);
+                events3.Add(queue3.Take());
+                events3.Add(queue3.Take());
+                events3.Add(queue3.Take());
+                events3.Add(queue3.Take());
+                events3.Add(queue3.Take());
             }
 
             Assert.AreEqual(2, events1.Count);
@@ -234,34 +238,36 @@ namespace Org.Apache.REEF.Wake.Tests
         {
             IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
 
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            BlockingCollection<string> queue = new BlockingCollection<string>();
             List<string> events = new List<string>();
 
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
             {
                 // Register handler for when remote manager 2 receives events; respond
                 // with an ack
                 var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
                 var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
 
-                var receiverObserver = Observer.Create<WritableString>(
-                    message => remoteObserver2.OnNext(new WritableString("received message: " + message.Data)));
+                var receiverObserver = Observer.Create<string>(
+                    message => remoteObserver2.OnNext("received message: " + message));
                 remoteManager2.RegisterObserver(remoteEndpoint, receiverObserver);
 
                 // Register handler for remote manager 1 to record the ack
-                var senderObserver = Observer.Create<WritableString>(queue.Add);
+                var senderObserver = Observer.Create<string>(queue.Add);
                 remoteManager1.RegisterObserver(remoteEndpoint, senderObserver);
 
                 // Begin to send messages
                 var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver1.OnNext(new WritableString("hello"));
-                remoteObserver1.OnNext(new WritableString("there"));
-                remoteObserver1.OnNext(new WritableString("buddy"));
+                remoteObserver1.OnNext("hello");
+                remoteObserver1.OnNext("there");
+                remoteObserver1.OnNext("buddy");
 
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+                events.Add(queue.Take());
             }
 
             Assert.AreEqual(3, events.Count);
@@ -278,25 +284,27 @@ namespace Org.Apache.REEF.Wake.Tests
         {
             IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
 
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            BlockingCollection<string> queue = new BlockingCollection<string>();
             List<string> events = new List<string>();
 
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
             {
-                // RemoteManager2 listens and records events of type IRemoteEvent<WritableString>
-                var observer = Observer.Create<IRemoteMessage<WritableString>>(message => queue.Add(message.Message));
+                // RemoteManager2 listens and records events of type IRemoteEvent<string>
+                var observer = Observer.Create<IRemoteMessage<string>>(message => queue.Add(message.Message));
                 remoteManager2.RegisterObserver(observer);
 
                 // Remote manager 1 sends 3 events to remote manager 2
                 var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver.OnNext(new WritableString("abc"));
-                remoteObserver.OnNext(new WritableString("def"));
-                remoteObserver.OnNext(new WritableString("ghi"));
+                remoteObserver.OnNext("abc");
+                remoteObserver.OnNext("def");
+                remoteObserver.OnNext("ghi");
 
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+                events.Add(queue.Take());
             }
 
             Assert.AreEqual(3, events.Count);
@@ -310,28 +318,30 @@ namespace Org.Apache.REEF.Wake.Tests
         {
             IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
 
-            BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+            BlockingCollection<string> queue = new BlockingCollection<string>();
             List<string> events = new List<string>();
 
-            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
-            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+            IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+            using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+            using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
             {
-                var observer = Observer.Create<WritableString>(queue.Add);
+                var observer = Observer.Create<string>(queue.Add);
                 IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
                 remoteManager2.RegisterObserver(endpoint1, observer);
 
                 var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver.OnNext(new WritableString("abc"));
-                remoteObserver.OnNext(new WritableString("def"));
+                remoteObserver.OnNext("abc");
+                remoteObserver.OnNext("def");
 
                 var cachedObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                cachedObserver.OnNext(new WritableString("ghi"));
-                cachedObserver.OnNext(new WritableString("jkl"));
+                cachedObserver.OnNext("ghi");
+                cachedObserver.OnNext("jkl");
 
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
-                events.Add(queue.Take().Data);
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+                events.Add(queue.Take());
+                events.Add(queue.Take());
             }
 
             Assert.AreEqual(4, events.Count);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
deleted file mode 100644
index 30ff487..0000000
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Wake.Tests
-{
-    /// <summary>
-    /// Writable wrapper around the string class
-    /// </summary>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public class WritableString : IWritable
-    {
-        /// <summary>
-        /// Returns the actual string data
-        /// </summary>
-        public string Data { get; set; }
-        
-        /// <summary>
-        /// Empty constructor for instantiation with reflection
-        /// </summary>
-        [Inject]
-        public WritableString()
-        {
-        }
-
-        /// <summary>
-        /// Constructor
-        /// </summary>
-        /// <param name="data">The string data</param>
-        public WritableString(string data)
-        {
-            Data = data;
-        }
-
-        /// <summary>
-        /// Reads the string
-        /// </summary>
-        /// <param name="reader">reader to read from</param>
-        public void Read(IDataReader reader)
-        {
-            Data = reader.ReadString();
-        }
-
-        /// <summary>
-        /// Writes the string
-        /// </summary>
-        /// <param name="writer">Writer to write</param>
-        public void Write(IDataWriter writer)
-        {
-            writer.WriteString(Data);
-        }
-
-        /// <summary>
-        /// Reads the string
-        /// </summary>
-        /// <param name="reader">reader to read from</param>
-        /// <param name="token">the cancellation token</param>
-        public async Task ReadAsync(IDataReader reader, CancellationToken token)
-        {
-            Data = await reader.ReadStringAsync(token);
-        }
-
-        /// <summary>
-        /// Writes the string
-        /// </summary>
-        /// <param name="writer">Writer to write</param>
-        /// <param name="token">the cancellation token</param>
-        public async Task WriteAsync(IDataWriter writer, CancellationToken token)
-        {
-            await writer.WriteStringAsync(Data, token);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index 4069d15..5767f8a 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -49,7 +49,6 @@ under the License.
     <Compile Include="IIdentifier.cs" />
     <Compile Include="IIdentifierFactory.cs" />
     <Compile Include="Remote\Impl\RemoteEventStreamingCodec.cs" />
-    <Compile Include="Remote\Impl\TemporaryWritableToStreamingCodec.cs" />
     <Compile Include="Remote\Impl\StreamingRemoteManagerFactory.cs" />
     <Compile Include="Remote\Impl\DefaultRemoteManagerFactory.cs" />
     <Compile Include="Impl\LoggingEventHandler.cs" />
@@ -78,7 +77,6 @@ under the License.
     <Compile Include="Remote\Impl\StreamingTransportClient.cs" />
     <Compile Include="Remote\Impl\StreamingTransportServer.cs" />
     <Compile Include="Remote\IRemoteManagerFactory.cs" />
-    <Compile Include="Remote\IWritable.cs" />
     <Compile Include="Remote\Proto\WakeRemoteProtosGen.cs" />
     <Compile Include="Remote\ICodec.cs" />
     <Compile Include="Remote\ICodecFactory.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs
deleted file mode 100644
index 644cf82..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Org.Apache.REEF.Wake.Remote
-{
-    /// <summary>
-    /// Interface that classes should implement if they need to be readable to and writable 
-    /// from the stream. It is assumed that the classes inheriting this interface will have a 
-    /// default empty constructor
-    /// </summary>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public interface IWritable
-    {
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        void Read(IDataReader reader);
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        void Write(IDataWriter writer);
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        /// <param name="token">The cancellation token</param>
-        Task ReadAsync(IDataReader reader, CancellationToken token);
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        /// <param name="token">The cancellation token</param>
-        Task WriteAsync(IDataWriter writer, CancellationToken token);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
index 90e3aca..da549ea 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
@@ -20,6 +20,7 @@
 using System.Net;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.StreamingCodec;
 
 namespace Org.Apache.REEF.Wake.Remote.Impl
 {
@@ -38,12 +39,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             _injector = injector;
         }
 
-        //ToDo: The port argument will be removed once changes are made in WritableNetworkService [REEF-447]
-        public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable
+        public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, IStreamingCodec<T> codec)
         {
 #pragma warning disable 618
 // This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
-            var codec = _injector.GetInstance<TemporaryWritableToStreamingCodec<T>>();
             return new StreamingRemoteManager<T>(localAddress, _tcpPortProvider, codec);
 #pragma warning disable 618
         }



[3/3] incubator-reef git commit: [REEF-447]Convert Network Service Layer from Writable to Streaming

Posted by ju...@apache.org.
[REEF-447]Convert Network Service Layer from Writable to Streaming

This addressed the issue by
  * Converting WritableNetworkSerbvice to StreamingNetworkService
  * Intoducing NsMessageStreamingCodec which caches and use StreamingCodecs of various GroupCommunicationMessage types.
  * Introducing GroupCommunicationMessage StreamingCodec

JIRA:
  [REEF-447](https://issues.apache.org/jira/browse/REEF-447)

 This Closes #289

Author:    Dhruv <dh...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8505dee9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8505dee9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8505dee9

Branch: refs/heads/master
Commit: 8505dee942f924cbfd72d631bdaabcb20c125f02
Parents: 5cdd655
Author: Dhruv <dh...@gmail.com>
Authored: Thu Jul 9 14:52:20 2015 -0700
Committer: Julia Wang <jw...@yahoo.com>
Committed: Wed Jul 15 14:44:34 2015 -0700

----------------------------------------------------------------------
 .../GroupCommunicationTests.cs                  |  22 +-
 .../GroupCommunicationTreeTopologyTests.cs      |   5 -
 .../StreamingNetworkServiceTests.cs             | 361 +++++++++++++++++++
 .../WritableNetworkServiceTests.cs              | 262 --------------
 .../NetworkService/WritableString.cs            |  94 -----
 .../Org.Apache.REEF.Network.Tests.csproj        |   3 +-
 .../CodecToStreamingCodecConfiguration.cs       |   8 +-
 .../Group/Config/StreamingCodecConfiguration.cs |   8 +-
 .../Driver/Impl/CommunicationGroupDriver.cs     |   2 -
 .../Impl/GeneralGroupCommunicationMessage.cs    |  51 +--
 .../Group/Driver/Impl/GroupCommDriver.cs        |   5 +-
 .../Driver/Impl/GroupCommunicationMessage.cs    | 145 +-------
 .../GroupCommunicationMessageStreamingCodec.cs  | 223 ++++++++++++
 .../Group/Operators/Impl/Sender.cs              |   7 +-
 .../Task/ICommunicationGroupNetworkObserver.cs  |   3 +-
 .../Group/Task/IGroupCommNetworkObserver.cs     |   3 +-
 .../Group/Task/Impl/CommunicationGroupClient.cs |   1 -
 .../Impl/CommunicationGroupNetworkObserver.cs   |   5 +-
 .../Group/Task/Impl/GroupCommClient.cs          |   4 +-
 .../Group/Task/Impl/GroupCommNetworkObserver.cs |   7 +-
 .../Group/Task/Impl/NodeStruct.cs               |  11 +-
 .../Group/Task/Impl/OperatorTopology.cs         |  31 +-
 .../Codec/NsMessageStreamingCodec.cs            | 202 +++++++++++
 .../Codec/StreamingCodecFunctionCache.cs        | 203 +++++++++++
 .../NetworkService/StreamingNetworkService.cs   | 160 ++++++++
 .../NetworkService/WritableNetworkService.cs    | 159 --------
 .../NetworkService/WritableNsConnection.cs      | 138 -------
 .../NetworkService/WritableNsMessage.cs         | 185 ----------
 .../Org.Apache.REEF.Network.csproj              |   7 +-
 .../Org.Apache.REEF.Wake.Tests.csproj           |   1 -
 .../StreamingRemoteManagerTest.cs               | 232 ++++++------
 .../WritableString.cs                           |  95 -----
 .../Org.Apache.REEF.Wake.csproj                 |   2 -
 .../cs/Org.Apache.REEF.Wake/Remote/IWritable.cs |  61 ----
 .../Impl/StreamingRemoteManagerFactory.cs       |   5 +-
 .../Impl/TemporaryWritableToStreamingCodec.cs   |  70 ----
 36 files changed, 1350 insertions(+), 1431 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
index 61813af..62a6a3f 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
@@ -72,17 +72,17 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
                     new BlockingCollection<GeneralGroupCommunicationMessage>();
 
                 var handler1 =
-                    Observer.Create<WritableNsMessage<GeneralGroupCommunicationMessage>>(msg => messages1.Add(msg.Data.First()));
+                    Observer.Create<NsMessage<GeneralGroupCommunicationMessage>>(msg => messages1.Add(msg.Data.First()));
                 var handler2 =
-                    Observer.Create<WritableNsMessage<GeneralGroupCommunicationMessage>>(msg => messages2.Add(msg.Data.First()));
+                    Observer.Create<NsMessage<GeneralGroupCommunicationMessage>>(msg => messages2.Add(msg.Data.First()));
 
                 var networkServiceInjector1 = BuildNetworkServiceInjector(endpoint, handler1);
                 var networkServiceInjector2 = BuildNetworkServiceInjector(endpoint, handler2);
 
                 var networkService1 = networkServiceInjector1.GetInstance<
-                  WritableNetworkService<GeneralGroupCommunicationMessage>>();
+                  StreamingNetworkService<GeneralGroupCommunicationMessage>>();
                 var networkService2 = networkServiceInjector2.GetInstance<
-                    WritableNetworkService<GeneralGroupCommunicationMessage>>();
+                    StreamingNetworkService<GeneralGroupCommunicationMessage>>();
                 networkService1.Register(new StringIdentifier("id1"));
                 networkService2.Register(new StringIdentifier("id2"));
 
@@ -822,7 +822,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
         }
 
         public static IInjector BuildNetworkServiceInjector(
-            IPEndPoint nameServerEndpoint, IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>> handler)
+            IPEndPoint nameServerEndpoint, IObserver<NsMessage<GeneralGroupCommunicationMessage>> handler)
         {
             var config = TangFactory.GetTang().NewConfigurationBuilder()
                 .BindNamedParameter(typeof (NamingConfigurationOptions.NameServerAddress),
@@ -832,20 +832,24 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
                 .BindNamedParameter(typeof (NetworkServiceOptions.NetworkServicePort),
                     (0).ToString(CultureInfo.InvariantCulture))
                 .BindImplementation(GenericType<INameClient>.Class, GenericType<NameClient>.Class)
-                .BindImplementation(GenericType<IStreamingCodec<string>>.Class, GenericType<StringStreamingCodec>.Class)
                 .Build();
 
+            var codecConfig = StreamingCodecConfiguration<string>.Conf
+                .Set(StreamingCodecConfiguration<string>.Codec, GenericType<StringStreamingCodec>.Class)
+                .Build();
+
+            config = Configurations.Merge(config, codecConfig);
+
             var injector = TangFactory.GetTang().NewInjector(config);
             injector.BindVolatileInstance(
-                GenericType<IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>>>.Class, handler);
+                GenericType<IObserver<NsMessage<GeneralGroupCommunicationMessage>>>.Class, handler);
 
             return injector;
         }
 
         private GroupCommunicationMessage<string> CreateGcmStringType(string message, string from, string to)
         {
-            var stringCodec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
-            return new GroupCommunicationMessage<string>("g1", "op1", from, to, message, MessageType.Data, stringCodec);
+            return new GroupCommunicationMessage<string>("g1", "op1", from, to, message);
         }
 
         private static void ScatterReceiveReduce(IScatterReceiver<int> receiver, IReduceSender<int> sumSender)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
index 6b9b8c7..c194bcb 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
@@ -18,21 +18,16 @@
  */
 
 using System.Collections.Generic;
-using System.Globalization;
 using System.Linq;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
 using Org.Apache.REEF.Network.Group.Pipelining.Impl;
 using Org.Apache.REEF.Network.Group.Topology;
-using Org.Apache.REEF.Tang.Implementations.Configuration;
-using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Remote.Impl;
 using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
 
 namespace Org.Apache.REEF.Network.Tests.GroupCommunication

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/StreamingNetworkServiceTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/StreamingNetworkServiceTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/StreamingNetworkServiceTests.cs
new file mode 100644
index 0000000..1e0378e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/StreamingNetworkServiceTests.cs
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Globalization;
+using System.IO;
+using System.Linq;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Network.Naming;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Network.NetworkService.Codec;
+using Org.Apache.REEF.Network.Tests.NamingService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Network.Tests.NetworkService
+{
+    /// <summary>
+    /// Tests for Streaming Network Service
+    /// </summary>
+    [TestClass]
+    public class StreamingNetworkServiceTests
+    {
+        /// <summary>
+        /// Tests one way communication between two network services
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingNetworkServiceOneWayCommunication()
+        {
+            int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000);
+            int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000);
+
+            BlockingCollection<string> queue;
+
+            using (var nameServer = NameServerTests.BuildNameServer())
+            {
+                IPEndPoint endpoint = nameServer.LocalEndpoint;
+                int nameServerPort = endpoint.Port;
+                string nameServerAddr = endpoint.Address.ToString();
+
+                var handlerConf1 =
+                    TangFactory.GetTang()
+                        .NewConfigurationBuilder()
+                        .BindImplementation(GenericType<IObserver<NsMessage<string>>>.Class,
+                            GenericType<NetworkMessageHandler>.Class)
+                        .Build();
+
+                var handlerConf2 =
+                    TangFactory.GetTang()
+                        .NewConfigurationBuilder()
+                        .BindImplementation(GenericType<IObserver<NsMessage<string>>>.Class,
+                            GenericType<MessageHandler>.Class)
+                        .Build();
+
+                var networkServiceInjection1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr,
+                    handlerConf1);
+
+                var networkServiceInjection2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr,
+                   handlerConf2);
+
+                using (INetworkService<string> networkService1 = networkServiceInjection1.GetInstance<StreamingNetworkService<string>>())
+                using (INetworkService<string> networkService2 = networkServiceInjection2.GetInstance<StreamingNetworkService<string>>())
+                {
+                    queue = networkServiceInjection2.GetInstance<MessageHandler>().Queue;
+                    IIdentifier id1 = new StringIdentifier("service1");
+                    IIdentifier id2 = new StringIdentifier("service2");
+                    networkService1.Register(id1);
+                    networkService2.Register(id2);
+
+                    using (IConnection<string> connection = networkService1.NewConnection(id2))
+                    {
+                        connection.Open();
+                        connection.Write("abc");
+                        connection.Write("def");
+                        connection.Write("ghi");
+
+                        Assert.AreEqual("abc", queue.Take());
+                        Assert.AreEqual("def", queue.Take());
+                        Assert.AreEqual("ghi", queue.Take());
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// Tests two way communication between two network services
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingNetworkServiceTwoWayCommunication()
+        {
+            int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000);
+            int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000);
+
+            BlockingCollection<string> queue1;
+            BlockingCollection<string> queue2;
+
+            using (var nameServer = NameServerTests.BuildNameServer())
+            {
+                IPEndPoint endpoint = nameServer.LocalEndpoint;
+                int nameServerPort = endpoint.Port;
+                string nameServerAddr = endpoint.Address.ToString();
+
+                var handlerConf =
+                    TangFactory.GetTang()
+                        .NewConfigurationBuilder()
+                        .BindImplementation(GenericType<IObserver<NsMessage<string>>>.Class,
+                            GenericType<MessageHandler>.Class)
+                        .Build();
+
+                var networkServiceInjection1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr,
+                    handlerConf);
+
+                var networkServiceInjection2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr,
+                   handlerConf);
+
+                using (INetworkService<string> networkService1 = networkServiceInjection1.GetInstance<StreamingNetworkService<string>>())
+                using (INetworkService<string> networkService2 = networkServiceInjection2.GetInstance<StreamingNetworkService<string>>())
+                {
+                    queue1 = networkServiceInjection1.GetInstance<MessageHandler>().Queue;
+                    queue2 = networkServiceInjection2.GetInstance<MessageHandler>().Queue;
+
+                    IIdentifier id1 = new StringIdentifier("service1");
+                    IIdentifier id2 = new StringIdentifier("service2");
+                    networkService1.Register(id1);
+                    networkService2.Register(id2);
+
+                    using (IConnection<string> connection1 = networkService1.NewConnection(id2))
+                    using (IConnection<string> connection2 = networkService2.NewConnection(id1))
+                    {
+                        connection1.Open();
+                        connection1.Write("abc");
+                        connection1.Write("def");
+                        connection1.Write("ghi");
+
+                        connection2.Open();
+                        connection2.Write("jkl");
+                        connection2.Write("nop");
+
+                        Assert.AreEqual("abc", queue2.Take());
+                        Assert.AreEqual("def", queue2.Take());
+                        Assert.AreEqual("ghi", queue2.Take());
+
+                        Assert.AreEqual("jkl", queue1.Take());
+                        Assert.AreEqual("nop", queue1.Take());
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// Tests StreamingCodecFunctionCache
+        /// </summary>
+        [TestMethod]
+        public void TestStreamingCodecFunctionCache()
+        {
+            IConfiguration conf = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindImplementation(GenericType<IStreamingCodec<B>>.Class, GenericType<BStreamingCodec>.Class)
+                .Build();
+            IInjector injector = TangFactory.GetTang().NewInjector(conf);
+            
+            StreamingCodecFunctionCache<A> cache = new StreamingCodecFunctionCache<A>(injector);
+
+            var readFunc = cache.ReadFunction(typeof(B));
+            var writeFunc = cache.WriteFunction(typeof (B));
+            var readAsyncFunc = cache.ReadAsyncFunction(typeof(B));
+            var writeAsyncFunc = cache.WriteAsyncFunction(typeof(B));
+
+            var stream = new MemoryStream();
+            IDataWriter writer = new StreamDataWriter(stream);
+            IDataReader reader = new StreamDataReader(stream);
+
+            B val = new B();
+            val.Value1 = "hello";
+            val.Value2 = "reef";
+
+            writeFunc(val, writer);
+
+            val.Value1 = "helloasync";
+            val.Value2 = "reefasync";
+            CancellationToken token = new CancellationToken();
+            
+            var asyncResult = writeAsyncFunc.BeginInvoke(val, writer, token, null, null);
+            writeAsyncFunc.EndInvoke(asyncResult);
+            
+            stream.Position = 0;
+            A res = readFunc(reader);
+            B resB1 = res as B;
+
+            asyncResult = readAsyncFunc.BeginInvoke(reader, token, null, null);
+            res = readAsyncFunc.EndInvoke(asyncResult);
+            B resB2 = res as B;
+            
+            Assert.AreEqual("hello", resB1.Value1);
+            Assert.AreEqual("reef", resB1.Value2);
+            Assert.AreEqual("helloasync", resB2.Value1);
+            Assert.AreEqual("reefasync", resB2.Value2);
+        }
+
+        /// <summary>
+        /// Creates an instance of network service.
+        /// </summary>
+        /// <param name="networkServicePort">The port that the NetworkService will listen on</param>
+        /// <param name="nameServicePort">The port of the NameServer</param>
+        /// <param name="nameServiceAddr">The ip address of the NameServer</param>
+        /// <param name="handlerConf">The configuration of observer to handle incoming messages</param>
+        /// <returns></returns>
+        private IInjector BuildNetworkService(
+            int networkServicePort,
+            int nameServicePort,
+            string nameServiceAddr,
+            IConfiguration handlerConf)
+        {
+            var networkServiceConf = TangFactory.GetTang().NewConfigurationBuilder(handlerConf)
+                .BindNamedParameter<NetworkServiceOptions.NetworkServicePort, int>(
+                    GenericType<NetworkServiceOptions.NetworkServicePort>.Class,
+                    networkServicePort.ToString(CultureInfo.CurrentCulture))
+                .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>(
+                    GenericType<NamingConfigurationOptions.NameServerPort>.Class,
+                    nameServicePort.ToString(CultureInfo.CurrentCulture))
+                .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
+                    GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
+                    nameServiceAddr)
+                .BindImplementation(GenericType<INameClient>.Class, GenericType<NameClient>.Class)
+                .BindImplementation(GenericType<IStreamingCodec<string>>.Class, GenericType<StringStreamingCodec>.Class)
+                .Build();
+
+            return TangFactory.GetTang().NewInjector(networkServiceConf);
+        }
+
+        public class A
+        {
+            public string Value1;
+        }
+
+        public class B : A
+        {
+            public string Value2;
+        }
+
+        public class BStreamingCodec : IStreamingCodec<B>
+        {
+            [Inject]
+            public BStreamingCodec()
+            {
+            }
+
+            public B Read(IDataReader reader)
+            {
+                B val = new B();
+                val.Value1 = reader.ReadString();
+                val.Value2 = reader.ReadString();
+                return val;
+            }
+
+            public void Write(B obj, IDataWriter writer)
+            {
+                writer.WriteString(obj.Value1);
+                writer.WriteString(obj.Value2);
+            }
+
+            public async Task<B> ReadAsync(IDataReader reader, CancellationToken token)
+            {
+                B val = new B();
+                val.Value1 = await reader.ReadStringAsync(token);
+                val.Value2 = await reader.ReadStringAsync(token);
+                return val;
+            }
+
+            public async Task WriteAsync(B obj, IDataWriter writer, CancellationToken token)
+            {
+                await writer.WriteStringAsync(obj.Value1, token);
+                await writer.WriteStringAsync(obj.Value2, token);
+            }
+        } 
+        /// <summary>
+        /// The observer to handle incoming messages for string
+        /// </summary>
+        private class MessageHandler : IObserver<NsMessage<string>>
+        {
+            private readonly BlockingCollection<string> _queue;
+
+            public BlockingCollection<string> Queue
+            {
+                get { return _queue; }
+            } 
+
+            [Inject]
+            private MessageHandler()
+            {
+                _queue = new BlockingCollection<string>();
+            }
+
+            public void OnNext(NsMessage<string> value)
+            {
+                _queue.Add(value.Data.First());
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        /// <summary>
+        /// The network handler to handle incoming Streaming NsMessages
+        /// </summary>
+        private class NetworkMessageHandler : IObserver<NsMessage<string>>
+        {
+            [Inject]
+            public NetworkMessageHandler()
+            {
+            }
+
+            public void OnNext(NsMessage<string> value)
+            {
+            }
+
+            public void OnError(Exception error)
+            {
+            }
+
+            public void OnCompleted()
+            {
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs
deleted file mode 100644
index 07464ff..0000000
--- a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Globalization;
-using System.Linq;
-using System.Net;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Network.Naming;
-using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Network.Tests.NamingService;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Network.Tests.NetworkService
-{
-    /// <summary>
-    /// Tests for Writable Network Service
-    /// </summary>
-    [TestClass]
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public class WritableNetworkServiceTests
-    {
-        /// <summary>
-        /// Tests one way communication between two network services
-        /// </summary>
-        [TestMethod]
-        public void TestWritableNetworkServiceOneWayCommunication()
-        {
-            int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000);
-            int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000);
-
-            BlockingCollection<WritableString> queue;
-
-            using (var nameServer = NameServerTests.BuildNameServer())
-            {
-                IPEndPoint endpoint = nameServer.LocalEndpoint;
-                int nameServerPort = endpoint.Port;
-                string nameServerAddr = endpoint.Address.ToString();
-
-                var handlerConf1 =
-                    TangFactory.GetTang()
-                        .NewConfigurationBuilder()
-                        .BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class,
-                            GenericType<NetworkMessageHandler>.Class)
-                        .Build();
-
-                var handlerConf2 =
-                    TangFactory.GetTang()
-                        .NewConfigurationBuilder()
-                        .BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class,
-                            GenericType<MessageHandler>.Class)
-                        .Build();
-
-                var networkServiceInjection1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr,
-                    handlerConf1);
-
-                var networkServiceInjection2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr,
-                   handlerConf2);
-
-                using (INetworkService<WritableString> networkService1 = networkServiceInjection1.GetInstance<WritableNetworkService<WritableString>>())
-                using (INetworkService<WritableString> networkService2 = networkServiceInjection2.GetInstance<WritableNetworkService<WritableString>>())
-                {
-                    queue = networkServiceInjection2.GetInstance<MessageHandler>().Queue;
-                    IIdentifier id1 = new StringIdentifier("service1");
-                    IIdentifier id2 = new StringIdentifier("service2");
-                    networkService1.Register(id1);
-                    networkService2.Register(id2);
-
-                    using (IConnection<WritableString> connection = networkService1.NewConnection(id2))
-                    {
-                        connection.Open();
-                        connection.Write(new WritableString("abc"));
-                        connection.Write(new WritableString("def"));
-                        connection.Write(new WritableString("ghi"));
-
-                        Assert.AreEqual("abc", queue.Take().Data);
-                        Assert.AreEqual("def", queue.Take().Data);
-                        Assert.AreEqual("ghi", queue.Take().Data);
-                    }
-                }
-            }
-        }
-
-        /// <summary>
-        /// Tests two way communication between two network services
-        /// </summary>
-        [TestMethod]
-        public void TestWritableNetworkServiceTwoWayCommunication()
-        {
-            int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000);
-            int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000);
-
-            BlockingCollection<WritableString> queue1;
-            BlockingCollection<WritableString> queue2;
-
-            using (var nameServer = NameServerTests.BuildNameServer())
-            {
-                IPEndPoint endpoint = nameServer.LocalEndpoint;
-                int nameServerPort = endpoint.Port;
-                string nameServerAddr = endpoint.Address.ToString();
-
-                var handlerConf =
-                    TangFactory.GetTang()
-                        .NewConfigurationBuilder()
-                        .BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class,
-                            GenericType<MessageHandler>.Class)
-                        .Build();
-
-                var networkServiceInjection1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr,
-                    handlerConf);
-
-                var networkServiceInjection2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr,
-                   handlerConf);
-
-                using (INetworkService<WritableString> networkService1 = networkServiceInjection1.GetInstance<WritableNetworkService<WritableString>>())
-                using (INetworkService<WritableString> networkService2 = networkServiceInjection2.GetInstance<WritableNetworkService<WritableString>>())
-                {
-                    queue1 = networkServiceInjection1.GetInstance<MessageHandler>().Queue;
-                    queue2 = networkServiceInjection2.GetInstance<MessageHandler>().Queue;
-
-                    IIdentifier id1 = new StringIdentifier("service1");
-                    IIdentifier id2 = new StringIdentifier("service2");
-                    networkService1.Register(id1);
-                    networkService2.Register(id2);
-
-                    using (IConnection<WritableString> connection1 = networkService1.NewConnection(id2))
-                    using (IConnection<WritableString> connection2 = networkService2.NewConnection(id1))
-                    {
-                        connection1.Open();
-                        connection1.Write(new WritableString("abc"));
-                        connection1.Write(new WritableString("def"));
-                        connection1.Write(new WritableString("ghi"));
-
-                        connection2.Open();
-                        connection2.Write(new WritableString("jkl"));
-                        connection2.Write(new WritableString("nop"));
-
-                        Assert.AreEqual("abc", queue2.Take().Data);
-                        Assert.AreEqual("def", queue2.Take().Data);
-                        Assert.AreEqual("ghi", queue2.Take().Data);
-
-                        Assert.AreEqual("jkl", queue1.Take().Data);
-                        Assert.AreEqual("nop", queue1.Take().Data);
-                    }
-                }
-            }
-        }
-
-        /// <summary>
-        /// Creates an instance of network service.
-        /// </summary>
-        /// <param name="networkServicePort">The port that the NetworkService will listen on</param>
-        /// <param name="nameServicePort">The port of the NameServer</param>
-        /// <param name="nameServiceAddr">The ip address of the NameServer</param>
-        /// <param name="factory">Identifier factory for WritableString</param>
-        /// <param name="handler">The observer to handle incoming messages</param>
-        /// <returns></returns>
-        private IInjector BuildNetworkService(
-            int networkServicePort,
-            int nameServicePort,
-            string nameServiceAddr,
-            IConfiguration handlerConf)
-        {
-            var networkServiceConf = TangFactory.GetTang().NewConfigurationBuilder(handlerConf)
-                .BindNamedParameter<NetworkServiceOptions.NetworkServicePort, int>(
-                    GenericType<NetworkServiceOptions.NetworkServicePort>.Class,
-                    networkServicePort.ToString(CultureInfo.CurrentCulture))
-                .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>(
-                    GenericType<NamingConfigurationOptions.NameServerPort>.Class,
-                    nameServicePort.ToString(CultureInfo.CurrentCulture))
-                .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
-                    GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
-                    nameServiceAddr)
-                .BindImplementation(GenericType<INameClient>.Class, GenericType<NameClient>.Class)
-                .Build();
-
-            return TangFactory.GetTang().NewInjector(networkServiceConf);
-        }
-
-        /// <summary>
-        /// The observer to handle incoming messages for WritableString
-        /// </summary>
-        private class MessageHandler : IObserver<WritableNsMessage<WritableString>>
-        {
-            private readonly BlockingCollection<WritableString> _queue;
-
-            public BlockingCollection<WritableString> Queue
-            {
-                get { return _queue; }
-            } 
-
-            [Inject]
-            private MessageHandler()
-            {
-                _queue = new BlockingCollection<WritableString>();
-            }
-
-            public void OnNext(WritableNsMessage<WritableString> value)
-            {
-                _queue.Add(value.Data.First());
-            }
-
-            public void OnError(Exception error)
-            {
-                throw new NotImplementedException();
-            }
-
-            public void OnCompleted()
-            {
-                throw new NotImplementedException();
-            }
-        }
-
-        /// <summary>
-        /// The network handler to handle incoming Writable NsMessages
-        /// </summary>
-        private class NetworkMessageHandler : IObserver<WritableNsMessage<WritableString>>
-        {
-            [Inject]
-            public NetworkMessageHandler()
-            {
-            }
-
-            public void OnNext(WritableNsMessage<WritableString> value)
-            {
-            }
-
-            public void OnError(Exception error)
-            {
-            }
-
-            public void OnCompleted()
-            {
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs
deleted file mode 100644
index 400aa52..0000000
--- a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Tang.Annotations;
-
-namespace Org.Apache.REEF.Network.Tests.NetworkService
-{
-    /// <summary>
-    /// Writable wrapper around the string class
-    /// </summary>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public class WritableString : IWritable
-    {
-        /// <summary>
-        /// Returns the actual string data
-        /// </summary>
-        public string Data { get; set; }
-        
-        /// <summary>
-        /// Empty constructor for instantiation with reflection
-        /// </summary>
-        [Inject]
-        public WritableString()
-        {
-        }
-
-        /// <summary>
-        /// Constructor
-        /// </summary>
-        /// <param name="data">The string data</param>
-        public WritableString(string data)
-        {
-            Data = data;
-        }
-
-        /// <summary>
-        /// Reads the string
-        /// </summary>
-        /// <param name="reader">reader to read from</param>
-        public void Read(IDataReader reader)
-        {
-            Data = reader.ReadString();
-        }
-
-        /// <summary>
-        /// Writes the string
-        /// </summary>
-        /// <param name="writer">Writer to write</param>
-        public void Write(IDataWriter writer)
-        {
-            writer.WriteString(Data);
-        }
-
-        /// <summary>
-        /// Reads the string
-        /// </summary>
-        /// <param name="reader">reader to read from</param>
-        /// <param name="token">the cancellation token</param>
-        public async Task ReadAsync(IDataReader reader, CancellationToken token)
-        {
-            Data = await reader.ReadStringAsync(token);
-        }
-
-        /// <summary>
-        /// Writes the string
-        /// </summary>
-        /// <param name="writer">Writer to write</param>
-        /// <param name="token">the cancellation token</param>
-        public async Task WriteAsync(IDataWriter writer, CancellationToken token)
-        {
-            await writer.WriteStringAsync(Data, token);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
index e0568aa..3355ac7 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
@@ -55,9 +55,8 @@ under the License.
     <Compile Include="GroupCommunication\GroupCommunicationTreeTopologyTests.cs" />
     <Compile Include="GroupCommunication\StreamingCodecTests.cs" />
     <Compile Include="NamingService\NameServerTests.cs" />
-    <Compile Include="NetworkService\WritableNetworkServiceTests.cs" />
     <Compile Include="NetworkService\NetworkServiceTests.cs" />
-    <Compile Include="NetworkService\WritableString.cs" />
+    <Compile Include="NetworkService\StreamingNetworkServiceTests.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
   </ItemGroup>
   <ItemGroup>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
index f8e1483..da156ca 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Pipelining;
 using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Util;
@@ -38,7 +39,12 @@ namespace Org.Apache.REEF.Network.Group.Config
         public static ConfigurationModule Conf = new CodecToStreamingCodecConfiguration<T>()
             .BindImplementation(GenericType<ICodec<T>>.Class, Codec)
             .BindImplementation(GenericType<IStreamingCodec<T>>.Class, GenericType<CodecToStreamingCodec<T>>.Class)
-            .BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class, GenericType<StreamingPipelineMessageCodec<T>>.Class)
+            .BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class,
+                GenericType<StreamingPipelineMessageCodec<T>>.Class)
+            .BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<T>>>.Class,
+                GenericType<GroupCommunicationMessageStreamingCodec<T>>.Class)
+            .BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<PipelineMessage<T>>>>.Class,
+                GenericType<GroupCommunicationMessageStreamingCodec<PipelineMessage<T>>>.Class)
             .Build();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
index 2a30047..20c6ade 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Pipelining;
 using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Util;
@@ -41,7 +42,12 @@ namespace Org.Apache.REEF.Network.Group.Config
         /// </summary>
         public static ConfigurationModule Conf = new StreamingCodecConfiguration<T>()
             .BindImplementation(GenericType<IStreamingCodec<T>>.Class, Codec)
-            .BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class, GenericType<StreamingPipelineMessageCodec<T>>.Class)
+            .BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class,
+                GenericType<StreamingPipelineMessageCodec<T>>.Class)
+            .BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<T>>>.Class,
+                GenericType<GroupCommunicationMessageStreamingCodec<T>>.Class)
+            .BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<PipelineMessage<T>>>>.Class,
+                GenericType<GroupCommunicationMessageStreamingCodec<PipelineMessage<T>>>.Class)
             .Build();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
index 5ebb357..001c110 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-using System;
 using System.Collections.Generic;
 using System.Reflection;
 using Org.Apache.REEF.Network.Group.Config;
@@ -39,7 +38,6 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
     /// All operators in the same Communication Group run on the the 
     /// same set of tasks.
     /// </summary>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.]
     public sealed class CommunicationGroupDriver : ICommunicationGroupDriver
     {
         private static readonly Logger LOGGER = Logger.GetLogger(typeof (CommunicationGroupDriver));

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
index e807a4e..9aa49a4 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
@@ -25,11 +25,10 @@ using Org.Apache.REEF.Tang.Annotations;
 namespace Org.Apache.REEF.Network.Group.Driver.Impl
 {
     /// <summary>
-    /// Messages sent by MPI Operators. This is the abstract class inherited by 
-    /// WritableGroupCommunicationMessage but seen by Network Service
+    /// Messages sent by MPI Operators. This is the class inherited by 
+    /// GroupCommunicationMessage but seen by Network Service
     /// </summary>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
-    public abstract class GeneralGroupCommunicationMessage : IWritable
+    public class GeneralGroupCommunicationMessage
     {        
         /// <summary>
         /// Empty constructor to allow instantiation by reflection
@@ -45,70 +44,36 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <param name="operatorName">The name of the MPI operator</param>
         /// <param name="source">The message source</param>
         /// <param name="destination">The message destination</param>
-        /// <param name="messageType">The type of message to send</param>
         protected GeneralGroupCommunicationMessage(
             string groupName,
             string operatorName,
             string source,
-            string destination,
-            MessageType messageType)
+            string destination)
         {
             GroupName = groupName;
             OperatorName = operatorName;
             Source = source;
             Destination = destination;
-            MsgType = messageType;
         }
 
         /// <summary>
         /// Returns the Communication Group name.
         /// </summary>
-        public string GroupName { get; internal set; }
+        internal string GroupName { get; set; }
 
         /// <summary>
         /// Returns the MPI Operator name.
         /// </summary>
-        public string OperatorName { get; internal set; }
+        internal string OperatorName { get; set; }
 
         /// <summary>
         /// Returns the source of the message.
         /// </summary>
-        public string Source { get; internal set; }
+        internal string Source { get; set; }
 
         /// <summary>
         /// Returns the destination of the message.
         /// </summary>
-        public string Destination { get; internal set; }
-
-        /// <summary>
-        /// Returns the type of message being sent.
-        /// </summary>
-        public MessageType MsgType { get; internal set; }
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        public abstract void Read(IDataReader reader);
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        public abstract void Write(IDataWriter writer);
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        /// <param name="token">The cancellation token</param>
-        public abstract System.Threading.Tasks.Task ReadAsync(IDataReader reader, CancellationToken token);
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        /// <param name="token">The cancellation token</param>
-        public abstract System.Threading.Tasks.Task WriteAsync(IDataWriter writer, CancellationToken token);
+        internal string Destination { get; set; }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
index ade5834..5bf0848 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
@@ -42,7 +42,6 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
     /// Used to create Communication Groups for Group Communication Operators on the Reef driver.
     /// Also manages configuration for Group Communication tasks/services.
     /// </summary>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
     public sealed class GroupCommDriver : IGroupCommDriver
     {
         private const string MasterTaskContextName = "MasterTaskContext";
@@ -158,12 +157,12 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         public IConfiguration GetServiceConfiguration()
         {
             IConfiguration serviceConfig = ServiceConfiguration.ConfigurationModule
-                .Set(ServiceConfiguration.Services, GenericType<WritableNetworkService<GeneralGroupCommunicationMessage>>.Class)
+                .Set(ServiceConfiguration.Services, GenericType<StreamingNetworkService<GeneralGroupCommunicationMessage>>.Class)
                 .Build();
 
             return TangFactory.GetTang().NewConfigurationBuilder(serviceConfig)
                 .BindImplementation(
-                    GenericType<IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>>>.Class,
+                    GenericType<IObserver<NsMessage<GeneralGroupCommunicationMessage>>>.Class,
                     GenericType<GroupCommNetworkObserver>.Class)
                 .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
                     GenericType<NamingConfigurationOptions.NameServerAddress>.Class,

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
index ed7855b..c53cdb0 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
@@ -17,30 +17,21 @@
  * under the License.
  */
 
-using System;
-using System.Threading;
-using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.StreamingCodec;
 
 namespace Org.Apache.REEF.Network.Group.Driver.Impl
 {
     /// <summary>
-    /// Messages sent by MPI Operators. This is the Writable version of GroupCommunicationMessage
-    ///  class and will eventually replace it once everybody agrees with the design
+    /// Messages sent by MPI Operators.
     /// </summary>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
-    public sealed class GroupCommunicationMessage<T> : GeneralGroupCommunicationMessage
+    internal sealed class GroupCommunicationMessage<T> : GeneralGroupCommunicationMessage
     {
-        private readonly IStreamingCodec<T> _codec;
-
         /// <summary>
         /// Empty constructor to allow instantiation by reflection
         /// </summary>
         [Inject]
-        private GroupCommunicationMessage(IStreamingCodec<T> codec)
+        private GroupCommunicationMessage()
         {
-            _codec = codec;
         }
 
         /// <summary>
@@ -51,20 +42,15 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <param name="source">The message source</param>
         /// <param name="destination">The message destination</param>
         /// <param name="message">The actual Writable message</param>
-        /// <param name="messageType">The type of message to send</param>
-        /// <param name="codec">Streaming Codec</param>
-        public GroupCommunicationMessage(
+        internal GroupCommunicationMessage(
             string groupName,
             string operatorName,
             string source,
             string destination,
-            T message,
-            MessageType messageType,
-            IStreamingCodec<T> codec)
-            : base(groupName, operatorName, source, destination, messageType)
+            T message)
+            : base(groupName, operatorName, source, destination)
         {
-            _codec = codec;
-            Data = new T[] { message };
+            Data = new[] { message };
         }
 
         /// <summary>
@@ -75,133 +61,24 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <param name="source">The message source</param>
         /// <param name="destination">The message destination</param>
         /// <param name="message">The actual Writable message array</param>
-        /// <param name="messageType">The type of message to send</param>
-        /// <param name="codec">Streaming Codec</param>
-        public GroupCommunicationMessage(
+        internal GroupCommunicationMessage(
             string groupName,
             string operatorName,
             string source,
             string destination,
-            T[] message,
-            MessageType messageType,
-            IStreamingCodec<T> codec)
-            : base(groupName, operatorName, source, destination, messageType)
+            T[] message)
+            : base(groupName, operatorName, source, destination)
         {
-            _codec = codec;
             Data = message;
         }
 
         /// <summary>
         /// Returns the array of messages.
         /// </summary>
-        public T[] Data
+        internal T[] Data
         {
             get;
             set;
         }
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        public override void Read(IDataReader reader)
-        {
-            GroupName = reader.ReadString();
-            OperatorName = reader.ReadString();
-            Source = reader.ReadString();
-            Destination = reader.ReadString();
-
-            int dataCount = reader.ReadInt32();
-
-            if (dataCount == 0)
-            {
-                throw new Exception("Data Count in Group COmmunication Message cannot be zero");
-            }
-
-            MsgType = (MessageType)Enum.Parse(typeof(MessageType), reader.ReadString());
-            Data = new T[dataCount];
-
-            for (int index = 0; index < dataCount; index++)
-            {
-                Data[index] = _codec.Read(reader);
-
-                if (Data[index] == null)
-                {
-                    throw new Exception("message instance cannot be created from the IDataReader in Group Communication Message");
-                }
-            }
-        }
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        public override void Write(IDataWriter writer)
-        {
-            writer.WriteString(GroupName);
-            writer.WriteString(OperatorName);
-            writer.WriteString(Source);
-            writer.WriteString(Destination);
-            writer.WriteInt32(Data.Length);
-            writer.WriteString(MsgType.ToString());
-
-            foreach (var data in Data)
-            {
-                _codec.Write(data, writer);
-            }
-        }
-
-        /// <summary>
-        /// Read the class fields.
-        /// </summary>
-        /// <param name="reader">The reader from which to read </param>
-        /// <param name="token">The cancellation token</param>
-        public override async System.Threading.Tasks.Task ReadAsync(IDataReader reader, CancellationToken token)
-        {
-            GroupName = await reader.ReadStringAsync(token);
-            OperatorName = await reader.ReadStringAsync(token);
-            Source = await reader.ReadStringAsync(token);
-            Destination = await reader.ReadStringAsync(token);
-
-            int dataCount = await reader.ReadInt32Async(token);
-
-            if (dataCount == 0)
-            {
-                throw new Exception("Data Count in Group COmmunication Message cannot be zero");
-            }
-
-            MsgType = (MessageType)Enum.Parse(typeof(MessageType), await reader.ReadStringAsync(token));
-            Data = new T[dataCount];
-
-            for (int index = 0; index < dataCount; index++)
-            {
-                Data[index] = await _codec.ReadAsync(reader, token);
-
-                if (Data[index] == null)
-                {
-                    throw new Exception("message instance cannot be created from the IDataReader in Group Communication Message");
-                }
-            }
-        }
-
-        /// <summary>
-        /// Writes the class fields.
-        /// </summary>
-        /// <param name="writer">The writer to which to write</param>
-        /// <param name="token">The cancellation token</param>
-        public override async System.Threading.Tasks.Task WriteAsync(IDataWriter writer, CancellationToken token)
-        {
-            await writer.WriteStringAsync(GroupName, token);
-            await writer.WriteStringAsync(OperatorName, token);
-            await writer.WriteStringAsync(Source, token);
-            await writer.WriteStringAsync(Destination, token);
-            await writer.WriteInt32Async(Data.Length, token);
-            await writer.WriteStringAsync(MsgType.ToString(), token);
-
-            foreach (var data in Data)
-            {
-                await _codec.WriteAsync(data, writer, token);
-            }
-        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessageStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessageStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessageStreamingCodec.cs
new file mode 100644
index 0000000..d619e64
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessageStreamingCodec.cs
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Network.Group.Driver.Impl
+{
+    /// <summary>
+    /// Streaming Codec for the Group Communication Message
+    /// </summary>
+    internal sealed class GroupCommunicationMessageStreamingCodec<T> : IStreamingCodec<GroupCommunicationMessage<T>>
+    {
+        private readonly IStreamingCodec<T> _codec;
+
+        /// <summary>
+        /// Empty constructor to allow instantiation by reflection
+        /// </summary>
+        [Inject]
+        private GroupCommunicationMessageStreamingCodec(IStreamingCodec<T> codec)
+        {
+            _codec = codec;
+        }
+
+        /// <summary>
+        /// Read the class fields.
+        /// </summary>
+        /// <param name="reader">The reader from which to read </param>
+        /// <returns>The Group Communication Message</returns>
+        public GroupCommunicationMessage<T> Read(IDataReader reader)
+        {
+            int metadataSize = reader.ReadInt32();
+            byte[] metadata = new byte[metadataSize];
+            reader.Read(ref metadata, 0, metadataSize);
+            var res = GenerateMetaDataDecoding(metadata);
+
+            string groupName = res.Item1;
+            string operatorName = res.Item2;
+            string source = res.Item3;
+            string destination = res.Item4;
+            int dataCount = res.Item5;
+
+            if (dataCount == 0)
+            {
+                throw new Exception("Data Count in Group Communication Message cannot be zero");
+            }
+
+            var data = new T[dataCount];
+
+            for (int index = 0; index < dataCount; index++)
+            {
+                data[index] = _codec.Read(reader);
+
+                if (data[index] == null)
+                {
+                    throw new Exception("message instance cannot be created from the IDataReader in Group Communication Message");
+                }
+            }
+
+            return new GroupCommunicationMessage<T>(groupName, operatorName, source, destination, data);
+        }
+
+        /// <summary>
+        /// Writes the class fields.
+        /// </summary>
+        /// <param name="obj">The message to write</param>
+        /// <param name="writer">The writer to which to write</param>
+        public void Write(GroupCommunicationMessage<T> obj, IDataWriter writer)
+        {
+            byte[] encodedMetadata = GenerateMetaDataEncoding(obj);
+            byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length);
+            byte[] totalEncoding = encodedInt.Concat(encodedMetadata).ToArray();
+            writer.Write(totalEncoding, 0, totalEncoding.Length);
+
+            foreach (var data in obj.Data)
+            {
+                _codec.Write(data, writer);
+            }
+        }
+
+        /// <summary>
+        /// Read the class fields.
+        /// </summary>
+        /// <param name="reader">The reader from which to read </param>
+        /// <param name="token">The cancellation token</param>
+        /// <returns>The Group Communication Message</returns>
+        public async Task<GroupCommunicationMessage<T>> ReadAsync(IDataReader reader,
+            CancellationToken token)
+        {
+            int metadataSize = await reader.ReadInt32Async(token);
+            byte[] metadata = new byte[metadataSize];
+            await reader.ReadAsync(metadata, 0, metadataSize, token);
+            var res = GenerateMetaDataDecoding(metadata);
+
+            string groupName = res.Item1;
+            string operatorName = res.Item2;
+            string source = res.Item3;
+            string destination = res.Item4;
+            int dataCount = res.Item5;
+
+            if (dataCount == 0)
+            {
+                throw new Exception("Data Count in Group Communication Message cannot be zero");
+            }
+
+            var data = new T[dataCount];
+
+            for (int index = 0; index < dataCount; index++)
+            {
+                data[index] = await _codec.ReadAsync(reader, token);
+
+                if (data[index] == null)
+                {
+                    throw new Exception(
+                        "message instance cannot be created from the IDataReader in Group Communication Message");
+                }
+            }
+
+            return new GroupCommunicationMessage<T>(groupName, operatorName, source, destination, data);
+        }
+
+        /// <summary>
+        /// Writes the class fields.
+        /// </summary>
+        /// <param name="obj">The message to write</param>
+        /// <param name="writer">The writer to which to write</param>
+        /// <param name="token">The cancellation token</param>
+        public async System.Threading.Tasks.Task WriteAsync(GroupCommunicationMessage<T> obj, IDataWriter writer, CancellationToken token)
+        {
+            byte[] encodedMetadata = GenerateMetaDataEncoding(obj);
+            byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length);
+            byte[] totalEncoding = encodedInt.Concat(encodedMetadata).ToArray();
+            await writer.WriteAsync(totalEncoding, 0, totalEncoding.Length, token);
+
+            foreach (var data in obj.Data)
+            {
+                await _codec.WriteAsync(data, writer, token);
+            }
+        }
+
+        private static byte[] GenerateMetaDataEncoding(GroupCommunicationMessage<T> obj)
+        {
+            List<byte[]> metadataBytes = new List<byte[]>();
+
+            byte[] groupBytes = StringToBytes(obj.GroupName);
+            byte[] operatorBytes = StringToBytes(obj.OperatorName);
+            byte[] sourceBytes = StringToBytes(obj.Source);
+            byte[] dstBytes = StringToBytes(obj.Destination);
+            byte[] messageCount = BitConverter.GetBytes(obj.Data.Length);
+
+            metadataBytes.Add(BitConverter.GetBytes(groupBytes.Length));
+            metadataBytes.Add(BitConverter.GetBytes(operatorBytes.Length));
+            metadataBytes.Add(BitConverter.GetBytes(sourceBytes.Length));
+            metadataBytes.Add(BitConverter.GetBytes(dstBytes.Length));
+            metadataBytes.Add(groupBytes);
+            metadataBytes.Add(operatorBytes);
+            metadataBytes.Add(sourceBytes);
+            metadataBytes.Add(dstBytes);
+            metadataBytes.Add(messageCount);
+
+            return metadataBytes.SelectMany(i => i).ToArray();
+        }
+
+        private static Tuple<string, string, string, string, int> GenerateMetaDataDecoding(byte[] obj)
+        {
+            int groupCount = BitConverter.ToInt32(obj, 0);
+            int operatorCount = BitConverter.ToInt32(obj, sizeof (int));
+            int srcCount = BitConverter.ToInt32(obj, 2*sizeof (int));
+            int dstCount = BitConverter.ToInt32(obj, 3*sizeof (int));
+
+            int offset = 4 * sizeof(int);
+
+            string groupString = BytesToString(obj.Skip(offset).Take(groupCount).ToArray());
+            offset += groupCount;
+            string operatorString = BytesToString(obj.Skip(offset).Take(operatorCount).ToArray());
+            offset += operatorCount;
+            string srcString = BytesToString(obj.Skip(offset).Take(srcCount).ToArray());
+            offset += srcCount;
+            string dstString = BytesToString(obj.Skip(offset).Take(dstCount).ToArray());
+            offset += dstCount;
+            int messageCount = BitConverter.ToInt32(obj, offset);
+
+            return new Tuple<string, string, string, string, int>(groupString, operatorString, srcString, dstString,
+                messageCount);
+        }
+
+        private static byte[] StringToBytes(string str)
+        {
+            byte[] bytes = new byte[str.Length * sizeof(char)];
+            Buffer.BlockCopy(str.ToCharArray(), 0, bytes, 0, bytes.Length);
+            return bytes;
+        }
+
+        private static string BytesToString(byte[] bytes)
+        {
+            char[] chars = new char[bytes.Length / sizeof(char)];
+            Buffer.BlockCopy(bytes, 0, chars, 0, bytes.Length);
+            return new string(chars);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
index a78b13c..41b34a9 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
@@ -29,8 +29,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
     /// Group Communication operator used to do point-to-point communication between named Tasks.
     /// It uses Writable classes
     /// </summary>
-    [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
-    public sealed class Sender
+    internal sealed class Sender
     {
         private readonly INetworkService<GeneralGroupCommunicationMessage> _networkService;
         private readonly IIdentifierFactory _idFactory;
@@ -42,7 +41,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <param name="idFactory">Used to create IIdentifier for GroupCommunicationMessages.</param>
         [Inject]
         private Sender(
-            WritableNetworkService<GeneralGroupCommunicationMessage> networkService,
+            StreamingNetworkService<GeneralGroupCommunicationMessage> networkService,
             IIdentifierFactory idFactory)
         {
             _networkService = networkService;
@@ -54,7 +53,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// included in the message.
         /// </summary>
         /// <param name="message">The message to send.</param>
-        public void Send(GeneralGroupCommunicationMessage message)
+        internal void Send(GeneralGroupCommunicationMessage message)
         {
             if (message == null)
             {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
index 96c36e1..14dbd96 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
@@ -29,8 +29,7 @@ namespace Org.Apache.REEF.Network.Group.Task
     /// Writable Version
     /// </summary>
     [DefaultImplementation(typeof(CommunicationGroupNetworkObserver))]
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
-    public interface ICommunicationGroupNetworkObserver : IObserver<GeneralGroupCommunicationMessage>
+    internal interface ICommunicationGroupNetworkObserver : IObserver<GeneralGroupCommunicationMessage>
     {
         /// <summary>
         /// Registers the handler with the WritableCommunicationGroupNetworkObserver.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
index de19754..c700d88 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
@@ -30,8 +30,7 @@ namespace Org.Apache.REEF.Network.Group.Task
     /// Writable Version
     /// </summary>
     [DefaultImplementation(typeof(GroupCommNetworkObserver))]
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
-    public interface IGroupCommNetworkObserver : IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>>
+    internal interface IGroupCommNetworkObserver : IObserver<NsMessage<GeneralGroupCommunicationMessage>>
     {
         /// <summary>
         /// Registers the network handler for the given CommunicationGroup.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
index 305d245..cf3e559 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
@@ -19,7 +19,6 @@
 
 using System;
 using System.Collections.Generic;
-using System.Linq;
 using System.Reflection;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Operators;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
index 5be1457..c3989a9 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
@@ -30,8 +30,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// Handles incoming messages sent to this Communication Group.
     /// Writable version
     /// </summary>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
-    public sealed class CommunicationGroupNetworkObserver : ICommunicationGroupNetworkObserver
+    internal sealed class CommunicationGroupNetworkObserver : ICommunicationGroupNetworkObserver
     {
         private static readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupNetworkObserver));
         private readonly Dictionary<string, IObserver<GeneralGroupCommunicationMessage>> _handlers;
@@ -54,7 +53,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// will be invoked</param>
         /// <param name="observer">The writable handler to invoke when messages are sent
         /// to the operator specified by operatorName</param>
-        public void Register(string operatorName, IObserver<GeneralGroupCommunicationMessage> observer)
+        void ICommunicationGroupNetworkObserver.Register(string operatorName, IObserver<GeneralGroupCommunicationMessage> observer)
         {
             if (string.IsNullOrEmpty(operatorName))
             {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
index e79df55..8a54ede 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
@@ -34,7 +34,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// Used by Tasks to fetch CommunicationGroupClients.
     /// Writable version
     /// </summary>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
     public sealed class GroupCommClient : IGroupCommClient
     {
         private readonly Dictionary<string, ICommunicationGroupClientInternal> _commGroups;
@@ -50,11 +49,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <param name="configSerializer">Used to deserialize Group Communication configuration</param>
         /// <param name="injector">injector forked from the injector that creates this instance</param>
         [Inject]
-        [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
         public GroupCommClient(
             [Parameter(typeof(GroupCommConfigurationOptions.SerializedGroupConfigs))] ISet<string> groupConfigs,
             [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
-            WritableNetworkService<GeneralGroupCommunicationMessage> networkService,
+            StreamingNetworkService<GeneralGroupCommunicationMessage> networkService,
             AvroConfigurationSerializer configSerializer,
             IInjector injector)
         {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
index 9d35ff1..d0f35fe 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
@@ -31,8 +31,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// Handles all incoming messages for this Task.
     /// Writable version
     /// </summary>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
-    public sealed class GroupCommNetworkObserver : IGroupCommNetworkObserver
+    internal sealed class GroupCommNetworkObserver : IGroupCommNetworkObserver
     {
         private static readonly Logger LOGGER = Logger.GetLogger(typeof(GroupCommNetworkObserver));
 
@@ -42,7 +41,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Creates a new GroupCommNetworkObserver.
         /// </summary>
         [Inject]
-        public GroupCommNetworkObserver()
+        private GroupCommNetworkObserver()
         {
             _commGroupHandlers = new Dictionary<string, IObserver<GeneralGroupCommunicationMessage>>();
         }
@@ -53,7 +52,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// WritableCommunicationGroupNetworkObserver.
         /// </summary>
         /// <param name="nsMessage"></param>
-        public void OnNext(WritableNsMessage<GeneralGroupCommunicationMessage> nsMessage)
+        public void OnNext(NsMessage<GeneralGroupCommunicationMessage> nsMessage)
         {
             if (nsMessage == null)
             {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
index aa5de1e..00fa9a5 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
@@ -29,7 +29,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// Writable version
     /// </summary>
     /// <typeparam name="T"> Generic type of message</typeparam>
-    // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
     internal sealed class NodeStruct<T>
     {
         private readonly BlockingCollection<GroupCommunicationMessage<T>> _messageQueue;
@@ -38,7 +37,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Creates a new NodeStruct.
         /// </summary>
         /// <param name="id">The Task identifier</param>
-        public NodeStruct(string id)
+        internal NodeStruct(string id)
         {
             Identifier = id;
             _messageQueue = new BlockingCollection<GroupCommunicationMessage<T>>();
@@ -48,13 +47,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Returns the identifier for the Task that sent all
         /// messages in the message queue.
         /// </summary>
-        public string Identifier { get; private set; }
+        internal string Identifier { get; private set; }
 
         /// <summary>
         /// Gets the first message in the message queue.
         /// </summary>
         /// <returns>The first available message.</returns>
-        public T[] GetData()
+        internal T[] GetData()
         {
             return _messageQueue.Take().Data;
         }
@@ -63,7 +62,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Adds an incoming message to the message queue.
         /// </summary>
         /// <param name="gcm">The incoming message</param>
-        public void AddData(GroupCommunicationMessage<T> gcm)
+        internal void AddData(GroupCommunicationMessage<T> gcm)
         {
             _messageQueue.Add(gcm);
         }
@@ -72,7 +71,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Tells whether there is a message in queue or not.
         /// </summary>
         /// <returns>True if queue is non empty, false otherwise.</returns>
-        public bool HasMessage()
+        internal bool HasMessage()
         {
             if (_messageQueue.Count != 0)
             {