You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/07/30 08:55:17 UTC

git commit: (TWILL-92) Add runnable name to LogEntry

Repository: incubator-twill
Updated Branches:
  refs/heads/master 11804cd2d -> 35a0820ae


(TWILL-92) Add runnable name to LogEntry

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/35a0820a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/35a0820a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/35a0820a

Branch: refs/heads/master
Commit: 35a0820aeb9bb888078fdf71b6afb7eb05c68274
Parents: 11804cd
Author: Sree <sr...@continuuity.com>
Authored: Tue Jul 29 18:24:52 2014 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Jul 29 23:44:04 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/twill/api/logging/LogEntry.java  |  5 +++++
 .../twill/internal/json/ILoggingEventSerializer.java |  6 ++++--
 .../apache/twill/internal/json/LogEntryDecoder.java  |  6 ++++++
 .../apache/twill/internal/logging/KafkaAppender.java | 15 ++++++++++++---
 .../java/org/apache/twill/internal/ServiceMain.java  | 13 +++++++++++++
 .../internal/appmaster/ApplicationMasterMain.java    |  5 +++++
 .../twill/internal/container/TwillContainerMain.java |  5 +++++
 .../org/apache/twill/yarn/LogHandlerTestRun.java     | 12 ++++++++++++
 8 files changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35a0820a/twill-api/src/main/java/org/apache/twill/api/logging/LogEntry.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/logging/LogEntry.java b/twill-api/src/main/java/org/apache/twill/api/logging/LogEntry.java
