You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2018/03/10 01:33:17 UTC
[08/28] reef git commit: [REEF-1936] Use dependency injection in C#
ProtocolSerializer
[REEF-1936] Use dependency injection in C# ProtocolSerializer
Summary of changes:
* Make `ProtocolSerializer` constructor injectible
* Create named parameters for the constructor's input
* Fix the `ProtocolSerializerTest` unit tests to use injection
* Bugfix: register the `Header` protocol object regardless of the namespace
* bugfix for the future PR: relax the type constraints in `ProtocolSerializer.Read()` to make injection easier for the observers.
* Minor fixes to make code compliant with REEF condig standards and naming conventions
* Minor refactoring
JIRA:
[REEF-1936](https://issues.apache.org/jira/browse/REEF-1936)
Pull Request:
Closes #1402
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/94af38d3
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/94af38d3
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/94af38d3
Branch: refs/heads/REEF-335
Commit: 94af38d363b5c8ea4fef0d920082113523442b8a
Parents: eb522e9
Author: Sergiy Matusevych <mo...@apache.com>
Authored: Thu Oct 26 15:21:14 2017 -0700
Committer: Doug Service <do...@apache.org>
Committed: Thu Nov 2 23:16:44 2017 +0000
----------------------------------------------------------------------
.../ProtocolSerializerTest.cs | 34 ++++++---
.../Avro/ProtocolSerializer.cs | 78 +++++++++++++-------
2 files changed, 72 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/94af38d3/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
index cf3ac02..574f522 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
@@ -25,6 +25,7 @@ using Org.Apache.REEF.Wake.Remote.Impl;
using Org.Apache.REEF.Tang.Implementations.Tang;
using org.apache.reef.wake.tests.message;
using Xunit;
+using Org.Apache.REEF.Tang.Interface;
namespace Org.Apache.REEF.Wake.Tests
{
@@ -33,32 +34,36 @@ namespace Org.Apache.REEF.Wake.Tests
/// </summary>
internal sealed class TestMessageObserver : IObserver<IMessageInstance<AvroTestMessage>>
{
- private readonly IMessageInstance<AvroTestMessage> messageInstance;
+ private readonly IMessageInstance<AvroTestMessage> _messageInstance;
public TestMessageObserver(long seq, AvroTestMessage msg)
{
- messageInstance = new MessageInstance<AvroTestMessage>(seq, msg);
+ _messageInstance = new MessageInstance<AvroTestMessage>(seq, msg);
}
public void OnNext(IMessageInstance<AvroTestMessage> otherMessageInstance)
{
- Assert.Equal(messageInstance.Message.number, otherMessageInstance.Message.number);
- Assert.Equal(messageInstance.Message.data, otherMessageInstance.Message.data);
+ Assert.Equal(_messageInstance.Message.number, otherMessageInstance.Message.number);
+ Assert.Equal(_messageInstance.Message.data, otherMessageInstance.Message.data);
}
public void OnError(Exception error)
{
- throw new NotImplementedException();
+ throw new NotImplementedException("This method should never be called");
}
public void OnCompleted()
{
- throw new NotImplementedException();
+ throw new NotImplementedException("This method should never be called");
}
}
public sealed class TestProtocolSerializer
{
+ private static readonly ITang Tang = TangFactory.GetTang();
+ private static readonly IPAddress ListeningAddress = IPAddress.Parse("127.0.0.1");
+ private static readonly ByteCodec Codec = new ByteCodec();
+
/// <summary>
/// Setup two way communication between two remote managers through the loopback
/// network and verify that Avro messages are properly serialized and deserialzied
@@ -72,18 +77,23 @@ namespace Org.Apache.REEF.Wake.Tests
int[] numbers = { 12, 25 };
string[] strings = { "The first string", "The second string" };
- IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
BlockingCollection<byte[]> queue1 = new BlockingCollection<byte[]>();
BlockingCollection<byte[]> queue2 = new BlockingCollection<byte[]>();
- ProtocolSerializer serializer = new ProtocolSerializer(this.GetType().Assembly, "org.apache.reef.wake.tests.message");
- IRemoteManagerFactory _remoteManagerFactory = TangFactory.GetTang().NewInjector().GetInstance<IRemoteManagerFactory>();
+ IConfiguration config = Tang.NewConfigurationBuilder()
+ .BindStringNamedParam<ProtocolSerializer.AssemblyName>(this.GetType().Assembly.FullName)
+ .BindStringNamedParam<ProtocolSerializer.MessageNamespace>("org.apache.reef.wake.tests.message")
+ .Build();
+
+ var injector = Tang.NewInjector(config);
+ var remoteManagerFactory = injector.GetInstance<IRemoteManagerFactory>();
+ var serializer = injector.GetInstance<ProtocolSerializer>();
- using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, new ByteCodec()))
- using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, new ByteCodec()))
+ using (var remoteManager1 = remoteManagerFactory.GetInstance(ListeningAddress, Codec))
+ using (var remoteManager2 = remoteManagerFactory.GetInstance(ListeningAddress, Codec))
{
// Register observers for remote manager 1 and remote manager 2
- var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var remoteEndpoint = new IPEndPoint(ListeningAddress, 0);
remoteManager1.RegisterObserver(remoteEndpoint, Observer.Create<byte[]>(queue1.Add));
remoteManager2.RegisterObserver(remoteEndpoint, Observer.Create<byte[]>(queue2.Add));
http://git-wip-us.apache.org/repos/asf/reef/blob/94af38d3/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
index 4adcc88..df93138 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
@@ -22,6 +22,7 @@ using System.Reflection;
using Microsoft.Hadoop.Avro;
using Org.Apache.REEF.Utilities.Logging;
using org.apache.reef.wake.avro.message;
+using Org.Apache.REEF.Tang.Annotations;
namespace Org.Apache.REEF.Wake.Avro
{
@@ -34,7 +35,17 @@ namespace Org.Apache.REEF.Wake.Avro
/// </summary>
public sealed class ProtocolSerializer
{
- private static readonly Logger Logr = Logger.GetLogger(typeof(ProtocolSerializer));
+ [NamedParameter("Name of the assembly that contains serializable classes.")]
+ public class AssemblyName : Name<string>
+ {
+ }
+
+ [NamedParameter("Package name to search for serializabe classes.")]
+ public class MessageNamespace : Name<string>
+ {
+ }
+
+ private static readonly Logger Log = Logger.GetLogger(typeof(ProtocolSerializer));
/// <summary>
/// Delegate for message serializer.
@@ -50,16 +61,14 @@ namespace Org.Apache.REEF.Wake.Avro
/// <summary>
/// Map from message type (a string with the message class name) to serializer.
/// </summary>
- private readonly SortedDictionary<string, Serialize>
- serializeMap = new SortedDictionary<string, Serialize>();
+ private readonly IDictionary<string, Serialize> _serializeMap = new Dictionary<string, Serialize>();
/// <summary>
/// Map from message type (a string with the message class name) to deserializer.
/// </summary>
- private readonly SortedDictionary<string, Deserialize>
- deserializeMap = new SortedDictionary<string, Deserialize>();
+ private readonly IDictionary<string, Deserialize> _deserializeMap = new Dictionary<string, Deserialize>();
- private readonly IAvroSerializer<Header> headerSerializer = AvroSerializer.Create<Header>();
+ private readonly IAvroSerializer<Header> _headerSerializer = AvroSerializer.Create<Header>();
/// <summary>
/// Non-generic reflection record for the Register() method of this class. A constant.
@@ -68,40 +77,50 @@ namespace Org.Apache.REEF.Wake.Avro
typeof(ProtocolSerializer).GetMethod("Register", BindingFlags.Instance | BindingFlags.NonPublic);
/// <summary>
- /// Register all of the protocol messages using reflection.
+ /// Construct an initialized protocol serializer.
/// </summary>
- /// <param name="assembly">The Assembly object which contains the namespace of the message classes.</param>
+ /// <param name="assemblyName">The full name of the assembly
+ /// which contains the namespace of the message classes.</param>
/// <param name="messageNamespace">A string which contains the namespace the protocol messages.</param>
- public ProtocolSerializer(Assembly assembly, string messageNamespace)
+ [Inject]
+ public ProtocolSerializer(
+ [Parameter(typeof(AssemblyName))] string assemblyName,
+ [Parameter(typeof(MessageNamespace))] string messageNamespace)
{
- Logr.Log(Level.Verbose, "Retrieving types for assembly: {0}", assembly.FullName);
+ Log.Log(Level.Info, "Retrieving types for assembly: {0}", assemblyName);
+ Assembly assembly = Assembly.Load(assemblyName);
- var types = new List<Type>(assembly.GetTypes()) { typeof(Header) };
- foreach (Type type in types)
+ CreateProtocolObject(typeof(Header));
+ foreach (Type type in assembly.GetTypes())
{
if (type.FullName.StartsWith(messageNamespace))
{
- MethodInfo genericInfo = RegisterMethodInfo.MakeGenericMethod(new[] { type });
- genericInfo.Invoke(this, null);
+ CreateProtocolObject(type);
}
}
}
+ private void CreateProtocolObject(Type type)
+ {
+ MethodInfo genericInfo = RegisterMethodInfo.MakeGenericMethod(new[] { type });
+ genericInfo.Invoke(this, null);
+ }
+
/// <summary>
/// Generate and store the metadata necessary to serialze and deserialize a specific message type.
/// </summary>
/// <typeparam name="TMessage">The class type of the message being registered.</typeparam>
internal void Register<TMessage>()
{
- Logr.Log(Level.Info, "Registering message type: {0} {1}",
- typeof(TMessage).FullName, typeof(TMessage).Name);
+ string name = typeof(TMessage).FullName;
+ Log.Log(Level.Info, "Registering message type: {0}", name);
IAvroSerializer<TMessage> messageSerializer = AvroSerializer.Create<TMessage>();
Serialize serialize = (MemoryStream stream, object message) =>
{
messageSerializer.Serialize(stream, (TMessage)message);
};
- serializeMap.Add(typeof(TMessage).Name, serialize);
+ _serializeMap.Add(name, serialize);
Deserialize deserialize = (MemoryStream stream, object observer, long sequence) =>
{
@@ -109,14 +128,16 @@ namespace Org.Apache.REEF.Wake.Avro
var msgObserver = observer as IObserver<IMessageInstance<TMessage>>;
if (msgObserver != null)
{
+ Log.Log(Level.Verbose, "Invoking message observer {0} with message {1}", msgObserver, message);
msgObserver.OnNext(new MessageInstance<TMessage>(sequence, message));
}
else
{
- Logr.Log(Level.Warning, "Unhandled message received: {0}", message);
+ Log.Log(Level.Warning, "Unhandled message received: {0}", message);
}
};
- deserializeMap.Add(typeof(TMessage).Name, deserialize);
+
+ _deserializeMap.Add(name, deserialize);
}
/// <summary>
@@ -128,17 +149,17 @@ namespace Org.Apache.REEF.Wake.Avro
/// <returns>A byte array containing the serialized header and message.</returns>
public byte[] Write(object message, long sequence)
{
- string name = message.GetType().Name;
- Logr.Log(Level.Info, "Serializing message: {0}", name);
+ string name = message.GetType().FullName;
+ Log.Log(Level.Verbose, "Serializing message: {0}", name);
try
{
using (MemoryStream stream = new MemoryStream())
{
Header header = new Header(sequence, name);
- headerSerializer.Serialize(stream, header);
+ _headerSerializer.Serialize(stream, header);
Serialize serialize;
- if (serializeMap.TryGetValue(name, out serialize))
+ if (_serializeMap.TryGetValue(name, out serialize))
{
serialize(stream, message);
}
@@ -152,7 +173,7 @@ namespace Org.Apache.REEF.Wake.Avro
}
catch (Exception e)
{
- Logr.Log(Level.Error, "Failure writing message.", e);
+ Log.Log(Level.Error, "Failure writing message.", e);
throw e;
}
}
@@ -163,15 +184,16 @@ namespace Org.Apache.REEF.Wake.Avro
/// <param name="data">Byte array containing a header message and message to be deserialized.</param>
/// <param name="observer">An object which implements the IObserver<>
/// interface for the message being deserialized.</param>
- public void Read<T>(byte[] data, IObserver<IMessageInstance<T>> observer)
+ public void Read(byte[] data, object observer)
{
try
{
using (MemoryStream stream = new MemoryStream(data))
{
- Header head = headerSerializer.Deserialize(stream);
+ Header head = _headerSerializer.Deserialize(stream);
+ Log.Log(Level.Verbose, "Message header {0}", head);
Deserialize deserialize;
- if (deserializeMap.TryGetValue(head.className, out deserialize))
+ if (_deserializeMap.TryGetValue(head.className, out deserialize))
{
deserialize(stream, observer, head.sequence);
}
@@ -184,7 +206,7 @@ namespace Org.Apache.REEF.Wake.Avro
}
catch (Exception e)
{
- Logr.Log(Level.Error, "Failure reading message.", e);
+ Log.Log(Level.Error, "Failure reading message.", e);
throw e;
}
}