You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/03/13 22:44:41 UTC

samza git commit: SAMZA-597; support JSON encoding for log4j stream appender

Repository: samza
Updated Branches:
  refs/heads/master 769844e53 -> 23c4b3915


SAMZA-597; support JSON encoding for log4j stream appender


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/23c4b391
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/23c4b391
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/23c4b391

Branch: refs/heads/master
Commit: 23c4b391545e68fa005cedc8db9b01d5270000d6
Parents: 769844e
Author: Chris Riccomini <cr...@apache.org>
Authored: Fri Mar 13 14:40:34 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Mar 13 14:40:34 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../versioned/jobs/configuration-table.html     |  13 ++
 .../documentation/versioned/jobs/logging.md     |  13 +-
 .../apache/samza/config/Log4jSystemConfig.java  |  67 +++----
 .../samza/logging/log4j/StreamAppender.java     |  36 ++--
 .../serializers/LoggingEventJsonSerde.java      | 192 +++++++++++++++++++
 .../LoggingEventJsonSerdeFactory.java           |  38 ++++
 .../samza/config/TestLog4jSystemConfig.java     |  22 ++-
 .../samza/logging/log4j/TestStreamAppender.java |  54 +++++-
 9 files changed, 367 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/23c4b391/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 516c83e..87334fa 100644
