You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by do...@apache.org on 2017/10/24 04:58:11 UTC

reef git commit: [REEF-1906] Make ProtocolSerializer java class injectable by Tang

Repository: reef
Updated Branches:
  refs/heads/master 49157eeda -> f8a835751


[REEF-1906] Make ProtocolSerializer java class injectable by Tang

Summary of changes:
   * Add `@Inject` annotation to `ProtocolSerializer` constructor
   * Create a named class `ProtocolSerializerNamespace`
   * Implement static `.getClassId()` method to get unified class ID available to all serialization code
   * Use injection in the `ProtocolSerializerTest` unit tests
   * Minor refactoring and logging improvements in message serialization code and around

JIRA: [REEF-1906](https://issues.apache.org/jira/browse/REEF-1906)

Pull request:
  This closes #1395


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/f8a83575
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/f8a83575
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/f8a83575

Branch: refs/heads/master
Commit: f8a8357518b3a25a6ac8c6ba9031e615696ff326
Parents: 49157ee
Author: Sergiy Matusevych <mo...@apache.com>
Authored: Wed Oct 18 17:10:33 2017 -0700
Committer: Doug Service <do...@apache.org>
Committed: Tue Oct 24 04:56:17 2017 +0000

----------------------------------------------------------------------
 .../reef/wake/avro/ProtocolSerializer.java      | 71 ++++++++++++++------
 .../wake/avro/ProtocolSerializerNamespace.java  | 32 +++++++++
 .../wake/avro/impl/MessageSerializerImpl.java   |  3 +-
 .../reef/wake/impl/MultiObserverImpl.java       | 13 ++--
 .../wake/test/avro/ProtocolSerializerTest.java  | 64 +++++++++---------
 5 files changed, 120 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/f8a83575/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/ProtocolSerializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/ProtocolSerializer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/ProtocolSerializer.java
index ad10d5a..a0c5cff 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/ProtocolSerializer.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/ProtocolSerializer.java
@@ -21,12 +21,15 @@ import org.apache.avro.io.*;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.wake.MultiObserver;
 import org.apache.reef.wake.avro.message.Header;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
@@ -35,6 +38,8 @@ import java.util.logging.Logger;
 import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner;
 import io.github.lukehutch.fastclasspathscanner.scanner.ScanResult;
 
+import javax.inject.Inject;
+
 /**
  * The ProtocolSerializer generates serializers and deserializers for
  * all of the Avro messages contained in a specified package. The name
@@ -43,10 +48,13 @@ import io.github.lukehutch.fastclasspathscanner.scanner.ScanResult;
  * would sit in the org.foo.me.messages package.
  */
 public final class ProtocolSerializer {
+
   private static final Logger LOG = Logger.getLogger(ProtocolSerializer.class.getName());
+
   // Maps for mapping message class names to serializer and deserializer classes.
   private final Map<String, IMessageSerializer> nameToSerializerMap = new HashMap<>();
   private final Map<String, IMessageDeserializer> nameToDeserializerMap = new HashMap<>();
+
   private final SpecificDatumReader<Header> headerReader = new SpecificDatumReader<>(Header.class);
 
   /**
@@ -54,7 +62,10 @@ public final class ProtocolSerializer {
    * @param messagePackage A string which contains the full name of the
    *                       package containing the protocol messages.
    */
-  public ProtocolSerializer(final String messagePackage) {
+  @Inject
+  private ProtocolSerializer(
+      @Parameter(ProtocolSerializerNamespace.class) final String messagePackage) {
+
     // Build a list of the message reflection classes.
     final ScanResult scanResult = new FastClasspathScanner(messagePackage).scan();
     final List<String> scanNames = scanResult.getNamesOfSubclassesOf(SpecificRecordBase.class);
@@ -63,25 +74,33 @@ public final class ProtocolSerializer {
     // Add the header message from the org.apache.reef.wake.avro.message package.
     messageClasses.add(Header.class);
 
-    try {
-      // Register all of the messages in the specified package.
-      for (final Class<?> cls : messageClasses) {
-        this.register(cls);
-      }
-    } catch (final Exception e) {
-      throw new RuntimeException("Message registration failed", e);
+    // Register all of the messages in the specified package.
+    for (final Class<?> cls : messageClasses) {
+      this.register(cls);
     }
   }
 
   /**
+   * Get a canonical string ID of the class. This ID is then used as a key to find
+   * serializer and deserializer of the message payload. We need a separate method
+   * for it to make sure all parties use the same algorithm to get the class ID.
+   * @param clazz class of the message to be serialized/deserialized.
+   * @return canonical string ID of the class.
+   */
+  public static String getClassId(final Class<?> clazz) {
+    return clazz.getCanonicalName();
+  }
+
+  /**
    * Instantiates and adds a message serializer/deserializer for the message.
    * @param msgMetaClass The reflection class for the message.
    * @param <TMessage> The Java type of the message being registered.
    */
   public <TMessage> void register(final Class<TMessage> msgMetaClass) {
-    LOG.log(Level.INFO, "Registering message: {0}", msgMetaClass.getSimpleName());
-    nameToSerializerMap.put(msgMetaClass.getSimpleName(), SerializationFactory.createSerializer(msgMetaClass));
-    nameToDeserializerMap.put(msgMetaClass.getSimpleName(), SerializationFactory.createDeserializer(msgMetaClass));
+    final String classId = getClassId(msgMetaClass);
+    LOG.log(Level.INFO, "Registering message: {0}", classId);
+    this.nameToSerializerMap.put(classId, SerializationFactory.createSerializer(msgMetaClass));
+    this.nameToDeserializerMap.put(classId, SerializationFactory.createDeserializer(msgMetaClass));
   }
 
   /**
@@ -90,18 +109,21 @@ public final class ProtocolSerializer {
    * @param sequence The unique sequence number of the message.
    */
   public byte[] write(final SpecificRecord message, final long sequence) {
+
+    final String classId = getClassId(message.getClass());
     try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
-      final String name = message.getClass().getSimpleName();
-      LOG.log(Level.FINE, "Serializing message: {0}", name);
 
-      final IMessageSerializer serializer = nameToSerializerMap.get(name);
+      LOG.log(Level.FINEST, "Serializing message: {0}", classId);
+
+      final IMessageSerializer serializer = this.nameToSerializerMap.get(classId);
       if (serializer != null) {
         serializer.serialize(outputStream, message, sequence);
       }
 
       return outputStream.toByteArray();
-    } catch (final Exception e) {
-      throw new RuntimeException("Failure writing message: " + message.getClass().getCanonicalName(), e);
+
+    } catch (final IOException e) {
+      throw new RuntimeException("Failure writing message: " + classId, e);
     }
   }
 
@@ -112,24 +134,29 @@ public final class ProtocolSerializer {
    *                 to process the deserialized message.
    */
   public void read(final byte[] messageBytes, final MultiObserver observer) {
+
     try (final InputStream inputStream = new ByteArrayInputStream(messageBytes)) {
+
       // Binary decoder for both the header and the message.
       final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
 
       // Read the header message.
-      final Header header = headerReader.read(null, decoder);
-      LOG.log(Level.FINE, "Deserializing Avro message: {0}", header.getClassName());
+      final Header header = this.headerReader.read(null, decoder);
+      final String classId = header.getClassName().toString();
+      LOG.log(Level.FINEST, "Deserializing Avro message: {0}", classId);
 
       // Get the appropriate deserializer and deserialize the message.
-      final IMessageDeserializer deserializer = nameToDeserializerMap.get(header.getClassName().toString());
+      final IMessageDeserializer deserializer = this.nameToDeserializerMap.get(classId);
       if (deserializer != null) {
         deserializer.deserialize(decoder, observer, header.getSequence());
       } else {
-        throw new RuntimeException("Request to deserialize unknown message type: " +  header.getClassName());
+        throw new RuntimeException("Request to deserialize unknown message type: " + classId);
       }
 
-    } catch (final Exception e) {
-      throw new RuntimeException("Failure reading message: ", e);
+    } catch (final IOException e) {
+      throw new RuntimeException("Failure reading message", e);
+    } catch (final InvocationTargetException | IllegalAccessException e) {
+      throw new RuntimeException("Error deserializing message body", e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/f8a83575/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/ProtocolSerializerNamespace.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/ProtocolSerializerNamespace.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/ProtocolSerializerNamespace.java
new file mode 100644
index 0000000..df0fe35
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/ProtocolSerializerNamespace.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.avro;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * ProtocolSerializer parameter: full name of the package containing protocol messages.
+ */
+@NamedParameter(doc = "full name of the package containing protocol messages",
+    short_name = "protocol_serializer_namespace")
+public final class ProtocolSerializerNamespace implements Name<String> {
+  /** Do not instantiate that class. */
+  private ProtocolSerializerNamespace() { }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/f8a83575/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageSerializerImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageSerializerImpl.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageSerializerImpl.java
index f225cbb..6844d24 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageSerializerImpl.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/avro/impl/MessageSerializerImpl.java
@@ -23,6 +23,7 @@ import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.reef.wake.avro.IMessageSerializer;
+import org.apache.reef.wake.avro.ProtocolSerializer;
 import org.apache.reef.wake.avro.message.Header;
 
 import java.io.ByteArrayOutputStream;
@@ -43,7 +44,7 @@ public class MessageSerializerImpl<TMessage> implements IMessageSerializer {
    * @param msgMetaClass The reflection class for the message.
    */
   public MessageSerializerImpl(final Class<TMessage> msgMetaClass) {
-    this.msgMetaClassName = msgMetaClass.getSimpleName();
+    this.msgMetaClassName = ProtocolSerializer.getClassId(msgMetaClass);
     this.messageWriter = new SpecificDatumWriter<>(msgMetaClass);
   }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/f8a83575/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MultiObserverImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MultiObserverImpl.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MultiObserverImpl.java
index fb3935d..1ca7b05 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MultiObserverImpl.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MultiObserverImpl.java
@@ -31,10 +31,11 @@ import java.util.logging.Logger;
 /**
  * The MultiObserverImpl class uses reflection to discover which onNext()
  * event processing methods are defined and then map events to them.
- * @param <TSubCls> The subclass derived from MultiObserverImpl.
  */
-public abstract class MultiObserverImpl<TSubCls> implements MultiObserver {
+public abstract class MultiObserverImpl implements MultiObserver {
+
   private static final Logger LOG = Logger.getLogger(MultiObserverImpl.class.getName());
+
   private final Map<String, Method> methodMap = new HashMap<>();
 
   /**
@@ -62,8 +63,8 @@ public abstract class MultiObserverImpl<TSubCls> implements MultiObserver {
    * @param <TEvent> The type of the event being processed.
    */
   private <TEvent> void unimplemented(final long identifier, final TEvent event) {
-    LOG.log(Level.INFO, "Unimplemented event: [{0}]: {1}",
-        new String[]{String.valueOf(identifier), event.getClass().getName()});
+    LOG.log(Level.SEVERE, "Unimplemented event: [{0}]: {1}", new Object[] {identifier, event});
+    throw new RuntimeException("Event not supported: " + event);
   }
 
   /**
@@ -74,13 +75,13 @@ public abstract class MultiObserverImpl<TSubCls> implements MultiObserver {
    */
   @Override
   public <TEvent> void onNext(final long identifier, final TEvent event)
-    throws IllegalAccessException, InvocationTargetException {
+      throws IllegalAccessException, InvocationTargetException {
 
     // Get the reflection method for this call.
     final Method onNext = methodMap.get(event.getClass().getName());
     if (onNext != null) {
       // Process the event.
-      onNext.invoke((TSubCls) this, identifier, event);
+      onNext.invoke(this, identifier, event);
     } else {
       // Log the unprocessed event.
       unimplemented(identifier, event);

http://git-wip-us.apache.org/repos/asf/reef/blob/f8a83575/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/ProtocolSerializerTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/ProtocolSerializerTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/ProtocolSerializerTest.java
index 31d142a..33a8c2b 100644
--- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/ProtocolSerializerTest.java
+++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/avro/ProtocolSerializerTest.java
@@ -18,19 +18,18 @@
  */
 package org.apache.reef.wake.test.avro;
 
+import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.avro.ProtocolSerializer;
-import org.apache.reef.wake.impl.LoggingEventHandler;
+import org.apache.reef.wake.avro.ProtocolSerializerNamespace;
 import org.apache.reef.wake.impl.MultiObserverImpl;
 import org.apache.reef.wake.remote.*;
-import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.impl.ByteCodec;
-import org.apache.reef.wake.remote.ports.TcpPortProvider;
 import org.apache.reef.wake.test.avro.message.AvroTestMessage;
 
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -46,17 +45,37 @@ import static org.junit.Assert.assertEquals;
  *  exchanged between two remote manager classes.
  */
 public final class ProtocolSerializerTest {
-  private static final Logger LOG = Logger.getLogger(ProtocolSerializer.class.getName());
+
+  private static final Logger LOG = Logger.getLogger(ProtocolSerializerTest.class.getName());
 
   @Rule
   public final TestName name = new TestName();
 
+  private RemoteManagerFactory remoteManagerFactory;
+  private ProtocolSerializer serializer;
+
+  @Before
+  public void setup() throws InjectionException {
+
+    final Tang tang = Tang.Factory.getTang();
+
+    final Configuration config = tang.newConfigurationBuilder()
+        .bindNamedParameter(ProtocolSerializerNamespace.class, "org.apache.reef.wake.test.avro.message")
+        .build();
+
+    final Injector injector = tang.newInjector(config);
+
+    remoteManagerFactory = injector.getInstance(RemoteManagerFactory.class);
+    serializer = injector.getInstance(ProtocolSerializer.class);
+  }
+
   /**
    * Verify Avro message can be serialized and deserialized
    * between two remote managers.
    */
   @Test
-  public void testProtocolSerializerTest() throws Exception {
+  public void testProtocolSerializerTest() throws InterruptedException {
+
     final int[] numbers = {12, 25};
     final String[] strings = {"The first string", "The second string"};
 
@@ -65,8 +84,8 @@ public final class ProtocolSerializerTest {
     final BlockingQueue<byte[]> queue2 = new LinkedBlockingQueue<>();
 
     // Remote managers for sending and receiving byte messages.
-    final RemoteManager remoteManager1 = getTestRemoteManager("RemoteManagerOne");
-    final RemoteManager remoteManager2 = getTestRemoteManager("RemoteManagerTwo");
+    final RemoteManager remoteManager1 = remoteManagerFactory.getInstance("RemoteManagerOne");
+    final RemoteManager remoteManager2 = remoteManagerFactory.getInstance("RemoteManagerTwo");
 
     // Register message handlers for byte level messages.
     remoteManager1.registerHandler(byte[].class, new ByteMessageObserver(queue1));
@@ -75,8 +94,6 @@ public final class ProtocolSerializerTest {
     final EventHandler<byte[]> sender1 = remoteManager1.getHandler(remoteManager2.getMyIdentifier(), byte[].class);
     final EventHandler<byte[]> sender2 = remoteManager2.getHandler(remoteManager1.getMyIdentifier(), byte[].class);
 
-    final ProtocolSerializer serializer = new ProtocolSerializer("org.apache.reef.wake.test.avro.message");
-
     sender1.onNext(serializer.write(new AvroTestMessage(numbers[0], strings[0]), 1));
     sender2.onNext(serializer.write(new AvroTestMessage(numbers[1], strings[1]), 2));
 
@@ -93,30 +110,8 @@ public final class ProtocolSerializerTest {
     assertEquals(strings[1], avroObserver1.getDataString());
   }
 
-  /**
-   * Build a remote manager on the local IP address with an unused port.
-   * @param identifier The identifier of the remote manager.
-   * @return A RemoteManager instance listing on the local IP address
-   *         with a unique port number.
-   */
-  private RemoteManager getTestRemoteManager(final String identifier) throws InjectionException {
-    final int port = 0;
-    final boolean order = true;
-    final int retries = 3;
-    final int timeOut = 10000;
-
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    final LocalAddressProvider localAddressProvider = injector.getInstance(LocalAddressProvider.class);
-    final TcpPortProvider tcpPortProvider = injector.getInstance(TcpPortProvider.class);
-    final RemoteManagerFactory remoteManagerFactory = injector.getInstance(RemoteManagerFactory.class);
-
-    return remoteManagerFactory.getInstance(
-    identifier, localAddressProvider.getLocalAddress(), port, new ByteCodec(),
-      new LoggingEventHandler<Throwable>(), order, retries, timeOut,
-      localAddressProvider, tcpPortProvider);
-  }
-
   private final class ByteMessageObserver implements EventHandler<RemoteMessage<byte[]>> {
+
     private final BlockingQueue<byte[]> queue;
 
     /**
@@ -138,7 +133,8 @@ public final class ProtocolSerializerTest {
   /**
    * Processes messages from the network remote manager.
    */
-  public final class AvroMessageObserver extends MultiObserverImpl<AvroMessageObserver> {
+  public final class AvroMessageObserver extends MultiObserverImpl {
+
     private int number;
     private String dataString;