You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2017/05/17 16:08:47 UTC
reef git commit: [REEF-1801] Improve logging when sending remote
events in Wake
Repository: reef
Updated Branches:
refs/heads/master c4710a886 -> 4729037be
[REEF-1801] Improve logging when sending remote events in Wake
Summary of changes:
* Replace `ex.printStackTrace()` calls with proper log messages
* Minor refactoring for readability and better performance in Wake identifier factory and around
* Better logging in Wake
* Other minor refactoring in Wake: implement some `.toString()` etc.
JIRA:
[REEF-1801](https://issues.apache.org/jira/browse/REEF-1801)
Pull Request:
This closes #1306
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4729037b
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4729037b
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4729037b
Branch: refs/heads/master
Commit: 4729037be79fc7d5f2f02194cda433497022798c
Parents: c4710a8
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Tue May 16 14:03:45 2017 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Wed May 17 09:06:27 2017 -0700
----------------------------------------------------------------------
.../common/driver/client/ClientConnection.java | 2 +-
.../wake/impl/DefaultIdentifierFactory.java | 36 ++++++++-----
.../remote/impl/RemoteSenderEventHandler.java | 54 ++++++++++----------
.../transport/netty/LoggingLinkListener.java | 7 +--
.../wake/remote/transport/netty/NettyLink.java | 18 +++----
5 files changed, 61 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/4729037b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
index 7be8e07..bce2fbe 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
@@ -54,7 +54,7 @@ public final class ClientConnection {
* @param status
*/
public synchronized void send(final ReefServiceProtos.JobStatusProto status) {
- LOG.log(Level.FINEST, "Sending:\n" + status);
+ LOG.log(Level.FINEST, "Sending to client: status={0}", status.getState());
this.jobStatusHandler.onNext(status);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/4729037b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java
index 951fe5a..47e1ffd 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java
@@ -28,10 +28,12 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* Default remote identifier factory that creates a specific remote identifier
- * from a string representation
+ * from a string representation.
* <p>
* A string representation is broken into two parts type and type-specific details separated by "://"
* A remote identifier implementation should implement a constructor that accepts a string.
@@ -39,7 +41,9 @@ import java.util.Map;
*/
public class DefaultIdentifierFactory implements IdentifierFactory {
- // map between type and remote identifier class
+ private static final Logger LOG = Logger.getLogger(DefaultIdentifierFactory.class.getName());
+
+ /** Map between type and remote identifier class. */
private final Map<String, Class<? extends Identifier>> typeToClazzMap;
/**
@@ -60,6 +64,8 @@ public class DefaultIdentifierFactory implements IdentifierFactory {
this.typeToClazzMap = typeToClazzMap;
}
+ private static final Class<?>[] CONSTRUCTOR_ARG_TYPES = {String.class};
+
/**
* Creates a new remote identifier instance.
*
@@ -69,24 +75,28 @@ public class DefaultIdentifierFactory implements IdentifierFactory {
*/
@Override
public Identifier getNewInstance(final String str) {
+
final int index = str.indexOf("://");
if (index < 0) {
- throw new RemoteRuntimeException("Invalid name " + str);
+ throw new RemoteRuntimeException("Invalid remote identifier name: " + str);
}
+
final String type = str.substring(0, index);
final Class<? extends Identifier> clazz = typeToClazzMap.get(type);
- final Class<?>[] argTypes = {String.class};
- final Constructor<? extends Identifier> constructor;
+
try {
- constructor = clazz.getDeclaredConstructor(argTypes);
- final Object[] args = new Object[1];
- args[0] = str.substring(index + 3);
- return constructor.newInstance(args);
- } catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException |
- IllegalArgumentException | InvocationTargetException e) {
- e.printStackTrace();
+
+ final Constructor<? extends Identifier> constructor = clazz.getDeclaredConstructor(CONSTRUCTOR_ARG_TYPES);
+ final Object[] args = new Object[] {str.substring(index + 3)};
+
+ final Identifier instance = constructor.newInstance(args);
+ LOG.log(Level.FINER, "Created new identifier: {0} for {1}", new Object[] {instance, str});
+ return instance;
+
+ } catch (final NoSuchMethodException | SecurityException | InstantiationException
+ | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ LOG.log(Level.SEVERE, "Cannot create new identifier for: " + str, e);
throw new RemoteRuntimeException(e);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/reef/blob/4729037b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
index 8e158f9..bdd0879 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
@@ -40,10 +40,11 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> {
private static final Logger LOG = Logger.getLogger(RemoteSenderEventHandler.class.getName());
+ private final BlockingQueue<RemoteEvent<T>> queue = new LinkedBlockingQueue<>();
+ private final AtomicReference<Link<byte[]>> linkRef = new AtomicReference<>();
+
private final RemoteEventEncoder<T> encoder;
private final Transport transport;
- private final BlockingQueue<RemoteEvent<T>> queue;
- private final AtomicReference<Link<byte[]>> linkRef;
private final ExecutorService executor;
/**
@@ -57,12 +58,15 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> {
this.encoder = new RemoteEventEncoder<>(encoder);
this.transport = transport;
this.executor = executor;
- this.linkRef = new AtomicReference<>();
- this.queue = new LinkedBlockingQueue<>();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("RemoteSenderEventHandler: { transport: %s encoder: %s}", this.transport, this.encoder);
}
void setLink(final Link<byte[]> link) {
- LOG.log(Level.FINEST, "thread {0} link {1}", new Object[]{Thread.currentThread(), link});
+ LOG.log(Level.FINEST, "thread {0} set link {1}", new Object[] {Thread.currentThread(), link});
linkRef.compareAndSet(null, link);
consumeQueue();
}
@@ -71,12 +75,12 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> {
try {
RemoteEvent<T> event;
while ((event = queue.poll(0, TimeUnit.MICROSECONDS)) != null) {
- LOG.log(Level.FINEST, "{0}", event);
+ LOG.log(Level.FINEST, "Event: {0}", event);
linkRef.get().write(encoder.encode(event));
}
- } catch (final InterruptedException e) {
- e.printStackTrace();
- throw new RemoteRuntimeException(e);
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.SEVERE, "Interrupted", ex);
+ throw new RemoteRuntimeException(ex);
}
}
@@ -89,6 +93,9 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> {
@Override
public void onNext(final RemoteEvent<T> value) {
try {
+
+ LOG.log(Level.FINEST, "Link: {0} event: {1}", new Object[] {linkRef, value});
+
if (linkRef.get() == null) {
queue.add(value);
@@ -101,36 +108,30 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> {
final ConnectFutureTask<Link<byte[]>> cf = new ConnectFutureTask<>(
new ConnectCallable(transport, value.localAddress(), value.remoteAddress()),
- new ConnectEventHandler<T>(this));
+ new ConnectEventHandler<>(this));
executor.submit(cf);
+
} else {
// encode and write bytes
// consumeQueue();
-
- if (LOG.isLoggable(Level.FINEST)) {
- LOG.log(Level.FINEST, "Send an event from " + linkRef.get().getLocalAddress() + " to " +
- linkRef.get().getRemoteAddress() + " value " + value);
- }
+ LOG.log(Level.FINEST, "Send: {0} event: {1}", new Object[] {linkRef, value});
linkRef.get().write(encoder.encode(value));
}
- } catch (final RemoteRuntimeException ex2) {
- ex2.printStackTrace();
- throw ex2;
+
+ } catch (final RemoteRuntimeException ex) {
+ LOG.log(Level.SEVERE, "Remote Exception", ex);
+ throw ex;
}
}
-
-
}
class ConnectCallable implements Callable<Link<byte[]>> {
private final Transport transport;
- private final SocketAddress localAddress;
private final SocketAddress remoteAddress;
ConnectCallable(final Transport transport, final SocketAddress localAddress, final SocketAddress remoteAddress) {
this.transport = transport;
- this.localAddress = localAddress;
this.remoteAddress = remoteAddress;
}
@@ -144,6 +145,8 @@ class ConnectCallable implements Callable<Link<byte[]>> {
class ConnectEventHandler<T> implements EventHandler<ConnectFutureTask<Link<byte[]>>> {
+ private static final Logger LOG = Logger.getLogger(ConnectEventHandler.class.getName());
+
private final RemoteSenderEventHandler<T> handler;
ConnectEventHandler(final RemoteSenderEventHandler<T> handler) {
@@ -154,10 +157,9 @@ class ConnectEventHandler<T> implements EventHandler<ConnectFutureTask<Link<byte
public void onNext(final ConnectFutureTask<Link<byte[]>> value) {
try {
handler.setLink(value.get());
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- throw new RemoteRuntimeException(e);
+ } catch (final InterruptedException | ExecutionException ex) {
+ LOG.log(Level.SEVERE, "Execution Exception", ex);
+ throw new RemoteRuntimeException(ex);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/reef/blob/4729037b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
index 8cd8daf..72ae838 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
@@ -38,9 +38,7 @@ public class LoggingLinkListener<T> implements LinkListener<T> {
*/
@Override
public void onSuccess(final T message) {
- if (LOG.isLoggable(Level.FINEST)) {
- LOG.log(Level.FINEST, "The message is successfully sent : {0}", new Object[]{message});
- }
+ LOG.log(Level.FINEST, "Message successfully sent: {0}", message);
}
/**
@@ -49,8 +47,7 @@ public class LoggingLinkListener<T> implements LinkListener<T> {
@Override
public void onException(final Throwable cause, final SocketAddress remoteAddress, final T message) {
if (LOG.isLoggable(Level.FINEST)) {
- LOG.log(Level.FINEST, "The message to {0} is failed to be sent. message : {1}, cause : {2}",
- new Object[]{remoteAddress, message, cause});
+ LOG.log(Level.FINEST, "Error sending message " + message + " to " + remoteAddress, cause);
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/4729037b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
index bde515b..3c51f2a 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
@@ -39,7 +39,9 @@ import java.util.logging.Logger;
public class NettyLink<T> implements Link<T> {
public static final int INT_SIZE = Integer.SIZE / Byte.SIZE;
+
private static final Logger LOG = Logger.getLogger(NettyLink.class.getName());
+
private final Channel channel;
private final Encoder<? super T> encoder;
private final LinkListener<? super T> listener;
@@ -61,14 +63,12 @@ public class NettyLink<T> implements Link<T> {
* @param encoder the encoder
* @param listener the link listener
*/
- public NettyLink(final Channel channel,
- final Encoder<? super T> encoder, final LinkListener<? super T> listener) {
+ public NettyLink(final Channel channel, final Encoder<? super T> encoder, final LinkListener<? super T> listener) {
this.channel = channel;
this.encoder = encoder;
this.listener = listener;
}
-
/**
* Writes the message to this link.
*
@@ -76,14 +76,10 @@ public class NettyLink<T> implements Link<T> {
*/
@Override
public void write(final T message) {
- LOG.log(Level.FINEST, "write {0} {1}", new Object[]{channel, message});
- final byte[] allData = encoder.encode(message);
- // byte[] -> ByteBuf
+ LOG.log(Level.FINEST, "write {0} :: {1}", new Object[] {channel, message});
+ final ChannelFuture future = channel.writeAndFlush(Unpooled.wrappedBuffer(encoder.encode(message)));
if (listener != null) {
- channel.writeAndFlush(Unpooled.wrappedBuffer(allData))
- .addListener(new NettyChannelFutureListener<>(message, listener));
- } else {
- channel.writeAndFlush(Unpooled.wrappedBuffer(allData));
+ future.addListener(new NettyChannelFutureListener<>(message, listener));
}
}
@@ -109,7 +105,7 @@ public class NettyLink<T> implements Link<T> {
@Override
public String toString() {
- return "localAddr: " + getLocalAddress() + " remoteAddr: " + getRemoteAddress();
+ return "NettyLink: " + channel; // Channel has good .toString() implementation
}
}