You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2015/07/15 23:48:57 UTC
[2/3] incubator-reef git commit: [REEF-447]Convert Network Service
Layer from Writable to Streaming
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index b1b79d4..8fe19d6 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -32,7 +32,6 @@ using Org.Apache.REEF.Network.NetworkService;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Exceptions;
using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.StreamingCodec;
namespace Org.Apache.REEF.Network.Group.Task.Impl
{
@@ -43,18 +42,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// Communication Group.
/// </summary>
/// <typeparam name="T">The message type</typeparam>
- // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.
public sealed class OperatorTopology<T> : IOperatorTopology<T>, IObserver<GeneralGroupCommunicationMessage>
{
- private const int DefaultTimeout = 50000;
- private const int RetryCount = 10;
-
private static readonly Logger Logger = Logger.GetLogger(typeof(OperatorTopology<>));
private readonly string _groupName;
private readonly string _operatorName;
private readonly string _selfId;
- private string _driverId;
private readonly int _timeout;
private readonly int _retryCount;
private readonly int _sleepTime;
@@ -66,7 +60,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
private readonly Sender _sender;
private readonly BlockingCollection<NodeStruct<T>> _nodesWithData;
private readonly Object _thisLock = new Object();
- private readonly IStreamingCodec<T> _codec;
/// <summary>
/// Creates a new OperatorTopology object.
@@ -74,7 +67,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// <param name="operatorName">The name of the Group Communication Operator</param>
/// <param name="groupName">The name of the operator's Communication Group</param>
/// <param name="taskId">The operator's Task identifier</param>
- /// <param name="driverId">The identifer for the driver</param>
/// <param name="timeout">Timeout value for cancellation token</param>
/// <param name="retryCount">Number of times to retry wating for registration</param>
/// <param name="sleepTime">Sleep time between retry wating for registration</param>
@@ -82,26 +74,22 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// <param name="childIds">The set of child Task identifiers in the topology graph</param>
/// <param name="networkService">The network service</param>
/// <param name="sender">The Sender used to do point to point communication</param>
- /// <param name="codec">Streaming codec to encode objects</param>
[Inject]
private OperatorTopology(
[Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName,
[Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName,
[Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
- [Parameter(typeof(GroupCommConfigurationOptions.DriverId))] string driverId,
[Parameter(typeof(GroupCommConfigurationOptions.Timeout))] int timeout,
[Parameter(typeof(GroupCommConfigurationOptions.RetryCountWaitingForRegistration))] int retryCount,
[Parameter(typeof(GroupCommConfigurationOptions.SleepTimeWaitingForRegistration))] int sleepTime,
[Parameter(typeof(GroupCommConfigurationOptions.TopologyRootTaskId))] string rootId,
[Parameter(typeof(GroupCommConfigurationOptions.TopologyChildTaskIds))] ISet<string> childIds,
- WritableNetworkService<GeneralGroupCommunicationMessage> networkService,
- Sender sender,
- IStreamingCodec<T> codec)
+ StreamingNetworkService<GeneralGroupCommunicationMessage> networkService,
+ Sender sender)
{
_operatorName = operatorName;
_groupName = groupName;
_selfId = taskId;
- _driverId = driverId;
_timeout = timeout;
_retryCount = retryCount;
_sleepTime = sleepTime;
@@ -110,7 +98,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
_nodesWithData = new BlockingCollection<NodeStruct<T>>();
_children = new List<NodeStruct<T>>();
_idToNodeMap = new Dictionary<string, NodeStruct<T>>();
- _codec = codec;
if (_selfId.Equals(rootId))
{
@@ -201,7 +188,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
throw new ArgumentException("No parent for node");
}
- SendToNode(message, MessageType.Data, _parent);
+ SendToNode(message, _parent);
}
/// <summary>
@@ -218,7 +205,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
foreach (var child in _children)
{
- SendToNode(message, MessageType.Data, child);
+ SendToNode(message, child);
}
}
@@ -444,10 +431,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// <param name="message">The message to send</param>
/// <param name="msgType">The message type</param>
/// <param name="node">The NodeStruct representing the Task to send to</param>
- private void SendToNode(T message, MessageType msgType, NodeStruct<T> node)
+ private void SendToNode(T message, NodeStruct<T> node)
{
GeneralGroupCommunicationMessage gcm = new GroupCommunicationMessage<T>(_groupName, _operatorName,
- _selfId, node.Identifier, message, msgType, _codec);
+ _selfId, node.Identifier, message);
_sender.Send(gcm);
}
@@ -458,12 +445,12 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
/// <param name="messages">The list of messages to send</param>
/// <param name="msgType">The message type</param>
/// <param name="node">The NodeStruct representing the Task to send to</param>
- private void SendToNode(IList<T> messages, MessageType msgType, NodeStruct<T> node)
+ private void SendToNode(IList<T> messages, NodeStruct<T> node)
{
T[] encodedMessages = messages.ToArray();
GroupCommunicationMessage<T> gcm = new GroupCommunicationMessage<T>(_groupName, _operatorName,
- _selfId, node.Identifier, encodedMessages, msgType, _codec);
+ _selfId, node.Identifier, encodedMessages);
_sender.Send(gcm);
}
@@ -511,7 +498,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
}
IList<T> sublist = messages.ToList().GetRange(i, size);
- SendToNode(sublist, MessageType.Data, nodeStruct);
+ SendToNode(sublist, nodeStruct);
i += size;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs
new file mode 100644
index 0000000..c30c1bd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/NsMessageStreamingCodec.cs
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Network.NetworkService.Codec
+{
+ /// <summary>
+ /// Codec to serialize NsMessages for NetworkService.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ internal class NsMessageStreamingCodec<T> : IStreamingCodec<NsMessage<T>>
+ {
+ private readonly IIdentifierFactory _idFactory;
+ private readonly StreamingCodecFunctionCache<T> _codecFunctionsCache;
+
+ /// <summary>
+ /// Create new NsMessageCodec.
+ /// </summary>
+ /// <param name="idFactory">Used to create identifier from string.</param>
+ /// <param name="injector">Injector to instantiate codecs.</param>
+ [Inject]
+ private NsMessageStreamingCodec(IIdentifierFactory idFactory, IInjector injector)
+ {
+ _idFactory = idFactory;
+ _codecFunctionsCache = new StreamingCodecFunctionCache<T>(injector);
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ ///<returns>The instance of type NsMessage<T></T> read from the reader</returns>
+ public NsMessage<T> Read(IDataReader reader)
+ {
+ int metadataSize = reader.ReadInt32();
+ byte[] metadata = new byte[metadataSize];
+ reader.Read(ref metadata, 0, metadataSize);
+ var res = GenerateMetaDataDecoding(metadata);
+
+ Type messageType = res.Item3;
+ NsMessage<T> message = res.Item1;
+
+ var codecReadFunc = _codecFunctionsCache.ReadFunction(messageType);
+ int messageCount = res.Item2;
+
+ for (int i = 0; i < messageCount; i++)
+ {
+ message.Data.Add(codecReadFunc(reader));
+ }
+
+ return message;
+ }
+
+ /// <summary>
+ /// Writes the class fields to the writer.
+ /// </summary>
+ /// <param name="obj">The object of type NsMessage<T></T> to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ public void Write(NsMessage<T> obj, IDataWriter writer)
+ {
+ byte[] encodedMetadata = GenerateMetaDataEncoding(obj);
+ byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length);
+ byte[] totalEncoding = encodedInt.Concat(encodedMetadata).ToArray();
+ writer.Write(totalEncoding, 0, totalEncoding.Length);
+
+ Type messageType = obj.Data[0].GetType();
+ var codecWriteFunc = _codecFunctionsCache.WriteFunction(messageType);
+
+ foreach (var data in obj.Data)
+ {
+ codecWriteFunc(data, writer);
+ }
+ }
+
+ /// <summary>
+ /// Instantiate the class from the reader.
+ /// </summary>
+ /// <param name="reader">The reader from which to read</param>
+ /// <param name="token">Cancellation token</param>
+ /// <returns>The instance of type NsMessage<T> read from the reader</returns>
+ public async Task<NsMessage<T>> ReadAsync(IDataReader reader, CancellationToken token)
+ {
+ int metadataSize = await reader.ReadInt32Async(token);
+ byte[] metadata = new byte[metadataSize];
+ await reader.ReadAsync(metadata, 0, metadataSize, token);
+ var res = GenerateMetaDataDecoding(metadata);
+ Type messageType = res.Item3;
+ NsMessage<T> message = res.Item1;
+ var codecReadFunc = _codecFunctionsCache.ReadAsyncFunction(messageType);
+ int messageCount = res.Item2;
+
+ for (int i = 0; i < messageCount; i++)
+ {
+ message.Data.Add(codecReadFunc(reader, token));
+ }
+
+ return message;
+ }
+
+ /// <summary>
+ /// Writes the class fields to the writer.
+ /// </summary>
+ /// <param name="obj">The object of type NsMessage<T> to be encoded</param>
+ /// <param name="writer">The writer to which to write</param>
+ /// <param name="token">Cancellation token</param>
+ public async Task WriteAsync(NsMessage<T> obj, IDataWriter writer, CancellationToken token)
+ {
+ byte[] encodedMetadata = GenerateMetaDataEncoding(obj);
+ byte[] encodedInt = BitConverter.GetBytes(encodedMetadata.Length);
+ byte[] totalEncoding = encodedInt.Concat(encodedMetadata).ToArray();
+ await writer.WriteAsync(totalEncoding, 0, totalEncoding.Length, token);
+
+ Type messageType = obj.Data[0].GetType();
+
+ var codecWriteFunc = _codecFunctionsCache.WriteAsyncFunction(messageType);
+
+ foreach (var data in obj.Data)
+ {
+ var asyncResult = codecWriteFunc.BeginInvoke(data, writer, token, null, null);
+ codecWriteFunc.EndInvoke(asyncResult);
+ }
+ }
+
+ private static byte[] GenerateMetaDataEncoding(NsMessage<T> obj )
+ {
+ List<byte[]> metadataBytes = new List<byte[]>();
+ byte[] sourceBytes = StringToBytes(obj.SourceId.ToString());
+ byte[] dstBytes = StringToBytes(obj.DestId.ToString());
+ byte[] messageTypeBytes = StringToBytes(obj.Data[0].GetType().AssemblyQualifiedName);
+ byte[] messageCount = BitConverter.GetBytes(obj.Data.Count);
+
+ metadataBytes.Add(BitConverter.GetBytes(sourceBytes.Length));
+ metadataBytes.Add(BitConverter.GetBytes(dstBytes.Length));
+ metadataBytes.Add(BitConverter.GetBytes(messageTypeBytes.Length));
+ metadataBytes.Add(sourceBytes);
+ metadataBytes.Add(dstBytes);
+ metadataBytes.Add(messageTypeBytes);
+ metadataBytes.Add(messageCount);
+
+ return metadataBytes.SelectMany(i => i).ToArray();
+ }
+
+ private Tuple<NsMessage<T>, int, Type> GenerateMetaDataDecoding(byte[] obj)
+ {
+ int srcCount = BitConverter.ToInt32(obj, 0);
+ int dstCount = BitConverter.ToInt32(obj, sizeof (int));
+ int msgTypeCount = BitConverter.ToInt32(obj, 2*sizeof (int));
+
+ int offset = 3*sizeof (int);
+ string srcString = BytesToString(obj.Skip(offset).Take(srcCount).ToArray());
+ offset += srcCount;
+ string dstString = BytesToString(obj.Skip(offset).Take(dstCount).ToArray());
+ offset += dstCount;
+ Type msgType = Type.GetType(BytesToString(obj.Skip(offset).Take(msgTypeCount).ToArray()));
+ offset += msgTypeCount;
+ int messageCount = BitConverter.ToInt32(obj, offset);
+
+ NsMessage<T> msg = new NsMessage<T>(_idFactory.Create(srcString), _idFactory.Create(dstString));
+ return new Tuple<NsMessage<T>, int, Type>(msg, messageCount, msgType);
+ }
+
+ private static byte[] StringToBytes(string str)
+ {
+ byte[] bytes = new byte[str.Length * sizeof(char)];
+ Buffer.BlockCopy(str.ToCharArray(), 0, bytes, 0, bytes.Length);
+ return bytes;
+ }
+
+ private static string BytesToString(byte[] bytes)
+ {
+ char[] chars = new char[bytes.Length / sizeof(char)];
+ Buffer.BlockCopy(bytes, 0, chars, 0, bytes.Length);
+ return new string(chars);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
new file mode 100644
index 0000000..6d91298
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Threading;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Network.NetworkService.Codec
+{
+ /// <summary>
+ /// Cache of StreamingCodec functions used to store codec functions for messages
+ /// to avoid reflection cost. Each message type is assumed to have a unique
+ /// associated codec
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ internal class StreamingCodecFunctionCache<T>
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof (StreamingCodecFunctionCache<T>));
+ private readonly Dictionary<Type, Func<IDataReader, T>> _readFuncCache;
+ private readonly Dictionary<Type, Func<IDataReader, CancellationToken, T>> _readAsyncFuncCache;
+ private readonly Dictionary<Type, Action<T, IDataWriter>> _writeFuncCache;
+ private readonly Dictionary<Type, Func<T, IDataWriter, CancellationToken, Task>> _writeAsyncFuncCache;
+ private readonly IInjector _injector;
+ private readonly Type _streamingCodecType;
+
+ /// <summary>
+ /// Create new StreamingCodecFunctionCache.
+ /// </summary>
+ /// <param name="injector"> Injector</param>
+ internal StreamingCodecFunctionCache(IInjector injector)
+ {
+ _injector = injector;
+ _readFuncCache = new Dictionary<Type, Func<IDataReader, T>>();
+ _readAsyncFuncCache = new Dictionary<Type, Func<IDataReader, CancellationToken, T>>();
+ _writeFuncCache = new Dictionary<Type, Action<T, IDataWriter>>();
+ _writeAsyncFuncCache = new Dictionary<Type, Func<T, IDataWriter, CancellationToken, Task>>();
+ _streamingCodecType = typeof(IStreamingCodec<>);
+ }
+
+ /// <summary>
+ /// Creates the read delegate function of StreamingCodec from the message type
+ /// </summary>
+ /// <param name="messageType">Type of message</param>
+ /// <returns>The read delegate function</returns>
+ internal Func<IDataReader, T> ReadFunction(Type messageType)
+ {
+ Func<IDataReader, T> readFunc;
+
+ if (!_readFuncCache.TryGetValue(messageType, out readFunc))
+ {
+ AddCodecFunctions(messageType);
+ readFunc = _readFuncCache[messageType];
+ }
+
+ return readFunc;
+ }
+
+ /// <summary>
+ /// Creates the read async delegate function of StreamingCodec from the message type
+ /// </summary>
+ /// <param name="messageType">Type of message</param>
+ /// <returns>The read async delegate function</returns>
+ internal Func<IDataReader, CancellationToken, T> ReadAsyncFunction(Type messageType)
+ {
+ Func<IDataReader, CancellationToken, T> readFunc;
+
+ if (!_readAsyncFuncCache.TryGetValue(messageType, out readFunc))
+ {
+ AddCodecFunctions(messageType);
+ readFunc = _readAsyncFuncCache[messageType];
+ }
+
+ return readFunc;
+ }
+
+ /// <summary>
+ /// Creates the write delegate function of StreamingCodec from the message type
+ /// </summary>
+ /// <param name="messageType">Type of message</param>
+ /// <returns>The write delegate function</returns>
+ internal Action<T, IDataWriter> WriteFunction(Type messageType)
+ {
+ Action<T, IDataWriter> writeFunc;
+
+ if (!_writeFuncCache.TryGetValue(messageType, out writeFunc))
+ {
+ AddCodecFunctions(messageType);
+ writeFunc = _writeFuncCache[messageType];
+ }
+
+ return writeFunc;
+ }
+
+ /// <summary>
+ /// Creates the write async delegate function of StreamingCodec from the message type
+ /// </summary>
+ /// <param name="messageType">Type of message</param>
+ /// <returns>The write async delegate function</returns>
+ internal Func<T, IDataWriter, CancellationToken, Task> WriteAsyncFunction(Type messageType)
+ {
+ Func<T, IDataWriter, CancellationToken, Task> writeFunc;
+
+ if (!_writeAsyncFuncCache.TryGetValue(messageType, out writeFunc))
+ {
+ AddCodecFunctions(messageType);
+ writeFunc = _writeAsyncFuncCache[messageType];
+ }
+
+ return writeFunc;
+ }
+
+ private void AddCodecFunctions(Type messageType)
+ {
+ if (!typeof(T).IsAssignableFrom(messageType))
+ {
+ Exceptions.CaughtAndThrow(new Exception("Message type not assignable to base type"), Level.Error,
+ Logger);
+ }
+
+ Type codecType = _streamingCodecType.MakeGenericType(messageType);
+ var codec = _injector.GetInstance(codecType);
+
+ MethodInfo readMethod = codec.GetType().GetMethod("Read");
+ _readFuncCache[messageType] = (Func<IDataReader, T>) Delegate.CreateDelegate
+ (typeof (Func<IDataReader, T>), codec, readMethod);
+
+ MethodInfo readAsyncMethod = codec.GetType().GetMethod("ReadAsync");
+ MethodInfo genericHelper = GetType()
+ .GetMethod("ReadAsyncHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance);
+ MethodInfo constructedHelper = genericHelper.MakeGenericMethod(messageType);
+ _readAsyncFuncCache[messageType] =
+ (Func<IDataReader, CancellationToken, T>)constructedHelper.Invoke(this, new[] { readAsyncMethod, codec });
+
+ MethodInfo writeMethod = codec.GetType().GetMethod("Write");
+ genericHelper = GetType().GetMethod("WriteHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance);
+ constructedHelper = genericHelper.MakeGenericMethod(messageType);
+ _writeFuncCache[messageType] =
+ (Action<T, IDataWriter>) constructedHelper.Invoke(this, new[] {writeMethod, codec});
+
+ MethodInfo writeAsyncMethod = codec.GetType().GetMethod("WriteAsync");
+ genericHelper = GetType().GetMethod("WriteAsyncHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance);
+ constructedHelper = genericHelper.MakeGenericMethod(messageType);
+ _writeAsyncFuncCache[messageType] =
+ (Func<T, IDataWriter, CancellationToken, Task>)
+ constructedHelper.Invoke(this, new[] {writeAsyncMethod, codec});
+ }
+
+ private Action<T, IDataWriter> WriteHelperFunc<T1>(MethodInfo method, object codec) where T1 : class
+ {
+ Action<T1, IDataWriter> func = (Action<T1, IDataWriter>) Delegate.CreateDelegate
+ (typeof (Action<T1, IDataWriter>), codec, method);
+
+ Action<T, IDataWriter> ret = (obj, writer) => func(obj as T1, writer);
+ return ret;
+ }
+
+ private Func<T, IDataWriter, CancellationToken, Task> WriteAsyncHelperFunc<T1>(MethodInfo method, object codec)
+ where T1 : class
+ {
+ Func<T1, IDataWriter, CancellationToken, Task> func =
+ (Func<T1, IDataWriter, CancellationToken, Task>) Delegate.CreateDelegate
+ (typeof (Func<T1, IDataWriter, CancellationToken, Task>), codec, method);
+
+ Func<T, IDataWriter, CancellationToken, Task> ret = (obj, writer, token) => func(obj as T1, writer, token);
+ return ret;
+ }
+
+ private Func<IDataReader, CancellationToken, T> ReadAsyncHelperFunc<T1>(MethodInfo method, object codec)
+ where T1 : class
+ {
+ Func<IDataReader, CancellationToken, Task<T1>> func =
+ (Func<IDataReader, CancellationToken, Task<T1>>) Delegate.CreateDelegate
+ (typeof (Func<IDataReader, CancellationToken, Task<T1>>), codec, method);
+
+ Func<IDataReader, CancellationToken, T1> func1 = (writer, token) => func(writer, token).Result;
+ Func<IDataReader, CancellationToken, T> func2 = (writer, token) => ((T)(object)func1(writer, token));
+ return func2;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
new file mode 100644
index 0000000..1ff2517
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/StreamingNetworkService.cs
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Net;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Network.NetworkService.Codec;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.Util;
+
+namespace Org.Apache.REEF.Network.NetworkService
+{
+ /// <summary>
+ /// Writable Network service used for Reef Task communication.
+ /// </summary>
+ /// <typeparam name="T">The message type</typeparam>
+ public class StreamingNetworkService<T> : INetworkService<T>
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(StreamingNetworkService<>));
+
+ private readonly IRemoteManager<NsMessage<T>> _remoteManager;
+ private IIdentifier _localIdentifier;
+ private readonly IDisposable _messageHandlerDisposable;
+ private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap;
+ private readonly INameClient _nameClient;
+
+ /// <summary>
+ /// Create a new Writable NetworkService.
+ /// </summary>
+ /// <param name="messageHandler">The observer to handle incoming messages</param>
+ /// <param name="idFactory">The factory used to create IIdentifiers</param>
+ /// <param name="nameClient">The name client used to register Ids</param>
+ /// <param name="remoteManagerFactory">Writable RemoteManagerFactory to create a
+ /// Writable RemoteManager</param>
+ /// <param name="codec">Codec for Network Service message</param>
+ /// <param name="injector">Fork of the injector that created the Network service</param>
+ [Inject]
+ private StreamingNetworkService(
+ IObserver<NsMessage<T>> messageHandler,
+ IIdentifierFactory idFactory,
+ INameClient nameClient,
+ StreamingRemoteManagerFactory remoteManagerFactory,
+ NsMessageStreamingCodec<T> codec,
+ IInjector injector)
+ {
+ IPAddress localAddress = NetworkUtils.LocalIPAddress;
+ _remoteManager = remoteManagerFactory.GetInstance(localAddress, codec);
+
+ // Create and register incoming message handler
+ // TODO[REEF-419] This should use the TcpPortProvider mechanism
+ var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
+ _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, messageHandler);
+
+ _nameClient = nameClient;
+ _connectionMap = new Dictionary<IIdentifier, IConnection<T>>();
+
+ Logger.Log(Level.Info, "Started network service");
+ }
+
+ /// <summary>
+ /// Name client for registering ids
+ /// </summary>
+ public INameClient NamingClient
+ {
+ get { return _nameClient; }
+ }
+
+ /// <summary>
+ /// Open a new connection to the remote host registered to
+ /// the name service with the given identifier
+ /// </summary>
+ /// <param name="destinationId">The identifier of the remote host</param>
+ /// <returns>The IConnection used for communication</returns>
+ public IConnection<T> NewConnection(IIdentifier destinationId)
+ {
+ if (_localIdentifier == null)
+ {
+ throw new IllegalStateException("Cannot open connection without first registering an ID");
+ }
+
+ IConnection<T> connection;
+ if (_connectionMap.TryGetValue(destinationId, out connection))
+ {
+ return connection;
+ }
+ else
+ {
+ connection = new NsConnection<T>(_localIdentifier, destinationId,
+ NamingClient, _remoteManager, _connectionMap);
+
+ _connectionMap[destinationId] = connection;
+ return connection;
+ }
+ }
+
+ /// <summary>
+ /// Register the identifier for the NetworkService with the NameService.
+ /// </summary>
+ /// <param name="id">The identifier to register</param>
+ public void Register(IIdentifier id)
+ {
+ Logger.Log(Level.Info, "Registering id {0} with network service.", id);
+
+ _localIdentifier = id;
+ NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint);
+
+ Logger.Log(Level.Info, "End of Registering id {0} with network service.", id);
+ }
+
+ /// <summary>
+ /// Unregister the identifier for the NetworkService with the NameService.
+ /// </summary>
+ public void Unregister()
+ {
+ if (_localIdentifier == null)
+ {
+ throw new IllegalStateException("Cannot unregister a non existant identifier");
+ }
+
+ NamingClient.Unregister(_localIdentifier.ToString());
+ _localIdentifier = null;
+ _messageHandlerDisposable.Dispose();
+ }
+
+ /// <summary>
+ /// Dispose of the NetworkService's resources
+ /// </summary>
+ public void Dispose()
+ {
+ NamingClient.Dispose();
+ _remoteManager.Dispose();
+
+ Logger.Log(Level.Info, "Disposed of network service");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
deleted file mode 100644
index 93da126..0000000
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNetworkService.cs
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.Net;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Exceptions;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
-
-namespace Org.Apache.REEF.Network.NetworkService
-{
- /// <summary>
- /// Writable Network service used for Reef Task communication.
- /// </summary>
- /// <typeparam name="T">The message type</typeparam>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public class WritableNetworkService<T> : INetworkService<T> where T : IWritable
- {
- private static readonly Logger Logger = Logger.GetLogger(typeof(NetworkService<>));
-
- private readonly IRemoteManager<WritableNsMessage<T>> _remoteManager;
- private readonly IObserver<WritableNsMessage<T>> _messageHandler;
- private IIdentifier _localIdentifier;
- private IDisposable _messageHandlerDisposable;
- private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap;
- private readonly INameClient _nameClient;
-
- /// <summary>
- /// Create a new Writable NetworkService.
- /// </summary>
- /// <param name="nsPort">The port that the NetworkService will listen on</param>
- /// <param name="messageHandler">The observer to handle incoming messages</param>
- /// <param name="idFactory">The factory used to create IIdentifiers</param>
- /// <param name="nameClient">The name client used to register Ids</param>
- /// <param name="remoteManagerFactory">Writable RemoteManagerFactory to create a
- /// Writable RemoteManager</param>
- [Inject]
- private WritableNetworkService(
- [Parameter(typeof (NetworkServiceOptions.NetworkServicePort))] int nsPort,
- IObserver<WritableNsMessage<T>> messageHandler,
- IIdentifierFactory idFactory,
- INameClient nameClient,
- StreamingRemoteManagerFactory remoteManagerFactory)
- {
-
- IPAddress localAddress = NetworkUtils.LocalIPAddress;
- _remoteManager = remoteManagerFactory.GetInstance<WritableNsMessage<T>>(localAddress, nsPort);
- _messageHandler = messageHandler;
-
- // Create and register incoming message handler
- // TODO[REEF-419] This should use the TcpPortProvider mechanism
- var anyEndpoint = new IPEndPoint(IPAddress.Any, 0);
- _messageHandlerDisposable = _remoteManager.RegisterObserver(anyEndpoint, _messageHandler);
-
- _nameClient = nameClient;
- _connectionMap = new Dictionary<IIdentifier, IConnection<T>>();
-
- Logger.Log(Level.Info, "Started network service");
- }
-
- /// <summary>
- /// Name client for registering ids
- /// </summary>
- public INameClient NamingClient
- {
- get { return _nameClient; }
- }
-
- /// <summary>
- /// Open a new connection to the remote host registered to
- /// the name service with the given identifier
- /// </summary>
- /// <param name="destinationId">The identifier of the remote host</param>
- /// <returns>The IConnection used for communication</returns>
- public IConnection<T> NewConnection(IIdentifier destinationId)
- {
- if (_localIdentifier == null)
- {
- throw new IllegalStateException("Cannot open connection without first registering an ID");
- }
-
- IConnection<T> connection;
- if (_connectionMap.TryGetValue(destinationId, out connection))
- {
- return connection;
- }
- else
- {
- connection = new WritableNsConnection<T>(_localIdentifier, destinationId,
- NamingClient, _remoteManager, _connectionMap);
-
- _connectionMap[destinationId] = connection;
- return connection;
- }
- }
-
- /// <summary>
- /// Register the identifier for the NetworkService with the NameService.
- /// </summary>
- /// <param name="id">The identifier to register</param>
- public void Register(IIdentifier id)
- {
- Logger.Log(Level.Info, "Registering id {0} with network service.", id);
-
- _localIdentifier = id;
- NamingClient.Register(id.ToString(), _remoteManager.LocalEndpoint);
-
- Logger.Log(Level.Info, "End of Registering id {0} with network service.", id);
- }
-
- /// <summary>
- /// Unregister the identifier for the NetworkService with the NameService.
- /// </summary>
- public void Unregister()
- {
- if (_localIdentifier == null)
- {
- throw new IllegalStateException("Cannot unregister a non existant identifier");
- }
-
- NamingClient.Unregister(_localIdentifier.ToString());
- _localIdentifier = null;
- _messageHandlerDisposable.Dispose();
- }
-
- /// <summary>
- /// Dispose of the NetworkService's resources
- /// </summary>
- public void Dispose()
- {
- NamingClient.Dispose();
- _remoteManager.Dispose();
-
- Logger.Log(Level.Info, "Disposed of network service");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs
deleted file mode 100644
index c20238c..0000000
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsConnection.cs
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Net;
-using System.Net.Sockets;
-using System.Runtime.Remoting;
-using Org.Apache.REEF.Common.Io;
-using Org.Apache.REEF.Tang.Exceptions;
-using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Network.NetworkService
-{
- /// <summary>
- /// Represents a connection between two hosts using the Writable NetworkService.
- /// </summary>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public class WritableNsConnection<T> : IConnection<T> where T : IWritable
- {
- private static readonly Logger Logger = Logger.GetLogger(typeof (WritableNsConnection<T>));
-
- private readonly IIdentifier _sourceId;
- private readonly IIdentifier _destId;
- private readonly INameClient _nameClient;
- private readonly IRemoteManager<WritableNsMessage<T>> _remoteManager;
- private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap;
- private IObserver<WritableNsMessage<T>> _remoteSender;
-
- /// <summary>
- /// Creates a new NsConnection between two hosts.
- /// </summary>
- /// <param name="sourceId">The identifier of the sender</param>
- /// <param name="destId">The identifier of the receiver</param>
- /// <param name="nameClient">The NameClient used for naming lookup</param>
- /// <param name="remoteManager">The remote manager used for network communication</param>
- /// <param name="connectionMap">A cache of opened connections. Will remove itself from
- /// the cache when the NsConnection is disposed.</param>
- public WritableNsConnection(
- IIdentifier sourceId,
- IIdentifier destId,
- INameClient nameClient,
- IRemoteManager<WritableNsMessage<T>> remoteManager,
- Dictionary<IIdentifier, IConnection<T>> connectionMap)
- {
- _sourceId = sourceId;
- _destId = destId;
- _nameClient = nameClient;
- _remoteManager = remoteManager;
- _connectionMap = connectionMap;
- }
-
- /// <summary>
- /// Opens the connection to the remote host.
- /// </summary>
- public void Open()
- {
- string destStr = _destId.ToString();
- Logger.Log(Level.Verbose, "Network service opening connection to {0}...", destStr);
-
- IPEndPoint destAddr = _nameClient.Lookup(destStr);
- if (null == destAddr)
- {
- throw new RemotingException("Destination Address identifier cannot be found");
- }
-
- try
- {
- _remoteSender = _remoteManager.GetRemoteObserver(destAddr);
- Logger.Log(Level.Verbose, "Network service completed connection to {0}.", destStr);
- }
- catch (SocketException)
- {
- Logger.Log(Level.Error, "Network Service cannot open connection to " + destAddr);
- throw;
- }
- catch (ObjectDisposedException)
- {
- Logger.Log(Level.Error, "Network Service cannot open connection to " + destAddr);
- throw;
- }
- }
-
- /// <summary>
- /// Writes the object to the remote host.
- /// </summary>
- /// <param name="message">The message to send</param>
- public void Write(T message)
- {
- if (_remoteSender == null)
- {
- throw new IllegalStateException("NsConnection has not been opened yet.");
- }
-
- try
- {
- _remoteSender.OnNext(new WritableNsMessage<T>(_sourceId, _destId, message));
- }
- catch (IOException)
- {
- Logger.Log(Level.Error, "Network Service cannot write message to {0}", _destId);
- throw;
- }
- catch (ObjectDisposedException)
- {
- Logger.Log(Level.Error, "Network Service cannot write message to {0}", _destId);
- throw;
- }
- }
-
- /// <summary>
- /// Closes the connection
- /// </summary>
- public void Dispose()
- {
- _connectionMap.Remove(_destId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
deleted file mode 100644
index a9299bb..0000000
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Linq;
-using System.Runtime.Serialization;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.Hadoop.Avro;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Wake;
-using Org.Apache.REEF.Wake.Remote;
-
-
-namespace Org.Apache.REEF.Network.NetworkService
-{
- /// <summary>
- /// Writable Message sent between NetworkServices.</summary>
- /// <typeparam name="T">The type of data being sent. It is assumed to be Writable</typeparam>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public class WritableNsMessage<T> : IWritable where T : IWritable
- {
- private IIdentifierFactory _factory;
- private IInjector _injection;
- /// <summary>
- /// Constructor to allow instantiation by reflection
- /// </summary>
- [Inject]
- public WritableNsMessage(IIdentifierFactory factory, IInjector injection)
- {
- _factory = factory;
- _injection = injection;
- }
-
- /// <summary>
- /// Create a new Writable NsMessage with no data.
- /// </summary>
- /// <param name="sourceId">The identifier of the sender</param>
- /// <param name="destId">The identifier of the receiver</param>
- public WritableNsMessage(IIdentifier sourceId, IIdentifier destId)
- {
- SourceId = sourceId;
- DestId = destId;
- Data = new List<T>();
- }
-
- /// <summary>
- /// Create a new Writable NsMessage with data.
- /// </summary>
- /// <param name="sourceId">The identifier of the sender</param>
- /// <param name="destId">The identifier of the receiver</param>
- /// <param name="message">The message to send</param>
- public WritableNsMessage(IIdentifier sourceId, IIdentifier destId, T message)
- {
- SourceId = sourceId;
- DestId = destId;
- Data = new List<T> {message};
- }
-
- /// <summary>
- /// The identifier of the sender of the message.
- /// </summary>
- internal IIdentifier SourceId { get; private set; }
-
- /// <summary>
- /// The identifier of the receiver of the message.
- /// </summary>
- internal IIdentifier DestId { get; private set; }
-
- /// <summary>
- /// A list of data being sent in the message.
- /// </summary>
- public IList<T> Data { get; set; }
-
- /// <summary>
- /// Read the class fields.
- /// </summary>
- /// <param name="reader">The reader from which to read </param>
- public void Read(IDataReader reader)
- {
- SourceId = _factory.Create(reader.ReadString());
- DestId = _factory.Create(reader.ReadString());
- int messageCount = reader.ReadInt32();
- string dataType = reader.ReadString();
-
- Data = new List<T>();
-
- for (int index = 0; index < messageCount; index++)
- {
- var dataPoint = (T)_injection.ForkInjector().GetInstance(Type.GetType(dataType));
-
- if (null == dataPoint)
- {
- throw new Exception("T type instance cannot be created from the stream data in Network Service Message");
- }
-
- dataPoint.Read(reader);
- Data.Add(dataPoint);
- }
- }
-
- /// <summary>
- /// Writes the class fields.
- /// </summary>
- /// <param name="writer">The writer to which to write</param>
- public void Write(IDataWriter writer)
- {
- writer.WriteString(SourceId.ToString());
- writer.WriteString(DestId.ToString());
- writer.WriteInt32(Data.Count);
- writer.WriteString(Data[0].GetType().AssemblyQualifiedName);
-
- foreach (var data in Data)
- {
- data.Write(writer);
- }
- }
-
- /// <summary>
- /// Read the class fields.
- /// </summary>
- /// <param name="reader">The reader from which to read </param>
- /// <param name="token">The cancellation token</param>
- public async Task ReadAsync(IDataReader reader, CancellationToken token)
- {
- SourceId = _factory.Create(await reader.ReadStringAsync(token));
- DestId = _factory.Create(await reader.ReadStringAsync(token));
- int messageCount = await reader.ReadInt32Async(token);
- string dataType = await reader.ReadStringAsync(token);
-
- Data = new List<T>();
-
- for (int index = 0; index < messageCount; index++)
- {
- var dataPoint = (T) _injection.ForkInjector().GetInstance(Type.GetType(dataType));
-
- if (null == dataPoint)
- {
- throw new Exception("T type instance cannot be created from the stream data in Network Service Message");
- }
-
- await dataPoint.ReadAsync(reader, token);
- Data.Add(dataPoint);
- }
- }
-
- /// <summary>
- /// Writes the class fields.
- /// </summary>
- /// <param name="writer">The writer to which to write</param>
- /// <param name="token">The cancellation token</param>
- public async Task WriteAsync(IDataWriter writer, CancellationToken token)
- {
- await writer.WriteStringAsync(SourceId.ToString(), token);
- await writer.WriteStringAsync(DestId.ToString(), token);
- await writer.WriteInt32Async(Data.Count, token);
- await writer.WriteStringAsync(Data[0].GetType().AssemblyQualifiedName, token);
-
- foreach (var data in Data)
- {
- data.Write(writer);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 73b6d9d..a0e6038 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -61,6 +61,7 @@ under the License.
<Compile Include="Group\Driver\IGroupCommDriver.cs" />
<Compile Include="Group\Driver\Impl\CommunicationGroupDriver.cs" />
<Compile Include="Group\Driver\Impl\GroupCommunicationMessage.cs" />
+ <Compile Include="Group\Driver\Impl\GroupCommunicationMessageStreamingCodec.cs" />
<Compile Include="Group\Driver\Impl\MessageType.cs" />
<Compile Include="Group\Driver\Impl\GroupCommDriver.cs" />
<Compile Include="Group\Driver\Impl\TaskStarter.cs" />
@@ -139,6 +140,8 @@ under the License.
<Compile Include="NetworkService\Codec\ControlMessageCodec.cs" />
<Compile Include="NetworkService\Codec\NsMessageCodec.cs" />
<Compile Include="NetworkService\Codec\NsMessageProto.cs" />
+ <Compile Include="NetworkService\Codec\NsMessageStreamingCodec.cs" />
+ <Compile Include="NetworkService\Codec\StreamingCodecFunctionCache.cs" />
<Compile Include="NetworkService\ControlMessage.cs" />
<Compile Include="NetworkService\IConnection.cs" />
<Compile Include="NetworkService\INetworkService.cs" />
@@ -147,9 +150,7 @@ under the License.
<Compile Include="NetworkService\NetworkServiceOptions.cs" />
<Compile Include="NetworkService\NsConnection.cs" />
<Compile Include="NetworkService\NsMessage.cs" />
- <Compile Include="NetworkService\WritableNetworkService.cs" />
- <Compile Include="NetworkService\WritableNsConnection.cs" />
- <Compile Include="NetworkService\WritableNsMessage.cs" />
+ <Compile Include="NetworkService\StreamingNetworkService.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Utilities\BlockingCollectionExtensions.cs" />
<Compile Include="Utilities\Utils.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index babc26d..9f13c83 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -52,7 +52,6 @@ under the License.
<Compile Include="StreamingRemoteManagerTest.cs" />
<Compile Include="StreamingTransportTest.cs" />
<Compile Include="TransportTest.cs" />
- <Compile Include="WritableString.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
index 20f75be..a0be7ee 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/StreamingRemoteManagerTest.cs
@@ -25,7 +25,8 @@ using Microsoft.VisualStudio.TestTools.UnitTesting;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Remote.Impl;
-using Org.Apache.REEF.Wake.Util;
+using Org.Apache.REEF.Wake.StreamingCodec;
+using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
namespace Org.Apache.REEF.Wake.Tests
{
@@ -34,9 +35,6 @@ namespace Org.Apache.REEF.Wake.Tests
{
private readonly StreamingRemoteManagerFactory _remoteManagerFactory1 =
TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>();
-
- private readonly StreamingRemoteManagerFactory _remoteManagerFactory2 =
- TangFactory.GetTang().NewInjector().GetInstance<StreamingRemoteManagerFactory>();
/// <summary>
/// Tests one way communication between Remote Managers
@@ -47,24 +45,25 @@ namespace Org.Apache.REEF.Wake.Tests
{
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
+ IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
{
- var observer = Observer.Create<WritableString>(queue.Add);
+ var observer = Observer.Create<string>(queue.Add);
IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
remoteManager2.RegisterObserver(endpoint1, observer);
var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver.OnNext(new WritableString("abc"));
- remoteObserver.OnNext(new WritableString("def"));
- remoteObserver.OnNext(new WritableString("ghi"));
+ remoteObserver.OnNext("abc");
+ remoteObserver.OnNext("def");
+ remoteObserver.OnNext("ghi");
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
}
Assert.AreEqual(3, events.Count);
@@ -78,42 +77,44 @@ namespace Org.Apache.REEF.Wake.Tests
{
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
- BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
- BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
+ BlockingCollection<string> queue1 = new BlockingCollection<string>();
+ BlockingCollection<string> queue2 = new BlockingCollection<string>();
List<string> events1 = new List<string>();
List<string> events2 = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
{
// Register observers for remote manager 1 and remote manager 2
var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
- var observer1 = Observer.Create<WritableString>(queue1.Add);
- var observer2 = Observer.Create<WritableString>(queue2.Add);
+ var observer1 = Observer.Create<string>(queue1.Add);
+ var observer2 = Observer.Create<string>(queue2.Add);
remoteManager1.RegisterObserver(remoteEndpoint, observer1);
remoteManager2.RegisterObserver(remoteEndpoint, observer2);
// Remote manager 1 sends 3 events to remote manager 2
var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver1.OnNext(new WritableString("abc"));
- remoteObserver1.OnNext(new WritableString("def"));
- remoteObserver1.OnNext(new WritableString("ghi"));
+ remoteObserver1.OnNext("abc");
+ remoteObserver1.OnNext("def");
+ remoteObserver1.OnNext("ghi");
// Remote manager 2 sends 4 events to remote manager 1
var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
- remoteObserver2.OnNext(new WritableString("jkl"));
- remoteObserver2.OnNext(new WritableString("mno"));
- remoteObserver2.OnNext(new WritableString("pqr"));
- remoteObserver2.OnNext(new WritableString("stu"));
-
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
-
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
+ remoteObserver2.OnNext("jkl");
+ remoteObserver2.OnNext("mno");
+ remoteObserver2.OnNext("pqr");
+ remoteObserver2.OnNext("stu");
+
+ events1.Add(queue1.Take());
+ events1.Add(queue1.Take());
+ events1.Add(queue1.Take());
+ events1.Add(queue1.Take());
+
+ events2.Add(queue2.Take());
+ events2.Add(queue2.Take());
+ events2.Add(queue2.Take());
}
Assert.AreEqual(4, events1.Count);
@@ -129,29 +130,30 @@ namespace Org.Apache.REEF.Wake.Tests
{
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
+ IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager3 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
{
var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
- var observer = Observer.Create<WritableString>(queue.Add);
+ var observer = Observer.Create<string>(queue.Add);
remoteManager3.RegisterObserver(remoteEndpoint, observer);
var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
- remoteObserver2.OnNext(new WritableString("abc"));
- remoteObserver1.OnNext(new WritableString("def"));
- remoteObserver2.OnNext(new WritableString("ghi"));
- remoteObserver1.OnNext(new WritableString("jkl"));
- remoteObserver2.OnNext(new WritableString("mno"));
+ remoteObserver2.OnNext("abc");
+ remoteObserver1.OnNext("def");
+ remoteObserver2.OnNext("ghi");
+ remoteObserver1.OnNext("jkl");
+ remoteObserver2.OnNext("mno");
for (int i = 0; i < 5; i++)
{
- events.Add(queue.Take().Data);
+ events.Add(queue.Take());
}
}
@@ -167,58 +169,60 @@ namespace Org.Apache.REEF.Wake.Tests
{
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
- BlockingCollection<WritableString> queue1 = new BlockingCollection<WritableString>();
- BlockingCollection<WritableString> queue2 = new BlockingCollection<WritableString>();
- BlockingCollection<WritableString> queue3 = new BlockingCollection<WritableString>();
+ BlockingCollection<string> queue1 = new BlockingCollection<string>();
+ BlockingCollection<string> queue2 = new BlockingCollection<string>();
+ BlockingCollection<string> queue3 = new BlockingCollection<string>();
List<string> events1 = new List<string>();
List<string> events2 = new List<string>();
List<string> events3 = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager3 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager3 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
{
var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
- var observer = Observer.Create<WritableString>(queue1.Add);
+ var observer = Observer.Create<string>(queue1.Add);
remoteManager1.RegisterObserver(remoteEndpoint, observer);
- var observer2 = Observer.Create<WritableString>(queue2.Add);
+ var observer2 = Observer.Create<string>(queue2.Add);
remoteManager2.RegisterObserver(remoteEndpoint, observer2);
- var observer3 = Observer.Create<WritableString>(queue3.Add);
+ var observer3 = Observer.Create<string>(queue3.Add);
remoteManager3.RegisterObserver(remoteEndpoint, observer3);
var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager3.LocalEndpoint);
var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager3.LocalEndpoint);
// Observer 1 and 2 send messages to observer 3
- remoteObserver1.OnNext(new WritableString("abc"));
- remoteObserver1.OnNext(new WritableString("abc"));
- remoteObserver1.OnNext(new WritableString("abc"));
- remoteObserver2.OnNext(new WritableString("def"));
- remoteObserver2.OnNext(new WritableString("def"));
+ remoteObserver1.OnNext("abc");
+ remoteObserver1.OnNext("abc");
+ remoteObserver1.OnNext("abc");
+ remoteObserver2.OnNext("def");
+ remoteObserver2.OnNext("def");
// Observer 3 sends messages back to observers 1 and 2
var remoteObserver3A = remoteManager3.GetRemoteObserver(remoteManager1.LocalEndpoint);
var remoteObserver3B = remoteManager3.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver3A.OnNext(new WritableString("ghi"));
- remoteObserver3A.OnNext(new WritableString("ghi"));
- remoteObserver3B.OnNext(new WritableString("jkl"));
- remoteObserver3B.OnNext(new WritableString("jkl"));
- remoteObserver3B.OnNext(new WritableString("jkl"));
+ remoteObserver3A.OnNext("ghi");
+ remoteObserver3A.OnNext("ghi");
+ remoteObserver3B.OnNext("jkl");
+ remoteObserver3B.OnNext("jkl");
+ remoteObserver3B.OnNext("jkl");
- events1.Add(queue1.Take().Data);
- events1.Add(queue1.Take().Data);
+ events1.Add(queue1.Take());
+ events1.Add(queue1.Take());
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
- events2.Add(queue2.Take().Data);
+ events2.Add(queue2.Take());
+ events2.Add(queue2.Take());
+ events2.Add(queue2.Take());
- events3.Add(queue3.Take().Data);
- events3.Add(queue3.Take().Data);
- events3.Add(queue3.Take().Data);
- events3.Add(queue3.Take().Data);
- events3.Add(queue3.Take().Data);
+ events3.Add(queue3.Take());
+ events3.Add(queue3.Take());
+ events3.Add(queue3.Take());
+ events3.Add(queue3.Take());
+ events3.Add(queue3.Take());
}
Assert.AreEqual(2, events1.Count);
@@ -234,34 +238,36 @@ namespace Org.Apache.REEF.Wake.Tests
{
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
{
// Register handler for when remote manager 2 receives events; respond
// with an ack
var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
- var receiverObserver = Observer.Create<WritableString>(
- message => remoteObserver2.OnNext(new WritableString("received message: " + message.Data)));
+ var receiverObserver = Observer.Create<string>(
+ message => remoteObserver2.OnNext("received message: " + message));
remoteManager2.RegisterObserver(remoteEndpoint, receiverObserver);
// Register handler for remote manager 1 to record the ack
- var senderObserver = Observer.Create<WritableString>(queue.Add);
+ var senderObserver = Observer.Create<string>(queue.Add);
remoteManager1.RegisterObserver(remoteEndpoint, senderObserver);
// Begin to send messages
var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver1.OnNext(new WritableString("hello"));
- remoteObserver1.OnNext(new WritableString("there"));
- remoteObserver1.OnNext(new WritableString("buddy"));
+ remoteObserver1.OnNext("hello");
+ remoteObserver1.OnNext("there");
+ remoteObserver1.OnNext("buddy");
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
}
Assert.AreEqual(3, events.Count);
@@ -278,25 +284,27 @@ namespace Org.Apache.REEF.Wake.Tests
{
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
{
- // RemoteManager2 listens and records events of type IRemoteEvent<WritableString>
- var observer = Observer.Create<IRemoteMessage<WritableString>>(message => queue.Add(message.Message));
+ // RemoteManager2 listens and records events of type IRemoteEvent<string>
+ var observer = Observer.Create<IRemoteMessage<string>>(message => queue.Add(message.Message));
remoteManager2.RegisterObserver(observer);
// Remote manager 1 sends 3 events to remote manager 2
var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver.OnNext(new WritableString("abc"));
- remoteObserver.OnNext(new WritableString("def"));
- remoteObserver.OnNext(new WritableString("ghi"));
+ remoteObserver.OnNext("abc");
+ remoteObserver.OnNext("def");
+ remoteObserver.OnNext("ghi");
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
}
Assert.AreEqual(3, events.Count);
@@ -310,28 +318,30 @@ namespace Org.Apache.REEF.Wake.Tests
{
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
- BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>();
+ BlockingCollection<string> queue = new BlockingCollection<string>();
List<string> events = new List<string>();
- using (var remoteManager1 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
- using (var remoteManager2 = _remoteManagerFactory1.GetInstance<WritableString>(listeningAddress, 0))
+ IStreamingCodec<string> codec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>();
+
+ using (var remoteManager1 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
+ using (var remoteManager2 = _remoteManagerFactory1.GetInstance<string>(listeningAddress, codec))
{
- var observer = Observer.Create<WritableString>(queue.Add);
+ var observer = Observer.Create<string>(queue.Add);
IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0);
remoteManager2.RegisterObserver(endpoint1, observer);
var remoteObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver.OnNext(new WritableString("abc"));
- remoteObserver.OnNext(new WritableString("def"));
+ remoteObserver.OnNext("abc");
+ remoteObserver.OnNext("def");
var cachedObserver = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- cachedObserver.OnNext(new WritableString("ghi"));
- cachedObserver.OnNext(new WritableString("jkl"));
+ cachedObserver.OnNext("ghi");
+ cachedObserver.OnNext("jkl");
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
- events.Add(queue.Take().Data);
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
+ events.Add(queue.Take());
}
Assert.AreEqual(4, events.Count);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
deleted file mode 100644
index 30ff487..0000000
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableString.cs
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.Remote;
-
-namespace Org.Apache.REEF.Wake.Tests
-{
- /// <summary>
- /// Writable wrapper around the string class
- /// </summary>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public class WritableString : IWritable
- {
- /// <summary>
- /// Returns the actual string data
- /// </summary>
- public string Data { get; set; }
-
- /// <summary>
- /// Empty constructor for instantiation with reflection
- /// </summary>
- [Inject]
- public WritableString()
- {
- }
-
- /// <summary>
- /// Constructor
- /// </summary>
- /// <param name="data">The string data</param>
- public WritableString(string data)
- {
- Data = data;
- }
-
- /// <summary>
- /// Reads the string
- /// </summary>
- /// <param name="reader">reader to read from</param>
- public void Read(IDataReader reader)
- {
- Data = reader.ReadString();
- }
-
- /// <summary>
- /// Writes the string
- /// </summary>
- /// <param name="writer">Writer to write</param>
- public void Write(IDataWriter writer)
- {
- writer.WriteString(Data);
- }
-
- /// <summary>
- /// Reads the string
- /// </summary>
- /// <param name="reader">reader to read from</param>
- /// <param name="token">the cancellation token</param>
- public async Task ReadAsync(IDataReader reader, CancellationToken token)
- {
- Data = await reader.ReadStringAsync(token);
- }
-
- /// <summary>
- /// Writes the string
- /// </summary>
- /// <param name="writer">Writer to write</param>
- /// <param name="token">the cancellation token</param>
- public async Task WriteAsync(IDataWriter writer, CancellationToken token)
- {
- await writer.WriteStringAsync(Data, token);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index 4069d15..5767f8a 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -49,7 +49,6 @@ under the License.
<Compile Include="IIdentifier.cs" />
<Compile Include="IIdentifierFactory.cs" />
<Compile Include="Remote\Impl\RemoteEventStreamingCodec.cs" />
- <Compile Include="Remote\Impl\TemporaryWritableToStreamingCodec.cs" />
<Compile Include="Remote\Impl\StreamingRemoteManagerFactory.cs" />
<Compile Include="Remote\Impl\DefaultRemoteManagerFactory.cs" />
<Compile Include="Impl\LoggingEventHandler.cs" />
@@ -78,7 +77,6 @@ under the License.
<Compile Include="Remote\Impl\StreamingTransportClient.cs" />
<Compile Include="Remote\Impl\StreamingTransportServer.cs" />
<Compile Include="Remote\IRemoteManagerFactory.cs" />
- <Compile Include="Remote\IWritable.cs" />
<Compile Include="Remote\Proto\WakeRemoteProtosGen.cs" />
<Compile Include="Remote\ICodec.cs" />
<Compile Include="Remote\ICodecFactory.cs" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs
deleted file mode 100644
index 644cf82..0000000
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/IWritable.cs
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using System;
-using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Org.Apache.REEF.Wake.Remote
-{
- /// <summary>
- /// Interface that classes should implement if they need to be readable to and writable
- /// from the stream. It is assumed that the classes inheriting this interface will have a
- /// default empty constructor
- /// </summary>
- [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)]
- public interface IWritable
- {
- /// <summary>
- /// Read the class fields.
- /// </summary>
- /// <param name="reader">The reader from which to read </param>
- void Read(IDataReader reader);
-
- /// <summary>
- /// Writes the class fields.
- /// </summary>
- /// <param name="writer">The writer to which to write</param>
- void Write(IDataWriter writer);
-
- /// <summary>
- /// Read the class fields.
- /// </summary>
- /// <param name="reader">The reader from which to read </param>
- /// <param name="token">The cancellation token</param>
- Task ReadAsync(IDataReader reader, CancellationToken token);
-
- /// <summary>
- /// Writes the class fields.
- /// </summary>
- /// <param name="writer">The writer to which to write</param>
- /// <param name="token">The cancellation token</param>
- Task WriteAsync(IDataWriter writer, CancellationToken token);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8505dee9/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
index 90e3aca..da549ea 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingRemoteManagerFactory.cs
@@ -20,6 +20,7 @@
using System.Net;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.StreamingCodec;
namespace Org.Apache.REEF.Wake.Remote.Impl
{
@@ -38,12 +39,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
_injector = injector;
}
- //ToDo: The port argument will be removed once changes are made in WritableNetworkService [REEF-447]
- public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable
+ public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, IStreamingCodec<T> codec)
{
#pragma warning disable 618
// This is the one place allowed to call this constructor. Hence, disabling the warning is OK.
- var codec = _injector.GetInstance<TemporaryWritableToStreamingCodec<T>>();
return new StreamingRemoteManager<T>(localAddress, _tcpPortProvider, codec);
#pragma warning disable 618
}