You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2017/07/22 01:46:05 UTC

reef git commit: [REEF-1835] Use stricter type constraints in Avro ProtocolSerializer

Repository: reef
Updated Branches:
  refs/heads/master 5abb8b13a -> 184e4d954


[REEF-1835] Use stricter type constraints in Avro ProtocolSerializer

  * Also, minor refactoring in `ProtocolSerializer` and unit tests

JIRA: [REEF-1835](https://issues.apache.org/jira/browse/REEF-1835)

Pull request:
  This closes #1337


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/184e4d95
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/184e4d95
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/184e4d95

Branch: refs/heads/master
Commit: 184e4d9541da9016a1ed08a4fb084bbba02de984
Parents: 5abb8b1
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Wed Jul 19 18:32:54 2017 -0700
Committer: Doug Service <do...@apache.org>
Committed: Sat Jul 22 01:44:34 2017 +0000

----------------------------------------------------------------------
 .../ProtocolSerializerTest.cs                   | 36 +++++------
 .../Avro/IMessageInstance.cs                    | 36 +++++++++++
 .../Avro/MessageInstance.cs                     | 27 +++++---
 .../Avro/ProtocolSerializer.cs                  | 65 +++++++++++++-------
 .../Org.Apache.REEF.Wake.csproj                 |  3 +-
 5 files changed, 117 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/184e4d95/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
index 8f4a1ca..cf3ac02 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
@@ -31,21 +31,19 @@ namespace Org.Apache.REEF.Wake.Tests
     /// <summary>
     /// Observer to receive and verify test message contents.
     /// </summary>
-    internal sealed class TestMessageObserver : IObserver<MessageInstance<AvroTestMessage>>
+    internal sealed class TestMessageObserver : IObserver<IMessageInstance<AvroTestMessage>>
     {
-        int number;
-        string data;
+        private readonly IMessageInstance<AvroTestMessage> messageInstance;
 
-        public TestMessageObserver(int number, string data)
+        public TestMessageObserver(long seq, AvroTestMessage msg)
         {
-            this.number = number;
-            this.data = data;
+            messageInstance = new MessageInstance<AvroTestMessage>(seq, msg);
         }
 
-        public void OnNext(MessageInstance<AvroTestMessage> instance)
+        public void OnNext(IMessageInstance<AvroTestMessage> otherMessageInstance)
         {
-            Assert.Equal(instance.message.number, this.number);
-            Assert.Equal(instance.message.data, this.data);
+            Assert.Equal(messageInstance.Message.number, otherMessageInstance.Message.number);
+            Assert.Equal(messageInstance.Message.data, otherMessageInstance.Message.data);
         }
 
         public void OnError(Exception error)
@@ -59,8 +57,7 @@ namespace Org.Apache.REEF.Wake.Tests
         }
     }
 