--- a/build.gradle
+++ b/build.gradle
@@ -190,6 +190,7 @@ project(':samza-log4j') {
     compile "log4j:log4j:$log4jVersion"
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")
+    compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
     testCompile "junit:junit:$junitVersion"
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/23c4b391/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index ec12874..e091460 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -388,6 +388,19 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="task-log4j-location-info-enabled">task.log4j.location.info.enabled</td>
+                    <td class="default">false</td>
+                    <td class="description">
+                        Defines whether or not to include log4j's LocationInfo data in Log4j StreamAppender messages. LocationInfo includes
+                        information such as the file, class, and line that wrote a log message. This setting is only active if the Log4j 
+                        stream appender is being used. (See <a href="logging.html#stream-log4j-appender">Stream Log4j Appender</a>)
+                        <dl>
+                            <dt>Example: <code>task.log4j.location.info.enabled=true</code></dt>
+                        </dl>
+                    </td>
+                </tr>
+
+                <tr>
                     <td class="property" id="task-poll-interval-ms">task.poll.interval.ms</td>
                     <td class="default"></td>
                     <td class="description">

http://git-wip-us.apache.org/repos/asf/samza/blob/23c4b391/docs/learn/documentation/versioned/jobs/logging.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/logging.md b/docs/learn/documentation/versioned/jobs/logging.md
index af2fd0e..1d13d15 100644
--- a/docs/learn/documentation/versioned/jobs/logging.md
+++ b/docs/learn/documentation/versioned/jobs/logging.md
@@ -97,7 +97,7 @@ And then updating your log4j.xml to include the appender:
 
 #### Stream Log4j Appender
 
-Samza provides a StreamAppender to publish the logs into a specific system. You can specify the system name using "task.log4j.system". If there is only one system in the config, Samza will use that system for the log publishing. Also we have the [MDC|http://logback.qos.ch/manual/mdc.html] keys "containerName", "jobName" and "jobId", which help identify the source of the log. In order to use this appender, simply add:
+Samza provides a StreamAppender to publish the logs into a specific system. You can specify the system name using "task.log4j.system". If there is only one system in the config, Samza will use that system for the log publishing. Also we have the [MDC](http://logback.qos.ch/manual/mdc.html) keys "containerName", "jobName" and "jobId", which help identify the source of the log. In order to use this appender, simply add:
 
 {% highlight xml %}
 <appender name="StreamAppender" class="org.apache.samza.logging.log4j.StreamAppender">
@@ -111,7 +111,16 @@ and add:
 
 {% highlight xml %}
 <appender-ref ref="StreamAppender"/>
-{% endhighlight %}.
+{% endhighlight %}
+
+Configuring the StreamAppender will automatically encode messages using logstash's [Log4J JSON format](https://github.com/logstash/log4j-jsonevent-layout). Samza also supports pluggable serialization for those that prefer non-JSON logging events. This can be configured the same way other stream serializers are defined:
+
+{% highlight jproperties %}
+serializers.registry.log4j-string.class=org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory
+systems.mock.streams.\_\_samza\__jobname_\__jobid_\_logs.samza.msg.serde=log4j-string
+{% endhighlight %}
+
+The StreamAppender will always send messages to a job's log stream keyed by the container name.
 
 ### Log Directory
 

http://git-wip-us.apache.org/repos/asf/samza/blob/23c4b391/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index 107ddf0..d5e24f2 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -20,16 +20,16 @@
 package org.apache.samza.config;
 
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
-import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
-
 /**
  * This class contains the methods for getting properties that are needed by the
  * StreamAppender.
  */
 public class Log4jSystemConfig {
 
+  private static final String LOCATION_ENABLED = "task.log4j.location.info.enabled";
   private static final String TASK_LOG4J_SYSTEM = "task.log4j.system";
   private static final String SYSTEM_PREFIX = "systems.";
   private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
@@ -41,30 +41,42 @@ public class Log4jSystemConfig {
   }
 
   /**
+   * Defines whether or not to include file location information for Log4J
+   * appender messages. File location information includes the method, line
+   * number, class, etc.
+   * 
+   * @return If true, will include file location (method, line number, etc)
+   *         information in Log4J appender messages.
+   */
+  public boolean getLocationEnabled() {
+    return "true".equals(config.get(Log4jSystemConfig.LOCATION_ENABLED, "false"));
+  }
+
+  /**
    * Get the log4j system name from the config. If it's not defined, try to
    * guess the system name if there is only one system is defined.
    *
    * @return log4j system name
    */
   public String getSystemName() {
-    String log4jSystem = getValue(TASK_LOG4J_SYSTEM);
+    String log4jSystem = config.get(TASK_LOG4J_SYSTEM, null);
     if (log4jSystem == null) {
-      ArrayList<String> systemNames = getSystemNames();
+      List<String> systemNames = getSystemNames();
       if (systemNames.size() == 1) {
         log4jSystem = systemNames.get(0);
       } else {
-        throw new ConfigException("Missing task.log4j.system configuration, and more than 1 systems were found.");
+        throw new ConfigException("Missing " + TASK_LOG4J_SYSTEM + " configuration, and more than 1 systems were found.");
       }
     }
     return log4jSystem;
   }
 
   public String getJobName() {
-    return getValue(JobConfig.JOB_NAME());
+    return config.get(JobConfig.JOB_NAME(), null);
   }
 
   public String getJobId() {
-    return getValue(JobConfig.JOB_ID());
+    return config.get(JobConfig.JOB_ID(), null);
   }
 
   public String getSystemFactory(String name) {
@@ -72,47 +84,24 @@ public class Log4jSystemConfig {
       return null;
     }
     String systemFactory = String.format(SystemConfig.SYSTEM_FACTORY(), name);
-    return getValue(systemFactory);
+    return config.get(systemFactory, null);
   }
 
   /**
-   * get the class name according to the serde name. If the serde name is "log4j" and
-   * the serde class is not configured, will use the default {@link LoggingEventStringSerdeFactory}
+   * Get the class name according to the serde name.
    * 
-   * @param name serde name
-   * @return serde factory name
+   * @param name
+   *          serde name
+   * @return serde factory name, or null if there is no factory defined for the
+   *         supplied serde name.
    */
   public String getSerdeClass(String name) {
-    String className = getValue(String.format(SerializerConfig.SERDE(), name));
-    if (className == null && name.equals("log4j")) {
-      className = LoggingEventStringSerdeFactory.class.getCanonicalName();
-    }
-    return className;
-  }
-
-  public String getSystemSerdeName(String name) {
-    String systemSerdeNameConfig = String.format(SystemConfig.MSG_SERDE(), name);
-    return getValue(systemSerdeNameConfig);
+    return config.get(String.format(SerializerConfig.SERDE(), name), null);
   }
 
   public String getStreamSerdeName(String systemName, String streamName) {
     String streamSerdeNameConfig = String.format(StreamConfig.MSG_SERDE(), systemName, streamName);
-    return getValue(streamSerdeNameConfig);
-  }
-
-  /**
-   * A helper method to get the value from the config. If the config does not
-   * contain the key, return null.
-   *
-   * @param key key name
-   * @return value of the key in the config
-   */
-  protected String getValue(String key) {
-    if (config.containsKey(key)) {
-      return config.get(key);
-    } else {
-      return null;
-    }
+    return config.get(streamSerdeNameConfig, null);
   }
 
   /**
@@ -120,7 +109,7 @@ public class Log4jSystemConfig {
    * 
    * @return A list system names
    */
-  protected ArrayList<String> getSystemNames() {
+  protected List<String> getSystemNames() {
     Config subConf = config.subset(SYSTEM_PREFIX, true);
     ArrayList<String> systemNames = new ArrayList<String>();
     for (Map.Entry<String, String> entry : subConf.entrySet()) {

http://git-wip-us.apache.org/repos/asf/samza/blob/23c4b391/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index 4ef3551..d3f25c0 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -29,10 +29,10 @@ import org.apache.log4j.spi.LoggingEvent;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.Log4jSystemConfig;
+import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.config.ShellCommandConfig;
-import org.apache.samza.config.StreamConfig;
-import org.apache.samza.config.SystemConfig;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeFactory;
@@ -83,7 +83,11 @@ public class StreamAppender extends AppenderSkeleton {
   @Override
   public void activateOptions() {
     String containerName = System.getProperty(JAVA_OPTS_CONTAINER_NAME);
-    isApplicationMaster = containerName.contains(APPLICATION_MASTER_TAG);
+    if (containerName != null) {
+      isApplicationMaster = containerName.contains(APPLICATION_MASTER_TAG);
+    } else {
+      throw new SamzaException("Got null container name from system property: " + JAVA_OPTS_CONTAINER_NAME + ". This is used as the key for the log appender, so can't proceed.");
+    }
     key = containerName; // use the container name as the key for the logs
     config = getConfig();
     SystemFactory systemFactory = null;
@@ -185,7 +189,7 @@ public class StreamAppender extends AppenderSkeleton {
     return config;
   }
 
-  private String getStreamName(String jobName, String jobId) {
+  public static String getStreamName(String jobName, String jobId) {
     if (jobName == null) {
       throw new SamzaException("job name is null. Please specify job.name");
     }
@@ -205,24 +209,28 @@ public class StreamAppender extends AppenderSkeleton {
    * @param streamName name of the stream
    */
   private void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName, String streamName) {
-    String serdeClass = null;
+    String serdeClass = LoggingEventJsonSerdeFactory.class.getCanonicalName();;
     String serdeName = log4jSystemConfig.getStreamSerdeName(systemName, streamName);
 
-    if (serdeName == null) {
-      serdeName = log4jSystemConfig.getSystemSerdeName(systemName);
+    if (serdeName != null) {
+      serdeClass = log4jSystemConfig.getSerdeClass(serdeName);
     }
 
-    if (serdeName == null) {
-      throw new SamzaException("Missing serde name. Please specify the " + StreamConfig.MSG_SERDE() + " or " + SystemConfig.MSG_SERDE() + " property.");
-    }
-
-    serdeClass = log4jSystemConfig.getSerdeClass(serdeName);
-
     if (serdeClass != null) {
       SerdeFactory<LoggingEvent> serdeFactory = Util.<SerdeFactory<LoggingEvent>> getObj(serdeClass);
       serde = serdeFactory.getSerde(systemName, config);
     } else {
-      throw new SamzaException("Can not find serializers class. Please specify serializers.registry.s%.class property");
+      String serdeKey = String.format(SerializerConfig.SERDE(), serdeName);
+      throw new SamzaException("Can not find serializers class for key '" + serdeName + "'. Please specify " + serdeKey + " property");
     }
   }
+
+  /**
+   * Returns the serde that is being used for the stream appender.
+   * 
+   * @return The Serde&lt;LoggingEvent&gt; that the appender is using.
+   */
+  public Serde<LoggingEvent> getSerde() {
+    return serde;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/23c4b391/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java
new file mode 100644
index 0000000..fd1c476
--- /dev/null
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j.serializers;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.spi.LocationInfo;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.log4j.spi.ThrowableInformation;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A JSON serde that serializes Log4J LoggingEvent objects into JSON using the
+ * standard logstash LoggingEvent format defined <a
+ * href="https://github.com/logstash/log4j-jsonevent-layout">here</a>.
+ */
+public class LoggingEventJsonSerde implements Serde<LoggingEvent> {
+  /**
+   * The JSON format version.
+   */
+  public static final int VERSION = 1;
+
+  /**
+   * The date format to use for the timestamp field.
+   */
+  public static final Format DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+
+  // Have to wrap rather than extend due to type collisions between
+  // Serde<LoggingEvent> and Serde<Object>.
+  private final JsonSerde jsonSerde;
+
+  /**
+   * Defines whether to include LocationInfo data in the serialized
+   * LoggingEvent. This information includes the file, line, and class that
+   * wrote the log line.
+   */
+  private final boolean includeLocationInfo;
+
+  /**
+   * Constructs the serde without location info.
+   */
+  public LoggingEventJsonSerde() {
+    this(false);
+  }
+
+  /**
+   * Constructs the serde.
+   * 
+   * @param includeLocationInfo
+   *          Whether to include location info in the logging event or not.
+   */
+  public LoggingEventJsonSerde(boolean includeLocationInfo) {
+    this.includeLocationInfo = includeLocationInfo;
+    this.jsonSerde = new JsonSerde();
+  }
+
+  @Override
+  public byte[] toBytes(LoggingEvent loggingEvent) {
+    Map<String, Object> loggingEventMap = encodeToMap(loggingEvent, includeLocationInfo);
+    return jsonSerde.toBytes(loggingEventMap);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public LoggingEvent fromBytes(byte[] loggingEventMapBytes) {
+    Map<String, Object> loggingEventMap = (Map<String, Object>) jsonSerde.fromBytes(loggingEventMapBytes);
+    return decodeFromMap(loggingEventMap);
+  }
+
+  /**
+   * Encodes a LoggingEvent into a HashMap using the logstash JSON format.
+   * 
+   * @param loggingEvent
+   *          The LoggingEvent to encode.
+   * @param includeLocationInfo
+   *          Whether to include LocationInfo in the map, or not.
+   * @return A Map representing the LoggingEvent, which is suitable to be
+   *         serialized by a JSON encoder such as Jackson.
+   */
+  @SuppressWarnings("rawtypes")
+  public static Map<String, Object> encodeToMap(LoggingEvent loggingEvent, boolean includeLocationInfo) {
+    Map<String, Object> logstashEvent = new LoggingEventMap();
+    String threadName = loggingEvent.getThreadName();
+    long timestamp = loggingEvent.getTimeStamp();
+    HashMap<String, Object> exceptionInformation = new HashMap<String, Object>();
+    Map mdc = loggingEvent.getProperties();
+    String ndc = loggingEvent.getNDC();
+
+    logstashEvent.put("@version", VERSION);
+    logstashEvent.put("@timestamp", dateFormat(timestamp));
+    logstashEvent.put("source_host", getHostname());
+    logstashEvent.put("message", loggingEvent.getRenderedMessage());
+
+    if (loggingEvent.getThrowableInformation() != null) {
+      final ThrowableInformation throwableInformation = loggingEvent.getThrowableInformation();
+      if (throwableInformation.getThrowable().getClass().getCanonicalName() != null) {
+        exceptionInformation.put("exception_class", throwableInformation.getThrowable().getClass().getCanonicalName());
+      }
+      if (throwableInformation.getThrowable().getMessage() != null) {
+        exceptionInformation.put("exception_message", throwableInformation.getThrowable().getMessage());
+      }
+      if (throwableInformation.getThrowableStrRep() != null) {
+        StringBuilder stackTrace = new StringBuilder();
+        for (String line : throwableInformation.getThrowableStrRep()) {
+          stackTrace.append(line);
+          stackTrace.append("\n");
+        }
+        exceptionInformation.put("stacktrace", stackTrace);
+      }
+      logstashEvent.put("exception", exceptionInformation);
+    }
+
+    if (includeLocationInfo) {
+      LocationInfo info = loggingEvent.getLocationInformation();
+      logstashEvent.put("file", info.getFileName());
+      logstashEvent.put("line_number", info.getLineNumber());
+      logstashEvent.put("class", info.getClassName());
+      logstashEvent.put("method", info.getMethodName());
+    }
+
+    logstashEvent.put("logger_name", loggingEvent.getLoggerName());
+    logstashEvent.put("mdc", mdc);
+    logstashEvent.put("ndc", ndc);
+    logstashEvent.put("level", loggingEvent.getLevel().toString());
+    logstashEvent.put("thread_name", threadName);
+
+    return logstashEvent;
+  }
+
+  /**
+   * This method is not currently implemented.
+   */
+  public static LoggingEvent decodeFromMap(Map<String, Object> loggingEventMap) {
+    throw new UnsupportedOperationException("Unable to decode LoggingEvents.");
+  }
+
+  public static String dateFormat(long time) {
+    return DATE_FORMAT.format(new Date(time));
+  }
+
+  /**
+   * @return The hostname to use in the hostname field of the encoded
+   *         LoggingEvents.
+   */
+  public static String getHostname() {
+    try {
+      return InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      return "unknown-host";
+    }
+  }
+
+  /**
+   * A helper class that only puts non-null values into the encoded LoggingEvent
+   * map. This helps to shrink over-the-wire byte payloads for encoded
+   * LoggingEvents.
+   */
+  @SuppressWarnings("serial")
+  public static final class LoggingEventMap extends HashMap<String, Object> {
+    public Object put(String key, Object value) {
+      if (value == null) {
+        return get(key);
+      } else {
+        return super.put(key, value);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23c4b391/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerdeFactory.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerdeFactory.java
new file mode 100644
index 0000000..9fdc8ae
--- /dev/null
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerdeFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j.serializers;
+
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.Log4jSystemConfig;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+
+/**
+ * A factory that returns a Serde suitable for encoding Log4J LoggingEvents as
+ * JSON-encoded maps.
+ */
+public class LoggingEventJsonSerdeFactory implements SerdeFactory<LoggingEvent> {
+  @Override
+  public Serde<LoggingEvent> getSerde(String name, Config config) {
+    boolean locationInfoEnabled = new Log4jSystemConfig(config).getLocationEnabled();
+    return new LoggingEventJsonSerde(locationInfoEnabled);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/23c4b391/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
index 16ccb45..6314a3e 100644
--- a/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
+++ b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
@@ -19,12 +19,12 @@
 
 package org.apache.samza.config;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -35,14 +35,13 @@ public class TestLog4jSystemConfig {
   public ExpectedException exception = ExpectedException.none();
 
   @Test
-  public void testGetSystemNamesAndGetValue() {
+  public void testGetSystemNames() {
     Map<String, String> map = new HashMap<String, String>();
     map.put("systems.system1.samza.factory","1");
     map.put("systems.system2.samza.factory","2");
     Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map));
 
     assertEquals(2, log4jSystemConfig.getSystemNames().size());
-    assertEquals("1", log4jSystemConfig.getValue("systems.system1.samza.factory"));
   }
 
   @Test
@@ -71,20 +70,27 @@ public class TestLog4jSystemConfig {
     Map<String, String> map = new HashMap<String, String>();
     Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map));
 
-    // get the default serde
-    assertEquals(LoggingEventStringSerdeFactory.class.getCanonicalName(), log4jSystemConfig.getSerdeClass("log4j"));
     // get null
     assertNull(log4jSystemConfig.getSerdeClass("otherName"));
+
+    // get serde
+    map.put("serializers.registry.log4j.class", "someClass");
+    log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map));
+    assertEquals("someClass", log4jSystemConfig.getSerdeClass("log4j"));
   }
 
   @Test
   public void testGetSerdeName() {
     Map<String, String> map = new HashMap<String, String>();
-    map.put("systems.mockSystem.streams.mockStream.samza.msg.serde", "streamSerde");
     map.put("systems.mockSystem.samza.msg.serde", "systemSerde");
     Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map));
 
+    // no stream serde
+    assertNull(log4jSystemConfig.getStreamSerdeName("mockSystem", "mockStream"));
+
+    // stream serde
+    map.put("systems.mockSystem.streams.mockStream.samza.msg.serde", "streamSerde");
+    log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map));
     assertEquals("streamSerde", log4jSystemConfig.getStreamSerdeName("mockSystem", "mockStream"));
-    assertEquals("systemSerde", log4jSystemConfig.getSystemSerdeName("mockSystem"));
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/23c4b391/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
index 3e4ddc9..3e81240 100644
--- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
@@ -28,6 +28,8 @@ import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerde;
+import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerde;
 import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
 import org.junit.Test;
 
@@ -36,6 +38,37 @@ public class TestStreamAppender {
   static Logger log = Logger.getLogger(TestStreamAppender.class);
 
   @Test
+  public void testDefaultSerde() {
+    System.setProperty("samza.container.name", "samza-container-1");
+    MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
+    PatternLayout layout = new PatternLayout();
+    layout.setConversionPattern("%m");
+    systemProducerAppender.setLayout(layout);
+    systemProducerAppender.activateOptions();
+    assertNotNull(systemProducerAppender.getSerde());
+    assertEquals(LoggingEventJsonSerde.class, systemProducerAppender.getSerde().getClass());
+  }
+
+  @Test
+  public void testNonDefaultSerde() {
+    System.setProperty("samza.container.name", "samza-container-1");
+    String streamName = StreamAppender.getStreamName("log4jTest", "1");
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("job.name", "log4jTest");
+    map.put("job.id", "1");
+    map.put("serializers.registry.log4j-string.class", LoggingEventStringSerdeFactory.class.getCanonicalName());
+    map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
+    map.put("systems.mock.streams." + streamName + ".samza.msg.serde", "log4j-string");
+    MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(new MapConfig(map));
+    PatternLayout layout = new PatternLayout();
+    layout.setConversionPattern("%m");
+    systemProducerAppender.setLayout(layout);
+    systemProducerAppender.activateOptions();
+    assertNotNull(systemProducerAppender.getSerde());
+    assertEquals(LoggingEventStringSerde.class, systemProducerAppender.getSerde().getClass());
+  }
+
+  @Test
   public void testSystemProducerAppender() {
     System.setProperty("samza.container.name", "samza-container-1");
 
@@ -51,8 +84,8 @@ public class TestStreamAppender {
     systemProducerAppender.flushSystemProducer();
 
     assertEquals(2, MockSystemProducer.messagesReceived.size());
-    assertEquals("testing", new String((byte[])MockSystemProducer.messagesReceived.get(0)));
-    assertEquals("testing2", new String((byte[])MockSystemProducer.messagesReceived.get(1)));
+    assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(0)).contains("\"message\":\"testing\""));
+    assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(1)).contains("\"message\":\"testing2\""));
   }
 
   /**
@@ -61,13 +94,22 @@ public class TestStreamAppender {
    * stays is difficult to test.
    */
   class MockSystemProducerAppender extends StreamAppender {
-    @Override
-    protected Config getConfig() {
+    private final Config config;
+
+    public MockSystemProducerAppender() {
       Map<String, String> map = new HashMap<String, String>();
       map.put("job.name", "log4jTest");
       map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
-      map.put("systems.mock.samza.msg.serde", "log4j");
-      return new MapConfig(map);
+      config = new MapConfig(map);
+    }
+
+    public MockSystemProducerAppender(Config config) {
+      this.config = config;
+    }
+
+    @Override
+    protected Config getConfig() {
+      return config;
     }
   }
 }
\ No newline at end of file