index eb1bdf8..c840c45 100644
--- a/twill-api/src/main/java/org/apache/twill/api/logging/LogEntry.java
+++ b/twill-api/src/main/java/org/apache/twill/api/logging/LogEntry.java
@@ -85,6 +85,11 @@ public interface LogEntry {
   String getMessage();
 
   /**
+   * Returns the runnable name.
+   */
+  String getRunnableName();
+
+  /**
    * Returns the {@link Throwable} information emitted with the log.
    *
    * @return A {@link LogThrowable} or {@code null} if {@link Throwable} information is not available.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35a0820a/twill-core/src/main/java/org/apache/twill/internal/json/ILoggingEventSerializer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/ILoggingEventSerializer.java b/twill-core/src/main/java/org/apache/twill/internal/json/ILoggingEventSerializer.java
index a3c7add..3c0c475 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/ILoggingEventSerializer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/ILoggingEventSerializer.java
@@ -41,9 +41,11 @@ public final class ILoggingEventSerializer implements JsonSerializer<ILoggingEve
   private final FileOfCallerConverter fileConverter = new FileOfCallerConverter();
   private final LineOfCallerConverter lineConverter = new LineOfCallerConverter();
   private final String hostname;
+  private final String runnableName;
 
-  public ILoggingEventSerializer(String hostname) {
+  public ILoggingEventSerializer(String hostname, String runnableName) {
     this.hostname = hostname;
+    this.runnableName = runnableName;
   }
 
   @Override
@@ -59,7 +61,7 @@ public final class ILoggingEventSerializer implements JsonSerializer<ILoggingEve
     json.addProperty("line", lineConverter.convert(event));
     json.addProperty("thread", event.getThreadName());
     json.addProperty("message", event.getFormattedMessage());
-
+    json.addProperty("runnableName", runnableName);
     if (event.getThrowableProxy() == null) {
       json.add("throwable", JsonNull.INSTANCE);
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35a0820a/twill-core/src/main/java/org/apache/twill/internal/json/LogEntryDecoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/LogEntryDecoder.java b/twill-core/src/main/java/org/apache/twill/internal/json/LogEntryDecoder.java
index b047648..e143f04 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/LogEntryDecoder.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/LogEntryDecoder.java
@@ -52,6 +52,7 @@ public final class LogEntryDecoder implements JsonDeserializer<LogEntry> {
     final String line = JsonUtils.getAsString(jsonObj, "line");
     final String thread = JsonUtils.getAsString(jsonObj, "thread");
     final String message = JsonUtils.getAsString(jsonObj, "message");
+    final String runnableName = JsonUtils.getAsString(jsonObj, "runnableName");
     final LogThrowable logThrowable = context.deserialize(jsonObj.get("throwable"), LogThrowable.class);
 
     return new LogEntry() {
@@ -110,6 +111,11 @@ public final class LogEntryDecoder implements JsonDeserializer<LogEntry> {
       }
 
       @Override
+      public String getRunnableName() {
+        return runnableName;
+      }
+
+      @Override
       public LogThrowable getThrowable() {
         return logThrowable;
       }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35a0820a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
index f344773..174c25d 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
@@ -79,6 +79,7 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
   private KafkaClientService kafkaClient;
   private String zkConnectStr;
   private String hostname;
+  private String runnableName;
   private String topic;
   private Queue<String> buffer;
   private int flushLimit = 20;
@@ -109,6 +110,14 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
   }
 
   /**
+   * Sets the runnableName.
+   */
+  @SuppressWarnings("unused")
+  public void setRunnableName(String runnableName) {
+    this.runnableName = runnableName;
+  }
+
+  /**
    * Sets the topic name for publishing logs. Called by slf4j.
    */
   @SuppressWarnings("unused")
@@ -136,7 +145,7 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
   public void start() {
     Preconditions.checkNotNull(zkConnectStr);
 
-    eventConverter = new LogEventConverter(hostname);
+    eventConverter = new LogEventConverter(hostname, runnableName);
     scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("kafka-logger"));
 
     zkClientService = ZKClientServices.delegate(
@@ -286,11 +295,11 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
 
     private final Gson gson;
 
-    private LogEventConverter(String hostname) {
+    private LogEventConverter(String hostname, String runnableName) {
       gson = new GsonBuilder()
         .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
         .registerTypeAdapter(LogThrowable.class, new LogThrowableCodec())
-        .registerTypeAdapter(ILoggingEvent.class, new ILoggingEventSerializer(hostname))
+        .registerTypeAdapter(ILoggingEvent.class, new ILoggingEventSerializer(hostname, runnableName))
         .create();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35a0820a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index 6f04ed7..ae86f42 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -104,6 +104,8 @@ public abstract class ServiceMain {
 
   protected abstract String getKafkaZKConnect();
 
+  protected abstract String getRunnableName();
+
   /**
    * Returns the {@link Location} for the application based on the env {@link EnvKeys#TWILL_APP_DIR}.
    */
@@ -177,6 +179,7 @@ public abstract class ServiceMain {
       "        <topic>" + Constants.LOG_TOPIC + "</topic>\n" +
       "        <hostname>" + getHostname() + "</hostname>\n" +
       "        <zookeeper>" + getKafkaZKConnect() + "</zookeeper>\n" +
+      appendRunnable() +
       "    </appender>\n" +
       "    <logger name=\"org.apache.twill.internal.logging\" additivity=\"false\" />\n" +
       "    <root level=\"" + rootLevel + "\">\n" +
@@ -185,6 +188,16 @@ public abstract class ServiceMain {
       "</configuration>";
   }
 
+
+  private String appendRunnable() {
+    // RunnableName for AM is null, so append runnable name to log config only if the name is not null.
+    if (getRunnableName() == null) {
+     return "";
+    } else {
+      return "        <runnableName>" + getRunnableName() + "</runnableName>\n";
+    }
+  }
+
   private String getLoggerLevel(Logger logger) {
     if (logger instanceof ch.qos.logback.classic.Logger) {
       return ((ch.qos.logback.classic.Logger) logger).getLevel().toString();

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35a0820a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index a3619d7..914f13f 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -100,4 +100,9 @@ public final class ApplicationMasterMain extends ServiceMain {
   protected String getKafkaZKConnect() {
     return kafkaZKConnect;
   }
+
+  @Override
+  protected String getRunnableName() {
+    return System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35a0820a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 1e2241e..5c9aa45 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -187,4 +187,9 @@ public final class TwillContainerMain extends ServiceMain {
   protected String getKafkaZKConnect() {
     return System.getenv(EnvKeys.TWILL_LOG_KAFKA_ZK);
   }
+
+  @Override
+  protected String getRunnableName() {
+    return System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35a0820a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
index 7e6d78c..683b452 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
@@ -45,11 +45,19 @@ public class LogHandlerTestRun extends BaseYarnTest {
   public void testLogHandler() throws ExecutionException, InterruptedException {
     final CountDownLatch latch = new CountDownLatch(3);
     final Queue<LogThrowable> throwables = new ConcurrentLinkedQueue<LogThrowable>();
+    final Queue<String> runnables = new ConcurrentLinkedQueue<String>();
 
     LogHandler logHandler = new LogHandler() {
       @Override
       public void onLog(LogEntry logEntry) {
         // Would expect logs from AM and the runnable.
+        if (logEntry.getSourceClassName().contains("LogHandlerTestRun")) {
+          runnables.add(logEntry.getRunnableName());
+        }
+        // Runnable name for AM should be null
+        if (logEntry.getSourceClassName().contains("ApplicationMasterService")) {
+          Assert.assertNull(logEntry.getRunnableName());
+        }
         if (logEntry.getMessage().startsWith("Starting runnable " + LogRunnable.class.getSimpleName())) {
           latch.countDown();
         } else if (logEntry.getMessage().equals("Running")) {
@@ -73,6 +81,10 @@ public class LogHandlerTestRun extends BaseYarnTest {
       controller.stopAndWait();
     }
 
+    // Verify the runnable names
+    Assert.assertEquals(2, runnables.size());
+    Assert.assertArrayEquals(new String[] {"LogRunnable", "LogRunnable"}, runnables.toArray());
+
     // Verify the log throwable
     Assert.assertEquals(1, throwables.size());