-    [Collection("FunctionalTests")]
-    public class TestProtocolSerializer
+    public sealed class TestProtocolSerializer
     {
         /// <summary>
         /// Setup two way communication between two remote managers through the loopback
@@ -87,22 +84,23 @@ namespace Org.Apache.REEF.Wake.Tests
             {
                 // Register observers for remote manager 1 and remote manager 2
                 var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
-                var observer1 = Observer.Create<byte[]>(queue1.Add);
-                var observer2 = Observer.Create<byte[]>(queue2.Add);
-                remoteManager1.RegisterObserver(remoteEndpoint, observer1);
-                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+                remoteManager1.RegisterObserver(remoteEndpoint, Observer.Create<byte[]>(queue1.Add));
+                remoteManager2.RegisterObserver(remoteEndpoint, Observer.Create<byte[]>(queue2.Add));
+
+                var msg1 = new AvroTestMessage(numbers[0], strings[0]);
+                var msg2 = new AvroTestMessage(numbers[1], strings[1]);
 
                 // Remote manager 1 sends avro message to remote manager 2
                 var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
-                remoteObserver1.OnNext(serializer.Write(new AvroTestMessage(numbers[0], strings[0]), 1));
+                remoteObserver1.OnNext(serializer.Write(msg1, 1));
 
                 // Remote manager 2 sends avro message to remote manager 1
                 var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
-                remoteObserver2.OnNext(serializer.Write(new AvroTestMessage(numbers[1], strings[1]), 2));
+                remoteObserver2.OnNext(serializer.Write(msg2, 2));
 
                 // Verify the messages are properly received.
-                serializer.Read(queue1.Take(), new TestMessageObserver(numbers[1], strings[1]));
-                serializer.Read(queue2.Take(), new TestMessageObserver(numbers[0], strings[0]));
+                serializer.Read(queue1.Take(), new TestMessageObserver(2, msg2));
+                serializer.Read(queue2.Take(), new TestMessageObserver(1, msg1));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/184e4d95/lang/cs/Org.Apache.REEF.Wake/Avro/IMessageInstance.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/IMessageInstance.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/IMessageInstance.cs
new file mode 100755
index 0000000..1060d6f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/IMessageInstance.cs
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+namespace Org.Apache.REEF.Wake.Avro
+{
+    /// <summary>
+    /// An interface to a readonly pair of (message, sequence number).
+    /// </summary>
+    /// <typeparam name="T">Message payload type.</typeparam>
+    public interface IMessageInstance<out T>
+    {
+        /// <summary>
+        /// Get the sequence number of a message.
+        /// </summary>
+        long Sequence { get; }
+
+        /// <summary>
+        /// Return the data payload of message instance.
+        /// </summary>
+        T Message { get; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/184e4d95/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
index 96ac8a4..8560f1d 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
@@ -18,19 +18,30 @@
 namespace Org.Apache.REEF.Wake.Avro
 {
     /// <summary>
-    /// Wrapper class to bind a specific instance of an Avro messagage
-    /// with it associated sequence number.
+    /// Wrapper class to bind a specific instance of a message with the associated sequence number.
     /// </summary>
-    /// <typeparam name="T"></typeparam>
-    public struct MessageInstance<T>
+    /// <typeparam name="T">Message payload type.</typeparam>
+    public sealed class MessageInstance<T> : IMessageInstance<T>
     {
-        public long sequence;
-        public T message;
+        /// <summary>
+        /// Get the sequence number of a message.
+        /// </summary>
+        public long Sequence { get; private set; }
 
+        /// <summary>
+        /// Return the data payload of message instance.
+        /// </summary>
+        public T Message { get; private set; }
+
+        /// <summary>
+        /// Create a new instance of the (sequence number, message payload) pair.
+        /// </summary>
+        /// <param name="sequence">The message sequence number.</param>
+        /// <param name="message">The message payload.</param>
         public MessageInstance(long sequence, T message)
         {
-            this.sequence = sequence;
-            this.message = message;
+            Sequence = sequence;
+            Message = message;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/184e4d95/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
index dff30b1..4adcc88 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
@@ -36,36 +36,52 @@ namespace Org.Apache.REEF.Wake.Avro
     {
         private static readonly Logger Logr = Logger.GetLogger(typeof(ProtocolSerializer));
 
-        // Delagates for message serializers and deserializers.
+        /// <summary>
+        /// Delegate for message serializer.
+        /// </summary>
         private delegate void Serialize(MemoryStream stream, object message);
+
+        /// <summary>
+        /// Delegate for message deserializer.
+        /// </summary>
+        /// <param name="observer">Must be of type IObserver&lt;IMessageInstance&lt;?&gt;</param>
         private delegate void Deserialize(MemoryStream stream, object observer, long sequence);
 
-        // Message type to serialize/derserialize delagate.
-        private readonly SortedDictionary<string, Serialize> serializeMap = new SortedDictionary<string, Serialize>();
-        private readonly SortedDictionary<string, Deserialize> deserializeMap = new SortedDictionary<string, Deserialize>();
+        /// <summary>
+        /// Map from message type (a string with the message class name) to serializer.
+        /// </summary>
+        private readonly SortedDictionary<string, Serialize>
+            serializeMap = new SortedDictionary<string, Serialize>();
+
+        /// <summary>
+        /// Map from message type (a string with the message class name) to deserializer.
+        /// </summary>
+        private readonly SortedDictionary<string, Deserialize>
+            deserializeMap = new SortedDictionary<string, Deserialize>();
 
         private readonly IAvroSerializer<Header> headerSerializer = AvroSerializer.Create<Header>();
 
         /// <summary>
+        /// Non-generic reflection record for the Register() method of this class. A constant.
+        /// </summary>
+        private static readonly MethodInfo RegisterMethodInfo =
+            typeof(ProtocolSerializer).GetMethod("Register", BindingFlags.Instance | BindingFlags.NonPublic);
+
+        /// <summary>
         /// Register all of the protocol messages using reflection.
         /// </summary>
-        /// <param name="assembly">The Assembley object which contains the namespace of the message classes.</param>
+        /// <param name="assembly">The Assembly object which contains the namespace of the message classes.</param>
         /// <param name="messageNamespace">A string which contains the namespace the protocol messages.</param>
         public ProtocolSerializer(Assembly assembly, string messageNamespace)
         {
-            MethodInfo registerInfo = typeof(ProtocolSerializer).GetMethod("Register", BindingFlags.Instance | BindingFlags.NonPublic);
-            MethodInfo genericInfo;
-
-            Logr.Log(Level.Info, "Retrieving types for assembly: {0}", assembly.FullName);
-            List<Type> types = new List<Type>(assembly.GetTypes());
-            types.Add(typeof(Header));
+            Logr.Log(Level.Verbose, "Retrieving types for assembly: {0}", assembly.FullName);
 
+            var types = new List<Type>(assembly.GetTypes()) { typeof(Header) };
             foreach (Type type in types)
             {
-                string name = type.FullName;
-                if (name.StartsWith(messageNamespace))
+                if (type.FullName.StartsWith(messageNamespace))
                 {
-                    genericInfo = registerInfo.MakeGenericMethod(new[] { type });
+                    MethodInfo genericInfo = RegisterMethodInfo.MakeGenericMethod(new[] { type });
                     genericInfo.Invoke(this, null);
                 }
             }
@@ -77,7 +93,8 @@ namespace Org.Apache.REEF.Wake.Avro
         /// <typeparam name="TMessage">The class type of the message being registered.</typeparam>
         internal void Register<TMessage>()
         {
-            Logr.Log(Level.Info, "Registering message type: {0} {1}", typeof(TMessage).FullName, typeof(TMessage).Name);
+            Logr.Log(Level.Info, "Registering message type: {0} {1}",
+                typeof(TMessage).FullName, typeof(TMessage).Name);
 
             IAvroSerializer<TMessage> messageSerializer = AvroSerializer.Create<TMessage>();
             Serialize serialize = (MemoryStream stream, object message) =>
@@ -89,7 +106,7 @@ namespace Org.Apache.REEF.Wake.Avro
             Deserialize deserialize = (MemoryStream stream, object observer, long sequence) =>
             {
                 TMessage message = messageSerializer.Deserialize(stream);
-                IObserver<MessageInstance<TMessage>> msgObserver = observer as IObserver<MessageInstance<TMessage>>;
+                var msgObserver = observer as IObserver<IMessageInstance<TMessage>>;
                 if (msgObserver != null)
                 {
                     msgObserver.OnNext(new MessageInstance<TMessage>(sequence, message));
@@ -106,7 +123,8 @@ namespace Org.Apache.REEF.Wake.Avro
         /// Serialize the input message and return a byte array.
         /// </summary>
         /// <param name="message">An object reference to the messeage to be serialized</param>
-        /// <param name="sequence">A long which cotains the higher level protocols sequence number for the message.</param>
+        /// <param name="sequence">
+        /// A long which cotains the higher level protocols sequence number for the message.</param>
         /// <returns>A byte array containing the serialized header and message.</returns>
         public byte[] Write(object message, long sequence) 
         {
@@ -126,7 +144,8 @@ namespace Org.Apache.REEF.Wake.Avro
                     }
                     else
                     {
-                        throw new SeializationException("Request to serialize unknown message type: " + name);
+                        throw new SeializationException(
+                            "Request to serialize unknown message type: " + name);
                     }
                     return stream.GetBuffer();
                 }
@@ -141,9 +160,10 @@ namespace Org.Apache.REEF.Wake.Avro
         /// <summary>
         /// Read a message from the input byte array.
         /// </summary>
-        /// <param name="data">The byte array containing a header message and message to be deserialized.</param>
-        /// <param name="observer">An object which implements the IObserver<> interface for the message being deserialized.</param>
-        public void Read(byte[] data, object observer)
+        /// <param name="data">Byte array containing a header message and message to be deserialized.</param>
+        /// <param name="observer">An object which implements the IObserver<>
+        /// interface for the message being deserialized.</param>
+        public void Read<T>(byte[] data, IObserver<IMessageInstance<T>> observer)
         {
             try
             {
@@ -157,7 +177,8 @@ namespace Org.Apache.REEF.Wake.Avro
                     }
                     else
                     {
-                        throw new SeializationException("Request to deserialize unknown message type: " + head.className);
+                        throw new SeializationException(
+                            "Request to deserialize unknown message type: " + head.className);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/reef/blob/184e4d95/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index ea76e5d..6e75ff3 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="utf-8"?>
+<?xml version="1.0" encoding="utf-8"?>
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -62,6 +62,7 @@ under the License.
       <Link>Properties\SharedAssemblyInfo.cs</Link>
     </Compile>
     <Compile Include="AbstractEStage.cs" />
+    <Compile Include="Avro\IMessageInstance.cs" />
     <Compile Include="Avro\MessageInstance.cs" />
     <Compile Include="Avro\Message\Header.cs" />
     <Compile Include="Avro\ProtocolSerializer.cs" />