You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2017/05/17 16:08:47 UTC

reef git commit: [REEF-1801] Improve logging when sending remote events in Wake

Repository: reef
Updated Branches:
  refs/heads/master c4710a886 -> 4729037be


[REEF-1801] Improve logging when sending remote events in Wake

Summary of changes:
   * Replace `ex.printStackTrace()` calls with proper log messages
   * Minor refactoring for readability and better performance in Wake identifier factory and around
   * Better logging in Wake
   * Other minor refactoring in Wake: implement some `.toString()` etc.

JIRA:
  [REEF-1801](https://issues.apache.org/jira/browse/REEF-1801)

Pull Request:
  This closes #1306


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4729037b
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4729037b
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4729037b

Branch: refs/heads/master
Commit: 4729037be79fc7d5f2f02194cda433497022798c
Parents: c4710a8
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Tue May 16 14:03:45 2017 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Wed May 17 09:06:27 2017 -0700

----------------------------------------------------------------------
 .../common/driver/client/ClientConnection.java  |  2 +-
 .../wake/impl/DefaultIdentifierFactory.java     | 36 ++++++++-----
 .../remote/impl/RemoteSenderEventHandler.java   | 54 ++++++++++----------
 .../transport/netty/LoggingLinkListener.java    |  7 +--
 .../wake/remote/transport/netty/NettyLink.java  | 18 +++----
 5 files changed, 61 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/4729037b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
index 7be8e07..bce2fbe 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
@@ -54,7 +54,7 @@ public final class ClientConnection {
    * @param status
    */
   public synchronized void send(final ReefServiceProtos.JobStatusProto status) {
-    LOG.log(Level.FINEST, "Sending:\n" + status);
+    LOG.log(Level.FINEST, "Sending to client: status={0}", status.getState());
     this.jobStatusHandler.onNext(status);
   }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/4729037b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java
index 951fe5a..47e1ffd 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java
@@ -28,10 +28,12 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 /**
  * Default remote identifier factory that creates a specific remote identifier
- * from a string representation
+ * from a string representation.
  * <p>
  * A string representation is broken into two parts type and type-specific details separated by "://"
  * A remote identifier implementation should implement a constructor that accepts a string.
@@ -39,7 +41,9 @@ import java.util.Map;
  */
 public class DefaultIdentifierFactory implements IdentifierFactory {
 
-  // map between type and remote identifier class
+  private static final Logger LOG = Logger.getLogger(DefaultIdentifierFactory.class.getName());
+
+  /** Map between type and remote identifier class. */
   private final Map<String, Class<? extends Identifier>> typeToClazzMap;
 
   /**
@@ -60,6 +64,8 @@ public class DefaultIdentifierFactory implements IdentifierFactory {
     this.typeToClazzMap = typeToClazzMap;
   }
 
+  private static final Class<?>[] CONSTRUCTOR_ARG_TYPES = {String.class};
+
   /**
    * Creates a new remote identifier instance.
    *
@@ -69,24 +75,28 @@ public class DefaultIdentifierFactory implements IdentifierFactory {
    */
   @Override
   public Identifier getNewInstance(final String str) {
+
     final int index = str.indexOf("://");
     if (index < 0) {
-      throw new RemoteRuntimeException("Invalid name " + str);
+      throw new RemoteRuntimeException("Invalid remote identifier name: " + str);
     }
+
     final String type = str.substring(0, index);
     final Class<? extends Identifier> clazz = typeToClazzMap.get(type);
-    final Class<?>[] argTypes = {String.class};
-    final Constructor<? extends Identifier> constructor;
+
     try {
-      constructor = clazz.getDeclaredConstructor(argTypes);
-      final Object[] args = new Object[1];
-      args[0] = str.substring(index + 3);
-      return constructor.newInstance(args);
-    } catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException |
-             IllegalArgumentException | InvocationTargetException e) {
-      e.printStackTrace();
+
+      final Constructor<? extends Identifier> constructor = clazz.getDeclaredConstructor(CONSTRUCTOR_ARG_TYPES);
+      final Object[] args = new Object[] {str.substring(index + 3)};
+
+      final Identifier instance = constructor.newInstance(args);
+      LOG.log(Level.FINER, "Created new identifier: {0} for {1}", new Object[] {instance, str});
+      return instance;
+
+    } catch (final NoSuchMethodException | SecurityException | InstantiationException
+        | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+      LOG.log(Level.SEVERE, "Cannot create new identifier for: " + str, e);
       throw new RemoteRuntimeException(e);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/4729037b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
index 8e158f9..bdd0879 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
@@ -40,10 +40,11 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> {
 
   private static final Logger LOG = Logger.getLogger(RemoteSenderEventHandler.class.getName());
 
+  private final BlockingQueue<RemoteEvent<T>> queue = new LinkedBlockingQueue<>();
+  private final AtomicReference<Link<byte[]>> linkRef = new AtomicReference<>();
+
   private final RemoteEventEncoder<T> encoder;
   private final Transport transport;
-  private final BlockingQueue<RemoteEvent<T>> queue;
-  private final AtomicReference<Link<byte[]>> linkRef;
   private final ExecutorService executor;
 
   /**
@@ -57,12 +58,15 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> {
     this.encoder = new RemoteEventEncoder<>(encoder);
     this.transport = transport;
     this.executor = executor;
-    this.linkRef = new AtomicReference<>();
-    this.queue = new LinkedBlockingQueue<>();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("RemoteSenderEventHandler: { transport: %s encoder: %s}", this.transport, this.encoder);
   }
 
   void setLink(final Link<byte[]> link) {
-    LOG.log(Level.FINEST, "thread {0} link {1}", new Object[]{Thread.currentThread(), link});
+    LOG.log(Level.FINEST, "thread {0} set link {1}", new Object[] {Thread.currentThread(), link});
     linkRef.compareAndSet(null, link);
     consumeQueue();
   }
@@ -71,12 +75,12 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> {
     try {
       RemoteEvent<T> event;
       while ((event = queue.poll(0, TimeUnit.MICROSECONDS)) != null) {
-        LOG.log(Level.FINEST, "{0}", event);
+        LOG.log(Level.FINEST, "Event: {0}", event);
         linkRef.get().write(encoder.encode(event));
       }
-    } catch (final InterruptedException e) {
-      e.printStackTrace();
-      throw new RemoteRuntimeException(e);
+    } catch (final InterruptedException ex) {
+      LOG.log(Level.SEVERE, "Interrupted", ex);
+      throw new RemoteRuntimeException(ex);
     }
   }
 
@@ -89,6 +93,9 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> {
   @Override
   public void onNext(final RemoteEvent<T> value) {
     try {
+
+      LOG.log(Level.FINEST, "Link: {0} event: {1}", new Object[] {linkRef, value});
+
       if (linkRef.get() == null) {
         queue.add(value);
 
@@ -101,36 +108,30 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> {
 
         final ConnectFutureTask<Link<byte[]>> cf = new ConnectFutureTask<>(
             new ConnectCallable(transport, value.localAddress(), value.remoteAddress()),
-            new ConnectEventHandler<T>(this));
+            new ConnectEventHandler<>(this));
         executor.submit(cf);
+
       } else {
         // encode and write bytes
         // consumeQueue();
-
-        if (LOG.isLoggable(Level.FINEST)) {
-          LOG.log(Level.FINEST, "Send an event from " + linkRef.get().getLocalAddress() + " to " +
-              linkRef.get().getRemoteAddress() + " value " + value);
-        }
+        LOG.log(Level.FINEST, "Send: {0} event: {1}", new Object[] {linkRef, value});
         linkRef.get().write(encoder.encode(value));
       }
-    } catch (final RemoteRuntimeException ex2) {
-      ex2.printStackTrace();
-      throw ex2;
+
+    } catch (final RemoteRuntimeException ex) {
+      LOG.log(Level.SEVERE, "Remote Exception", ex);
+      throw ex;
     }
   }
-
-
 }
 
 class ConnectCallable implements Callable<Link<byte[]>> {
 
   private final Transport transport;
-  private final SocketAddress localAddress;
   private final SocketAddress remoteAddress;
 
   ConnectCallable(final Transport transport, final SocketAddress localAddress, final SocketAddress remoteAddress) {
     this.transport = transport;
-    this.localAddress = localAddress;
     this.remoteAddress = remoteAddress;
   }
 
@@ -144,6 +145,8 @@ class ConnectCallable implements Callable<Link<byte[]>> {
 
 class ConnectEventHandler<T> implements EventHandler<ConnectFutureTask<Link<byte[]>>> {
 
+  private static final Logger LOG = Logger.getLogger(ConnectEventHandler.class.getName());
+
   private final RemoteSenderEventHandler<T> handler;
 
   ConnectEventHandler(final RemoteSenderEventHandler<T> handler) {
@@ -154,10 +157,9 @@ class ConnectEventHandler<T> implements EventHandler<ConnectFutureTask<Link<byte
   public void onNext(final ConnectFutureTask<Link<byte[]>> value) {
     try {
       handler.setLink(value.get());
-    } catch (InterruptedException | ExecutionException e) {
-      e.printStackTrace();
-      throw new RemoteRuntimeException(e);
+    } catch (final InterruptedException | ExecutionException ex) {
+      LOG.log(Level.SEVERE, "Execution Exception", ex);
+      throw new RemoteRuntimeException(ex);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/4729037b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
index 8cd8daf..72ae838 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
@@ -38,9 +38,7 @@ public class LoggingLinkListener<T> implements LinkListener<T> {
    */
   @Override
   public void onSuccess(final T message) {
-    if (LOG.isLoggable(Level.FINEST)) {
-      LOG.log(Level.FINEST, "The message is successfully sent : {0}", new Object[]{message});
-    }
+    LOG.log(Level.FINEST, "Message successfully sent: {0}", message);
   }
 
   /**
@@ -49,8 +47,7 @@ public class LoggingLinkListener<T> implements LinkListener<T> {
   @Override
   public void onException(final Throwable cause, final SocketAddress remoteAddress, final T message) {
     if (LOG.isLoggable(Level.FINEST)) {
-      LOG.log(Level.FINEST, "The message to {0} is failed to be sent. message : {1}, cause : {2}",
-          new Object[]{remoteAddress, message, cause});
+      LOG.log(Level.FINEST, "Error sending message " + message + " to " + remoteAddress, cause);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/4729037b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
index bde515b..3c51f2a 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
@@ -39,7 +39,9 @@ import java.util.logging.Logger;
 public class NettyLink<T> implements Link<T> {
 
   public static final int INT_SIZE = Integer.SIZE / Byte.SIZE;
+
   private static final Logger LOG = Logger.getLogger(NettyLink.class.getName());
+
   private final Channel channel;
   private final Encoder<? super T> encoder;
   private final LinkListener<? super T> listener;
@@ -61,14 +63,12 @@ public class NettyLink<T> implements Link<T> {
    * @param encoder  the encoder
    * @param listener the link listener
    */
-  public NettyLink(final Channel channel,
-                   final Encoder<? super T> encoder, final LinkListener<? super T> listener) {
+  public NettyLink(final Channel channel, final Encoder<? super T> encoder, final LinkListener<? super T> listener) {
     this.channel = channel;
     this.encoder = encoder;
     this.listener = listener;
   }
 
-
   /**
    * Writes the message to this link.
    *
@@ -76,14 +76,10 @@ public class NettyLink<T> implements Link<T> {
    */
   @Override
   public void write(final T message) {
-    LOG.log(Level.FINEST, "write {0} {1}", new Object[]{channel, message});
-    final byte[] allData = encoder.encode(message);
-    // byte[] -> ByteBuf
+    LOG.log(Level.FINEST, "write {0} :: {1}", new Object[] {channel, message});
+    final ChannelFuture future = channel.writeAndFlush(Unpooled.wrappedBuffer(encoder.encode(message)));
     if (listener !=  null) {
-      channel.writeAndFlush(Unpooled.wrappedBuffer(allData))
-          .addListener(new NettyChannelFutureListener<>(message, listener));
-    } else {
-      channel.writeAndFlush(Unpooled.wrappedBuffer(allData));
+      future.addListener(new NettyChannelFutureListener<>(message, listener));
     }
   }
 
@@ -109,7 +105,7 @@ public class NettyLink<T> implements Link<T> {
 
   @Override
   public String toString() {
-    return "localAddr: " + getLocalAddress() + " remoteAddr: " + getRemoteAddress();
+    return "NettyLink: " + channel; // Channel has good .toString() implementation
   }
 }