You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2017/07/22 01:46:05 UTC
reef git commit: [REEF-1835] Use stricter type constraints in Avro
ProtocolSerializer
Repository: reef
Updated Branches:
refs/heads/master 5abb8b13a -> 184e4d954
[REEF-1835] Use stricter type constraints in Avro ProtocolSerializer
* Also, minor refactoring in `ProtocolSerializer` and unit tests
JIRA: [REEF-1835](https://issues.apache.org/jira/browse/REEF-1835)
Pull request:
This closes #1337
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/184e4d95
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/184e4d95
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/184e4d95
Branch: refs/heads/master
Commit: 184e4d9541da9016a1ed08a4fb084bbba02de984
Parents: 5abb8b1
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Wed Jul 19 18:32:54 2017 -0700
Committer: Doug Service <do...@apache.org>
Committed: Sat Jul 22 01:44:34 2017 +0000
----------------------------------------------------------------------
.../ProtocolSerializerTest.cs | 36 +++++------
.../Avro/IMessageInstance.cs | 36 +++++++++++
.../Avro/MessageInstance.cs | 27 +++++---
.../Avro/ProtocolSerializer.cs | 65 +++++++++++++-------
.../Org.Apache.REEF.Wake.csproj | 3 +-
5 files changed, 117 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/184e4d95/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
index 8f4a1ca..cf3ac02 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
@@ -31,21 +31,19 @@ namespace Org.Apache.REEF.Wake.Tests
/// <summary>
/// Observer to receive and verify test message contents.
/// </summary>
- internal sealed class TestMessageObserver : IObserver<MessageInstance<AvroTestMessage>>
+ internal sealed class TestMessageObserver : IObserver<IMessageInstance<AvroTestMessage>>
{
- int number;
- string data;
+ private readonly IMessageInstance<AvroTestMessage> messageInstance;
- public TestMessageObserver(int number, string data)
+ public TestMessageObserver(long seq, AvroTestMessage msg)
{
- this.number = number;
- this.data = data;
+ messageInstance = new MessageInstance<AvroTestMessage>(seq, msg);
}
- public void OnNext(MessageInstance<AvroTestMessage> instance)
+ public void OnNext(IMessageInstance<AvroTestMessage> otherMessageInstance)
{
- Assert.Equal(instance.message.number, this.number);
- Assert.Equal(instance.message.data, this.data);
+ Assert.Equal(messageInstance.Message.number, otherMessageInstance.Message.number);
+ Assert.Equal(messageInstance.Message.data, otherMessageInstance.Message.data);
}
public void OnError(Exception error)
@@ -59,8 +57,7 @@ namespace Org.Apache.REEF.Wake.Tests
}
}
- [Collection("FunctionalTests")]
- public class TestProtocolSerializer
+ public sealed class TestProtocolSerializer
{
/// <summary>
/// Setup two way communication between two remote managers through the loopback
@@ -87,22 +84,23 @@ namespace Org.Apache.REEF.Wake.Tests
{
// Register observers for remote manager 1 and remote manager 2
var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
- var observer1 = Observer.Create<byte[]>(queue1.Add);
- var observer2 = Observer.Create<byte[]>(queue2.Add);
- remoteManager1.RegisterObserver(remoteEndpoint, observer1);
- remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+ remoteManager1.RegisterObserver(remoteEndpoint, Observer.Create<byte[]>(queue1.Add));
+ remoteManager2.RegisterObserver(remoteEndpoint, Observer.Create<byte[]>(queue2.Add));
+
+ var msg1 = new AvroTestMessage(numbers[0], strings[0]);
+ var msg2 = new AvroTestMessage(numbers[1], strings[1]);
// Remote manager 1 sends avro message to remote manager 2
var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
- remoteObserver1.OnNext(serializer.Write(new AvroTestMessage(numbers[0], strings[0]), 1));
+ remoteObserver1.OnNext(serializer.Write(msg1, 1));
// Remote manager 2 sends avro message to remote manager 1
var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
- remoteObserver2.OnNext(serializer.Write(new AvroTestMessage(numbers[1], strings[1]), 2));
+ remoteObserver2.OnNext(serializer.Write(msg2, 2));
// Verify the messages are properly received.
- serializer.Read(queue1.Take(), new TestMessageObserver(numbers[1], strings[1]));
- serializer.Read(queue2.Take(), new TestMessageObserver(numbers[0], strings[0]));
+ serializer.Read(queue1.Take(), new TestMessageObserver(2, msg2));
+ serializer.Read(queue2.Take(), new TestMessageObserver(1, msg1));
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/184e4d95/lang/cs/Org.Apache.REEF.Wake/Avro/IMessageInstance.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/IMessageInstance.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/IMessageInstance.cs
new file mode 100755
index 0000000..1060d6f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/IMessageInstance.cs
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+namespace Org.Apache.REEF.Wake.Avro
+{
+ /// <summary>
+ /// An interface to a readonly pair of (message, sequence number).
+ /// </summary>
+ /// <typeparam name="T">Message payload type.</typeparam>
+ public interface IMessageInstance<out T>
+ {
+ /// <summary>
+ /// Get the sequence number of a message.
+ /// </summary>
+ long Sequence { get; }
+
+ /// <summary>
+ /// Return the data payload of message instance.
+ /// </summary>
+ T Message { get; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/184e4d95/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
index 96ac8a4..8560f1d 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
@@ -18,19 +18,30 @@
namespace Org.Apache.REEF.Wake.Avro
{
/// <summary>
- /// Wrapper class to bind a specific instance of an Avro messagage
- /// with it associated sequence number.
+ /// Wrapper class to bind a specific instance of a message with the associated sequence number.
/// </summary>
- /// <typeparam name="T"></typeparam>
- public struct MessageInstance<T>
+ /// <typeparam name="T">Message payload type.</typeparam>
+ public sealed class MessageInstance<T> : IMessageInstance<T>
{
- public long sequence;
- public T message;
+ /// <summary>
+ /// Get the sequence number of a message.
+ /// </summary>
+ public long Sequence { get; private set; }
+ /// <summary>
+ /// Return the data payload of message instance.
+ /// </summary>
+ public T Message { get; private set; }
+
+ /// <summary>
+ /// Create a new instance of the (sequence number, message payload) pair.
+ /// </summary>
+ /// <param name="sequence">The message sequence number.</param>
+ /// <param name="message">The message payload.</param>
public MessageInstance(long sequence, T message)
{
- this.sequence = sequence;
- this.message = message;
+ Sequence = sequence;
+ Message = message;
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/184e4d95/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
index dff30b1..4adcc88 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
@@ -36,36 +36,52 @@ namespace Org.Apache.REEF.Wake.Avro
{
private static readonly Logger Logr = Logger.GetLogger(typeof(ProtocolSerializer));
- // Delagates for message serializers and deserializers.
+ /// <summary>
+ /// Delegate for message serializer.
+ /// </summary>
private delegate void Serialize(MemoryStream stream, object message);
+
+ /// <summary>
+ /// Delegate for message deserializer.
+ /// </summary>
+ /// <param name="observer">Must be of type IObserver<IMessageInstance<?></param>
private delegate void Deserialize(MemoryStream stream, object observer, long sequence);
- // Message type to serialize/derserialize delagate.
- private readonly SortedDictionary<string, Serialize> serializeMap = new SortedDictionary<string, Serialize>();
- private readonly SortedDictionary<string, Deserialize> deserializeMap = new SortedDictionary<string, Deserialize>();
+ /// <summary>
+ /// Map from message type (a string with the message class name) to serializer.
+ /// </summary>
+ private readonly SortedDictionary<string, Serialize>
+ serializeMap = new SortedDictionary<string, Serialize>();
+
+ /// <summary>
+ /// Map from message type (a string with the message class name) to deserializer.
+ /// </summary>
+ private readonly SortedDictionary<string, Deserialize>
+ deserializeMap = new SortedDictionary<string, Deserialize>();
private readonly IAvroSerializer<Header> headerSerializer = AvroSerializer.Create<Header>();
/// <summary>
+ /// Non-generic reflection record for the Register() method of this class. A constant.
+ /// </summary>
+ private static readonly MethodInfo RegisterMethodInfo =
+ typeof(ProtocolSerializer).GetMethod("Register", BindingFlags.Instance | BindingFlags.NonPublic);
+
+ /// <summary>
/// Register all of the protocol messages using reflection.
/// </summary>
- /// <param name="assembly">The Assembley object which contains the namespace of the message classes.</param>
+ /// <param name="assembly">The Assembly object which contains the namespace of the message classes.</param>
/// <param name="messageNamespace">A string which contains the namespace the protocol messages.</param>
public ProtocolSerializer(Assembly assembly, string messageNamespace)
{
- MethodInfo registerInfo = typeof(ProtocolSerializer).GetMethod("Register", BindingFlags.Instance | BindingFlags.NonPublic);
- MethodInfo genericInfo;
-
- Logr.Log(Level.Info, "Retrieving types for assembly: {0}", assembly.FullName);
- List<Type> types = new List<Type>(assembly.GetTypes());
- types.Add(typeof(Header));
+ Logr.Log(Level.Verbose, "Retrieving types for assembly: {0}", assembly.FullName);
+ var types = new List<Type>(assembly.GetTypes()) { typeof(Header) };
foreach (Type type in types)
{
- string name = type.FullName;
- if (name.StartsWith(messageNamespace))
+ if (type.FullName.StartsWith(messageNamespace))
{
- genericInfo = registerInfo.MakeGenericMethod(new[] { type });
+ MethodInfo genericInfo = RegisterMethodInfo.MakeGenericMethod(new[] { type });
genericInfo.Invoke(this, null);
}
}
@@ -77,7 +93,8 @@ namespace Org.Apache.REEF.Wake.Avro
/// <typeparam name="TMessage">The class type of the message being registered.</typeparam>
internal void Register<TMessage>()
{
- Logr.Log(Level.Info, "Registering message type: {0} {1}", typeof(TMessage).FullName, typeof(TMessage).Name);
+ Logr.Log(Level.Info, "Registering message type: {0} {1}",
+ typeof(TMessage).FullName, typeof(TMessage).Name);
IAvroSerializer<TMessage> messageSerializer = AvroSerializer.Create<TMessage>();
Serialize serialize = (MemoryStream stream, object message) =>
@@ -89,7 +106,7 @@ namespace Org.Apache.REEF.Wake.Avro
Deserialize deserialize = (MemoryStream stream, object observer, long sequence) =>
{
TMessage message = messageSerializer.Deserialize(stream);
- IObserver<MessageInstance<TMessage>> msgObserver = observer as IObserver<MessageInstance<TMessage>>;
+ var msgObserver = observer as IObserver<IMessageInstance<TMessage>>;
if (msgObserver != null)
{
msgObserver.OnNext(new MessageInstance<TMessage>(sequence, message));
@@ -106,7 +123,8 @@ namespace Org.Apache.REEF.Wake.Avro
/// Serialize the input message and return a byte array.
/// </summary>
/// <param name="message">An object reference to the messeage to be serialized</param>
- /// <param name="sequence">A long which cotains the higher level protocols sequence number for the message.</param>
+ /// <param name="sequence">
+ /// A long which cotains the higher level protocols sequence number for the message.</param>
/// <returns>A byte array containing the serialized header and message.</returns>
public byte[] Write(object message, long sequence)
{
@@ -126,7 +144,8 @@ namespace Org.Apache.REEF.Wake.Avro
}
else
{
- throw new SeializationException("Request to serialize unknown message type: " + name);
+ throw new SeializationException(
+ "Request to serialize unknown message type: " + name);
}
return stream.GetBuffer();
}
@@ -141,9 +160,10 @@ namespace Org.Apache.REEF.Wake.Avro
/// <summary>
/// Read a message from the input byte array.
/// </summary>
- /// <param name="data">The byte array containing a header message and message to be deserialized.</param>
- /// <param name="observer">An object which implements the IObserver<> interface for the message being deserialized.</param>
- public void Read(byte[] data, object observer)
+ /// <param name="data">Byte array containing a header message and message to be deserialized.</param>
+ /// <param name="observer">An object which implements the IObserver<>
+ /// interface for the message being deserialized.</param>
+ public void Read<T>(byte[] data, IObserver<IMessageInstance<T>> observer)
{
try
{
@@ -157,7 +177,8 @@ namespace Org.Apache.REEF.Wake.Avro
}
else
{
- throw new SeializationException("Request to deserialize unknown message type: " + head.className);
+ throw new SeializationException(
+ "Request to deserialize unknown message type: " + head.className);
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/184e4d95/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index ea76e5d..6e75ff3 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="utf-8"?>
+<?xml version="1.0" encoding="utf-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -62,6 +62,7 @@ under the License.
<Link>Properties\SharedAssemblyInfo.cs</Link>
</Compile>
<Compile Include="AbstractEStage.cs" />
+ <Compile Include="Avro\IMessageInstance.cs" />
<Compile Include="Avro\MessageInstance.cs" />
<Compile Include="Avro\Message\Header.cs" />
<Compile Include="Avro\ProtocolSerializer.cs" />