You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2015/07/15 23:48:56 UTC
[1/3] incubator-reef git commit: [REEF-447]Convert Network Service
Layer from Writable to Streaming
Repository: incubator-reef
Updated Branches:
refs/heads/master 5cdd655e4 -> 8505dee94
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs
deleted file mode 100644
index 1af9b86..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TemporaryWritableToStreamingCodec.cs
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.StreamingCodec;
-
-namespace Org.Apache.REEF.Wake.Remote.Impl
-{
- // TODO: This class will be removed shortly and is used to reduce the size of pull request.
- internal sealed class TemporaryWritableToStreamingCodec<T> : IStreamingCodec<T> where T:IWritable
- {
- private readonly IInjector _injector;
-
- [Inject]
- public TemporaryWritableToStreamingCodec(IInjector injector)
- {
- _injector = injector;
- }
-
- public T Read(IDataReader reader)
- {
- string type = reader.ReadString();
- var value = (T) _injector.ForkInjector().GetInstance(type);
- value.Read(reader);
- return value;
- }
-
- public void Write(T obj, IDataWriter writer)
- {
- writer.WriteString(obj.GetType().AssemblyQualifiedName);
- obj.Write(writer);
- }
-
- public async Task<T> ReadAsync(IDataReader reader, CancellationToken token)
- {
- string type = await reader.ReadStringAsync(token);
- var value = (T) _injector.ForkInjector().GetInstance(type);
- await value.ReadAsync(reader, token);
- return value;
- }
-
- public async Task WriteAsync(T obj, IDataWriter writer, CancellationToken token)
- {
- await writer.WriteStringAsync(obj.GetType().AssemblyQualifiedName, token);
- await obj.WriteAsync(writer, token);
- }
- }
-}
[2/3] incubator-reef git commit: [REEF-447]Convert Network Service
Layer from Writable to Streaming
Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index b1b79d4..8fe19d6 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -32,7 +32,6 @@ using Org.Apache.REEF.Network.NetworkService;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Exceptions;
using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.StreamingCodec;
namespace Org.Apache.REEF.Network.Group.Task.Impl
{
@@ -43,18 +42,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// Communication Group.
/// </summary>
/// <typeparam name="T">The message type</typeparam>
- // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
public sealed class OperatorTopology<T> : IOperatorTopology<T>, IObserver<GeneralGroupCommunicationMessage>
{
- private const int DefaultTimeout = 50000;
- private const int RetryCount = 10;
-
private static readonly Logger Logger = Logger.GetLogger(typeof(OperatorTopology<>));
private readonly string _groupName;
private readonly string _operatorName;
private readonly string _selfId;
- private string _driverId;
private readonly int _timeout;
private readonly int _retryCount;
private readonly int _sleepTime;
@@ -66,7 +60,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
private readonly Sender _sender;
private readonly BlockingCollection<NodeStruct<T>> _nodesWithData;
private readonly Object _thisLock = new Object();
- private readonly IStreamingCodec<T> _codec;
/// <summary>
/// Creates a new OperatorTopology object.
@@ -74,7 +67,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// <param name="operatorName">The name of the Group Communication Operator</param>
/// <param name="groupName">The name of the operator's Communication Group</param>
/// <param name="taskId">The operator's Task identifier</param>
- /// <param name="driverId">The identifer for the driver</param>
/// <param name="timeout">Timeout value for cancellation token</param>
/// <param name="retryCount">Number of times to retry wating for registration</param>
/// <param name="sleepTime">Sleep time between retry wating for registration</param>
@@ -82,26 +74,22 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// <param name="childIds">The set of child Task identifiers in the topology graph</param>
/// <param name="networkService">The network service</param>
/// <param name="sender">The Sender used to do point to point communication</param>
- /// <param name="codec">Streaming codec to encode objects</param>
[Inject]
private OperatorTopology(
[Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName,
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
[Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
- [Parameter(typeof(GroupCommConfigurationOptions.DriverId))] string driverId,
[Parameter(typeof(GroupCommConfigurationOptions.Timeout))] int timeout,
[Parameter(typeof(GroupCommConfigurationOptions.RetryCountWaitingForRegistration))] int retryCount,
[Parameter(typeof(GroupCommConfigurationOptions.SleepTimeWaitingForRegistration))] int sleepTime,
[Parameter(typeof(GroupCommConfigurationOptions.TopologyRootTaskId))] string rootId,
[Parameter(typeof(GroupCommConfigurationOptions.TopologyChildTaskIds))] ISet<string> childIds,
- WritableNetworkService<GeneralGroupCommunicationMessage> networkService,
- Sender sender,
- IStreamingCodec<T> codec)
+ StreamingNetworkService<GeneralGroupCommunicationMessage> networkService,
+ Sender sender)
{
_operatorName = operatorName;
_groupName = groupName;
_selfId = taskId;
- _driverId = driverId;
_timeout = timeout;
_retryCount = retryCount;
_sleepTime = sleepTime;
@@ -110,7 +98,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
_nodesWithData = new BlockingCollection<NodeStruct<T>>();
_children = new List<NodeStruct<T>>();
_idToNodeMap = new Dictionary<string, NodeStruct<T>>();
- _codec = codec;
if (_selfId.Equals(rootId))
{
@@ -201,7 +188,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
throw new ArgumentException("No parent for node");
}
- SendToNode(message, MessageType.Data, _parent);
+ SendToNode(message, _parent);
}
/// <summary>
@@ -218,7 +205,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
foreach (var child in _children)
{
- SendToNode(message, MessageType.Data, child);
+ SendToNode(message, child);
}
}
@@ -444,10 +431,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// <param name="message">The message to send</param>
/// <param name="msgType">The message type</param>
/// <param name="node">The NodeStruct representing the Task to send to</param>
- private void SendToNode(T message, MessageType msgType, NodeStruct<T> node)
+ private void SendToNode(T message, NodeStruct<T> node)
{
GeneralGroupCommunicationMessage gcm = new GroupCommunicationMessage<T>(_groupName, _operatorName,
- _selfId, node.Identifier, message, msgType, _codec);
+ _selfId, node.Identifier, message);
_sender.Send(gcm);
}
@@ -458,12 +445,12 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// <param name="messages">The list of messages to send</param>
/// <param name="msgType">The message type</param>
/// <param name="node">The NodeStruct representing the Task to send to</param>
- private void SendToNode(IList<T> messages, MessageType msgType, NodeStruct<T> node)
+ private void SendToNode(IList<T> messages, NodeStruct<T> node)
{
T[] encodedMessages = messages.ToArray();
GroupCommunicationMessage<T> gcm = new GroupCommunicationMessage<T>(_groupName, _operatorName,
- _selfId, node.Identifier, encodedMessages, msgType, _codec);
+ _selfId, node.Identifier, encodedMessages);
_sender.Send(gcm);
}
@@ -511,7 +498,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
}
IList<T> sublist = messages.ToList().GetRange(i, size);
- SendToNode(sublist, MessageType.Data, nodeStruct);
+ SendToNode(sublist, nodeStruct);
i += size;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs
new file mode 100644
index 0000000..c30c1bd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Network.NetworkService.Codec
+{
+ /// <summary>
+ /// Codec to serialize NsMessages for NetworkService.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ internal class NsMessageStreamingCodec<T> : IStreamingCodec<NsMessage<T>>
+ {
+ private readonly IIdentifierFactory _idFactory;
+ private readonly StreamingCodecFunctionCache<T> _codecFunctionsCache;
+
+ /// <summary>
+ /// Create new NsMessageCodec.
+ /// </summary>
+ /// <param name="idFactory">Used to create identifier from string.</param>
+ /// <param name="injector">Injector to instantiate codecs.</param>
+ [Inject]
+ private NsMessageStreamingCodec(IIdentifierFactory idFactory, IInjector injector)
+ {
+ _idFactory = idFactory;
+ _codecFunctionsCache = new StreamingCodecFunctionCache<T>(injector);
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ ///<returns>The instance of type NsMessage<T></T> read from the reader</returns>
+ public NsMessage<T> Read(IDataReader reader)
+ {
+ int metadataSize = reader.ReadInt32();
+ byte[] metadata = new byte[metadataSize];
+ reader.Read(ref metadata, 0, metadataSize);
+ var res = GenerateMetaDataDecoding(metadata);
+
+ Type messageType = res.Item3;
+ NsMessage<T> message = res.Item1;
+
+ var codecReadFunc = _codecFunctionsCache.ReadFunction(messageType);
+ int messageCount = res.Item2;
+
+ for (int i = 0; i < messageCount; i++)
+ {
+ message.Data.Add(codecReadFunc(reader));
+ }
+
+ return message;
+ }
+
+ /// <summary>
+ /// Writes the class fields to the writer.
+ /// </summary>
+ /// <param name="obj">The object of type NsMessage<T></T> to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ public void Write(NsMessage<T> obj, IDataWriter writer)
+ {
+ byte[] encodedMetadata = GenerateMetaDataEncoding(obj);
+ byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length);
+ byte[] totalEncoding = encodedInt.Concat(encodedMetadata).ToArray();
+ writer.Write(totalEncoding, 0, totalEncoding.Length);
+
+ Type messageType = obj.Data[0].GetType();
+ var codecWriteFunc = _codecFunctionsCache.WriteFunction(messageType);
+
+ foreach (var data in obj.Data)
+ {
+ codecWriteFunc(data, writer);
+ }
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ /// <param name="token">Cancellation token</param>
+ /// <returns>The instance of type NsMessage<T> read from the reader</returns>
+ public async Task<NsMessage<T>> ReadAsync(IDataReader reader, CancellationToken token)
+ {
+ int metadataSize = await reader.ReadInt32Async(token);
+ byte[] metadata = new byte[metadataSize];
+ await reader.ReadAsync(metadata, 0, metadataSize, token);
+ var res = GenerateMetaDataDecoding(metadata);
+ Type messageType = res.Item3;
+ NsMessage<T> message = res.Item1;
+ var codecReadFunc = _codecFunctionsCache.ReadAsyncFunction(messageType);
+ int messageCount = res.Item2;
+
+ for (int i = 0; i < messageCount; i++)
+ {
+ message.Data.Add(codecReadFunc(reader, token));
+ }
+
+ return message;
+ }
+
+ /// <summary>
+ /// Writes the class fields to the writer.
+ /// </summary>
+ /// <param name="obj">The object of type NsMessage<T> to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ /// <param name="token">Cancellation token</param>
+ public async Task WriteAsync(NsMessage<T> obj, IDataWriter writer, CancellationToken token)
+ {
+ byte[] encodedMetadata = GenerateMetaDataEncoding(obj);
+ byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length);
+ byte[] totalEncoding = encodedInt.Concat(encodedMetadata).ToArray();
+ await writer.WriteAsync(totalEncoding, 0, totalEncoding.Length, token);
+
+ Type messageType = obj.Data[0].GetType();
+
+ var codecWriteFunc = _codecFunctionsCache.WriteAsyncFunction(messageType);
+
+ foreach (var data in obj.Data)
+ {
+ var asyncResult = codecWriteFunc.BeginInvoke(data, writer, token, null, null);
+ codecWriteFunc.EndInvoke(asyncResult);
+ }
+ }
+
+ private static byte[] GenerateMetaDataEncoding(NsMessage<T> obj )
+ {
+ List<byte[]> metadataBytes = new List<byte[]>();
+ byte[] sourceBytes = StringToBytes(obj.SourceId.ToString());
+ byte[] dstBytes = StringToBytes(obj.DestId.ToString());
+ byte[] messageTypeBytes = StringToBytes(obj.Data[0].GetType().AssemblyQualifiedName);
+ byte[] messageCount = BitConverter.GetBytes(obj.Data.Count);
+
+ metadataBytes.Add(BitConverter.GetBytes(sourceBytes.Length));
+ metadataBytes.Add(BitConverter.GetBytes(dstBytes.Length));
+ metadataBytes.Add(BitConverter.GetBytes(messageTypeBytes.Length));
+ metadataBytes.Add(sourceBytes);
+ metadataBytes.Add(dstBytes);
+ metadataBytes.Add(messageTypeBytes);
+ metadataBytes.Add(messageCount);
+
+ return metadataBytes.SelectMany(i => i).ToArray();
+ }
+
+ private Tuple<NsMessage<T>, int, Type> GenerateMetaDataDecoding(byte[] obj)
+ {
+ int srcCount = BitConverter.ToInt32(obj, 0);
+ int dstCount = BitConverter.ToInt32(obj, sizeof (int));
+ int msgTypeCount = BitConverter.ToInt32(obj, 2*sizeof (int));
+
+ int offset = 3*sizeof (int);
+ string srcString = BytesToString(obj.Skip(offset).Take(srcCount).ToArray());
+ offset += srcCount;
+ string dstString = BytesToString(obj.Skip(offset).Take(dstCount).ToArray());
+ offset += dstCount;
+ Type msgType = Type.GetType(BytesToString(obj.Skip(offset).Take(msgTypeCount).ToArray()));
+ offset += msgTypeCount;
+ int messageCount = BitConverter.ToInt32(obj, offset);
+
+ NsMessage<T> msg = new NsMessage<T>(_idFactory.Create(srcString), _idFactory.Create(dstString));
+ return new Tuple<NsMessage<T>, int, Type>(msg, messageCount, msgType);
+ }
+
+ private static byte[] StringToBytes(string str)
+ {
+ byte[] bytes = new byte[str.Length * sizeof(char)];
+ Buffer.BlockCopy(str.ToCharArray(), 0, bytes, 0, bytes.Length);
+ return bytes;
+ }
+
+ private static string BytesToString(byte[] bytes)
+ {
+ char[] chars = new char[bytes.Length / sizeof(char)];
+ Buffer.BlockCopy(bytes, 0, chars, 0, bytes.Length);
+ return new string(chars);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
new file mode 100644
index 0000000..6d91298
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Network.NetworkService.Codec
+{
+ /// <summary>
+ /// Cache of StreamingCodec functions used to store codec functions for messages
+ /// to avoid reflection cost. Each message type is assumed to have a unique
+ /// associated codec
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ internal class StreamingCodecFunctionCache<T>
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof (StreamingCodecFunctionCache<T>));
+ private readonly Dictionary<Type, Func<IDataReader, T>> _readFuncCache;
+ private readonly Dictionary<Type, Func<IDataReader, CancellationToken, T>> _readAsyncFuncCache;
+ private readonly Dictionary<Type, Action<T, IDataWriter>> _writeFuncCache;
+ private readonly Dictionary<Type, Func<T, IDataWriter, CancellationToken, Task>> _writeAsyncFuncCache;
+ private readonly IInjector _injector;
+ private readonly Type _streamingCodecType;
+
+ /// <summary>
+ /// Create new StreamingCodecFunctionCache.
+ /// </summary>
+ /// <param name="injector"> Injector</param>
+ internal StreamingCodecFunctionCache(IInjector injector)
+ {
+ _injector = injector;
+ _readFuncCache = new Dictionary<Type, Func<IDataReader, T>>();
+ _readAsyncFuncCache = new Dictionary<Type, Func<IDataReader, CancellationToken, T>>();
+ _writeFuncCache = new Dictionary<Type, Action<T, IDataWriter>>();
+ _writeAsyncFuncCache = new Dictionary<Type, Func<T, IDataWriter, CancellationToken, Task>>();
+ _streamingCodecType = typeof(IStreamingCodec<>);
+ }
+
+ /// <summary>
+ /// Creates the read delegate function of StreamingCodec from the message type
+ /// </summary>
+ /// <param name="messageType">Type of message</param>
+ /// <returns>The read delegate function</returns>
+ internal Func<IDataReader, T> ReadFunction(Type messageType)
+ {
+ Func<IDataReader, T> readFunc;
+
+ if (!_readFuncCache.TryGetValue(messageType, out readFunc))
+ {
+ AddCodecFunctions(messageType);
+ readFunc = _readFuncCache[messageType];
+ }
+
+ return readFunc;
+ }
+
+ /// <summary>
+ /// Creates the read async delegate function of StreamingCodec from the message type
+ /// </summary>
+ /// <param name="messageType">Type of message</param>
+ /// <returns>The read async delegate function</returns>
+ internal Func<IDataReader, CancellationToken, T> ReadAsyncFunction(Type messageType)
+ {
+ Func<IDataReader, CancellationToken, T> readFunc;
+
+ if (!_readAsyncFuncCache.TryGetValue(messageType, out readFunc))
+ {
+ AddCodecFunctions(messageType);
+ readFunc = _readAsyncFuncCache[messageType];
+ }
+
+ return readFunc;
+ }
+
+ /// <summary>
+ /// Creates the write delegate function of StreamingCodec from the message type
+ /// </summary>
+ /// <param name="messageType">Type of message</param>
+ /// <returns>The write delegate function</returns>
+ internal Action<T, IDataWriter> WriteFunction(Type messageType)
+ {
+ Action<T, IDataWriter> writeFunc;
+
+ if (!_writeFuncCache.TryGetValue(messageType, out writeFunc))
+ {
+ AddCodecFunctions(messageType);
+ writeFunc = _writeFuncCache[messageType];
+ }
+
+ return writeFunc;
+ }
+
+ /// <summary>
+ /// Creates the write async delegate function of StreamingCodec from the message type
+ /// </summary>
+ /// <param name="messageType">Type of message</param>
+ /// <returns>The write async delegate function</returns>
+ internal Func<T, IDataWriter, CancellationToken, Task> WriteAsyncFunction(Type messageType)
+ {
+ Func<T, IDataWriter, CancellationToken, Task> writeFunc;
+
+ if (!_writeAsyncFuncCache.TryGetValue(messageType, out writeFunc))
+ {
+ AddCodecFunctions(messageType);
+ writeFunc = _writeAsyncFuncCache[messageType];
+ }
+
+ return writeFunc;
+ }
+
+ private void AddCodecFunctions(Type messageType)
+ {
+ if (!typeof(T).IsAssignableFrom(messageType))
+ {
+ Exceptions.CaughtAndThrow(new Exception("Message type not assignable to base type"), Level.Error,
+ Logger);
+ }
+
+ Type codecType = _streamingCodecType.MakeGenericType(messageType);
+ var codec = _injector.GetInstance(codecType);
+
+ MethodInfo readMethod = codec.GetType().GetMethod("Read");
+ _readFuncCache[messageType] = (Func<IDataReader, T>) Delegate.CreateDelegate
+ (typeof (Func<IDataReader, T>), codec, readMethod);
+
+ MethodInfo readAsyncMethod = codec.GetType().GetMethod("ReadAsync");
+ MethodInfo genericHelper = GetType()
+ .GetMethod("ReadAsyncHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance);
+ MethodInfo constructedHelper = genericHelper.MakeGenericMethod(messageType);
+ _readAsyncFuncCache[messageType] =
+ (Func<IDataReader, CancellationToken, T>)constructedHelper.Invoke(this, new[] { readAsyncMethod, codec });
+
+ MethodInfo writeMethod = codec.GetType().GetMethod("Write");
+ genericHelper = GetType().GetMethod("WriteHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance);
+ constructedHelper = genericHelper.MakeGenericMethod(messageType);
+ _writeFuncCache[messageType] =
+ (Action<T, IDataWriter>) constructedHelper.Invoke(this, new[] {writeMethod, codec});
+
+ MethodInfo writeAsyncMethod = codec.GetType().GetMethod("WriteAsync");
+ genericHelper = GetType().GetMethod("WriteAsyncHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance);
+ constructedHelper = genericHelper.MakeGenericMethod(messageType);
+ _writeAsyncFuncCache[messageType] =
+ (Func<T, IDataWriter, CancellationToken, Task>)
+ constructedHelper.Invoke(this, new[] {writeAsyncMethod, codec});
+ }
+
+ private Action<T, IDataWriter> WriteHelperFunc<T1>(MethodInfo method, object codec) where T1 : class
+ {
+ Action<T1, IDataWriter> func = (Action<T1, IDataWriter>) Delegate.CreateDelegate
+ (typeof (Action<T1, IDataWriter>), codec, method);
+
+ Action<T, IDataWriter> ret = (obj, writer) => func(obj as T1, writer);
+ return ret;
+ }
+
+ private Func<T, IDataWriter, CancellationToken, Task> WriteAsyncHelperFunc<T1>(MethodInfo method, object codec)
+ where T1 : class
+ {
+ Func<T1, IDataWriter, CancellationToken, Task> func =
+ (Func<T1, IDataWriter, CancellationToken, Task>) Delegate.CreateDelegate
+ (typeof (Func<T1, IDataWriter, CancellationToken, Task>), codec, method);
+
+ Func<T, IDataWriter, CancellationToken, Task> ret = (obj, writer, token) => func(obj as T1, writer, token);
+ return ret;
+ }
+
+ private Func<IDataReader, CancellationToken, T> ReadAsyncHelperFunc<T1>(MethodInfo method, object codec)
+ where T1 : class
+ {
+ Func<IDataReader, CancellationToken, Task<T1>> func =
+ (Func<IDataReader, CancellationToken, Task<T1>>) Delegate.CreateDelegate
+ (typeof (Func<IDataReader, CancellationToken, Task<T1>>), codec, method);
+
+ Func<IDataReader, CancellationToken, T1> func1 = (writer, token) => func(writer, token).Result;
+ Func<IDataReader, CancellationToken, T> func2 = (writer, token) => ((T)(object)func1(writer, token));
+ return func2;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
new file mode 100644
index 0000000..1ff2517
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Network.NetworkService.Codec;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Network.NetworkService
+{
+ /// <summary>
+ /// Writable Network service used for Reef Task communication.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public class StreamingNetworkService<T> : INetworkService<T>
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(StreamingNetworkService<>));
+
+ private readonly IRemoteManager<NsMessage<T>> _remoteManager;
+ private IIdentifier _localIdentifier;
+ private readonly IDisposable _messageHandlerDisposable;
+ private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap;
+ private readonly INameClient _nameClient;
+
+ /// <summary>
+ /// Create a new Writable NetworkService.
+ /// </summary>
+ /// <param name="messageHandler">The observer to handle incoming messages</param>
+ /// <param name="idFactory">The factory used to create IIdentifiers</param>
+ /// <param name="nameClient">The name client used to register Ids</param>
+ /// <param name="remoteManagerFactory">Writable RemoteManagerFactory to create a
+ /// Writable RemoteManager</param>
+ /// <param name="codec">Codec for Network Service message</param>
+ /// <param name="injector">Fork of the injector that created the Network service</param>
+ [Inject]
+ private StreamingNetworkService(
+ IObserver<NsMessage<T>> messageHandler,
+ IIdentifierFactory idFactory,
+ INameClient nameClient,
+ StreamingRemoteManagerFactory remoteManagerFactory,
+ NsMessageStreamingCodec<T> codec,
+ IInjector injector)
+ {
+ IPAddress localAddress = NetworkUtils.LocalIPAddress;
+ _remoteManager = remoteManagerFactory.GetInstance(localAddress, codec);
+
+ // Create and register incoming message handler
+ // TODO[REEF-419] This should use the TcpPortProvider mechanism
+ var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
+ _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, messageHandler);
+
+ _nameClient = nameClient;
+ _connectionMap = new Dictionary<IIdentifier, IConnection<T>>();
+
+ Logger.Log(Level.Info, "Started network service");
+ }
+
+ /// <summary>
+ /// Name client for registering ids
+ /// </summary>
+ public INameClient NamingClient
+ {
+ get { return _nameClient; }
+ }
+
+ /// <summary>
+ /// Open a new connection to the remote host registered to
+ /// the name service with the given identifier
+ /// </summary>
+ /// <param name="destinationId">The identifier of the remote host</param>
+ /// <returns>The IConnection used for communication</returns>
+ public IConnection<T> NewConnection(IIdentifier destinationId)
+ {
+ if (_localIdentifier == null)
+ {
+ throw new IllegalStateException("Cannot open connection without first registering an ID");
+ }
+
+ IConnection<T> connection;
+ if (_connectionMap.TryGetValue(destinationId, out connection))
+ {
+ return connection;
+ }
+ else
+ {
+ connection = new NsConnection<T>(_localIdentifier, destinationId,
+ NamingClient, _remoteManager, _connectionMap);
+
+ _connectionMap[destinationId] = connection;
+ return connection;
+ }
+ }
+
+ /// <summary>
+ /// Register the identifier for the NetworkService with the NameService.
+ /// </summary>
+ /// <param name="id">The identifier to register</param>
+ public void Register(IIdentifier id)
+ {
+ Logger.Log(Level.Info, "Registering id {0} with network service.", id);
+
+ _localIdentifier = id;
+ NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint);
+
+ Logger.Log(Level.Info, "End of Registering id {0} with network service.", id);
+ }
+
+ /// <summary>
+ /// Unregister the identifier for the NetworkService with the NameService.
+ /// </summary>
+ public void Unregister()
+ {
+ if (_localIdentifier == null)
+ {
+ throw new IllegalStateException("Cannot unregister a non existant identifier");
+ }
+
+ NamingClient.Unregister(_localIdentifier.ToString());
+ _localIdentifier = null;
+ _messageHandlerDisposable.Dispose();
+ }
+
+ /// <summary>
+ /// Dispose of the NetworkService's resources
+ /// </summary>
+ public void Dispose()
+ {
+ NamingClient.Dispose();
+ _remoteManager.Dispose();
+
+ Logger.Log(Level.Info, "Disposed of network service");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
deleted file mode 100644
index 93da126..0000000
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Net;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Exceptions;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Network.NetworkService
-{
- /// <summary>
- /// Writable Network service used for Reef Task communication.
- /// </summary>
- /// <typeparam name="T">The message type</typeparam>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public class WritableNetworkService<T> : INetworkService<T> where T : IWritable
- {
- private static readonly Logger Logger = Logger.GetLogger(typeof(NetworkService<>));
-
- private readonly IRemoteManager<WritableNsMessage<T>> _remoteManager;
- private readonly IObserver<WritableNsMessage<T>> _messageHandler;
- private IIdentifier _localIdentifier;
- private IDisposable _messageHandlerDisposable;
- private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap;
- private readonly INameClient _nameClient;
-
- /// <summary>
- /// Create a new Writable NetworkService.
- /// </summary>
- /// <param name="nsPort">The port that the NetworkService will listen on</param>
- /// <param name="messageHandler">The observer to handle incoming messages</param>
- /// <param name="idFactory">The factory used to create IIdentifiers</param>
- /// <param name="nameClient">The name client used to register Ids</param>
- /// <param name="remoteManagerFactory">Writable RemoteManagerFactory to create a
- /// Writable RemoteManager</param>
- [Inject]
- private WritableNetworkService(
- [Parameter(typeof (NetworkServiceOptions.NetworkServicePort))] int nsPort,
- IObserver<WritableNsMessage<T>> messageHandler,
- IIdentifierFactory idFactory,
- INameClient nameClient,
- StreamingRemoteManagerFactory remoteManagerFactory)
- {
-
- IPAddress localAddress = NetworkUtils.LocalIPAddress;
- _remoteManager = remoteManagerFactory.GetInstance<WritableNsMessage<T>>(localAddress, nsPort);
- _messageHandler = messageHandler;
-
- // Create and register incoming message handler
- // TODO[REEF-419] This should use the TcpPortProvider mechanism
- var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
- _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, _messageHandler);
-
- _nameClient = nameClient;
- _connectionMap = new Dictionary<IIdentifier, IConnection<T>>();
-
- Logger.Log(Level.Info, "Started network service");
- }
-
- /// <summary>
- /// Name client for registering ids
- /// </summary>
- public INameClient NamingClient
- {
- get { return _nameClient; }
- }
-
- /// <summary>
- /// Open a new connection to the remote host registered to
- /// the name service with the given identifier
- /// </summary>
- /// <param name="destinationId">The identifier of the remote host</param>
- /// <returns>The IConnection used for communication</returns>
- public IConnection<T> NewConnection(IIdentifier destinationId)
- {
- if (_localIdentifier == null)
- {
- throw new IllegalStateException("Cannot open connection without first registering an ID");
- }
-
- IConnection<T> connection;
- if (_connectionMap.TryGetValue(destinationId, out connection))
- {
- return connection;
- }
- else
- {
- connection = new WritableNsConnection<T>(_localIdentifier, destinationId,
- NamingClient, _remoteManager, _connectionMap);
-
- _connectionMap[destinationId] = connection;
- return connection;
- }
- }
-
- /// <summary>
- /// Register the identifier for the NetworkService with the NameService.
- /// </summary>
- /// <param name="id">The identifier to register</param>
- public void Register(IIdentifier id)
- {
- Logger.Log(Level.Info, "Registering id {0} with network service.", id);
-
- _localIdentifier = id;
- NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint);
-
- Logger.Log(Level.Info, "End of Registering id {0} with network service.", id);
- }
-
- /// <summary>
- /// Unregister the identifier for the NetworkService with the NameService.
- /// </summary>
- public void Unregister()
- {
- if (_localIdentifier == null)
- {
- throw new IllegalStateException("Cannot unregister a non existant identifier");
- }
-
- NamingClient.Unregister(_localIdentifier.ToString());
- _localIdentifier = null;
- _messageHandlerDisposable.Dispose();
- }
-
- /// <summary>
- /// Dispose of the NetworkService's resources
- /// </summary>
- public void Dispose()
- {
- NamingClient.Dispose();
- _remoteManager.Dispose();
-
- Logger.Log(Level.Info, "Disposed of network service");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs
deleted file mode 100644
index c20238c..0000000
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Net;
-using System.Net.Sockets;
-using System.Runtime.Remoting;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Tang.Exceptions;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Network.NetworkService
-{
- /// <summary>
- /// Represents a connection between two hosts using the Writable NetworkService.
- /// </summary>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public class WritableNsConnection<T> : IConnection<T> where T : IWritable
- {
- private static readonly Logger Logger = Logger.GetLogger(typeof (WritableNsConnection<T>));
-
- private readonly IIdentifier _sourceId;
- private readonly IIdentifier _destId;
- private readonly INameClient _nameClient;
- private readonly IRemoteManager<WritableNsMessage<T>> _remoteManager;
- private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap;
- private IObserver<WritableNsMessage<T>> _remoteSender;
-
- /// <summary>
- /// Creates a new NsConnection between two hosts.
- /// </summary>
- /// <param name="sourceId">The identifier of the sender</param>
- /// <param name="destId">The identifier of the receiver</param>
- /// <param name="nameClient">The NameClient used for naming lookup</param>
- /// <param name="remoteManager">The remote manager used for network communication</param>
- /// <param name="connectionMap">A cache of opened connections. Will remove itself from
- /// the cache when the NsConnection is disposed.</param>
- public WritableNsConnection(
- IIdentifier sourceId,
- IIdentifier destId,
- INameClient nameClient,
- IRemoteManager<WritableNsMessage<T>> remoteManager,
- Dictionary<IIdentifier, IConnection<T>> connectionMap)
- {
- _sourceId = sourceId;
- _destId = destId;
- _nameClient = nameClient;
- _remoteManager = remoteManager;
- _connectionMap = connectionMap;
- }
-
- /// <summary>
- /// Opens the connection to the remote host.
- /// </summary>
- public void Open()
- {
- string destStr = _destId.ToString();
- Logger.Log(Level.Verbose, "Network service opening connection to {0}...", destStr);
-
- IPEndPoint destAddr = _nameClient.Lookup(destStr);
- if (null == destAddr)
- {
- throw new RemotingException("Destination Address identifier cannot be found");
- }
-
- try
- {
- _remoteSender = _remoteManager.GetRemoteObserver(destAddr);
- Logger.Log(Level.Verbose, "Network service completed connection to {0}.", destStr);
- }
- catch (SocketException)
- {
- Logger.Log(Level.Error, "Network Service cannot open connection to " + destAddr);
- throw;
- }
- catch (ObjectDisposedException)
- {
- Logger.Log(Level.Error, "Network Service cannot open connection to " + destAddr);
- throw;
- }
- }
-
- /// <summary>
- /// Writes the object to the remote host.
- /// </summary>
- /// <param name="message">The message to send</param>
- public void Write(T message)
- {
- if (_remoteSender == null)
- {
- throw new IllegalStateException("NsConnection has not been opened yet.");
- }
-
- try
- {
- _remoteSender.OnNext(new WritableNsMessage<T>(_sourceId, _destId, message));
- }
- catch (IOException)
- {
- Logger.Log(Level.Error, "Network Service cannot write message to {0}", _destId);
- throw;
- }
- catch (ObjectDisposedException)
- {
- Logger.Log(Level.Error, "Network Service cannot write message to {0}", _destId);
- throw;
- }
- }
-
- /// <summary>
- /// Closes the connection
- /// </summary>
- public void Dispose()
- {
- _connectionMap.Remove(_destId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
deleted file mode 100644
index a9299bb..0000000
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Linq;
-using System.Runtime.Serialization;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.Hadoop.Avro;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-
-
-namespace Org.Apache.REEF.Network.NetworkService
-{
- /// <summary>
- /// Writable Message sent between NetworkServices.</summary>
- /// <typeparam name="T">The type of data being sent. It is assumed to be Writable</typeparam>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public class WritableNsMessage<T> : IWritable where T : IWritable
- {
- private IIdentifierFactory _factory;
- private IInjector _injection;
- /// <summary>
- /// Constructor to allow instantiation by reflection
- /// </summary>
- [Inject]
- public WritableNsMessage(IIdentifierFactory factory, IInjector injection)
- {
- _factory = factory;
- _injection = injection;
- }
-
- /// <summary>
- /// Create a new Writable NsMessage with no data.
- /// </summary>
- /// <param name="sourceId">The identifier of the sender</param>
- /// <param name="destId">The identifier of the receiver</param>
- public WritableNsMessage(IIdentifier sourceId, IIdentifier destId)
- {
- SourceId = sourceId;
- DestId = destId;
- Data = new List<T>();
- }
-
- /// <summary>
- /// Create a new Writable NsMessage with data.
- /// </summary>
- /// <param name="sourceId">The identifier of the sender</param>
- /// <param name="destId">The identifier of the receiver</param>
- /// <param name="message">The message to send</param>
- public WritableNsMessage(IIdentifier sourceId, IIdentifier destId, T message)
- {
- SourceId = sourceId;
- DestId = destId;
- Data = new List<T> {message};
- }
-
- /// <summary>
- /// The identifier of the sender of the message.
- /// </summary>
- internal IIdentifier SourceId { get; private set; }
-
- /// <summary>
- /// The identifier of the receiver of the message.
- /// </summary>
- internal IIdentifier DestId { get; private set; }
-
- /// <summary>
- /// A list of data being sent in the message.
- /// </summary>
- public IList<T> Data { get; set; }
-
- /// <summary>
- /// Read the class fields.
- /// </summary>
- /// <param name="reader">The reader from which to read </param>
- public void Read(IDataReader reader)
- {
- SourceId = _factory.Create(reader.ReadString());
- DestId = _factory.Create(reader.ReadString());
- int messageCount = reader.ReadInt32();
- string dataType = reader.ReadString();
-
- Data = new List<T>();
-
- for (int index = 0; index < messageCount; index++)
- {
- var dataPoint = (T)_injection.ForkInjector().GetInstance(Type.GetType(dataType));
-
- if (null == dataPoint)
- {
- throw new Exception("T type instance cannot be created from the stream data in Network Service Message");
- }
-
- dataPoint.Read(reader);
- Data.Add(dataPoint);
- }
- }
-
- /// <summary>
- /// Writes the class fields.
- /// </summary>
- /// <param name="writer">The writer to which to write</param>
- public void Write(IDataWriter writer)
- {
- writer.WriteString(SourceId.ToString());
- writer.WriteString(DestId.ToString());
- writer.WriteInt32(Data.Count);
- writer.WriteString(Data[0].GetType().AssemblyQualifiedName);
-
- foreach (var data in Data)
- {
- data.Write(writer);
- }
- }
-
- /// <summary>
- /// Read the class fields.
- /// </summary>
- /// <param name="reader">The reader from which to read </param>
- /// <param name="token">The cancellation token</param>
- public async Task ReadAsync(IDataReader reader, CancellationToken token)
- {
- SourceId = _factory.Create(await reader.ReadStringAsync(token));
- DestId = _factory.Create(await reader.ReadStringAsync(token));
- int messageCount = await reader.ReadInt32Async(token);
- string dataType = await reader.ReadStringAsync(token);
-
- Data = new List<T>();
-
- for (int index = 0; index < messageCount; index++)
- {
- var dataPoint = (T) _injection.ForkInjector().GetInstance(Type.GetType(dataType));
-
- if (null == dataPoint)
- {
- throw new Exception("T type instance cannot be created from the stream data in Network Service Message");
- }
-
- await dataPoint.ReadAsync(reader, token);
- Data.Add(dataPoint);
- }
- }
-
- /// <summary>
- /// Writes the class fields.
- /// </summary>
- /// <param name="writer">The writer to which to write</param>
- /// <param name="token">The cancellation token</param>
- public async Task WriteAsync(IDataWriter writer, CancellationToken token)
- {
- await writer.WriteStringAsync(SourceId.ToString(), token);
- await writer.WriteStringAsync(DestId.ToString(), token);
- await writer.WriteInt32Async(Data.Count, token);
- await writer.WriteStringAsync(Data[0].GetType().AssemblyQualifiedName, token);
-
- foreach (var data in Data)
- {
- data.Write(writer);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 73b6d9d..a0e6038 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -61,6 +61,7 @@ under the License.
<Compile Include="Group\Driver\IGroupCommDriver.cs" />
<Compile Include="Group\Driver\Impl\CommunicationGroupDriver.cs" />
<Compile Include="Group\Driver\Impl\GroupCommunicationMessage.cs" />
+ <Compile Include="Group\Driver\Impl\GroupCommunicationMessageStreamingCodec.cs" />
<Compile Include="Group\Driver\Impl\MessageType.cs" />
<Compile Include="Group\Driver\Impl\GroupCommDriver.cs" />
<Compile Include="Group\Driver\Impl\TaskStarter.cs" />
@@ -139,6 +140,8 @@ under the License.
<Compile Include="NetworkService\Codec\ControlMessageCodec.cs" />
<Compile Include="NetworkService\Codec\NsMessageCodec.cs" />
<Compile Include="NetworkService\Codec\NsMessageProto.cs" />
+ <Compile Include="NetworkService\Codec\NsMessageStreamingCodec.cs" />
+ <Compile Include="NetworkService\Codec\StreamingCodecFunctionCache.cs" />
<Compile Include="NetworkService\ControlMessage.cs" />
<Compile Include="NetworkService\IConnection.cs" />
<Compile Include="NetworkService\INetworkService.cs" />
@@ -147,9 +150,7 @@ under the License.
<Compile Include="NetworkService\NetworkServiceOptions.cs" />
<Compile Include="NetworkService\NsConnection.cs" />
<Compile Include="NetworkService\NsMessage.cs" />
- <Compile Include="NetworkService\WritableNetworkService.cs" />
- <Compile Include="NetworkService\WritableNsConnection.cs" />
- <Compile Include="NetworkService\WritableNsMessage.cs" />
+ <Compile Include="NetworkService\StreamingNetworkService.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Utilities\BlockingCollectionExtensions.cs" />
<Compile Include="Utilities\Utils.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index babc26d..9f13c83 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -52,7 +52,6 @@ under the License.
<Compile Include="StreamingRemoteManagerTest.cs" />
<Compile Include="StreamingTransportTest.cs" />
<Compile Include="TransportTest.cs" />
- <Compile Include="WritableString.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
index 20f75be..a0be7ee 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
@@ -25,7 +25,8 @@ using Microsoft.VisualStudio.TestTools.UnitTesting;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
namespace Org.Apache.REEF.Wake.Tests
{
@@ -34,9 +35,6 @@ namespace Org.Apache.REEF.Wake.Tests
{
private readonly StreamingRemoteManagerFactory _remoteManagerFactory1 =
TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>();
-
- private readonly StreamingRemoteManagerFactory _remoteManagerFactory2 =
- TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>();
/// <summary>
/// Tests one way communication between Remote Managers
@@ -47,24 +45,25 @@ namespace Org.Apache.REEF.Wake.Tests
{
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
+ IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
{
- var observer = Observer.Create<WritableString>(queue.Add);
+ var observer = Observer.Create<string>(queue.Add);
IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
remoteManager2.RegisterObserver(endpoint1, observer);
var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver.OnNext(new WritableString("abc"));
- remoteObserver.OnNext(new WritableString("def"));
- remoteObserver.OnNext(new WritableString("ghi"));
+ remoteObserver.OnNext("abc");
+ remoteObserver.OnNext("def");
+ remoteObserver.OnNext("ghi");
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
}
Assert.AreEqual(3, events.Count);
@@ -78,42 +77,44 @@ namespace Org.Apache.REEF.Wake.Tests
{
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
- BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
- BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
+ BlockingCollection<string> queue1 = new BlockingCollection<string>();
+ BlockingCollection<string> queue2 = new BlockingCollection<string>();
List<string> events1 = new List<string>();
List<string> events2 = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
{
// Register observers for remote manager 1 and remote manager 2
var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
- var observer1 = Observer.Create<WritableString>(queue1.Add);
- var observer2 = Observer.Create<WritableString>(queue2.Add);
+ var observer1 = Observer.Create<string>(queue1.Add);
+ var observer2 = Observer.Create<string>(queue2.Add);
remoteManager1.RegisterObserver(remoteEndpoint, observer1);
remoteManager2.RegisterObserver(remoteEndpoint, observer2);
// Remote manager 1 sends 3 events to remote manager 2
var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver1.OnNext(new WritableString("abc"));
- remoteObserver1.OnNext(new WritableString("def"));
- remoteObserver1.OnNext(new WritableString("ghi"));
+ remoteObserver1.OnNext("abc");
+ remoteObserver1.OnNext("def");
+ remoteObserver1.OnNext("ghi");
// Remote manager 2 sends 4 events to remote manager 1
var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
- remoteObserver2.OnNext(new WritableString("jkl"));
- remoteObserver2.OnNext(new WritableString("mno"));
- remoteObserver2.OnNext(new WritableString("pqr"));
- remoteObserver2.OnNext(new WritableString("stu"));
-
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
-
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
+ remoteObserver2.OnNext("jkl");
+ remoteObserver2.OnNext("mno");
+ remoteObserver2.OnNext("pqr");
+ remoteObserver2.OnNext("stu");
+
+ events1.Add(queue1.Take());
+ events1.Add(queue1.Take());
+ events1.Add(queue1.Take());
+ events1.Add(queue1.Take());
+
+ events2.Add(queue2.Take());
+ events2.Add(queue2.Take());
+ events2.Add(queue2.Take());
}
Assert.AreEqual(4, events1.Count);
@@ -129,29 +130,30 @@ namespace Org.Apache.REEF.Wake.Tests
{
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
+ IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager3 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
{
var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
- var observer = Observer.Create<WritableString>(queue.Add);
+ var observer = Observer.Create<string>(queue.Add);
remoteManager3.RegisterObserver(remoteEndpoint, observer);
var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
- remoteObserver2.OnNext(new WritableString("abc"));
- remoteObserver1.OnNext(new WritableString("def"));
- remoteObserver2.OnNext(new WritableString("ghi"));
- remoteObserver1.OnNext(new WritableString("jkl"));
- remoteObserver2.OnNext(new WritableString("mno"));
+ remoteObserver2.OnNext("abc");
+ remoteObserver1.OnNext("def");
+ remoteObserver2.OnNext("ghi");
+ remoteObserver1.OnNext("jkl");
+ remoteObserver2.OnNext("mno");
for (int i = 0; i < 5; i++)
{
- events.Add(queue.Take().Data);
+ events.Add(queue.Take());
}
}
@@ -167,58 +169,60 @@ namespace Org.Apache.REEF.Wake.Tests
{
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
- BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
- BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
- BlockingCollection<WritableString> queue3 = new BlockingCollection<WritableString>();
+ BlockingCollection<string> queue1 = new BlockingCollection<string>();
+ BlockingCollection<string> queue2 = new BlockingCollection<string>();
+ BlockingCollection<string> queue3 = new BlockingCollection<string>();
List<string> events1 = new List<string>();
List<string> events2 = new List<string>();
List<string> events3 = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager3 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
{
var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
- var observer = Observer.Create<WritableString>(queue1.Add);
+ var observer = Observer.Create<string>(queue1.Add);
remoteManager1.RegisterObserver(remoteEndpoint, observer);
- var observer2 = Observer.Create<WritableString>(queue2.Add);
+ var observer2 = Observer.Create<string>(queue2.Add);
remoteManager2.RegisterObserver(remoteEndpoint, observer2);
- var observer3 = Observer.Create<WritableString>(queue3.Add);
+ var observer3 = Observer.Create<string>(queue3.Add);
remoteManager3.RegisterObserver(remoteEndpoint, observer3);
var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
// Observer 1 and 2 send messages to observer 3
- remoteObserver1.OnNext(new WritableString("abc"));
- remoteObserver1.OnNext(new WritableString("abc"));
- remoteObserver1.OnNext(new WritableString("abc"));
- remoteObserver2.OnNext(new WritableString("def"));
- remoteObserver2.OnNext(new WritableString("def"));
+ remoteObserver1.OnNext("abc");
+ remoteObserver1.OnNext("abc");
+ remoteObserver1.OnNext("abc");
+ remoteObserver2.OnNext("def");
+ remoteObserver2.OnNext("def");
// Observer 3 sends messages back to observers 1 and 2
var remoteObserver3A = remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint);
var remoteObserver3B = remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver3A.OnNext(new WritableString("ghi"));
- remoteObserver3A.OnNext(new WritableString("ghi"));
- remoteObserver3B.OnNext(new WritableString("jkl"));
- remoteObserver3B.OnNext(new WritableString("jkl"));
- remoteObserver3B.OnNext(new WritableString("jkl"));
+ remoteObserver3A.OnNext("ghi");
+ remoteObserver3A.OnNext("ghi");
+ remoteObserver3B.OnNext("jkl");
+ remoteObserver3B.OnNext("jkl");
+ remoteObserver3B.OnNext("jkl");
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
+ events1.Add(queue1.Take());
+ events1.Add(queue1.Take());
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
+ events2.Add(queue2.Take());
+ events2.Add(queue2.Take());
+ events2.Add(queue2.Take());
- events3.Add(queue3.Take().Data);
- events3.Add(queue3.Take().Data);
- events3.Add(queue3.Take().Data);
- events3.Add(queue3.Take().Data);
- events3.Add(queue3.Take().Data);
+ events3.Add(queue3.Take());
+ events3.Add(queue3.Take());
+ events3.Add(queue3.Take());
+ events3.Add(queue3.Take());
+ events3.Add(queue3.Take());
}
Assert.AreEqual(2, events1.Count);
@@ -234,34 +238,36 @@ namespace Org.Apache.REEF.Wake.Tests
{
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
{
// Register handler for when remote manager 2 receives events; respond
// with an ack
var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
- var receiverObserver = Observer.Create<WritableString>(
- message => remoteObserver2.OnNext(new WritableString("received message: " + message.Data)));
+ var receiverObserver = Observer.Create<string>(
+ message => remoteObserver2.OnNext("received message: " + message));
remoteManager2.RegisterObserver(remoteEndpoint, receiverObserver);
// Register handler for remote manager 1 to record the ack
- var senderObserver = Observer.Create<WritableString>(queue.Add);
+ var senderObserver = Observer.Create<string>(queue.Add);
remoteManager1.RegisterObserver(remoteEndpoint, senderObserver);
// Begin to send messages
var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver1.OnNext(new WritableString("hello"));
- remoteObserver1.OnNext(new WritableString("there"));
- remoteObserver1.OnNext(new WritableString("buddy"));
+ remoteObserver1.OnNext("hello");
+ remoteObserver1.OnNext("there");
+ remoteObserver1.OnNext("buddy");
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
}
Assert.AreEqual(3, events.Count);
@@ -278,25 +284,27 @@ namespace Org.Apache.REEF.Wake.Tests
{
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
{
- // RemoteManager2 listens and records events of type IRemoteEvent<WritableString>
- var observer = Observer.Create<IRemoteMessage<WritableString>>(message => queue.Add(message.Message));
+ // RemoteManager2 listens and records events of type IRemoteEvent<string>
+ var observer = Observer.Create<IRemoteMessage<string>>(message => queue.Add(message.Message));
remoteManager2.RegisterObserver(observer);
// Remote manager 1 sends 3 events to remote manager 2
var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver.OnNext(new WritableString("abc"));
- remoteObserver.OnNext(new WritableString("def"));
- remoteObserver.OnNext(new WritableString("ghi"));
+ remoteObserver.OnNext("abc");
+ remoteObserver.OnNext("def");
+ remoteObserver.OnNext("ghi");
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
}
Assert.AreEqual(3, events.Count);
@@ -310,28 +318,30 @@ namespace Org.Apache.REEF.Wake.Tests
{
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
{
- var observer = Observer.Create<WritableString>(queue.Add);
+ var observer = Observer.Create<string>(queue.Add);
IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
remoteManager2.RegisterObserver(endpoint1, observer);
var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver.OnNext(new WritableString("abc"));
- remoteObserver.OnNext(new WritableString("def"));
+ remoteObserver.OnNext("abc");
+ remoteObserver.OnNext("def");
var cachedObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- cachedObserver.OnNext(new WritableString("ghi"));
- cachedObserver.OnNext(new WritableString("jkl"));
+ cachedObserver.OnNext("ghi");
+ cachedObserver.OnNext("jkl");
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
}
Assert.AreEqual(4, events.Count);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
deleted file mode 100644
index 30ff487..0000000
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Wake.Tests
-{
- /// <summary>
- /// Writable wrapper around the string class
- /// </summary>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public class WritableString : IWritable
- {
- /// <summary>
- /// Returns the actual string data
- /// </summary>
- public string Data { get; set; }
-
- /// <summary>
- /// Empty constructor for instantiation with reflection
- /// </summary>
- [Inject]
- public WritableString()
- {
- }
-
- /// <summary>
- /// Constructor
- /// </summary>
- /// <param name="data">The string data</param>
- public WritableString(string data)
- {
- Data = data;
- }
-
- /// <summary>
- /// Reads the string
- /// </summary>
- /// <param name="reader">reader to read from</param>
- public void Read(IDataReader reader)
- {
- Data = reader.ReadString();
- }
-
- /// <summary>
- /// Writes the string
- /// </summary>
- /// <param name="writer">Writer to write</param>
- public void Write(IDataWriter writer)
- {
- writer.WriteString(Data);
- }
-
- /// <summary>
- /// Reads the string
- /// </summary>
- /// <param name="reader">reader to read from</param>
- /// <param name="token">the cancellation token</param>
- public async Task ReadAsync(IDataReader reader, CancellationToken token)
- {
- Data = await reader.ReadStringAsync(token);
- }
-
- /// <summary>
- /// Writes the string
- /// </summary>
- /// <param name="writer">Writer to write</param>
- /// <param name="token">the cancellation token</param>
- public async Task WriteAsync(IDataWriter writer, CancellationToken token)
- {
- await writer.WriteStringAsync(Data, token);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index 4069d15..5767f8a 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -49,7 +49,6 @@ under the License.
<Compile Include="IIdentifier.cs" />
<Compile Include="IIdentifierFactory.cs" />
<Compile Include="Remote\Impl\RemoteEventStreamingCodec.cs" />
- <Compile Include="Remote\Impl\TemporaryWritableToStreamingCodec.cs" />
<Compile Include="Remote\Impl\StreamingRemoteManagerFactory.cs" />
<Compile Include="Remote\Impl\DefaultRemoteManagerFactory.cs" />
<Compile Include="Impl\LoggingEventHandler.cs" />
@@ -78,7 +77,6 @@ under the License.
<Compile Include="Remote\Impl\StreamingTransportClient.cs" />
<Compile Include="Remote\Impl\StreamingTransportServer.cs" />
<Compile Include="Remote\IRemoteManagerFactory.cs" />
- <Compile Include="Remote\IWritable.cs" />
<Compile Include="Remote\Proto\WakeRemoteProtosGen.cs" />
<Compile Include="Remote\ICodec.cs" />
<Compile Include="Remote\ICodecFactory.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs
deleted file mode 100644
index 644cf82..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Org.Apache.REEF.Wake.Remote
-{
- /// <summary>
- /// Interface that classes should implement if they need to be readable to and writable
- /// from the stream. It is assumed that the classes inheriting this interface will have a
- /// default empty constructor
- /// </summary>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public interface IWritable
- {
- /// <summary>
- /// Read the class fields.
- /// </summary>
- /// <param name="reader">The reader from which to read </param>
- void Read(IDataReader reader);
-
- /// <summary>
- /// Writes the class fields.
- /// </summary>
- /// <param name="writer">The writer to which to write</param>
- void Write(IDataWriter writer);
-
- /// <summary>
- /// Read the class fields.
- /// </summary>
- /// <param name="reader">The reader from which to read </param>
- /// <param name="token">The cancellation token</param>
- Task ReadAsync(IDataReader reader, CancellationToken token);
-
- /// <summary>
- /// Writes the class fields.
- /// </summary>
- /// <param name="writer">The writer to which to write</param>
- /// <param name="token">The cancellation token</param>
- Task WriteAsync(IDataWriter writer, CancellationToken token);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
index 90e3aca..da549ea 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
@@ -20,6 +20,7 @@
using System.Net;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.StreamingCodec;
namespace Org.Apache.REEF.Wake.Remote.Impl
{
@@ -38,12 +39,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
_injector = injector;
}
- //ToDo: The port argument will be removed once changes are made in WritableNetworkService [REEF-447]
- public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable
+ public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, IStreamingCodec<T> codec)
{
#pragma warning disable 618
// This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
- var codec = _injector.GetInstance<TemporaryWritableToStreamingCodec<T>>();
return new StreamingRemoteManager<T>(localAddress, _tcpPortProvider, codec);
#pragma warning disable 618
}
[3/3] incubator-reef git commit: [REEF-447]Convert Network Service
Layer from Writable to Streaming
Posted by ju...@apache.org.
[REEF-447]Convert Network Service Layer from Writable to Streaming
This addressed the issue by
* Converting WritableNetworkSerbvice to StreamingNetworkService
* Intoducing NsMessageStreamingCodec which caches and use StreamingCodecs of various GroupCommunicationMessage types.
* Introducing GroupCommunicationMessage StreamingCodec
JIRA:
[REEF-447](https://issues.apache.org/jira/browse/REEF-447)
This Closes #289
Author: Dhruv <dh...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8505dee9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8505dee9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8505dee9
Branch: refs/heads/master
Commit: 8505dee942f924cbfd72d631bdaabcb20c125f02
Parents: 5cdd655
Author: Dhruv <dh...@gmail.com>
Authored: Thu Jul 9 14:52:20 2015 -0700
Committer: Julia Wang <jw...@yahoo.com>
Committed: Wed Jul 15 14:44:34 2015 -0700
----------------------------------------------------------------------
.../GroupCommunicationTests.cs | 22 +-
.../GroupCommunicationTreeTopologyTests.cs | 5 -
.../StreamingNetworkServiceTests.cs | 361 +++++++++++++++++++
.../WritableNetworkServiceTests.cs | 262 --------------
.../NetworkService/WritableString.cs | 94 -----
.../Org.Apache.REEF.Network.Tests.csproj | 3 +-
.../CodecToStreamingCodecConfiguration.cs | 8 +-
.../Group/Config/StreamingCodecConfiguration.cs | 8 +-
.../Driver/Impl/CommunicationGroupDriver.cs | 2 -
.../Impl/GeneralGroupCommunicationMessage.cs | 51 +--
.../Group/Driver/Impl/GroupCommDriver.cs | 5 +-
.../Driver/Impl/GroupCommunicationMessage.cs | 145 +-------
.../GroupCommunicationMessageStreamingCodec.cs | 223 ++++++++++++
.../Group/Operators/Impl/Sender.cs | 7 +-
.../Task/ICommunicationGroupNetworkObserver.cs | 3 +-
.../Group/Task/IGroupCommNetworkObserver.cs | 3 +-
.../Group/Task/Impl/CommunicationGroupClient.cs | 1 -
.../Impl/CommunicationGroupNetworkObserver.cs | 5 +-
.../Group/Task/Impl/GroupCommClient.cs | 4 +-
.../Group/Task/Impl/GroupCommNetworkObserver.cs | 7 +-
.../Group/Task/Impl/NodeStruct.cs | 11 +-
.../Group/Task/Impl/OperatorTopology.cs | 31 +-
.../Codec/NsMessageStreamingCodec.cs | 202 +++++++++++
.../Codec/StreamingCodecFunctionCache.cs | 203 +++++++++++
.../NetworkService/StreamingNetworkService.cs | 160 ++++++++
.../NetworkService/WritableNetworkService.cs | 159 --------
.../NetworkService/WritableNsConnection.cs | 138 -------
.../NetworkService/WritableNsMessage.cs | 185 ----------
.../Org.Apache.REEF.Network.csproj | 7 +-
.../Org.Apache.REEF.Wake.Tests.csproj | 1 -
.../StreamingRemoteManagerTest.cs | 232 ++++++------
.../WritableString.cs | 95 -----
.../Org.Apache.REEF.Wake.csproj | 2 -
.../cs/Org.Apache.REEF.Wake/Remote/IWritable.cs | 61 ----
.../Impl/StreamingRemoteManagerFactory.cs | 5 +-
.../Impl/TemporaryWritableToStreamingCodec.cs | 70 ----
36 files changed, 1350 insertions(+), 1431 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
index 61813af..62a6a3f 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs
@@ -72,17 +72,17 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
new BlockingCollection<GeneralGroupCommunicationMessage>();
var handler1 =
- Observer.Create<WritableNsMessage<GeneralGroupCommunicationMessage>>(msg => messages1.Add(msg.Data.First()));
+ Observer.Create<NsMessage<GeneralGroupCommunicationMessage>>(msg => messages1.Add(msg.Data.First()));
var handler2 =
- Observer.Create<WritableNsMessage<GeneralGroupCommunicationMessage>>(msg => messages2.Add(msg.Data.First()));
+ Observer.Create<NsMessage<GeneralGroupCommunicationMessage>>(msg => messages2.Add(msg.Data.First()));
var networkServiceInjector1 = BuildNetworkServiceInjector(endpoint, handler1);
var networkServiceInjector2 = BuildNetworkServiceInjector(endpoint, handler2);
var networkService1 = networkServiceInjector1.GetInstance<
- WritableNetworkService<GeneralGroupCommunicationMessage>>();
+ StreamingNetworkService<GeneralGroupCommunicationMessage>>();
var networkService2 = networkServiceInjector2.GetInstance<
- WritableNetworkService<GeneralGroupCommunicationMessage>>();
+ StreamingNetworkService<GeneralGroupCommunicationMessage>>();
networkService1.Register(new StringIdentifier("id1"));
networkService2.Register(new StringIdentifier("id2"));
@@ -822,7 +822,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
}
public static IInjector BuildNetworkServiceInjector(
- IPEndPoint nameServerEndpoint, IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>> handler)
+ IPEndPoint nameServerEndpoint, IObserver<NsMessage<GeneralGroupCommunicationMessage>> handler)
{
var config = TangFactory.GetTang().NewConfigurationBuilder()
.BindNamedParameter(typeof (NamingConfigurationOptions.NameServerAddress),
@@ -832,20 +832,24 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication
.BindNamedParameter(typeof (NetworkServiceOptions.NetworkServicePort),
(0).ToString(CultureInfo.InvariantCulture))
.BindImplementation(GenericType<INameClient>.Class, GenericType<NameClient>.Class)
- .BindImplementation(GenericType<IStreamingCodec<string>>.Class, GenericType<StringStreamingCodec>.Class)
.Build();
+ var codecConfig = StreamingCodecConfiguration<string>.Conf
+ .Set(StreamingCodecConfiguration<string>.Codec, GenericType<StringStreamingCodec>.Class)
+ .Build();
+
+ config = Configurations.Merge(config, codecConfig);
+
var injector = TangFactory.GetTang().NewInjector(config);
injector.BindVolatileInstance(
- GenericType<IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>>>.Class, handler);
+ GenericType<IObserver<NsMessage<GeneralGroupCommunicationMessage>>>.Class, handler);
return injector;
}
private GroupCommunicationMessage<string> CreateGcmStringType(string message, string from, string to)
{
- var stringCodec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
- return new GroupCommunicationMessage<string>("g1", "op1", from, to, message, MessageType.Data, stringCodec);
+ return new GroupCommunicationMessage<string>("g1", "op1", from, to, message);
}
private static void ScatterReceiveReduce(IScatterReceiver<int> receiver, IReduceSender<int> sumSender)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
index 6b9b8c7..c194bcb 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs
@@ -18,21 +18,16 @@
*/
using System.Collections.Generic;
-using System.Globalization;
using System.Linq;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Org.Apache.REEF.Network.Group.Config;
using Org.Apache.REEF.Network.Group.Driver;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
using Org.Apache.REEF.Network.Group.Operators;
using Org.Apache.REEF.Network.Group.Operators.Impl;
using Org.Apache.REEF.Network.Group.Pipelining.Impl;
using Org.Apache.REEF.Network.Group.Topology;
-using Org.Apache.REEF.Tang.Implementations.Configuration;
-using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.Remote.Impl;
using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
namespace Org.Apache.REEF.Network.Tests.GroupCommunication
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/StreamingNetworkServiceTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/StreamingNetworkServiceTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/StreamingNetworkServiceTests.cs
new file mode 100644
index 0000000..1e0378e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/StreamingNetworkServiceTests.cs
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Globalization;
+using System.IO;
+using System.Linq;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Network.Naming;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Network.NetworkService.Codec;
+using Org.Apache.REEF.Network.Tests.NamingService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Network.Tests.NetworkService
+{
+ /// <summary>
+ /// Tests for Streaming Network Service
+ /// </summary>
+ [TestClass]
+ public class StreamingNetworkServiceTests
+ {
+ /// <summary>
+ /// Tests one way communication between two network services
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingNetworkServiceOneWayCommunication()
+ {
+ int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000);
+ int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000);
+
+ BlockingCollection<string> queue;
+
+ using (var nameServer = NameServerTests.BuildNameServer())
+ {
+ IPEndPoint endpoint = nameServer.LocalEndpoint;
+ int nameServerPort = endpoint.Port;
+ string nameServerAddr = endpoint.Address.ToString();
+
+ var handlerConf1 =
+ TangFactory.GetTang()
+ .NewConfigurationBuilder()
+ .BindImplementation(GenericType<IObserver<NsMessage<string>>>.Class,
+ GenericType<NetworkMessageHandler>.Class)
+ .Build();
+
+ var handlerConf2 =
+ TangFactory.GetTang()
+ .NewConfigurationBuilder()
+ .BindImplementation(GenericType<IObserver<NsMessage<string>>>.Class,
+ GenericType<MessageHandler>.Class)
+ .Build();
+
+ var networkServiceInjection1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr,
+ handlerConf1);
+
+ var networkServiceInjection2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr,
+ handlerConf2);
+
+ using (INetworkService<string> networkService1 = networkServiceInjection1.GetInstance<StreamingNetworkService<string>>())
+ using (INetworkService<string> networkService2 = networkServiceInjection2.GetInstance<StreamingNetworkService<string>>())
+ {
+ queue = networkServiceInjection2.GetInstance<MessageHandler>().Queue;
+ IIdentifier id1 = new StringIdentifier("service1");
+ IIdentifier id2 = new StringIdentifier("service2");
+ networkService1.Register(id1);
+ networkService2.Register(id2);
+
+ using (IConnection<string> connection = networkService1.NewConnection(id2))
+ {
+ connection.Open();
+ connection.Write("abc");
+ connection.Write("def");
+ connection.Write("ghi");
+
+ Assert.AreEqual("abc", queue.Take());
+ Assert.AreEqual("def", queue.Take());
+ Assert.AreEqual("ghi", queue.Take());
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Tests two way communication between two network services
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingNetworkServiceTwoWayCommunication()
+ {
+ int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000);
+ int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000);
+
+ BlockingCollection<string> queue1;
+ BlockingCollection<string> queue2;
+
+ using (var nameServer = NameServerTests.BuildNameServer())
+ {
+ IPEndPoint endpoint = nameServer.LocalEndpoint;
+ int nameServerPort = endpoint.Port;
+ string nameServerAddr = endpoint.Address.ToString();
+
+ var handlerConf =
+ TangFactory.GetTang()
+ .NewConfigurationBuilder()
+ .BindImplementation(GenericType<IObserver<NsMessage<string>>>.Class,
+ GenericType<MessageHandler>.Class)
+ .Build();
+
+ var networkServiceInjection1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr,
+ handlerConf);
+
+ var networkServiceInjection2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr,
+ handlerConf);
+
+ using (INetworkService<string> networkService1 = networkServiceInjection1.GetInstance<StreamingNetworkService<string>>())
+ using (INetworkService<string> networkService2 = networkServiceInjection2.GetInstance<StreamingNetworkService<string>>())
+ {
+ queue1 = networkServiceInjection1.GetInstance<MessageHandler>().Queue;
+ queue2 = networkServiceInjection2.GetInstance<MessageHandler>().Queue;
+
+ IIdentifier id1 = new StringIdentifier("service1");
+ IIdentifier id2 = new StringIdentifier("service2");
+ networkService1.Register(id1);
+ networkService2.Register(id2);
+
+ using (IConnection<string> connection1 = networkService1.NewConnection(id2))
+ using (IConnection<string> connection2 = networkService2.NewConnection(id1))
+ {
+ connection1.Open();
+ connection1.Write("abc");
+ connection1.Write("def");
+ connection1.Write("ghi");
+
+ connection2.Open();
+ connection2.Write("jkl");
+ connection2.Write("nop");
+
+ Assert.AreEqual("abc", queue2.Take());
+ Assert.AreEqual("def", queue2.Take());
+ Assert.AreEqual("ghi", queue2.Take());
+
+ Assert.AreEqual("jkl", queue1.Take());
+ Assert.AreEqual("nop", queue1.Take());
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Tests StreamingCodecFunctionCache
+ /// </summary>
+ [TestMethod]
+ public void TestStreamingCodecFunctionCache()
+ {
+ IConfiguration conf = TangFactory.GetTang().NewConfigurationBuilder()
+ .BindImplementation(GenericType<IStreamingCodec<B>>.Class, GenericType<BStreamingCodec>.Class)
+ .Build();
+ IInjector injector = TangFactory.GetTang().NewInjector(conf);
+
+ StreamingCodecFunctionCache<A> cache = new StreamingCodecFunctionCache<A>(injector);
+
+ var readFunc = cache.ReadFunction(typeof(B));
+ var writeFunc = cache.WriteFunction(typeof (B));
+ var readAsyncFunc = cache.ReadAsyncFunction(typeof(B));
+ var writeAsyncFunc = cache.WriteAsyncFunction(typeof(B));
+
+ var stream = new MemoryStream();
+ IDataWriter writer = new StreamDataWriter(stream);
+ IDataReader reader = new StreamDataReader(stream);
+
+ B val = new B();
+ val.Value1 = "hello";
+ val.Value2 = "reef";
+
+ writeFunc(val, writer);
+
+ val.Value1 = "helloasync";
+ val.Value2 = "reefasync";
+ CancellationToken token = new CancellationToken();
+
+ var asyncResult = writeAsyncFunc.BeginInvoke(val, writer, token, null, null);
+ writeAsyncFunc.EndInvoke(asyncResult);
+
+ stream.Position = 0;
+ A res = readFunc(reader);
+ B resB1 = res as B;
+
+ asyncResult = readAsyncFunc.BeginInvoke(reader, token, null, null);
+ res = readAsyncFunc.EndInvoke(asyncResult);
+ B resB2 = res as B;
+
+ Assert.AreEqual("hello", resB1.Value1);
+ Assert.AreEqual("reef", resB1.Value2);
+ Assert.AreEqual("helloasync", resB2.Value1);
+ Assert.AreEqual("reefasync", resB2.Value2);
+ }
+
+ /// <summary>
+ /// Creates an instance of network service.
+ /// </summary>
+ /// <param name="networkServicePort">The port that the NetworkService will listen on</param>
+ /// <param name="nameServicePort">The port of the NameServer</param>
+ /// <param name="nameServiceAddr">The ip address of the NameServer</param>
+ /// <param name="handlerConf">The configuration of observer to handle incoming messages</param>
+ /// <returns></returns>
+ private IInjector BuildNetworkService(
+ int networkServicePort,
+ int nameServicePort,
+ string nameServiceAddr,
+ IConfiguration handlerConf)
+ {
+ var networkServiceConf = TangFactory.GetTang().NewConfigurationBuilder(handlerConf)
+ .BindNamedParameter<NetworkServiceOptions.NetworkServicePort, int>(
+ GenericType<NetworkServiceOptions.NetworkServicePort>.Class,
+ networkServicePort.ToString(CultureInfo.CurrentCulture))
+ .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>(
+ GenericType<NamingConfigurationOptions.NameServerPort>.Class,
+ nameServicePort.ToString(CultureInfo.CurrentCulture))
+ .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
+ GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
+ nameServiceAddr)
+ .BindImplementation(GenericType<INameClient>.Class, GenericType<NameClient>.Class)
+ .BindImplementation(GenericType<IStreamingCodec<string>>.Class, GenericType<StringStreamingCodec>.Class)
+ .Build();
+
+ return TangFactory.GetTang().NewInjector(networkServiceConf);
+ }
+
+ public class A
+ {
+ public string Value1;
+ }
+
+ public class B : A
+ {
+ public string Value2;
+ }
+
+ public class BStreamingCodec : IStreamingCodec<B>
+ {
+ [Inject]
+ public BStreamingCodec()
+ {
+ }
+
+ public B Read(IDataReader reader)
+ {
+ B val = new B();
+ val.Value1 = reader.ReadString();
+ val.Value2 = reader.ReadString();
+ return val;
+ }
+
+ public void Write(B obj, IDataWriter writer)
+ {
+ writer.WriteString(obj.Value1);
+ writer.WriteString(obj.Value2);
+ }
+
+ public async Task<B> ReadAsync(IDataReader reader, CancellationToken token)
+ {
+ B val = new B();
+ val.Value1 = await reader.ReadStringAsync(token);
+ val.Value2 = await reader.ReadStringAsync(token);
+ return val;
+ }
+
+ public async Task WriteAsync(B obj, IDataWriter writer, CancellationToken token)
+ {
+ await writer.WriteStringAsync(obj.Value1, token);
+ await writer.WriteStringAsync(obj.Value2, token);
+ }
+ }
+ /// <summary>
+ /// The observer to handle incoming messages for string
+ /// </summary>
+ private class MessageHandler : IObserver<NsMessage<string>>
+ {
+ private readonly BlockingCollection<string> _queue;
+
+ public BlockingCollection<string> Queue
+ {
+ get { return _queue; }
+ }
+
+ [Inject]
+ private MessageHandler()
+ {
+ _queue = new BlockingCollection<string>();
+ }
+
+ public void OnNext(NsMessage<string> value)
+ {
+ _queue.Add(value.Data.First());
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ /// <summary>
+ /// The network handler to handle incoming Streaming NsMessages
+ /// </summary>
+ private class NetworkMessageHandler : IObserver<NsMessage<string>>
+ {
+ [Inject]
+ public NetworkMessageHandler()
+ {
+ }
+
+ public void OnNext(NsMessage<string> value)
+ {
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+
+ public void OnCompleted()
+ {
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs
deleted file mode 100644
index 07464ff..0000000
--- a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableNetworkServiceTests.cs
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Concurrent;
-using System.Globalization;
-using System.Linq;
-using System.Net;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Network.Naming;
-using Org.Apache.REEF.Network.NetworkService;
-using Org.Apache.REEF.Network.Tests.NamingService;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Network.Tests.NetworkService
-{
- /// <summary>
- /// Tests for Writable Network Service
- /// </summary>
- [TestClass]
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public class WritableNetworkServiceTests
- {
- /// <summary>
- /// Tests one way communication between two network services
- /// </summary>
- [TestMethod]
- public void TestWritableNetworkServiceOneWayCommunication()
- {
- int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000);
- int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000);
-
- BlockingCollection<WritableString> queue;
-
- using (var nameServer = NameServerTests.BuildNameServer())
- {
- IPEndPoint endpoint = nameServer.LocalEndpoint;
- int nameServerPort = endpoint.Port;
- string nameServerAddr = endpoint.Address.ToString();
-
- var handlerConf1 =
- TangFactory.GetTang()
- .NewConfigurationBuilder()
- .BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class,
- GenericType<NetworkMessageHandler>.Class)
- .Build();
-
- var handlerConf2 =
- TangFactory.GetTang()
- .NewConfigurationBuilder()
- .BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class,
- GenericType<MessageHandler>.Class)
- .Build();
-
- var networkServiceInjection1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr,
- handlerConf1);
-
- var networkServiceInjection2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr,
- handlerConf2);
-
- using (INetworkService<WritableString> networkService1 = networkServiceInjection1.GetInstance<WritableNetworkService<WritableString>>())
- using (INetworkService<WritableString> networkService2 = networkServiceInjection2.GetInstance<WritableNetworkService<WritableString>>())
- {
- queue = networkServiceInjection2.GetInstance<MessageHandler>().Queue;
- IIdentifier id1 = new StringIdentifier("service1");
- IIdentifier id2 = new StringIdentifier("service2");
- networkService1.Register(id1);
- networkService2.Register(id2);
-
- using (IConnection<WritableString> connection = networkService1.NewConnection(id2))
- {
- connection.Open();
- connection.Write(new WritableString("abc"));
- connection.Write(new WritableString("def"));
- connection.Write(new WritableString("ghi"));
-
- Assert.AreEqual("abc", queue.Take().Data);
- Assert.AreEqual("def", queue.Take().Data);
- Assert.AreEqual("ghi", queue.Take().Data);
- }
- }
- }
- }
-
- /// <summary>
- /// Tests two way communication between two network services
- /// </summary>
- [TestMethod]
- public void TestWritableNetworkServiceTwoWayCommunication()
- {
- int networkServicePort1 = NetworkUtils.GenerateRandomPort(6000, 7000);
- int networkServicePort2 = NetworkUtils.GenerateRandomPort(7001, 8000);
-
- BlockingCollection<WritableString> queue1;
- BlockingCollection<WritableString> queue2;
-
- using (var nameServer = NameServerTests.BuildNameServer())
- {
- IPEndPoint endpoint = nameServer.LocalEndpoint;
- int nameServerPort = endpoint.Port;
- string nameServerAddr = endpoint.Address.ToString();
-
- var handlerConf =
- TangFactory.GetTang()
- .NewConfigurationBuilder()
- .BindImplementation(GenericType<IObserver<WritableNsMessage<WritableString>>>.Class,
- GenericType<MessageHandler>.Class)
- .Build();
-
- var networkServiceInjection1 = BuildNetworkService(networkServicePort1, nameServerPort, nameServerAddr,
- handlerConf);
-
- var networkServiceInjection2 = BuildNetworkService(networkServicePort2, nameServerPort, nameServerAddr,
- handlerConf);
-
- using (INetworkService<WritableString> networkService1 = networkServiceInjection1.GetInstance<WritableNetworkService<WritableString>>())
- using (INetworkService<WritableString> networkService2 = networkServiceInjection2.GetInstance<WritableNetworkService<WritableString>>())
- {
- queue1 = networkServiceInjection1.GetInstance<MessageHandler>().Queue;
- queue2 = networkServiceInjection2.GetInstance<MessageHandler>().Queue;
-
- IIdentifier id1 = new StringIdentifier("service1");
- IIdentifier id2 = new StringIdentifier("service2");
- networkService1.Register(id1);
- networkService2.Register(id2);
-
- using (IConnection<WritableString> connection1 = networkService1.NewConnection(id2))
- using (IConnection<WritableString> connection2 = networkService2.NewConnection(id1))
- {
- connection1.Open();
- connection1.Write(new WritableString("abc"));
- connection1.Write(new WritableString("def"));
- connection1.Write(new WritableString("ghi"));
-
- connection2.Open();
- connection2.Write(new WritableString("jkl"));
- connection2.Write(new WritableString("nop"));
-
- Assert.AreEqual("abc", queue2.Take().Data);
- Assert.AreEqual("def", queue2.Take().Data);
- Assert.AreEqual("ghi", queue2.Take().Data);
-
- Assert.AreEqual("jkl", queue1.Take().Data);
- Assert.AreEqual("nop", queue1.Take().Data);
- }
- }
- }
- }
-
- /// <summary>
- /// Creates an instance of network service.
- /// </summary>
- /// <param name="networkServicePort">The port that the NetworkService will listen on</param>
- /// <param name="nameServicePort">The port of the NameServer</param>
- /// <param name="nameServiceAddr">The ip address of the NameServer</param>
- /// <param name="factory">Identifier factory for WritableString</param>
- /// <param name="handler">The observer to handle incoming messages</param>
- /// <returns></returns>
- private IInjector BuildNetworkService(
- int networkServicePort,
- int nameServicePort,
- string nameServiceAddr,
- IConfiguration handlerConf)
- {
- var networkServiceConf = TangFactory.GetTang().NewConfigurationBuilder(handlerConf)
- .BindNamedParameter<NetworkServiceOptions.NetworkServicePort, int>(
- GenericType<NetworkServiceOptions.NetworkServicePort>.Class,
- networkServicePort.ToString(CultureInfo.CurrentCulture))
- .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>(
- GenericType<NamingConfigurationOptions.NameServerPort>.Class,
- nameServicePort.ToString(CultureInfo.CurrentCulture))
- .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
- GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
- nameServiceAddr)
- .BindImplementation(GenericType<INameClient>.Class, GenericType<NameClient>.Class)
- .Build();
-
- return TangFactory.GetTang().NewInjector(networkServiceConf);
- }
-
- /// <summary>
- /// The observer to handle incoming messages for WritableString
- /// </summary>
- private class MessageHandler : IObserver<WritableNsMessage<WritableString>>
- {
- private readonly BlockingCollection<WritableString> _queue;
-
- public BlockingCollection<WritableString> Queue
- {
- get { return _queue; }
- }
-
- [Inject]
- private MessageHandler()
- {
- _queue = new BlockingCollection<WritableString>();
- }
-
- public void OnNext(WritableNsMessage<WritableString> value)
- {
- _queue.Add(value.Data.First());
- }
-
- public void OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- public void OnCompleted()
- {
- throw new NotImplementedException();
- }
- }
-
- /// <summary>
- /// The network handler to handle incoming Writable NsMessages
- /// </summary>
- private class NetworkMessageHandler : IObserver<WritableNsMessage<WritableString>>
- {
- [Inject]
- public NetworkMessageHandler()
- {
- }
-
- public void OnNext(WritableNsMessage<WritableString> value)
- {
- }
-
- public void OnError(Exception error)
- {
- }
-
- public void OnCompleted()
- {
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs
deleted file mode 100644
index 400aa52..0000000
--- a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/WritableString.cs
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Tang.Annotations;
-
-namespace Org.Apache.REEF.Network.Tests.NetworkService
-{
- /// <summary>
- /// Writable wrapper around the string class
- /// </summary>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public class WritableString : IWritable
- {
- /// <summary>
- /// Returns the actual string data
- /// </summary>
- public string Data { get; set; }
-
- /// <summary>
- /// Empty constructor for instantiation with reflection
- /// </summary>
- [Inject]
- public WritableString()
- {
- }
-
- /// <summary>
- /// Constructor
- /// </summary>
- /// <param name="data">The string data</param>
- public WritableString(string data)
- {
- Data = data;
- }
-
- /// <summary>
- /// Reads the string
- /// </summary>
- /// <param name="reader">reader to read from</param>
- public void Read(IDataReader reader)
- {
- Data = reader.ReadString();
- }
-
- /// <summary>
- /// Writes the string
- /// </summary>
- /// <param name="writer">Writer to write</param>
- public void Write(IDataWriter writer)
- {
- writer.WriteString(Data);
- }
-
- /// <summary>
- /// Reads the string
- /// </summary>
- /// <param name="reader">reader to read from</param>
- /// <param name="token">the cancellation token</param>
- public async Task ReadAsync(IDataReader reader, CancellationToken token)
- {
- Data = await reader.ReadStringAsync(token);
- }
-
- /// <summary>
- /// Writes the string
- /// </summary>
- /// <param name="writer">Writer to write</param>
- /// <param name="token">the cancellation token</param>
- public async Task WriteAsync(IDataWriter writer, CancellationToken token)
- {
- await writer.WriteStringAsync(Data, token);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
index e0568aa..3355ac7 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
@@ -55,9 +55,8 @@ under the License.
<Compile Include="GroupCommunication\GroupCommunicationTreeTopologyTests.cs" />
<Compile Include="GroupCommunication\StreamingCodecTests.cs" />
<Compile Include="NamingService\NameServerTests.cs" />
- <Compile Include="NetworkService\WritableNetworkServiceTests.cs" />
<Compile Include="NetworkService\NetworkServiceTests.cs" />
- <Compile Include="NetworkService\WritableString.cs" />
+ <Compile Include="NetworkService\StreamingNetworkServiceTests.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
index f8e1483..da156ca 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs
@@ -17,6 +17,7 @@
* under the License.
*/
+using Org.Apache.REEF.Network.Group.Driver.Impl;
using Org.Apache.REEF.Network.Group.Pipelining;
using Org.Apache.REEF.Tang.Formats;
using Org.Apache.REEF.Tang.Util;
@@ -38,7 +39,12 @@ namespace Org.Apache.REEF.Network.Group.Config
public static ConfigurationModule Conf = new CodecToStreamingCodecConfiguration<T>()
.BindImplementation(GenericType<ICodec<T>>.Class, Codec)
.BindImplementation(GenericType<IStreamingCodec<T>>.Class, GenericType<CodecToStreamingCodec<T>>.Class)
- .BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class, GenericType<StreamingPipelineMessageCodec<T>>.Class)
+ .BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class,
+ GenericType<StreamingPipelineMessageCodec<T>>.Class)
+ .BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<T>>>.Class,
+ GenericType<GroupCommunicationMessageStreamingCodec<T>>.Class)
+ .BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<PipelineMessage<T>>>>.Class,
+ GenericType<GroupCommunicationMessageStreamingCodec<PipelineMessage<T>>>.Class)
.Build();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
index 2a30047..20c6ade 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfiguration.cs
@@ -17,6 +17,7 @@
* under the License.
*/
+using Org.Apache.REEF.Network.Group.Driver.Impl;
using Org.Apache.REEF.Network.Group.Pipelining;
using Org.Apache.REEF.Tang.Formats;
using Org.Apache.REEF.Tang.Util;
@@ -41,7 +42,12 @@ namespace Org.Apache.REEF.Network.Group.Config
/// </summary>
public static ConfigurationModule Conf = new StreamingCodecConfiguration<T>()
.BindImplementation(GenericType<IStreamingCodec<T>>.Class, Codec)
- .BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class, GenericType<StreamingPipelineMessageCodec<T>>.Class)
+ .BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class,
+ GenericType<StreamingPipelineMessageCodec<T>>.Class)
+ .BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<T>>>.Class,
+ GenericType<GroupCommunicationMessageStreamingCodec<T>>.Class)
+ .BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<PipelineMessage<T>>>>.Class,
+ GenericType<GroupCommunicationMessageStreamingCodec<PipelineMessage<T>>>.Class)
.Build();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
index 5ebb357..001c110 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
@@ -17,7 +17,6 @@
* under the License.
*/
-using System;
using System.Collections.Generic;
using System.Reflection;
using Org.Apache.REEF.Network.Group.Config;
@@ -39,7 +38,6 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
/// All operators in the same Communication Group run on the the
/// same set of tasks.
/// </summary>
- // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.]
public sealed class CommunicationGroupDriver : ICommunicationGroupDriver
{
private static readonly Logger LOGGER = Logger.GetLogger(typeof (CommunicationGroupDriver));
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
index e807a4e..9aa49a4 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs
@@ -25,11 +25,10 @@ using Org.Apache.REEF.Tang.Annotations;
namespace Org.Apache.REEF.Network.Group.Driver.Impl
{
/// <summary>
- /// Messages sent by MPI Operators. This is the abstract class inherited by
- /// WritableGroupCommunicationMessage but seen by Network Service
+ /// Messages sent by MPI Operators. This is the class inherited by
+ /// GroupCommunicationMessage but seen by Network Service
/// </summary>
- // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
- public abstract class GeneralGroupCommunicationMessage : IWritable
+ public class GeneralGroupCommunicationMessage
{
/// <summary>
/// Empty constructor to allow instantiation by reflection
@@ -45,70 +44,36 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
/// <param name="operatorName">The name of the MPI operator</param>
/// <param name="source">The message source</param>
/// <param name="destination">The message destination</param>
- /// <param name="messageType">The type of message to send</param>
protected GeneralGroupCommunicationMessage(
string groupName,
string operatorName,
string source,
- string destination,
- MessageType messageType)
+ string destination)
{
GroupName = groupName;
OperatorName = operatorName;
Source = source;
Destination = destination;
- MsgType = messageType;
}
/// <summary>
/// Returns the Communication Group name.
/// </summary>
- public string GroupName { get; internal set; }
+ internal string GroupName { get; set; }
/// <summary>
/// Returns the MPI Operator name.
/// </summary>
- public string OperatorName { get; internal set; }
+ internal string OperatorName { get; set; }
/// <summary>
/// Returns the source of the message.
/// </summary>
- public string Source { get; internal set; }
+ internal string Source { get; set; }
/// <summary>
/// Returns the destination of the message.
/// </summary>
- public string Destination { get; internal set; }
-
- /// <summary>
- /// Returns the type of message being sent.
- /// </summary>
- public MessageType MsgType { get; internal set; }
-
- /// <summary>
- /// Read the class fields.
- /// </summary>
- /// <param name="reader">The reader from which to read </param>
- public abstract void Read(IDataReader reader);
-
- /// <summary>
- /// Writes the class fields.
- /// </summary>
- /// <param name="writer">The writer to which to write</param>
- public abstract void Write(IDataWriter writer);
-
- /// <summary>
- /// Read the class fields.
- /// </summary>
- /// <param name="reader">The reader from which to read </param>
- /// <param name="token">The cancellation token</param>
- public abstract System.Threading.Tasks.Task ReadAsync(IDataReader reader, CancellationToken token);
-
- /// <summary>
- /// Writes the class fields.
- /// </summary>
- /// <param name="writer">The writer to which to write</param>
- /// <param name="token">The cancellation token</param>
- public abstract System.Threading.Tasks.Task WriteAsync(IDataWriter writer, CancellationToken token);
+ internal string Destination { get; set; }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
index ade5834..5bf0848 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
@@ -42,7 +42,6 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
/// Used to create Communication Groups for Group Communication Operators on the Reef driver.
/// Also manages configuration for Group Communication tasks/services.
/// </summary>
- // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
public sealed class GroupCommDriver : IGroupCommDriver
{
private const string MasterTaskContextName = "MasterTaskContext";
@@ -158,12 +157,12 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
public IConfiguration GetServiceConfiguration()
{
IConfiguration serviceConfig = ServiceConfiguration.ConfigurationModule
- .Set(ServiceConfiguration.Services, GenericType<WritableNetworkService<GeneralGroupCommunicationMessage>>.Class)
+ .Set(ServiceConfiguration.Services, GenericType<StreamingNetworkService<GeneralGroupCommunicationMessage>>.Class)
.Build();
return TangFactory.GetTang().NewConfigurationBuilder(serviceConfig)
.BindImplementation(
- GenericType<IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>>>.Class,
+ GenericType<IObserver<NsMessage<GeneralGroupCommunicationMessage>>>.Class,
GenericType<GroupCommNetworkObserver>.Class)
.BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>(
GenericType<NamingConfigurationOptions.NameServerAddress>.Class,
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
index ed7855b..c53cdb0 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs
@@ -17,30 +17,21 @@
* under the License.
*/
-using System;
-using System.Threading;
-using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.StreamingCodec;
namespace Org.Apache.REEF.Network.Group.Driver.Impl
{
/// <summary>
- /// Messages sent by MPI Operators. This is the Writable version of GroupCommunicationMessage
- /// class and will eventually replace it once everybody agrees with the design
+ /// Messages sent by MPI Operators.
/// </summary>
- // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
- public sealed class GroupCommunicationMessage<T> : GeneralGroupCommunicationMessage
+ internal sealed class GroupCommunicationMessage<T> : GeneralGroupCommunicationMessage
{
- private readonly IStreamingCodec<T> _codec;
-
/// <summary>
/// Empty constructor to allow instantiation by reflection
/// </summary>
[Inject]
- private GroupCommunicationMessage(IStreamingCodec<T> codec)
+ private GroupCommunicationMessage()
{
- _codec = codec;
}
/// <summary>
@@ -51,20 +42,15 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
/// <param name="source">The message source</param>
/// <param name="destination">The message destination</param>
/// <param name="message">The actual Writable message</param>
- /// <param name="messageType">The type of message to send</param>
- /// <param name="codec">Streaming Codec</param>
- public GroupCommunicationMessage(
+ internal GroupCommunicationMessage(
string groupName,
string operatorName,
string source,
string destination,
- T message,
- MessageType messageType,
- IStreamingCodec<T> codec)
- : base(groupName, operatorName, source, destination, messageType)
+ T message)
+ : base(groupName, operatorName, source, destination)
{
- _codec = codec;
- Data = new T[] { message };
+ Data = new[] { message };
}
/// <summary>
@@ -75,133 +61,24 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
/// <param name="source">The message source</param>
/// <param name="destination">The message destination</param>
/// <param name="message">The actual Writable message array</param>
- /// <param name="messageType">The type of message to send</param>
- /// <param name="codec">Streaming Codec</param>
- public GroupCommunicationMessage(
+ internal GroupCommunicationMessage(
string groupName,
string operatorName,
string source,
string destination,
- T[] message,
- MessageType messageType,
- IStreamingCodec<T> codec)
- : base(groupName, operatorName, source, destination, messageType)
+ T[] message)
+ : base(groupName, operatorName, source, destination)
{
- _codec = codec;
Data = message;
}
/// <summary>
/// Returns the array of messages.
/// </summary>
- public T[] Data
+ internal T[] Data
{
get;
set;
}
-
- /// <summary>
- /// Read the class fields.
- /// </summary>
- /// <param name="reader">The reader from which to read </param>
- public override void Read(IDataReader reader)
- {
- GroupName = reader.ReadString();
- OperatorName = reader.ReadString();
- Source = reader.ReadString();
- Destination = reader.ReadString();
-
- int dataCount = reader.ReadInt32();
-
- if (dataCount == 0)
- {
- throw new Exception("Data Count in Group COmmunication Message cannot be zero");
- }
-
- MsgType = (MessageType)Enum.Parse(typeof(MessageType), reader.ReadString());
- Data = new T[dataCount];
-
- for (int index = 0; index < dataCount; index++)
- {
- Data[index] = _codec.Read(reader);
-
- if (Data[index] == null)
- {
- throw new Exception("message instance cannot be created from the IDataReader in Group Communication Message");
- }
- }
- }
-
- /// <summary>
- /// Writes the class fields.
- /// </summary>
- /// <param name="writer">The writer to which to write</param>
- public override void Write(IDataWriter writer)
- {
- writer.WriteString(GroupName);
- writer.WriteString(OperatorName);
- writer.WriteString(Source);
- writer.WriteString(Destination);
- writer.WriteInt32(Data.Length);
- writer.WriteString(MsgType.ToString());
-
- foreach (var data in Data)
- {
- _codec.Write(data, writer);
- }
- }
-
- /// <summary>
- /// Read the class fields.
- /// </summary>
- /// <param name="reader">The reader from which to read </param>
- /// <param name="token">The cancellation token</param>
- public override async System.Threading.Tasks.Task ReadAsync(IDataReader reader, CancellationToken token)
- {
- GroupName = await reader.ReadStringAsync(token);
- OperatorName = await reader.ReadStringAsync(token);
- Source = await reader.ReadStringAsync(token);
- Destination = await reader.ReadStringAsync(token);
-
- int dataCount = await reader.ReadInt32Async(token);
-
- if (dataCount == 0)
- {
- throw new Exception("Data Count in Group COmmunication Message cannot be zero");
- }
-
- MsgType = (MessageType)Enum.Parse(typeof(MessageType), await reader.ReadStringAsync(token));
- Data = new T[dataCount];
-
- for (int index = 0; index < dataCount; index++)
- {
- Data[index] = await _codec.ReadAsync(reader, token);
-
- if (Data[index] == null)
- {
- throw new Exception("message instance cannot be created from the IDataReader in Group Communication Message");
- }
- }
- }
-
- /// <summary>
- /// Writes the class fields.
- /// </summary>
- /// <param name="writer">The writer to which to write</param>
- /// <param name="token">The cancellation token</param>
- public override async System.Threading.Tasks.Task WriteAsync(IDataWriter writer, CancellationToken token)
- {
- await writer.WriteStringAsync(GroupName, token);
- await writer.WriteStringAsync(OperatorName, token);
- await writer.WriteStringAsync(Source, token);
- await writer.WriteStringAsync(Destination, token);
- await writer.WriteInt32Async(Data.Length, token);
- await writer.WriteStringAsync(MsgType.ToString(), token);
-
- foreach (var data in Data)
- {
- await _codec.WriteAsync(data, writer, token);
- }
- }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessageStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessageStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessageStreamingCodec.cs
new file mode 100644
index 0000000..d619e64
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessageStreamingCodec.cs
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Network.Group.Driver.Impl
+{
+ /// <summary>
+ /// Streaming Codec for the Group Communication Message
+ /// </summary>
+ internal sealed class GroupCommunicationMessageStreamingCodec<T> : IStreamingCodec<GroupCommunicationMessage<T>>
+ {
+ private readonly IStreamingCodec<T> _codec;
+
+ /// <summary>
+ /// Empty constructor to allow instantiation by reflection
+ /// </summary>
+ [Inject]
+ private GroupCommunicationMessageStreamingCodec(IStreamingCodec<T> codec)
+ {
+ _codec = codec;
+ }
+
+ /// <summary>
+ /// Read the class fields.
+ /// </summary>
+ /// <param name="reader">The reader from which to read </param>
+ /// <returns>The Group Communication Message</returns>
+ public GroupCommunicationMessage<T> Read(IDataReader reader)
+ {
+ int metadataSize = reader.ReadInt32();
+ byte[] metadata = new byte[metadataSize];
+ reader.Read(ref metadata, 0, metadataSize);
+ var res = GenerateMetaDataDecoding(metadata);
+
+ string groupName = res.Item1;
+ string operatorName = res.Item2;
+ string source = res.Item3;
+ string destination = res.Item4;
+ int dataCount = res.Item5;
+
+ if (dataCount == 0)
+ {
+ throw new Exception("Data Count in Group Communication Message cannot be zero");
+ }
+
+ var data = new T[dataCount];
+
+ for (int index = 0; index < dataCount; index++)
+ {
+ data[index] = _codec.Read(reader);
+
+ if (data[index] == null)
+ {
+ throw new Exception("message instance cannot be created from the IDataReader in Group Communication Message");
+ }
+ }
+
+ return new GroupCommunicationMessage<T>(groupName, operatorName, source, destination, data);
+ }
+
+ /// <summary>
+ /// Writes the class fields.
+ /// </summary>
+ /// <param name="obj">The message to write</param>
+ /// <param name="writer">The writer to which to write</param>
+ public void Write(GroupCommunicationMessage<T> obj, IDataWriter writer)
+ {
+ byte[] encodedMetadata = GenerateMetaDataEncoding(obj);
+ byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length);
+ byte[] totalEncoding = encodedInt.Concat(encodedMetadata).ToArray();
+ writer.Write(totalEncoding, 0, totalEncoding.Length);
+
+ foreach (var data in obj.Data)
+ {
+ _codec.Write(data, writer);
+ }
+ }
+
+ /// <summary>
+ /// Read the class fields.
+ /// </summary>
+ /// <param name="reader">The reader from which to read </param>
+ /// <param name="token">The cancellation token</param>
+ /// <returns>The Group Communication Message</returns>
+ public async Task<GroupCommunicationMessage<T>> ReadAsync(IDataReader reader,
+ CancellationToken token)
+ {
+ int metadataSize = await reader.ReadInt32Async(token);
+ byte[] metadata = new byte[metadataSize];
+ await reader.ReadAsync(metadata, 0, metadataSize, token);
+ var res = GenerateMetaDataDecoding(metadata);
+
+ string groupName = res.Item1;
+ string operatorName = res.Item2;
+ string source = res.Item3;
+ string destination = res.Item4;
+ int dataCount = res.Item5;
+
+ if (dataCount == 0)
+ {
+ throw new Exception("Data Count in Group Communication Message cannot be zero");
+ }
+
+ var data = new T[dataCount];
+
+ for (int index = 0; index < dataCount; index++)
+ {
+ data[index] = await _codec.ReadAsync(reader, token);
+
+ if (data[index] == null)
+ {
+ throw new Exception(
+ "message instance cannot be created from the IDataReader in Group Communication Message");
+ }
+ }
+
+ return new GroupCommunicationMessage<T>(groupName, operatorName, source, destination, data);
+ }
+
+ /// <summary>
+ /// Writes the class fields.
+ /// </summary>
+ /// <param name="obj">The message to write</param>
+ /// <param name="writer">The writer to which to write</param>
+ /// <param name="token">The cancellation token</param>
+ public async System.Threading.Tasks.Task WriteAsync(GroupCommunicationMessage<T> obj, IDataWriter writer, CancellationToken token)
+ {
+ byte[] encodedMetadata = GenerateMetaDataEncoding(obj);
+ byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length);
+ byte[] totalEncoding = encodedInt.Concat(encodedMetadata).ToArray();
+ await writer.WriteAsync(totalEncoding, 0, totalEncoding.Length, token);
+
+ foreach (var data in obj.Data)
+ {
+ await _codec.WriteAsync(data, writer, token);
+ }
+ }
+
+ private static byte[] GenerateMetaDataEncoding(GroupCommunicationMessage<T> obj)
+ {
+ List<byte[]> metadataBytes = new List<byte[]>();
+
+ byte[] groupBytes = StringToBytes(obj.GroupName);
+ byte[] operatorBytes = StringToBytes(obj.OperatorName);
+ byte[] sourceBytes = StringToBytes(obj.Source);
+ byte[] dstBytes = StringToBytes(obj.Destination);
+ byte[] messageCount = BitConverter.GetBytes(obj.Data.Length);
+
+ metadataBytes.Add(BitConverter.GetBytes(groupBytes.Length));
+ metadataBytes.Add(BitConverter.GetBytes(operatorBytes.Length));
+ metadataBytes.Add(BitConverter.GetBytes(sourceBytes.Length));
+ metadataBytes.Add(BitConverter.GetBytes(dstBytes.Length));
+ metadataBytes.Add(groupBytes);
+ metadataBytes.Add(operatorBytes);
+ metadataBytes.Add(sourceBytes);
+ metadataBytes.Add(dstBytes);
+ metadataBytes.Add(messageCount);
+
+ return metadataBytes.SelectMany(i => i).ToArray();
+ }
+
+ private static Tuple<string, string, string, string, int> GenerateMetaDataDecoding(byte[] obj)
+ {
+ int groupCount = BitConverter.ToInt32(obj, 0);
+ int operatorCount = BitConverter.ToInt32(obj, sizeof (int));
+ int srcCount = BitConverter.ToInt32(obj, 2*sizeof (int));
+ int dstCount = BitConverter.ToInt32(obj, 3*sizeof (int));
+
+ int offset = 4 * sizeof(int);
+
+ string groupString = BytesToString(obj.Skip(offset).Take(groupCount).ToArray());
+ offset += groupCount;
+ string operatorString = BytesToString(obj.Skip(offset).Take(operatorCount).ToArray());
+ offset += operatorCount;
+ string srcString = BytesToString(obj.Skip(offset).Take(srcCount).ToArray());
+ offset += srcCount;
+ string dstString = BytesToString(obj.Skip(offset).Take(dstCount).ToArray());
+ offset += dstCount;
+ int messageCount = BitConverter.ToInt32(obj, offset);
+
+ return new Tuple<string, string, string, string, int>(groupString, operatorString, srcString, dstString,
+ messageCount);
+ }
+
+ private static byte[] StringToBytes(string str)
+ {
+ byte[] bytes = new byte[str.Length * sizeof(char)];
+ Buffer.BlockCopy(str.ToCharArray(), 0, bytes, 0, bytes.Length);
+ return bytes;
+ }
+
+ private static string BytesToString(byte[] bytes)
+ {
+ char[] chars = new char[bytes.Length / sizeof(char)];
+ Buffer.BlockCopy(bytes, 0, chars, 0, bytes.Length);
+ return new string(chars);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
index a78b13c..41b34a9 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs
@@ -29,8 +29,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
/// Group Communication operator used to do point-to-point communication between named Tasks.
/// It uses Writable classes
/// </summary>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public sealed class Sender
+ internal sealed class Sender
{
private readonly INetworkService<GeneralGroupCommunicationMessage> _networkService;
private readonly IIdentifierFactory _idFactory;
@@ -42,7 +41,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
/// <param name="idFactory">Used to create IIdentifier for GroupCommunicationMessages.</param>
[Inject]
private Sender(
- WritableNetworkService<GeneralGroupCommunicationMessage> networkService,
+ StreamingNetworkService<GeneralGroupCommunicationMessage> networkService,
IIdentifierFactory idFactory)
{
_networkService = networkService;
@@ -54,7 +53,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
/// included in the message.
/// </summary>
/// <param name="message">The message to send.</param>
- public void Send(GeneralGroupCommunicationMessage message)
+ internal void Send(GeneralGroupCommunicationMessage message)
{
if (message == null)
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
index 96c36e1..14dbd96 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs
@@ -29,8 +29,7 @@ namespace Org.Apache.REEF.Network.Group.Task
/// Writable Version
/// </summary>
[DefaultImplementation(typeof(CommunicationGroupNetworkObserver))]
- // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
- public interface ICommunicationGroupNetworkObserver : IObserver<GeneralGroupCommunicationMessage>
+ internal interface ICommunicationGroupNetworkObserver : IObserver<GeneralGroupCommunicationMessage>
{
/// <summary>
/// Registers the handler with the WritableCommunicationGroupNetworkObserver.
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
index de19754..c700d88 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs
@@ -30,8 +30,7 @@ namespace Org.Apache.REEF.Network.Group.Task
/// Writable Version
/// </summary>
[DefaultImplementation(typeof(GroupCommNetworkObserver))]
- // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
- public interface IGroupCommNetworkObserver : IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>>
+ internal interface IGroupCommNetworkObserver : IObserver<NsMessage<GeneralGroupCommunicationMessage>>
{
/// <summary>
/// Registers the network handler for the given CommunicationGroup.
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
index 305d245..cf3e559 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
@@ -19,7 +19,6 @@
using System;
using System.Collections.Generic;
-using System.Linq;
using System.Reflection;
using Org.Apache.REEF.Network.Group.Config;
using Org.Apache.REEF.Network.Group.Operators;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
index 5be1457..c3989a9 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs
@@ -30,8 +30,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// Handles incoming messages sent to this Communication Group.
/// Writable version
/// </summary>
- // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
- public sealed class CommunicationGroupNetworkObserver : ICommunicationGroupNetworkObserver
+ internal sealed class CommunicationGroupNetworkObserver : ICommunicationGroupNetworkObserver
{
private static readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupNetworkObserver));
private readonly Dictionary<string, IObserver<GeneralGroupCommunicationMessage>> _handlers;
@@ -54,7 +53,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// will be invoked</param>
/// <param name="observer">The writable handler to invoke when messages are sent
/// to the operator specified by operatorName</param>
- public void Register(string operatorName, IObserver<GeneralGroupCommunicationMessage> observer)
+ void ICommunicationGroupNetworkObserver.Register(string operatorName, IObserver<GeneralGroupCommunicationMessage> observer)
{
if (string.IsNullOrEmpty(operatorName))
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
index e79df55..8a54ede 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
@@ -34,7 +34,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// Used by Tasks to fetch CommunicationGroupClients.
/// Writable version
/// </summary>
- // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
public sealed class GroupCommClient : IGroupCommClient
{
private readonly Dictionary<string, ICommunicationGroupClientInternal> _commGroups;
@@ -50,11 +49,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// <param name="configSerializer">Used to deserialize Group Communication configuration</param>
/// <param name="injector">injector forked from the injector that creates this instance</param>
[Inject]
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
public GroupCommClient(
[Parameter(typeof(GroupCommConfigurationOptions.SerializedGroupConfigs))] ISet<string> groupConfigs,
[Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
- WritableNetworkService<GeneralGroupCommunicationMessage> networkService,
+ StreamingNetworkService<GeneralGroupCommunicationMessage> networkService,
AvroConfigurationSerializer configSerializer,
IInjector injector)
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
index 9d35ff1..d0f35fe 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs
@@ -31,8 +31,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// Handles all incoming messages for this Task.
/// Writable version
/// </summary>
- // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
- public sealed class GroupCommNetworkObserver : IGroupCommNetworkObserver
+ internal sealed class GroupCommNetworkObserver : IGroupCommNetworkObserver
{
private static readonly Logger LOGGER = Logger.GetLogger(typeof(GroupCommNetworkObserver));
@@ -42,7 +41,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// Creates a new GroupCommNetworkObserver.
/// </summary>
[Inject]
- public GroupCommNetworkObserver()
+ private GroupCommNetworkObserver()
{
_commGroupHandlers = new Dictionary<string, IObserver<GeneralGroupCommunicationMessage>>();
}
@@ -53,7 +52,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// WritableCommunicationGroupNetworkObserver.
/// </summary>
/// <param name="nsMessage"></param>
- public void OnNext(WritableNsMessage<GeneralGroupCommunicationMessage> nsMessage)
+ public void OnNext(NsMessage<GeneralGroupCommunicationMessage> nsMessage)
{
if (nsMessage == null)
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
index aa5de1e..00fa9a5 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
@@ -29,7 +29,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// Writable version
/// </summary>
/// <typeparam name="T"> Generic type of message</typeparam>
- // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
internal sealed class NodeStruct<T>
{
private readonly BlockingCollection<GroupCommunicationMessage<T>> _messageQueue;
@@ -38,7 +37,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// Creates a new NodeStruct.
/// </summary>
/// <param name="id">The Task identifier</param>
- public NodeStruct(string id)
+ internal NodeStruct(string id)
{
Identifier = id;
_messageQueue = new BlockingCollection<GroupCommunicationMessage<T>>();
@@ -48,13 +47,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// Returns the identifier for the Task that sent all
/// messages in the message queue.
/// </summary>
- public string Identifier { get; private set; }
+ internal string Identifier { get; private set; }
/// <summary>
/// Gets the first message in the message queue.
/// </summary>
/// <returns>The first available message.</returns>
- public T[] GetData()
+ internal T[] GetData()
{
return _messageQueue.Take().Data;
}
@@ -63,7 +62,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// Adds an incoming message to the message queue.
/// </summary>
/// <param name="gcm">The incoming message</param>
- public void AddData(GroupCommunicationMessage<T> gcm)
+ internal void AddData(GroupCommunicationMessage<T> gcm)
{
_messageQueue.Add(gcm);
}
@@ -72,7 +71,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// Tells whether there is a message in queue or not.
/// </summary>
/// <returns>True if queue is non empty, false otherwise.</returns>
- public bool HasMessage()
+ internal bool HasMessage()
{
if (_messageQueue.Count != 0)
{