You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/10/09 20:51:43 UTC

[2/2] samza git commit: SAMZA-1783: Add Log4j2 functionality in Samza

SAMZA-1783: Add Log4j2 functionality in Samza

vjagadish1989 Kindly review.

Author: Pawas Chhokra <pc...@pchhokra-mn3.linkedin.biz>
Author: Pawas Chhokra <pc...@pchhokra-mn2.linkedin.biz>
Author: Pawas Chhokra <pa...@gmail.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #578 from PawasChhokra/Log4j2


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1e880ea6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1e880ea6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1e880ea6

Branch: refs/heads/master
Commit: 1e880ea6302418f1382af662034f7dd0144ba9ad
Parents: 07199cb
Author: Pawas Chhokra <pc...@pchhokra-mn3.linkedin.biz>
Authored: Tue Oct 9 13:51:39 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Oct 9 13:51:39 2018 -0700

----------------------------------------------------------------------
 build.gradle                                    |  49 ++-
 .../documentation/versioned/jobs/logging.md     | 156 ++++++-
 gradle.properties                               |   2 +-
 gradle/dependency-versions.gradle               |   3 +-
 samza-azure/src/test/resources/log4j.xml        |   8 -
 samza-azure/src/test/resources/log4j2.xml       |  32 ++
 .../samza/runtime/LocalContainerRunner.java     |   3 +-
 .../apache/samza/config/Log4jSystemConfig.java  |  88 ++++
 .../samza/logging/log4j2/StreamAppender.java    | 436 +++++++++++++++++++
 .../logging/log4j2/StreamAppenderMetrics.java   |  43 ++
 .../serializers/LoggingEventJsonSerde.java      | 194 +++++++++
 .../LoggingEventJsonSerdeFactory.java           |  36 ++
 .../serializers/LoggingEventStringSerde.java    |  76 ++++
 .../LoggingEventStringSerdeFactory.java         |  32 ++
 .../samza/logging/log4j2/MockSystemAdmin.java   |  74 ++++
 .../samza/logging/log4j2/MockSystemFactory.java |  45 ++
 .../logging/log4j2/MockSystemProducer.java      |  61 +++
 .../log4j2/MockSystemProducerAppender.java      |  77 ++++
 .../logging/log4j2/TestStreamAppender.java      | 298 +++++++++++++
 .../TestLoggingEventStringSerde.java            |  52 +++
 samza-log4j2/src/test/resources/log4j2.xml      |  37 ++
 samza-rest/src/main/resources/log4j2.xml        |  40 ++
 samza-shell/src/main/bash/checkpoint-tool.sh    |   6 +-
 samza-shell/src/main/bash/kill-all.sh           |   8 +-
 .../src/main/bash/kill-yarn-job-by-name.sh      |   7 +-
 samza-shell/src/main/bash/kill-yarn-job.sh      |   6 +-
 samza-shell/src/main/bash/list-yarn-job.sh      |   6 +-
 samza-shell/src/main/bash/read-rocksdb-tool.sh  |   6 +-
 samza-shell/src/main/bash/run-app.sh            |   6 +-
 samza-shell/src/main/bash/run-class.sh          |  10 +-
 samza-shell/src/main/bash/run-config-manager.sh |   6 +-
 .../main/bash/run-coordinator-stream-writer.sh  |   6 +-
 samza-shell/src/main/bash/run-job.sh            |   6 +-
 samza-shell/src/main/bash/stat-yarn-job.sh      |   6 +-
 samza-shell/src/main/bash/state-storage-tool.sh |   6 +-
 samza-shell/src/main/bash/validate-yarn-job.sh  |   6 +-
 .../src/main/resources/log4j2-console.xml       |  35 ++
 samza-sql/src/test/resources/log4j.xml          |   9 -
 samza-sql/src/test/resources/log4j2.xml         |  35 ++
 samza-test/src/main/resources/log4j2.xml        |  41 ++
 samza-tools/src/main/resources/log4j.xml        |   9 -
 samza-tools/src/main/resources/log4j2.xml       |  32 ++
 settings.gradle                                 |   1 +
 43 files changed, 2015 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 0e77d00..26c9de5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -282,11 +282,9 @@ project(":samza-autoscaling_$scalaVersion") {
     compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
     compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
     compile("org.apache.hadoop:hadoop-yarn-client:$yarnVersion") {
-      exclude module: 'slf4j-log4j12'
       exclude module: 'servlet-api'
     }
     compile("org.apache.hadoop:hadoop-common:$yarnVersion") {
-      exclude module: 'slf4j-log4j12'
       exclude module: 'servlet-api'
     }
     compile "org.apache.httpcomponents:httpclient:$httpClientVersion"
@@ -346,9 +344,7 @@ project(':samza-tools') {
     compile project(':samza-sql')
     compile project(':samza-api')
     compile project(':samza-azure')
-    compile "log4j:log4j:$log4jVersion"
     compile "org.slf4j:slf4j-api:$slf4jVersion"
-    compile "org.slf4j:slf4j-log4j12:$slf4jVersion"
     compile "commons-cli:commons-cli:$commonsCliVersion"
     compile "org.apache.avro:avro:$avroVersion"
     compile "org.apache.commons:commons-lang3:$commonsLang3Version"
@@ -389,7 +385,6 @@ project(":samza-kafka_$scalaVersion") {
   dependencies {
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")
-    compile "log4j:log4j:$log4jVersion"
     compile "org.scala-lang:scala-library:$scalaLibVersion"
     compile "com.101tec:zkclient:$zkClientVersion"
     compile "org.apache.zookeeper:zookeeper:$zookeeperVersion"
@@ -426,6 +421,28 @@ project(':samza-log4j') {
 
   dependencies {
     compile "log4j:log4j:$log4jVersion"
+    compile "org.slf4j:slf4j-log4j12:1.7.7"
+    compile project(':samza-api')
+    compile project(":samza-core_$scalaVersion")
+    compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
+    testCompile "junit:junit:$junitVersion"
+  }
+
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+    toolVersion = "$checkstyleVersion"
+  }
+}
+
+project(':samza-log4j2') {
+  apply plugin: 'java'
+  apply plugin: 'checkstyle'
+
+  dependencies {
+    compile "org.apache.logging.log4j:log4j-1.2-api:$log4j2Version"
+    compile "org.apache.logging.log4j:log4j-api:$log4j2Version"
+    compile "org.apache.logging.log4j:log4j-core:$log4j2Version"
+    compile "org.apache.logging.log4j:log4j-slf4j-impl:$log4j2Version"
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")
     compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
@@ -460,24 +477,19 @@ project(":samza-yarn_$scalaVersion") {
     compile "commons-httpclient:commons-httpclient:$commonsHttpClientVersion"
     compile "org.apache.httpcomponents:httpclient:$httpClientVersion"
     compile("org.apache.hadoop:hadoop-yarn-api:$yarnVersion") {
-      exclude module: 'slf4j-log4j12'
     }
     compile("org.apache.hadoop:hadoop-yarn-common:$yarnVersion") {
-      exclude module: 'slf4j-log4j12'
       exclude module: 'servlet-api'
     }
     compile("org.apache.hadoop:hadoop-yarn-client:$yarnVersion") {
-      exclude module: 'slf4j-log4j12'
       exclude module: 'servlet-api'
     }
     compile("org.apache.hadoop:hadoop-common:$yarnVersion") {
-      exclude module: 'slf4j-log4j12'
       exclude module: 'servlet-api'
       // Exclude because YARN's 3.4.5 ZK version is incompatbile with Kafka's 3.3.4.
       exclude module: 'zookeeper'
     }
     compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") {
-      exclude module: 'slf4j-log4j12'
       exclude module: 'servlet-api'
     }
     compile("org.scalatra:scalatra_$scalaVersion:$scalatraVersion") {
@@ -524,8 +536,6 @@ project(":samza-shell") {
     gradleShell project(":samza-kafka_$scalaVersion")
     gradleShell project(":samza-test_$scalaVersion")
     gradleShell project(":samza-yarn_$scalaVersion")
-    gradleShell "org.slf4j:slf4j-log4j12:$slf4jVersion"
-    gradleShell "log4j:log4j:1.2.16"
   }
 
   task shellTarGz(type: Tar) {
@@ -550,7 +560,7 @@ project(":samza-shell") {
     main = 'org.apache.samza.job.JobRunner'
     classpath = configurations.gradleShell
     if (project.hasProperty('configPath')) args += ['--config-path', configPath]
-    jvmArgs = ["-Dlog4j.configuration=file:src/main/resources/log4j-console.xml"]
+    jvmArgs = ["-Dlog4j.configurationFile=file:src/main/resources/log4j2-console.xml"]
   }
 
   // Usage: ./gradlew samza-shell:checkpointTool \
@@ -561,7 +571,7 @@ project(":samza-shell") {
     classpath = configurations.gradleShell
     if (project.hasProperty('configPath')) args += ['--config-path', configPath]
     if (project.hasProperty('newOffsets')) args += ['--new-offsets', newOffsets]
-    jvmArgs = ["-Dlog4j.configuration=file:src/main/resources/log4j-console.xml"]
+    jvmArgs = ["-Dlog4j.configurationFile=file:src/main/resources/log4j2-console.xml"]
   }
 
   // Usage: ./gradlew samza-shell:kvPerformanceTest
@@ -571,7 +581,7 @@ project(":samza-shell") {
     main = 'org.apache.samza.test.performance.TestKeyValuePerformance'
     classpath = configurations.gradleShell
     if (project.hasProperty('configPath')) args += ['--config-path', configPath]
-    jvmArgs = ["-Dlog4j.configuration=file:src/main/resources/log4j-console.xml"]
+    jvmArgs = ["-Dlog4j.configurationFile=file:src/main/resources/log4j2-console.xml"]
   }
 }
 
@@ -665,11 +675,9 @@ project(":samza-hdfs_$scalaVersion") {
     compile project(":samza-yarn_$scalaVersion")
     compile "org.scala-lang:scala-library:$scalaLibVersion"
     compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") {
-      exclude module: 'slf4j-log4j12'
       exclude module: 'servlet-api'
     }
     compile("org.apache.hadoop:hadoop-common:$yarnVersion") {
-      exclude module: 'slf4j-log4j12'
       exclude module: 'servlet-api'
       exclude module: 'zookeeper'
     }
@@ -696,12 +704,10 @@ project(":samza-rest") {
     compile "org.eclipse.jetty.aggregate:jetty-all:$jettyVersion"
     compile "commons-httpclient:commons-httpclient:$commonsHttpClientVersion"
     compile("org.apache.hadoop:hadoop-yarn-client:$yarnVersion") {
-      exclude module: 'slf4j-log4j12'
       exclude module: 'servlet-api'
       exclude group: 'com.sun.jersey'
     }
     runtime("org.apache.hadoop:hadoop-yarn-api:$yarnVersion") {
-      exclude module: 'slf4j-log4j12'
       exclude module: 'servlet-api'
       exclude group: 'com.sun.jersey'
     }
@@ -715,7 +721,7 @@ project(":samza-rest") {
     description 'Build a tarball containing the samza-rest component and its dependencies'
     compression = Compression.GZIP
     from(file("$projectDir/src/main/config")) { into "config/" }
-    from(file("$projectDir/src/main/resources/log4j.xml")) { into "bin/" }
+    from(file("$projectDir/src/main/resources/log4j2.xml")) { into "bin/" }
     from(file("$projectDir/src/main/bash/run-samza-rest-service.sh")) { into "bin/" }
     from(project(':samza-shell').file("src/main/bash/run-class.sh")) { into "bin/" }
     from '../LICENSE'
@@ -768,6 +774,7 @@ project(":samza-test_$scalaVersion") {
     compile project(":samza-kafka_$scalaVersion")
     compile project(":samza-sql")
     runtime project(":samza-log4j")
+    runtime project(":samza-log4j2")
     runtime project(":samza-yarn_$scalaVersion")
     runtime project(":samza-hdfs_$scalaVersion")
     compile "org.scala-lang:scala-library:$scalaLibVersion"
@@ -808,7 +815,7 @@ project(":samza-test_$scalaVersion") {
     from(project(':samza-shell').file("src/main/bash")) { into "bin/" }
     from(file("$projectDir/src/main/python/ghostface_killah.py")) { into "bin/"}
     from(project(':samza-shell').file("src/main/resources")) { into "lib/" }
-    from(project(':samza-shell').file("src/main/resources/log4j-console.xml")) { into "bin/" }
+    from(project(':samza-shell').file("src/main/resources/log4j2-console.xml")) { into "bin/" }
     from '../LICENSE'
     from '../NOTICE'
     from(configurations.runtime) { into("lib/") }

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/docs/learn/documentation/versioned/jobs/logging.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/logging.md b/docs/learn/documentation/versioned/jobs/logging.md
index ac2d664..8717c28 100644
--- a/docs/learn/documentation/versioned/jobs/logging.md
+++ b/docs/learn/documentation/versioned/jobs/logging.md
@@ -19,25 +19,54 @@ title: Logging
    limitations under the License.
 -->
 
-Samza uses [SLF4J](http://www.slf4j.org/) for all of its logging. By default, Samza only depends on slf4j-api, so you must add an SLF4J runtime dependency to your Samza packages for whichever underlying logging platform you wish to use.
+Samza uses [SLF4J](http://www.slf4j.org/) for all of its logging. By default, Samza only depends on slf4j-api, so it can work for whichever underlying logging platform you wish to use. You simply need to add the SLF4J bridge corresponding to the logging implementation chosen. Samza logging has been thoroughly tested against Log4j and Log4j2. Samza provides bundled modules for each of the Log4j versions along with additional functionality.
+### Logging with Log4j
 
-### Log4j
+To use Samza with [log4j](http://logging.apache.org/log4j/1.2/), you just need to make sure the following dependencies are present in your SamzaContainer’s classpath:
+-	samza-log4j
+-	slf4j-log4j12
 
-The [hello-samza](/startup/hello-samza/{{site.version}}) project shows how to use [log4j](http://logging.apache.org/log4j/1.2/) with Samza. To turn on log4j logging, you just need to make sure slf4j-log4j12 is in your SamzaContainer's classpath. In Maven, this can be done by adding the following dependency to your Samza package project.
+In Maven, this can be done by adding the following dependencies to your Samza package project's pom.xml:
 
 {% highlight xml %}
 <dependency>
   <setId>org.slf4j</setId>
   <artifactId>slf4j-log4j12</artifactId>
   <scope>runtime</scope>
-  <version>1.6.2</version>
+  <version>1.7.7</version>
 </dependency>
+
+<dependency>
+  <groupId>org.apache.samza</groupId>
+  <artifactId>samza-log4j</artifactId>
+  <version>0.14.0</version>
+</dependency>
+{% endhighlight %}
+
+If you're not using Maven, just make sure that both these dependencies end up in your Samza package's lib directory.
+
+Next, you need to make sure that these dependencies are also listed in your Samza project's build.gradle:
+
+{% highlight bash %}
+    compile(group: 'org.slf4j', name: 'slf4j-log4j12', version: "$SLF4J_VERSION")
+    runtime(group: 'org.apache.samza', name: 'samza-log4j', version: "$SAMZA_VERSION")
 {% endhighlight %}
 
-If you're not using Maven, just make sure that slf4j-log4j12 ends up in your Samza package's lib directory.
+Note: Please make sure that no dependencies of Log4j2 are present in the classpath while working with Log4j.
 
 #### Log4j configuration
 
+Please ensure you have log4j.xml in your [Samza package's](packaging.html) lib directory. For example, in hello-samza application, the following lines are added to src.xml to ensure log4j.xml is present in the lib directory:
+
+{% highlight xml %}
+<files>
+  <file>
+    <source>${basedir}/src/main/resources/log4j.xml</source>
+    <outputDirectory>lib</outputDirectory>
+  </file>
+</files>
+{% endhighlight %}
+
 Samza's [run-class.sh](packaging.html) script will automatically set the following setting if log4j.xml exists in your [Samza package's](packaging.html) lib directory.
 
 {% highlight bash %}
@@ -77,8 +106,75 @@ These settings are very useful if you're using a file-based appender. For exampl
 
 Setting up a file-based appender is recommended as a better alternative to using standard out. Standard out log files (see below) don't roll, and can get quite large if used for logging.
 
+### Logging with Log4j2
+
+To use Samza with [log4j2](https://logging.apache.org/log4j/2.x/), the following dependencies need to be present in SamzaContainer’s classpath:
+-	samza-log4j2
+-	log4j-slf4j-impl
+
+In Maven, these can be done by adding the following dependencies to your Samza project's pom.xml:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.logging.log4j</groupId>
+  <artifactId>log4j-slf4j-impl</artifactId>
+  <version>2.8</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.samza</groupId>
+  <artifactId>samza-log4j2</artifactId>
+  <version>0.14.0</version>
+</dependency>
+{% endhighlight %}
+
+If you’re not using Maven, please make sure both the above dependencies end up in your Samza package’s lib directory.
+
+Next, you need to make sure that these dependencies are also listed in your Samza project's build.gradle:
+
+{% highlight bash %}
+    compile(group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: "2.11.0")
+    runtime(group: 'org.apache.samza', name: 'samza-log4j2', version: "$SAMZA_VERSION")
+{% endhighlight %}
+
+
+#### Log4j2 configuration
+
+Please ensure you have log4j2.xml in your [Samza package's](packaging.html) lib directory. For example, in hello-samza application, the following lines are added to src.xml to ensure log4j2.xml is present in the lib directory:
+
+{% highlight xml %}
+<files>
+  <file>
+    <source>${basedir}/src/main/resources/log4j2.xml</source>
+    <outputDirectory>lib</outputDirectory>
+  </file>
+</files>
+{% endhighlight %}
+
+Samza's [run-class.sh](packaging.html) script will automatically set the following setting if log4j2.xml exists in your lib directory.
+
+{% highlight bash %}
+-Dlog4j.configurationFile=file:$base_dir/lib/log4j2.xml
+{% endhighlight %}
+
+Rest all of the system properties will be set exactly like in the case of log4j, stated above.
+
+#### Porting from Log4j to Log4j2
+
+If you are already using log4j and want to upgrade to using log4j2, following are the changes you will need to make in your job:
+-	Clean your lib directory. This will be rebuilt with new dependency JARs and xml files.
+-	Replace log4j’s dependencies with log4j2’s in your pom.xml/build.gradle as mentioned above. Please ensure that none of log4j’s dependencies remain in pom.xml/build.gradle
+-	Create a log4j2.xml to match your existing log4j.xml file. 
+-	Rebuild your application
+
+NOTE: Please ensure that your classpath does not contain dependencies for both log4j and log4j2, as this might cause the application logging to not work correctly. 
+
+
 #### Startup logger
-When using a rolling file appender, it is common for a long-running job to exceed the max file size and count. In such cases, the beginning of the logs will be lost. Since the beginning of the logs include some of the most critical information like configuration, it is important to not lose this information. To address this issue, Samza logs this critical information to a "startup logger" in addition to the normal logger. You can write these log messages to a separate, finite file by including the following snippet in your log4j.xml: 
+When using a rolling file appender, it is common for a long-running job to exceed the max file size and count. In such cases, the beginning of the logs will be lost. Since the beginning of the logs include some of the most critical information like configuration, it is important to not lose this information. To address this issue, Samza logs this critical information to a "startup logger" in addition to the normal logger. 
+
+##### Log4j:
+You can write these log messages to a separate, finite file by including the snippet below in your log4j.xml.
 
 {% highlight xml %}
 <appender name="StartupAppender" class="org.apache.log4j.RollingFileAppender">
@@ -95,8 +191,14 @@ When using a rolling file appender, it is common for a long-running job to excee
 </logger>
 {% endhighlight %}
 
+##### Log4j2:
+This can be done in a similar way for log4j2.xml using its defined syntax for xml files. 
+
+
 #### Changing log levels
 
+##### Log4j:
+
 Sometimes it's desirable to change the Log4J log level from `INFO` to `DEBUG` at runtime so that a developer can enable more logging for a Samza container that's exhibiting undesirable behavior. Samza provides a Log4j class called JmxAppender, which will allow you to dynamically modify log levels at runtime. The JmxAppender class is located in the samza-log4j package, and can be turned on by first adding a runtime dependency to the samza-log4j package:
 
 {% highlight xml %}
@@ -114,9 +216,23 @@ And then updating your log4j.xml to include the appender:
 <appender name="jmx" class="org.apache.samza.logging.log4j.JmxAppender" />
 {% endhighlight %}
 
-#### Stream Log4j Appender
+##### Log4j2:
+
+Log4j2 provides built-in support for JMX where all LoggerContexts, LoggerConfigs and Appenders are instrumented with MBeans and can be remotely monitored and controlled. This eliminates the need for a dedicated JMX appender. The steps to analyze and change the logger/appender properties at runtime are documented [here](https://logging.apache.org/log4j/2.0/manual/jmx.html).
+
+NOTE: If you use JMXAppender and are migrating from log4j to log4j2, simply remove it from your xml file. Don’t add it to your log4j2.xml file as it doesn’t exist in the samza-log4j2 module.  
 
-Samza provides a StreamAppender to publish the logs into a specific system. You can specify the system name using "task.log4j.system" and change name of log stream with param 'StreamName'. The [MDC](http://logback.qos.ch/manual/mdc.html) contains the keys "containerName", "jobName" and "jobId", which help identify the source of the log. In order to use this appender, add:
+
+#### Stream Appender
+
+Samza provides a StreamAppender to publish the logs into a specific system. You can specify the system name using "task.log4j.system" and change name of log stream with param 'StreamName'. The [MDC](http://logback.qos.ch/manual/mdc.html) contains the keys "containerName", "jobName" and "jobId", which help identify the source of the log. In order to use this appender, define the system name by specifying the config as follows:
+{% highlight xml %}
+task.log4j.system="<system-name>"
+{% endhighlight %}
+
+Also, the following needs to be added to the respective log4j.xml/log4j2.xml files:
+
+##### Log4j:
 
 {% highlight xml %}
 <appender name="StreamAppender" class="org.apache.samza.logging.log4j.StreamAppender">
@@ -134,15 +250,27 @@ and
 <appender-ref ref="StreamAppender"/>
 {% endhighlight %}
 
-to log4j.xml and define the system name by specifying the config:
+##### Log4j2:
+
+{% highlight xml %}
+<Stream name="StreamAppender" streamName="TestStreamName">
+  <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n"/>
+</Stream>
+{% endhighlight %}
+
+and
+
 {% highlight xml %}
-task.log4j.system="<system-name>"
+<AppenderRef ref="StreamAppender"/>
 {% endhighlight %}
 
-The default stream name for logger is generated using the following convention, though you can override it using the `StreamName` property in the log4j.xml as shown above.
-```java
-"__samza_%s_%s_logs" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
-```
+
+
+The default stream name for logger is generated using the following convention,
+ ```java
+ "__samza_%s_%s_logs" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+ ```
+though you can override it using the `StreamName` property in the xml files as shown above.
 
 Configuring the StreamAppender will automatically encode messages using logstash's [Log4J JSON format](https://github.com/logstash/log4j-jsonevent-layout). Samza also supports pluggable serialization for those that prefer non-JSON logging events. This can be configured the same way other stream serializers are defined:
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index f18e93b..05d068b 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 group=org.apache.samza
-version=0.15.0-SNAPSHOT
+version=0.15.1-SNAPSHOT
 scalaVersion=2.11
 
 gradleVersion=2.8

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index b33ab82..a5b4f51 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -36,12 +36,13 @@
   junitVersion = "4.12"
   kafkaVersion = "0.11.0.2"
   log4jVersion = "1.2.17"
+  log4j2Version = "2.11.0"
   metricsVersion = "2.2.0"
   mockitoVersion = "1.10.19"
   powerMockVersion = "1.6.6"
   rocksdbVersion = "5.7.3"
   scalaTestVersion = "3.0.1"
-  slf4jVersion = "1.6.2"
+  slf4jVersion = "1.7.7"
   yarnVersion = "2.6.1"
   zkClientVersion = "0.8"
   zookeeperVersion = "3.4.6"

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/samza-azure/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/resources/log4j.xml b/samza-azure/src/test/resources/log4j.xml
index 6259b48..4969cfd 100644
--- a/samza-azure/src/test/resources/log4j.xml
+++ b/samza-azure/src/test/resources/log4j.xml
@@ -13,10 +13,6 @@
 
 <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
 
-  @log4j.appenders.webapp@
-
-  @log4j.appenders.public_access@
-
   <appender name="console" class="org.apache.log4j.ConsoleAppender">
     <layout class="org.apache.log4j.PatternLayout">
       <param name="ConversionPattern"
@@ -24,15 +20,11 @@
     </layout>
   </appender>
 
-  @log4j.loggers.spring@
-
-  @log4j.loggers.public_access@
   <logger name="org.apache" additivity="false">
     <level value="DEBUG"/>
     <appender-ref ref="console"/>
   </logger>
 
-  @log4j.loggers.root@
   <root>
     <priority value ="DEBUG" />
     <appender-ref ref="console"/>

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/samza-azure/src/test/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/resources/log4j2.xml b/samza-azure/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..21f81f7
--- /dev/null
+++ b/samza-azure/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  you under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+
+
+<Configuration>
+
+  <Appenders>
+    <Console name="STDOUT" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n"/>
+    </Console>
+  </Appenders>
+
+  <Loggers>
+    <Logger name="org.apache" level="debug" additivity="false">
+      <AppenderRef ref="STDOUT"/>
+    </Logger>
+
+    <Root level="debug">
+      <AppenderRef ref="STDOUT"/>
+    </Root>
+  </Loggers>
+
+</Configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index b9bb1f6..add7e69 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -22,7 +22,7 @@ package org.apache.samza.runtime;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
-import org.apache.log4j.MDC;
+import org.slf4j.MDC;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.ApplicationDescriptor;
 import org.apache.samza.application.ApplicationDescriptorUtil;
@@ -134,7 +134,6 @@ public class LocalContainerRunner {
     }
 
     container.run();
-
     if (heartbeatMonitor != null) {
       heartbeatMonitor.stop();
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/samza-log4j2/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j2/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j2/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
new file mode 100644
index 0000000..5824489
--- /dev/null
+++ b/samza-log4j2/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * This class contains the methods for getting properties that are needed by the
+ * StreamAppender.
+ */
+public class Log4jSystemConfig extends JavaSystemConfig {
+
+  private static final String LOCATION_ENABLED = "task.log4j.location.info.enabled";
+  private static final String TASK_LOG4J_SYSTEM = "task.log4j.system";
+
+  public Log4jSystemConfig(Config config) {
+    super(config);
+  }
+
+  /**
+   * Defines whether or not to include file location information for Log4J
+   * appender messages. File location information includes the method, line
+   * number, class, etc.
+   *
+   * @return If true, will include file location (method, line number, etc)
+   *         information in Log4J appender messages.
+   */
+  public boolean getLocationEnabled() {
+    return "true".equals(get(Log4jSystemConfig.LOCATION_ENABLED, "false"));
+  }
+
+  /**
+   * Get the log4j system name from the config.
+   * If it's not defined, throw a ConfigException
+   *
+   * @return log4j system name
+   */
+  public String getSystemName() {
+    String log4jSystem = get(TASK_LOG4J_SYSTEM, null);
+    if (log4jSystem == null) {
+      throw new ConfigException("Missing " + TASK_LOG4J_SYSTEM + " configuration. Can't figure out the system name to use.");
+    }
+    return log4jSystem;
+  }
+
+  public String getJobName() {
+    return get(JobConfig.JOB_NAME(), null);
+  }
+
+  public String getJobId() {
+    return get(JobConfig.JOB_ID(), null);
+  }
+
+  /**
+   * Get the class name according to the serde name.
+   *
+   * @param name serde name
+   * @return serde factory name, or null if there is no factory defined for the
+   *         supplied serde name.
+   */
+  public String getSerdeClass(String name) {
+    return get(String.format(SerializerConfig.SERDE_FACTORY_CLASS(), name), null);
+  }
+
+  public String getStreamSerdeName(String systemName, String streamName) {
+    StreamConfig streamConfig =  new StreamConfig(this);
+    scala.Option<String> option = streamConfig.getStreamMsgSerde(new SystemStream(systemName, streamName));
+    return option.isEmpty() ? null : option.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
new file mode 100644
index 0000000..e1d6dd3
--- /dev/null
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j2;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.message.Message;
+import org.apache.logging.log4j.message.SimpleMessage;
+import org.apache.logging.log4j.util.StringMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.Log4jSystemConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.logging.log4j2.serializers.LoggingEventJsonSerdeFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.ExponentialSleepStrategy;
+import org.apache.samza.util.HttpUtil;
+import org.apache.samza.util.Util;
+
+@Plugin(name = "Stream", category = "Core", elementType = "appender", printObject = true)
+public class StreamAppender extends AbstractAppender {
+
+  private static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name";
+  private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
+  private static final String SOURCE = "log4j-log";
+
+  // Hidden config for now. Will move to appropriate Config class when ready to.
+  private static final String CREATE_STREAM_ENABLED = "task.log4j.create.stream.enabled";
+
+  protected static final int DEFAULT_QUEUE_SIZE = 100;
+  private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice
+
+  protected static volatile boolean systemInitialized = false;
+
+  private Config config = null;
+  private SystemStream systemStream = null;
+  private SystemProducer systemProducer = null;
+  private String key = null;
+  private String streamName = null;
+  private int partitionCount = 0;
+  private boolean isApplicationMaster;
+  private Serde<LogEvent> serde = null;
+  private Logger log = LogManager.getLogger(StreamAppender.class);
+  protected StreamAppenderMetrics metrics;
+
+  private final BlockingQueue<byte[]> logQueue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
+  protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
+
+  private Thread transferThread;
+
+  protected StreamAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, String streamName) {
+    super(name, filter, layout, ignoreExceptions);
+    this.streamName = streamName;
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    String containerName = System.getProperty(JAVA_OPTS_CONTAINER_NAME);
+    if (containerName != null) {
+      isApplicationMaster = containerName.contains(JOB_COORDINATOR_TAG);
+    } else {
+      throw new SamzaException("Got null container name from system property: " + JAVA_OPTS_CONTAINER_NAME +
+          ". This is used as the key for the log appender, so can't proceed.");
+    }
+    key = containerName; // use the container name as the key for the logs
+
+    // StreamAppender has to wait until the JobCoordinator is up when the log is in the AM
+    if (isApplicationMaster) {
+      systemInitialized = false;
+    } else {
+      setupSystem();
+      systemInitialized = true;
+    }
+  }
+
+  /**
+   * used to detect if this thread is called recursively
+   */
+  private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
+
+  /**
+   * Getter for the StreamName parameter. See also {@link #createAppender(String, Filter, Layout, boolean, String)} for when this is called.
+   * Example: {@literal <param name="StreamName" value="ExampleStreamName"/>}
+   * @return The configured stream name.
+   */
+  public String getStreamName() {
+    return this.streamName;
+  }
+
+  /**
+   * Getter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, String)} for when this is called.
+   * Example: {@literal <param name="PartitionCount" value="4"/>}
+   * @return The configured partition count of the StreamAppender stream. If not set, returns {@link JobConfig#getContainerCount()}.
+   */
+  public int getPartitionCount() {
+    if (partitionCount > 0) {
+      return partitionCount;
+    }
+    return new JobConfig(getConfig()).getContainerCount();
+  }
+
+  /**
+   * Setter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, String)} for when this is called.
+   * Example: {@literal <param name="PartitionCount" value="4"/>}
+   * @param partitionCount Configurable partition count.
+   */
+  public void setPartitionCount(int partitionCount) {
+    this.partitionCount = partitionCount;
+  }
+
+
+  @PluginFactory
+  public static StreamAppender createAppender(
+      @PluginAttribute("name") final String name,
+      @PluginElement("Filter") final Filter filter,
+      @PluginElement("Layout") Layout layout,
+      @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
+      @PluginAttribute("streamName") String streamName) {
+    return new StreamAppender(name, filter, layout, ignoreExceptions, streamName);
+  }
+
+  @Override
+  public void append(LogEvent event) {
+    if (!recursiveCall.get()) {
+      try {
+        recursiveCall.set(true);
+        if (!systemInitialized) {
+          if (JobModelManager.currentJobModelManager() != null) {
+            // JobCoordinator has been instantiated
+            setupSystem();
+            systemInitialized = true;
+          } else {
+            log.trace("Waiting for the JobCoordinator to be instantiated...");
+          }
+        } else {
+          // Serialize the event before adding to the queue to leverage the caller thread
+          // and ensure that the transferThread can keep up.
+          if (!logQueue.offer(serde.toBytes(subLog(event)), queueTimeoutS, TimeUnit.SECONDS)) {
+            // Do NOT retry adding system to the queue. Dropping the event allows us to alleviate the unlikely
+            // possibility of a deadlock, which can arise due to a circular dependency between the SystemProducer
+            // which is used for StreamAppender and the log, which uses StreamAppender. Any locks held in the callstack
+            // of those two code paths can cause a deadlock. Dropping the event allows us to proceed.
+
+            // Scenario:
+            // T1: holds L1 and is waiting for L2
+            // T2: holds L2 and is waiting to produce to BQ1 which is drained by T3 (SystemProducer) which is waiting for L1
+
+            // This has happened due to locks in Kafka and log4j (see SAMZA-1537), which are both out of our control,
+            // so dropping events in the StreamAppender is our best recourse.
+
+            // Drain the queue instead of dropping one message just to reduce the frequency of warn logs above.
+            int messagesDropped = logQueue.drainTo(new ArrayList<>()) + 1; // +1 because of the current log event
+            log.warn(String.format("Exceeded timeout %ss while trying to log to %s. Dropping %d log messages.",
+                queueTimeoutS,
+                systemStream.toString(),
+                messagesDropped));
+
+            // Emit a metric which can be monitored to ensure it doesn't happen often.
+            metrics.logMessagesDropped.inc(messagesDropped);
+          }
+          metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE));
+        }
+      } catch (Exception e) {
+        System.err.println("[StreamAppender] Error sending log message:");
+        e.printStackTrace();
+      } finally {
+        recursiveCall.set(false);
+      }
+    } else if (metrics != null) { // setupSystem() may not have been invoked yet so metrics can be null here.
+      metrics.recursiveCalls.inc();
+    }
+  }
+
+  private Message subAppend(LogEvent event) {
+    if (getLayout() == null) {
+      return new SimpleMessage(event.getMessage().getFormattedMessage());
+    } else {
+      Object obj = getLayout().toSerializable(event);
+      if (obj instanceof Message) {
+        return new SimpleMessage(((Message) obj).getFormattedMessage());
+      } else if (obj instanceof LogEvent) {
+        return new SimpleMessage(((LogEvent) obj).getMessage().getFormattedMessage());
+      } else {
+        return new SimpleMessage(obj.toString());
+      }
+    }
+  }
+
+  private LogEvent subLog(LogEvent event) {
+    return Log4jLogEvent.newBuilder()
+        .setLevel(event.getLevel())
+        .setLoggerName(event.getLoggerName())
+        .setLoggerFqcn(event.getLoggerFqcn())
+        .setMessage(subAppend(event))
+        .setThrown(event.getThrown())
+        .setContextData((StringMap) event.getContextData())
+        .setContextStack(event.getContextStack())
+        .setThreadName(event.getThreadName())
+        .setSource(event.getSource())
+        .setTimeMillis(event.getTimeMillis())
+        .build();
+  }
+
+  @Override
+  public void stop() {
+    log.info("Shutting down the StreamAppender...");
+    transferThread.interrupt();
+    try {
+      transferThread.join();
+    } catch (InterruptedException e) {
+      log.error("Interrupted while waiting for transfer thread to finish.", e);
+      Thread.currentThread().interrupt();
+    }
+
+    flushSystemProducer();
+    if (systemProducer !=  null) {
+      systemProducer.stop();
+    }
+  }
+
+  /**
+   * force the system producer to flush the messages
+   */
+  private void flushSystemProducer() {
+    if (systemProducer != null) {
+      systemProducer.flush(SOURCE);
+    }
+  }
+
+  /**
+   * get the config for the AM or containers based on the containers' names.
+   *
+   * @return Config the config of this container
+   */
+  protected Config getConfig() {
+    Config config;
+
+    try {
+      if (isApplicationMaster) {
+        config = JobModelManager.currentJobModelManager().jobModel().getConfig();
+      } else {
+        String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+        String response = HttpUtil.read(new URL(url), 30000, new ExponentialSleepStrategy());
+        config = SamzaObjectMapper.getObjectMapper().readValue(response, JobModel.class).getConfig();
+      }
+    } catch (IOException e) {
+      throw new SamzaException("can not read the config", e);
+    }
+    // Make system producer drop producer errors for StreamAppender
+    config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERROR(), "true"));
+
+    return config;
+  }
+
+  protected void setupSystem() {
+    config = getConfig();
+    SystemFactory systemFactory = null;
+    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
+
+    if (streamName == null) {
+      streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
+    }
+
+    // TODO we need the ACTUAL metrics registry, or the metrics won't get reported by the metric reporters!
+    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
+    metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
+
+    String systemName = log4jSystemConfig.getSystemName();
+    String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName);
+    if (systemFactoryName != null) {
+      systemFactory = Util.getObj(systemFactoryName, SystemFactory.class);
+    } else {
+      throw new SamzaException("Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use");
+    }
+
+    setSerde(log4jSystemConfig, systemName, streamName);
+
+    if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
+      // Explicitly create stream appender stream with the partition count the same as the number of containers.
+      System.out.println("[StreamAppender] creating stream " + streamName + " with partition count " + getPartitionCount());
+      StreamSpec streamSpec =
+          StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, getPartitionCount());
+
+      // SystemAdmin only needed for stream creation here.
+      SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
+      systemAdmin.start();
+      systemAdmin.createStream(streamSpec);
+      systemAdmin.stop();
+    }
+
+    systemProducer = systemFactory.getProducer(systemName, config, metricsRegistry);
+    systemStream = new SystemStream(systemName, streamName);
+    systemProducer.register(SOURCE);
+    systemProducer.start();
+
+    log.info(SOURCE + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName
+        + " in " + systemName + ". Logs are partitioned by " + key);
+
+    startTransferThread();
+  }
+
+  private void startTransferThread() {
+
+    try {
+      // Serialize the key once, since we will use it for every event.
+      final byte[] keyBytes = key.getBytes("UTF-8");
+
+      Runnable transferFromQueueToSystem = () -> {
+        while (!Thread.currentThread().isInterrupted()) {
+          try {
+            byte[] serializedLogEvent = logQueue.take();
+
+            OutgoingMessageEnvelope outgoingMessageEnvelope =
+                new OutgoingMessageEnvelope(systemStream, keyBytes, serializedLogEvent);
+            systemProducer.send(SOURCE, outgoingMessageEnvelope);
+
+          } catch (InterruptedException e) {
+            // Preserve the interrupted status for the loop condition.
+            Thread.currentThread().interrupt();
+          } catch (Throwable t) {
+            log.error("Error sending StreamAppender event to SystemProducer", t);
+          }
+        }
+      };
+
+      transferThread = new Thread(transferFromQueueToSystem);
+      transferThread.setDaemon(true);
+      transferThread.setName("Samza StreamAppender Producer " + transferThread.getName());
+      transferThread.start();
+
+    } catch (UnsupportedEncodingException e) {
+      throw new SamzaException(String.format(
+          "Container name: %s could not be encoded to bytes. StreamAppender cannot proceed.", key),
+          e);
+    }
+  }
+
+  protected static String getStreamName(String jobName, String jobId) {
+    if (jobName == null) {
+      throw new SamzaException("job name is null. Please specify job.name");
+    }
+    if (jobId == null) {
+      jobId = "1";
+    }
+    String streamName = "__samza_" + jobName + "_" + jobId + "_logs";
+    return streamName.replace("-", "_");
+  }
+
+  /**
+   * set the serde for this appender. It looks for the stream serde first, then system serde.
+   * If still can not get the serde, throws exceptions.
+   *
+   * @param log4jSystemConfig log4jSystemConfig for this appender
+   * @param systemName name of the system
+   * @param streamName name of the stream
+   */
+  private void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName, String streamName) {
+    String serdeClass = LoggingEventJsonSerdeFactory.class.getCanonicalName();
+    String serdeName = log4jSystemConfig.getStreamSerdeName(systemName, streamName);
+
+    if (serdeName != null) {
+      serdeClass = log4jSystemConfig.getSerdeClass(serdeName);
+    }
+
+    if (serdeClass != null) {
+      SerdeFactory<LogEvent> serdeFactory = Util.getObj(serdeClass, SerdeFactory.class);
+      serde = serdeFactory.getSerde(systemName, config);
+    } else {
+      String serdeKey = String.format(SerializerConfig.SERDE_FACTORY_CLASS(), serdeName);
+      throw new SamzaException("Can not find serializers class for key '" + serdeName + "'. Please specify " +
+          serdeKey + " property");
+    }
+  }
+
+  /**
+   * Returns the serde that is being used for the stream appender.
+   *
+   * @return The Serde&lt;LoggingEvent&gt; that the appender is using.
+   */
+  public Serde<LogEvent> getSerde() {
+    return serde;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
----------------------------------------------------------------------
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
new file mode 100644
index 0000000..38f613c
--- /dev/null
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.logging.log4j2;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsBase;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+public class StreamAppenderMetrics extends MetricsBase {
+  /** The percentage of the log queue capacity that is currently filled with messages from 0 to 100. */
+  public final Gauge<Integer> bufferFillPct;
+
+  /** The number of recursive calls to the StreamAppender. These events will not be logged. */
+  public final Counter recursiveCalls;
+
+  /** The number of log messages dropped e.g. because of buffer overflow. Does not include recursive calls. */
+  public final Counter logMessagesDropped;
+
+  public StreamAppenderMetrics(String prefix, MetricsRegistry registry) {
+    super(prefix, registry);
+    bufferFillPct = newGauge("buffer-fill-percent", 0);
+    recursiveCalls = newCounter("recursive-calls");
+    logMessagesDropped = newCounter("log-messages-dropped");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/serializers/LoggingEventJsonSerde.java
----------------------------------------------------------------------
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/serializers/LoggingEventJsonSerde.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/serializers/LoggingEventJsonSerde.java
new file mode 100644
index 0000000..e178394
--- /dev/null
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/serializers/LoggingEventJsonSerde.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j2.serializers;
+
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.logging.log4j.ThreadContext;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.util.Util;
+
+/**
+ * A JSON serde that serializes Log4J2 LogEvent objects into JSON using the
+ * standard logstash LogEvent format defined <a
+ * href="https://github.com/logstash/log4j-jsonevent-layout">here</a>.
+ */
+public class LoggingEventJsonSerde implements Serde<LogEvent> {
+  /**
+   * The JSON format version.
+   */
+  public static final int VERSION = 1;
+
+  /**
+   * The date format to use for the timestamp field.
+   */
+  public static final Format DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+
+  // Have to wrap rather than extend due to type collisions between
+  // Serde<LoggingEvent> and Serde<Object>.
+  @SuppressWarnings("rawtypes")
+  private final JsonSerde jsonSerde;
+
+  /**
+   * Defines whether to include LocationInfo data in the serialized
+   * LoggingEvent. This information includes the file, line, and class that
+   * wrote the log line.
+   */
+  private final boolean includeLocationInfo;
+
+  /**
+   * Constructs the serde without location info.
+   */
+  public LoggingEventJsonSerde() {
+    this(false);
+  }
+
+  /**
+   * Constructs the serde.
+   *
+   * @param includeLocationInfo
+   *          Whether to include location info in the logging event or not.
+   */
+  @SuppressWarnings("rawtypes")
+  public LoggingEventJsonSerde(boolean includeLocationInfo) {
+    this.includeLocationInfo = includeLocationInfo;
+    this.jsonSerde = new JsonSerde();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public byte[] toBytes(LogEvent loggingEvent) {
+    Map<String, Object> loggingEventMap = encodeToMap(loggingEvent, includeLocationInfo);
+    return jsonSerde.toBytes(loggingEventMap);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public LogEvent fromBytes(byte[] loggingEventMapBytes) {
+    Map<String, Object> loggingEventMap = (Map<String, Object>) jsonSerde.fromBytes(loggingEventMapBytes);
+    return decodeFromMap(loggingEventMap);
+  }
+
+  /**
+   * Encodes a LoggingEvent into a HashMap using the logstash JSON format.
+   *
+   * @param loggingEvent
+   *          The LoggingEvent to encode.
+   * @param includeLocationInfo
+   *          Whether to include LocationInfo in the map, or not.
+   * @return A Map representing the LoggingEvent, which is suitable to be
+   *         serialized by a JSON encoder such as Jackson.
+   */
+  @SuppressWarnings("rawtypes")
+  public static Map<String, Object> encodeToMap(LogEvent loggingEvent, boolean includeLocationInfo) {
+    Map<String, Object> logstashEvent = new LoggingEventJsonSerde.LoggingEventMap();
+    String threadName = loggingEvent.getThreadName();
+    long timestamp = loggingEvent.getTimeMillis();
+    HashMap<String, Object> exceptionInformation = new HashMap<String, Object>();
+    Map mdc = loggingEvent.getContextData().toMap();
+    ThreadContext.ContextStack ndc = loggingEvent.getContextStack();
+
+    logstashEvent.put("@version", VERSION);
+    logstashEvent.put("@timestamp", dateFormat(timestamp));
+    logstashEvent.put("source_host", getHostname());
+    logstashEvent.put("message", loggingEvent.getMessage());
+
+    if (loggingEvent.getThrown() != null) {
+      final Throwable throwableInformation = loggingEvent.getThrown();
+      if (throwableInformation.getClass().getCanonicalName() != null) {
+        exceptionInformation.put("exception_class", throwableInformation.getClass().getCanonicalName());
+      }
+      if (throwableInformation.getMessage() != null) {
+        exceptionInformation.put("exception_message", throwableInformation.getMessage());
+      }
+      if (throwableInformation.getMessage() != null) {
+        StringBuilder stackTrace = new StringBuilder(ExceptionUtils.getStackTrace(throwableInformation));
+        exceptionInformation.put("stacktrace", stackTrace);
+      }
+      logstashEvent.put("exception", exceptionInformation);
+    }
+
+    if (includeLocationInfo) {
+      StackTraceElement info = loggingEvent.getSource();
+      logstashEvent.put("file", info.getFileName());
+      logstashEvent.put("line_number", info.getLineNumber());
+      logstashEvent.put("class", info.getClassName());
+      logstashEvent.put("method", info.getMethodName());
+    }
+
+    logstashEvent.put("logger_name", loggingEvent.getLoggerName());
+    logstashEvent.put("mdc", mdc);
+    logstashEvent.put("ndc", ndc);
+    logstashEvent.put("level", loggingEvent.getLevel().toString());
+    logstashEvent.put("thread_name", threadName);
+
+    return logstashEvent;
+  }
+
+  /**
+   * This method is not currently implemented.
+   *
+   * @param loggingEventMap a map of logging events
+   *
+   * @return {@link LogEvent} decoded from the given logging event map.<br>
+   *     Currently it throws an {@link UnsupportedOperationException} as the method is not implemented yet!
+   */
+  public static LogEvent decodeFromMap(Map<String, Object> loggingEventMap) {
+    throw new UnsupportedOperationException("Unable to decode LoggingEvents.");
+  }
+
+  public static String dateFormat(long time) {
+    return DATE_FORMAT.format(new Date(time));
+  }
+
+  /**
+   * @return The hostname to use in the hostname field of the encoded
+   *         LoggingEvents.
+   */
+  public static String getHostname() {
+    try {
+      return Util.getLocalHost().getHostName();
+    } catch (Exception e) {
+      return "unknown-host";
+    }
+  }
+
+  /**
+   * A helper class that only puts non-null values into the encoded LoggingEvent
+   * map. This helps to shrink over-the-wire byte payloads for encoded
+   * LoggingEvents.
+   */
+  @SuppressWarnings("serial")
+  public static final class LoggingEventMap extends HashMap<String, Object> {
+    public Object put(String key, Object value) {
+      if (value == null) {
+        return get(key);
+      } else {
+        return super.put(key, value);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/serializers/LoggingEventJsonSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/serializers/LoggingEventJsonSerdeFactory.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/serializers/LoggingEventJsonSerdeFactory.java
new file mode 100644
index 0000000..12801eb
--- /dev/null
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/serializers/LoggingEventJsonSerdeFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j2.serializers;
+
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.Log4jSystemConfig;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+
+
+public class LoggingEventJsonSerdeFactory implements SerdeFactory<LogEvent> {
+
+  @Override
+  public Serde<LogEvent> getSerde(String name, Config config) {
+    boolean locationInfoEnabled = new Log4jSystemConfig(config).getLocationEnabled();
+    return new LoggingEventJsonSerde(locationInfoEnabled);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/serializers/LoggingEventStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/serializers/LoggingEventStringSerde.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/serializers/LoggingEventStringSerde.java
new file mode 100644
index 0000000..8946b14
--- /dev/null
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/serializers/LoggingEventStringSerde.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j2.serializers;
+
+import java.io.UnsupportedEncodingException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.message.SimpleMessage;
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A serializer for LoggingEvent. It provides two methods. {@link #toBytes(LogEvent object)} serializes
+ * the {@link org.apache.log4j.spi.LoggingEvent}'s messages into bytes. {@link #fromBytes(byte[] bytes)} will creates a new
+ * LoggingEvent based on the messages, which is deserialized from the bytes.
+ */
+public class LoggingEventStringSerde implements Serde<LogEvent> {
+
+  private static final String ENCODING = "UTF-8";
+  private final Logger logger = LogManager.getLogger(LoggingEventStringSerde.class);
+
+  @Override
+  public byte[] toBytes(LogEvent object) {
+    byte[] bytes = null;
+    if (object != null) {
+      try {
+        bytes = object.getMessage().toString().getBytes(ENCODING);
+      } catch (UnsupportedEncodingException e) {
+        throw new SamzaException("can not be encoded to byte[]", e);
+      }
+    }
+    return bytes;
+  }
+
+  /**
+   * Convert bytes to a {@link org.apache.log4j.spi.LoggingEvent}. This LoggingEvent uses logging
+   * information of the {@link LoggingEventStringSerde}, which includes log
+   * name, log category and log level.
+   *
+   * @param bytes bytes for decoding
+   * @return LoggingEvent a new LoggingEvent
+   */
+  @Override
+  public LogEvent fromBytes(byte[] bytes) {
+    if (bytes == null) {
+      return null;
+    }
+    String log;
+    try {
+      log = new String(bytes, ENCODING);
+    } catch (UnsupportedEncodingException e) {
+      throw new SamzaException("can not decode to String", e);
+    }
+    //figure out properties variable
+    return new Log4jLogEvent(logger.getName(), null, this.getClass().toString(), logger.getLevel(), new SimpleMessage(log), null, null);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/serializers/LoggingEventStringSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/serializers/LoggingEventStringSerdeFactory.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/serializers/LoggingEventStringSerdeFactory.java
new file mode 100644
index 0000000..420025f
--- /dev/null
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/serializers/LoggingEventStringSerdeFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j2.serializers;
+
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+
+public class LoggingEventStringSerdeFactory implements SerdeFactory<LogEvent> {
+  @Override
+  public Serde<LogEvent> getSerde(String name, Config config) {
+    return new LoggingEventStringSerde();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemAdmin.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemAdmin.java
new file mode 100644
index 0000000..e2c1499
--- /dev/null
+++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemAdmin.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j2;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.StreamValidationException;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+public class MockSystemAdmin implements SystemAdmin {
+  public static String createdStreamName = "";
+
+  @Override
+  public void start() {
+
+  }
+
+  @Override
+  public void stop() {
+
+  }
+
+  @Override
+  public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+    return null;
+  }
+
+  @Override
+  public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+    return null;
+  }
+
+  @Override
+  public Integer offsetComparator(String offset1, String offset2) {
+    return null;
+  }
+
+  @Override
+  public boolean createStream(StreamSpec streamSpec) {
+    createdStreamName = streamSpec.getPhysicalName();
+    return true;
+  }
+
+  @Override
+  public void validateStream(StreamSpec streamSpec) throws StreamValidationException {
+
+  }
+
+  @Override
+  public boolean clearStream(StreamSpec streamSpec) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemFactory.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemFactory.java
new file mode 100644
index 0000000..ae96e6c
--- /dev/null
+++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j2;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+
+public class MockSystemFactory implements SystemFactory {
+
+  @Override
+  public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+    return null;
+  }
+
+  @Override
+  public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+    return new MockSystemProducer();
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return new MockSystemAdmin();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducer.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducer.java
new file mode 100644
index 0000000..2180ea1
--- /dev/null
+++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j2;
+
+import java.util.ArrayList;
+
+import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+
+public class MockSystemProducer implements SystemProducer {
+  public static ArrayList<Object> messagesReceived = new ArrayList<>();
+  private static Logger log = Logger.getLogger(MockSystemProducer.class);
+  public static List<MockSystemProducerListener> listeners = new ArrayList<>();
+
+  @Override
+  public void start() {
+    log.info("mock system producer is started...");
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  @Override
+  public void register(String source) {
+  }
+
+  @Override
+  public void send(String source, OutgoingMessageEnvelope envelope) {
+    messagesReceived.add(envelope.getMessage());
+
+    listeners.forEach((listener) -> listener.onSend(source, envelope));
+  }
+
+  @Override
+  public void flush(String source) {
+  }
+
+  public interface MockSystemProducerListener {
+    void onSend(String source, OutgoingMessageEnvelope envelope);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1e880ea6/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java
new file mode 100644
index 0000000..07df10c
--- /dev/null
+++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j2;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+
+
+/**
+ * a mock class which overrides the getConfig method in SystemProducerAppender
+ * for testing purpose. Because the environment variable where the config
+ * stays is difficult to test.
+ */
+@Plugin(name = "MockSystemProducer", category = "Core", elementType = "appender", printObject = true)
+class MockSystemProducerAppender extends StreamAppender {
+  private static Config config;
+
+  protected MockSystemProducerAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, Config config, String streamName) {
+    super(name, filter, layout, ignoreExceptions, streamName);
+  }
+
+  @PluginFactory
+  public static MockSystemProducerAppender createAppender(
+      @PluginAttribute("name") final String name,
+      @PluginElement("Filter") final Filter filter,
+      @PluginElement("Layout") Layout<? extends Serializable> layout,
+      @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
+      @PluginElement("Config") final Config testConfig,
+      @PluginAttribute("streamName") String streamName) {
+    if (testConfig == null) {
+      initConfig();
+    } else {
+      config = testConfig;
+    }
+    return new MockSystemProducerAppender(name, filter, layout, ignoreExceptions, config, streamName);
+  }
+
+  @Override
+  protected Config getConfig() {
+    return config;
+  }
+
+  private static void initConfig() {
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("job.name", "log4jTest");
+    map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
+    map.put("task.log4j.system", "mock");
+    config = new MapConfig(map);
+  }
+}
+