You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2018/03/10 01:33:17 UTC

[08/28] reef git commit: [REEF-1936] Use dependency injection in C# ProtocolSerializer

[REEF-1936] Use dependency injection in C# ProtocolSerializer

  Summary of changes:
    * Make `ProtocolSerializer` constructor injectible
    * Create named parameters for the constructor's input
    * Fix the `ProtocolSerializerTest` unit tests to use injection
    * Bugfix: register the `Header` protocol object regardless of the namespace
    * bugfix for the future PR: relax the type constraints in `ProtocolSerializer.Read()` to make injection easier for the observers.
    * Minor fixes to make code compliant with REEF condig standards and naming conventions
    * Minor refactoring

JIRA:
  [REEF-1936](https://issues.apache.org/jira/browse/REEF-1936)

Pull Request:
  Closes #1402


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/94af38d3
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/94af38d3
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/94af38d3

Branch: refs/heads/REEF-335
Commit: 94af38d363b5c8ea4fef0d920082113523442b8a
Parents: eb522e9
Author: Sergiy Matusevych <mo...@apache.com>
Authored: Thu Oct 26 15:21:14 2017 -0700
Committer: Doug Service <do...@apache.org>
Committed: Thu Nov 2 23:16:44 2017 +0000

----------------------------------------------------------------------
 .../ProtocolSerializerTest.cs                   | 34 ++++++---
 .../Avro/ProtocolSerializer.cs                  | 78 +++++++++++++-------
 2 files changed, 72 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/94af38d3/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
index cf3ac02..574f522 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
@@ -25,6 +25,7 @@ using Org.Apache.REEF.Wake.Remote.Impl;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using org.apache.reef.wake.tests.message;
 using Xunit;
+using Org.Apache.REEF.Tang.Interface;
 
 namespace Org.Apache.REEF.Wake.Tests
 {
@@ -33,32 +34,36 @@ namespace Org.Apache.REEF.Wake.Tests
     /// </summary>
     internal sealed class TestMessageObserver : IObserver<IMessageInstance<AvroTestMessage>>
     {
-        private readonly IMessageInstance<AvroTestMessage> messageInstance;
+        private readonly IMessageInstance<AvroTestMessage> _messageInstance;
 
         public TestMessageObserver(long seq, AvroTestMessage msg)
         {
-            messageInstance = new MessageInstance<AvroTestMessage>(seq, msg);
+            _messageInstance = new MessageInstance<AvroTestMessage>(seq, msg);
         }
 
         public void OnNext(IMessageInstance<AvroTestMessage> otherMessageInstance)
         {
-            Assert.Equal(messageInstance.Message.number, otherMessageInstance.Message.number);
-            Assert.Equal(messageInstance.Message.data, otherMessageInstance.Message.data);
+            Assert.Equal(_messageInstance.Message.number, otherMessageInstance.Message.number);
+            Assert.Equal(_messageInstance.Message.data, otherMessageInstance.Message.data);
         }
 
         public void OnError(Exception error)
         {
-            throw new NotImplementedException();
+            throw new NotImplementedException("This method should never be called");
         }
 
         public void OnCompleted()
         {
-            throw new NotImplementedException();
+            throw new NotImplementedException("This method should never be called");
         }
     }
 
     public sealed class TestProtocolSerializer
     {
+        private static readonly ITang Tang = TangFactory.GetTang();
+        private static readonly IPAddress ListeningAddress = IPAddress.Parse("127.0.0.1");
+        private static readonly ByteCodec Codec = new ByteCodec();
+
         /// <summary>
         /// Setup two way communication between two remote managers through the loopback
         /// network and verify that Avro messages are properly serialized and deserialzied
@@ -72,18 +77,23 @@ namespace Org.Apache.REEF.Wake.Tests
             int[] numbers = { 12, 25 };
             string[] strings = { "The first string", "The second string" };
 
-            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
             BlockingCollection<byte[]> queue1 = new BlockingCollection<byte[]>();
             BlockingCollection<byte[]> queue2 = new BlockingCollection<byte[]>();
 
-            ProtocolSerializer serializer = new ProtocolSerializer(this.GetType().Assembly, "org.apache.reef.wake.tests.message");
-            IRemoteManagerFactory _remoteManagerFactory = TangFactory.GetTang().NewInjector().GetInstance<IRemoteManagerFactory>();
+            IConfiguration config = Tang.NewConfigurationBuilder()
+                .BindStringNamedParam<ProtocolSerializer.AssemblyName>(this.GetType().Assembly.FullName)
+                .BindStringNamedParam<ProtocolSerializer.MessageNamespace>("org.apache.reef.wake.tests.message")
+                .Build();
+
+            var injector = Tang.NewInjector(config);
+            var remoteManagerFactory = injector.GetInstance<IRemoteManagerFactory>();
+            var serializer = injector.GetInstance<ProtocolSerializer>();
 
-            using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, new ByteCodec()))
-            using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, new ByteCodec()))
+            using (var remoteManager1 = remoteManagerFactory.GetInstance(ListeningAddress, Codec))
+            using (var remoteManager2 = remoteManagerFactory.GetInstance(ListeningAddress, Codec))
             {
                 // Register observers for remote manager 1 and remote manager 2
-                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var remoteEndpoint = new IPEndPoint(ListeningAddress, 0);
                 remoteManager1.RegisterObserver(remoteEndpoint, Observer.Create<byte[]>(queue1.Add));
                 remoteManager2.RegisterObserver(remoteEndpoint, Observer.Create<byte[]>(queue2.Add));
 

http://git-wip-us.apache.org/repos/asf/reef/blob/94af38d3/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
index 4adcc88..df93138 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
@@ -22,6 +22,7 @@ using System.Reflection;
 using Microsoft.Hadoop.Avro;
 using Org.Apache.REEF.Utilities.Logging;
 using org.apache.reef.wake.avro.message;
+using Org.Apache.REEF.Tang.Annotations;
 
 namespace Org.Apache.REEF.Wake.Avro
 {
@@ -34,7 +35,17 @@ namespace Org.Apache.REEF.Wake.Avro
     /// </summary>
     public sealed class ProtocolSerializer
     {
-        private static readonly Logger Logr = Logger.GetLogger(typeof(ProtocolSerializer));
+        [NamedParameter("Name of the assembly that contains serializable classes.")]
+        public class AssemblyName : Name<string>
+        {
+        }
+
+        [NamedParameter("Package name to search for serializabe classes.")]
+        public class MessageNamespace : Name<string>
+        {
+        }
+
+        private static readonly Logger Log = Logger.GetLogger(typeof(ProtocolSerializer));
 
         /// <summary>
         /// Delegate for message serializer.
@@ -50,16 +61,14 @@ namespace Org.Apache.REEF.Wake.Avro
         /// <summary>
         /// Map from message type (a string with the message class name) to serializer.
         /// </summary>
-        private readonly SortedDictionary<string, Serialize>
-            serializeMap = new SortedDictionary<string, Serialize>();
+        private readonly IDictionary<string, Serialize> _serializeMap = new Dictionary<string, Serialize>();
 
         /// <summary>
         /// Map from message type (a string with the message class name) to deserializer.
         /// </summary>
-        private readonly SortedDictionary<string, Deserialize>
-            deserializeMap = new SortedDictionary<string, Deserialize>();
+        private readonly IDictionary<string, Deserialize> _deserializeMap = new Dictionary<string, Deserialize>();
 
-        private readonly IAvroSerializer<Header> headerSerializer = AvroSerializer.Create<Header>();
+        private readonly IAvroSerializer<Header> _headerSerializer = AvroSerializer.Create<Header>();
 
         /// <summary>
         /// Non-generic reflection record for the Register() method of this class. A constant.
@@ -68,40 +77,50 @@ namespace Org.Apache.REEF.Wake.Avro
             typeof(ProtocolSerializer).GetMethod("Register", BindingFlags.Instance | BindingFlags.NonPublic);
 
         /// <summary>
-        /// Register all of the protocol messages using reflection.
+        /// Construct an initialized protocol serializer.
         /// </summary>
-        /// <param name="assembly">The Assembly object which contains the namespace of the message classes.</param>
+        /// <param name="assemblyName">The full name of the assembly
+        /// which contains the namespace of the message classes.</param>
         /// <param name="messageNamespace">A string which contains the namespace the protocol messages.</param>
-        public ProtocolSerializer(Assembly assembly, string messageNamespace)
+        [Inject]
+        public ProtocolSerializer(
+            [Parameter(typeof(AssemblyName))] string assemblyName,
+            [Parameter(typeof(MessageNamespace))] string messageNamespace)
         {
-            Logr.Log(Level.Verbose, "Retrieving types for assembly: {0}", assembly.FullName);
+            Log.Log(Level.Info, "Retrieving types for assembly: {0}", assemblyName);
+            Assembly assembly = Assembly.Load(assemblyName);
 
-            var types = new List<Type>(assembly.GetTypes()) { typeof(Header) };
-            foreach (Type type in types)
+            CreateProtocolObject(typeof(Header));
+            foreach (Type type in assembly.GetTypes())
             {
                 if (type.FullName.StartsWith(messageNamespace))
                 {
-                    MethodInfo genericInfo = RegisterMethodInfo.MakeGenericMethod(new[] { type });
-                    genericInfo.Invoke(this, null);
+                    CreateProtocolObject(type);
                 }
             }
         }
 
+        private void CreateProtocolObject(Type type)
+        {
+            MethodInfo genericInfo = RegisterMethodInfo.MakeGenericMethod(new[] { type });
+            genericInfo.Invoke(this, null);
+        }
+
         /// <summary>
         /// Generate and store the metadata necessary to serialze and deserialize a specific message type.
         /// </summary>
         /// <typeparam name="TMessage">The class type of the message being registered.</typeparam>
         internal void Register<TMessage>()
         {
-            Logr.Log(Level.Info, "Registering message type: {0} {1}",
-                typeof(TMessage).FullName, typeof(TMessage).Name);
+            string name = typeof(TMessage).FullName;
+            Log.Log(Level.Info, "Registering message type: {0}", name);
 
             IAvroSerializer<TMessage> messageSerializer = AvroSerializer.Create<TMessage>();
             Serialize serialize = (MemoryStream stream, object message) =>
             {
                 messageSerializer.Serialize(stream, (TMessage)message);
             };
-            serializeMap.Add(typeof(TMessage).Name, serialize);
+            _serializeMap.Add(name, serialize);
 
             Deserialize deserialize = (MemoryStream stream, object observer, long sequence) =>
             {
@@ -109,14 +128,16 @@ namespace Org.Apache.REEF.Wake.Avro
                 var msgObserver = observer as IObserver<IMessageInstance<TMessage>>;
                 if (msgObserver != null)
                 {
+                    Log.Log(Level.Verbose, "Invoking message observer {0} with message {1}", msgObserver, message);
                     msgObserver.OnNext(new MessageInstance<TMessage>(sequence, message));
                 }
                 else
                 {
-                    Logr.Log(Level.Warning, "Unhandled message received: {0}", message);
+                    Log.Log(Level.Warning, "Unhandled message received: {0}", message);
                 }
             };
-            deserializeMap.Add(typeof(TMessage).Name, deserialize);
+
+            _deserializeMap.Add(name, deserialize);
         }
 
         /// <summary>
@@ -128,17 +149,17 @@ namespace Org.Apache.REEF.Wake.Avro
         /// <returns>A byte array containing the serialized header and message.</returns>
         public byte[] Write(object message, long sequence) 
         {
-            string name = message.GetType().Name;
-            Logr.Log(Level.Info, "Serializing message: {0}", name);
+            string name = message.GetType().FullName;
+            Log.Log(Level.Verbose, "Serializing message: {0}", name);
             try
             { 
                 using (MemoryStream stream = new MemoryStream())
                 {
                     Header header = new Header(sequence, name);
-                    headerSerializer.Serialize(stream, header);
+                    _headerSerializer.Serialize(stream, header);
 
                     Serialize serialize;
-                    if (serializeMap.TryGetValue(name, out serialize))
+                    if (_serializeMap.TryGetValue(name, out serialize))
                     {
                         serialize(stream, message);
                     }
@@ -152,7 +173,7 @@ namespace Org.Apache.REEF.Wake.Avro
             }
             catch (Exception e)
             {
-                Logr.Log(Level.Error, "Failure writing message.", e);
+                Log.Log(Level.Error, "Failure writing message.", e);
                 throw e;
             }
         }
@@ -163,15 +184,16 @@ namespace Org.Apache.REEF.Wake.Avro
         /// <param name="data">Byte array containing a header message and message to be deserialized.</param>
         /// <param name="observer">An object which implements the IObserver<>
         /// interface for the message being deserialized.</param>
-        public void Read<T>(byte[] data, IObserver<IMessageInstance<T>> observer)
+        public void Read(byte[] data, object observer)
         {
             try
             {
                 using (MemoryStream stream = new MemoryStream(data))
                 {
-                    Header head = headerSerializer.Deserialize(stream);
+                    Header head = _headerSerializer.Deserialize(stream);
+                    Log.Log(Level.Verbose, "Message header {0}", head);
                     Deserialize deserialize;
-                    if (deserializeMap.TryGetValue(head.className, out deserialize))
+                    if (_deserializeMap.TryGetValue(head.className, out deserialize))
                     {
                         deserialize(stream, observer, head.sequence);
                     }
@@ -184,7 +206,7 @@ namespace Org.Apache.REEF.Wake.Avro
             }
             catch (Exception e)
             {
-                Logr.Log(Level.Error, "Failure reading message.", e);
+                Log.Log(Level.Error, "Failure reading message.", e);
                 throw e;
             }
         }