You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2021/09/29 17:18:40 UTC

[samza] branch master updated: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running (#1532)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 4773574  SAMZA-2693: Make Samza log4j appenders agnostic of where they are running (#1532)
4773574 is described below

commit 477357430e5b099d81434e0363eed4ddc57f420e
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Wed Sep 29 10:18:34 2021 -0700

    SAMZA-2693: Make Samza log4j appenders agnostic of where they are running (#1532)
    
    API/usage changes:
    1. (functionality change) In worker containers, the logs for fetching the initial job model (in SamzaContainer.readJobModel) will no longer be sent to StreamAppender, because the config will not be initialized yet in the appender.
    2. (test change) MockSystemAdmin.createdStreamName was changed to MockSystemAdmin.createdStreamSpec. The stream name can be accessed with MockSystemAdmin.createdStreamSpec.getPhysicalName().
---
 build.gradle                                       |   1 +
 .../apache/samza/logging/LoggingContextHolder.java |  56 ++++
 .../apache/samza/runtime/ContainerLaunchUtil.java  |   7 +-
 .../apache/samza/runtime/LocalContainerRunner.java |   9 +-
 .../apache/samza/coordinator/JobModelManager.scala |  15 +-
 .../samza/logging/TestLoggingContextHolder.java    |  60 +++++
 .../samza/container/TestSamzaContainer.scala       |  10 +-
 .../samza/coordinator/TestJobModelManager.scala    |  16 +-
 .../apache/samza/logging/log4j/StreamAppender.java | 139 +++++-----
 .../samza/logging/log4j/MockSystemAdmin.java       |   4 +-
 .../samza/logging/log4j/TestStreamAppender.java    | 169 +++++++-----
 .../samza/logging/log4j2/StreamAppender.java       | 117 ++++-----
 .../samza/logging/log4j2/MockSystemAdmin.java      |   4 +-
 .../logging/log4j2/MockSystemProducerAppender.java |  78 ------
 .../samza/logging/log4j2/TestStreamAppender.java   | 284 ++++++++++++---------
 15 files changed, 525 insertions(+), 444 deletions(-)

diff --git a/build.gradle b/build.gradle
index 6facb18..cc05c21 100644
--- a/build.gradle
+++ b/build.gradle
@@ -449,6 +449,7 @@ project(":samza-log4j2_$scalaSuffix") {
     compile project(":samza-core_$scalaSuffix")
     compile "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
     testCompile "junit:junit:$junitVersion"
+    testCompile "org.mockito:mockito-core:$mockitoVersion"
   }
 }
 
diff --git a/samza-core/src/main/java/org/apache/samza/logging/LoggingContextHolder.java b/samza-core/src/main/java/org/apache/samza/logging/LoggingContextHolder.java
new file mode 100644
index 0000000..e0f5801
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/logging/LoggingContextHolder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.logging;
+
+import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Holds information to be used by loggers. For example, some custom Samza log4j/log4j2 logging appenders need system
+ * configs for initialization, so this allows the configs to be passed to those appenders.
+ */
+public class LoggingContextHolder {
+  private static final Logger LOG = LoggerFactory.getLogger(LoggingContextHolder.class);
+  public static final LoggingContextHolder INSTANCE = new LoggingContextHolder();
+
+  private final AtomicReference<Config> config = new AtomicReference<>();
+
+  @VisibleForTesting
+  LoggingContextHolder() {
+  }
+
+  /**
+   * Set the config to be used by Samza loggers.
+   * Only the config used in the first call to this method will be used. After the first call, this method will do
+   * nothing.
+   */
+  public void setConfig(Config config) {
+    if (!this.config.compareAndSet(null, config)) {
+      LOG.warn("Attempted to set config, but it was already set");
+    }
+  }
+
+  public Config getConfig() {
+    return this.config.get();
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 6c04e0a..a114e7b 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -44,6 +44,7 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
 import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.logging.LoggingContextHolder;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
@@ -81,14 +82,14 @@ public class ContainerLaunchUtil {
       ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
       String jobName, String jobId, String containerId, Optional<String> execEnvContainerId,
       JobModel jobModel) {
+    Config config = jobModel.getConfig();
 
-    // populate MDC for logging
+    // logging setup: MDC, logging context
     MDC.put("containerName", "samza-container-" + containerId);
     MDC.put("jobName", jobName);
     MDC.put("jobId", jobId);
+    LoggingContextHolder.INSTANCE.setConfig(jobModel.getConfig());
 
-
-    Config config = jobModel.getConfig();
     DiagnosticsUtil.writeMetadataFile(jobName, jobId, containerId, execEnvContainerId, config);
     run(appDesc, jobName, jobId, containerId, execEnvContainerId, jobModel, config, buildExternalContext(config));
 
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 4b2cba0..087d8bd 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -31,6 +31,7 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.logging.LoggingContextHolder;
 import org.apache.samza.util.SamzaUncaughtExceptionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,11 +51,9 @@ public class LocalContainerRunner {
         }));
 
     String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID);
-    log.info(String.format("Got container ID: %s", containerId));
     System.out.println(String.format("Container ID: %s", containerId));
 
     String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
-    log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
     System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
 
     Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
@@ -62,6 +61,12 @@ public class LocalContainerRunner {
     int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
     JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
     Config config = jobModel.getConfig();
+
+    // this call is also in ContainerLaunchUtil, but adding this here allows more logs to get handled by Samza loggers
+    LoggingContextHolder.INSTANCE.setConfig(config);
+    log.info(String.format("Got container ID: %s", containerId));
+    log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
+
     JobConfig jobConfig = new JobConfig(config);
     String jobName = jobConfig.getName()
         .orElseThrow(() -> new SamzaException(String.format("Config %s is missing", JobConfig.JOB_NAME)));
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 25ce582..22593a6 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -26,6 +26,7 @@ import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStrea
 import org.apache.samza.coordinator.server.{HttpServer, JobServlet, LocalityServlet}
 import org.apache.samza.coordinator.stream.messages.{SetContainerHostMapping, SetTaskContainerMapping, SetTaskModeMapping, SetTaskPartitionMapping}
 import org.apache.samza.job.model.JobModel
+import org.apache.samza.logging.LoggingContextHolder
 import org.apache.samza.metadatastore.MetadataStore
 import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
 import org.apache.samza.serializers.model.SamzaObjectMapper
@@ -40,16 +41,9 @@ import java.util.concurrent.atomic.AtomicReference
  * given a Config object.
  */
 object JobModelManager extends Logging {
-
   val SOURCE = "JobModelManager"
 
   /**
-   * a volatile value to store the current instantiated <code>JobModelManager</code>
-   */
-  @volatile var currentJobModelManager: JobModelManager = _
-  val serializedJobModelRef = new AtomicReference[Array[Byte]]
-
-  /**
    * Currently used only in the ApplicationMaster for yarn deployment model.
    * Does the following:
    * a) Reads the jobModel from coordinator stream using the job's configuration.
@@ -78,15 +72,14 @@ object JobModelManager extends Logging {
       val jobModel = jobModelHelper.newJobModel(config, changelogPartitionMapping)
       val jobModelToServe = new JobModel(jobModel.getConfig, jobModel.getContainers)
       val serializedJobModelToServe = SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModelToServe)
-      serializedJobModelRef.set(serializedJobModelToServe)
 
       val clusterManagerConfig = new ClusterManagerConfig(config)
       val server = new HttpServer(port = clusterManagerConfig.getCoordinatorUrlPort)
-      server.addServlet("/", new JobServlet(serializedJobModelRef))
+      server.addServlet("/", new JobServlet(new AtomicReference[Array[Byte]](serializedJobModelToServe)))
       server.addServlet("/locality", new LocalityServlet(localityManager))
 
-      currentJobModelManager = new JobModelManager(jobModelToServe, server)
-      currentJobModelManager
+      LoggingContextHolder.INSTANCE.setConfig(jobModel.getConfig)
+      new JobModelManager(jobModelToServe, server)
     } finally {
       systemAdmins.stop()
       // Not closing coordinatorStreamStore, since {@code ClusterBasedJobCoordinator} uses it to read container locality through {@code JobModel}.
diff --git a/samza-core/src/test/java/org/apache/samza/logging/TestLoggingContextHolder.java b/samza-core/src/test/java/org/apache/samza/logging/TestLoggingContextHolder.java
new file mode 100644
index 0000000..f2d8fa9
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/logging/TestLoggingContextHolder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.logging;
+
+import org.apache.samza.config.Config;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+
+public class TestLoggingContextHolder {
+  @Mock
+  private Config config;
+
+  private LoggingContextHolder loggingContextHolder;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    this.loggingContextHolder = new LoggingContextHolder();
+  }
+
+  @Test
+  public void testGet() {
+    assertNull(this.loggingContextHolder.getConfig());
+
+    this.loggingContextHolder.setConfig(this.config);
+    assertEquals(this.config, this.loggingContextHolder.getConfig());
+  }
+
+  @Test
+  public void testSetMultiple() {
+    this.loggingContextHolder.setConfig(this.config);
+    Config config0 = mock(Config.class);
+    this.loggingContextHolder.setConfig(config0);
+    // should still have first config
+    assertEquals(this.config, this.loggingContextHolder.getConfig());
+  }
+}
\ No newline at end of file
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index a86c49f..5154069 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -268,11 +268,10 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       "0" -> new ContainerModel("0", tasks),
       "1" -> new ContainerModel("1", tasks))
     val jobModel = new JobModel(config, containers)
-    def jobModelGenerator(): Array[Byte] = SamzaObjectMapper.getObjectMapper.writeValueAsBytes(jobModel)
     val server = new HttpServer
     val coordinator = new JobModelManager(jobModel, server)
-    JobModelManager.serializedJobModelRef.set(jobModelGenerator())
-    coordinator.server.addServlet("/*", new JobServlet(JobModelManager.serializedJobModelRef))
+    val serializedJobModel = SamzaObjectMapper.getObjectMapper.writeValueAsBytes(jobModel)
+    coordinator.server.addServlet("/*", new JobServlet(new AtomicReference[Array[Byte]](serializedJobModel)))
     try {
       coordinator.start
       assertEquals(jobModel, SamzaContainer.readJobModel(server.getUrl.toString))
@@ -293,11 +292,10 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       "0" -> new ContainerModel("0", tasks),
       "1" -> new ContainerModel("1", tasks))
     val jobModel = new JobModel(config, containers)
-    def jobModelGenerator(): Array[Byte] = SamzaObjectMapper.getObjectMapper.writeValueAsBytes(jobModel)
     val server = new HttpServer
     val coordinator = new JobModelManager(jobModel, server)
-    JobModelManager.serializedJobModelRef.set(jobModelGenerator())
-    val mockJobServlet = new MockJobServlet(2, JobModelManager.serializedJobModelRef)
+    val serializedJobModel = SamzaObjectMapper.getObjectMapper.writeValueAsBytes(jobModel)
+    val mockJobServlet = new MockJobServlet(2, new AtomicReference[Array[Byte]](serializedJobModel))
     coordinator.server.addServlet("/*", mockJobServlet)
     try {
       coordinator.start
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobModelManager.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobModelManager.scala
index 95856fa..191e6e3 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobModelManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobModelManager.scala
@@ -101,28 +101,26 @@ class TestJobModelManager extends FlatSpec with PrivateMethodTester {
     // We want the mocksystemconsumer to use the same instance across runs
     MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
 
-    val coordinator = getTestJobModelManager(new MapConfig((config ++ otherConfigs).asJava))
+    val jobModelManager = getTestJobModelManager(new MapConfig((config ++ otherConfigs).asJava))
     val expectedJobModel = new JobModel(new MapConfig(config.asJava), containers.asJava)
 
-    // Verify that the atomicReference is initialized
-    assertNotNull(JobModelManager.serializedJobModelRef.get())
     val expectedContainerModels = new util.TreeMap[String, ContainerModel](expectedJobModel.getContainers)
-    val jobModel = SamzaObjectMapper.getObjectMapper.readValue(JobModelManager.serializedJobModelRef.get(), classOf[JobModel])
+    val jobModel = jobModelManager.jobModel
     val actualContainerModels = new util.TreeMap[String, ContainerModel](jobModel.getContainers)
     assertEquals(expectedContainerModels, actualContainerModels)
 
-    coordinator.start
-    val expectedConfig: Config = coordinator.jobModel.getConfig
+    jobModelManager.start
+    val expectedConfig: Config = jobModelManager.jobModel.getConfig
     val actualConfig: Config = new MapConfig(config.asJava)
     assertTrue(expectedConfig.entrySet().containsAll(actualConfig.entrySet()))
-    assertEquals(expectedJobModel.getContainers, coordinator.jobModel.getContainers)
+    assertEquals(expectedJobModel.getContainers, jobModelManager.jobModel.getContainers)
 
-    val response = HttpUtil.read(coordinator.server.getUrl)
+    val response = HttpUtil.read(jobModelManager.server.getUrl)
     // Verify that the JobServlet is serving the correct jobModel
     val jobModelFromCoordinatorUrl = SamzaObjectMapper.getObjectMapper.readValue(response, classOf[JobModel])
     assertEquals(expectedJobModel.getContainers, jobModelFromCoordinatorUrl.getContainers)
 
-    coordinator.stop
+    jobModelManager.stop
   }
 
   @Test
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index e3d9771..6df3a21 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -19,15 +19,14 @@
 
 package org.apache.samza.logging.log4j;
 
-import com.google.common.collect.ImmutableMap;
-import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.Logger;
 import org.apache.log4j.spi.LoggingEvent;
@@ -37,24 +36,19 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.Log4jSystemConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SerializerConfig;
-import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.TaskConfig;
-import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.job.model.JobModel;
+import org.apache.samza.logging.LoggingContextHolder;
 import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeFactory;
-import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.ExponentialSleepStrategy;
-import org.apache.samza.util.HttpUtil;
 import org.apache.samza.util.ReflectionUtil;
 
 /**
@@ -65,7 +59,6 @@ import org.apache.samza.util.ReflectionUtil;
 public class StreamAppender extends AppenderSkeleton {
 
   private static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name";
-  private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
   private static final String SOURCE = "log4j-log";
 
   // Hidden config for now. Will move to appropriate Config class when ready to.
@@ -74,7 +67,7 @@ public class StreamAppender extends AppenderSkeleton {
   protected static final int DEFAULT_QUEUE_SIZE = 100;
   private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice
 
-  protected static volatile boolean systemInitialized = false;
+  protected volatile boolean systemInitialized = false;
 
   private Config config = null;
   private SystemStream systemStream = null;
@@ -82,7 +75,6 @@ public class StreamAppender extends AppenderSkeleton {
   private String key = null;
   private String streamName = null;
   private int partitionCount = 0;
-  private boolean isApplicationMaster = false;
   private Serde<LoggingEvent> serde = null;
   private Logger log = Logger.getLogger(StreamAppender.class);
   protected StreamAppenderMetrics metrics;
@@ -118,6 +110,7 @@ public class StreamAppender extends AppenderSkeleton {
   /**
    * Getter for the number of partitions to create on a new StreamAppender stream. See also {@link #activateOptions()} for when this is called.
    * Example: {@literal <param name="PartitionCount" value="4"/>}
+   * This needs to be called after the full Samza job config is available in {@link LoggingContextHolder}.
    * @return The configured partition count of the StreamAppender stream. If not set, returns {@link JobConfig#getContainerCount()}.
    */
   public int getPartitionCount() {
@@ -142,21 +135,11 @@ public class StreamAppender extends AppenderSkeleton {
   @Override
   public void activateOptions() {
     String containerName = System.getProperty(JAVA_OPTS_CONTAINER_NAME);
-    if (containerName != null) {
-      isApplicationMaster = containerName.contains(JOB_COORDINATOR_TAG);
-    } else {
+    if (containerName == null) {
       throw new SamzaException("Got null container name from system property: " + JAVA_OPTS_CONTAINER_NAME +
           ". This is used as the key for the log appender, so can't proceed.");
     }
     key = containerName; // use the container name as the key for the logs
-
-    // StreamAppender has to wait until the JobCoordinator is up when the log is in the AM
-    if (isApplicationMaster) {
-      systemInitialized = false;
-    } else {
-      setupSystem();
-      systemInitialized = true;
-    }
   }
 
   @Override
@@ -165,40 +148,21 @@ public class StreamAppender extends AppenderSkeleton {
       try {
         recursiveCall.set(true);
         if (!systemInitialized) {
-          if (JobModelManager.currentJobModelManager() != null) {
-            // JobCoordinator has been instantiated
-            setupSystem();
-            systemInitialized = true;
+          // configs are needed to set up producer system, so check that before actually initializing
+          if (readyToInitialize()) {
+            synchronized (this) {
+              if (!systemInitialized) {
+                setupSystem();
+                systemInitialized = true;
+              }
+            }
+            handleEvent(event);
           } else {
-            log.trace("Waiting for the JobCoordinator to be instantiated...");
+            // skip sending the log to the stream if initialization can't happen yet
+            log.trace("Waiting for config to become available before log can be handled");
           }
         } else {
-          // Serialize the event before adding to the queue to leverage the caller thread
-          // and ensure that the transferThread can keep up.
-          if (!logQueue.offer(serde.toBytes(subLog(event)), queueTimeoutS, TimeUnit.SECONDS)) {
-            // Do NOT retry adding to the queue. Dropping the event allows us to alleviate the unlikely
-            // possibility of a deadlock, which can arise due to a circular dependency between the SystemProducer
-            // which is used for StreamAppender and the log, which uses StreamAppender. Any locks held in the callstack
-            // of those two code paths can cause a deadlock. Dropping the event allows us to proceed.
-
-            // Scenario:
-            // T1: holds L1 and is waiting for L2
-            // T2: holds L2 and is waiting to produce to BQ1 which is drained by T3 (SystemProducer) which is waiting for L1
-
-            // This has happened due to locks in Kafka and log4j (see SAMZA-1537), which are both out of our control,
-            // so dropping events in the StreamAppender is our best recourse.
-
-            // Drain the queue instead of dropping one message just to reduce the frequency of warn logs above.
-            int messagesDropped = logQueue.drainTo(new ArrayList<>()) + 1; // +1 because of the current log event
-            log.warn(String.format("Exceeded timeout %ss while trying to log to %s. Dropping %d log messages.",
-                queueTimeoutS,
-                systemStream.toString(),
-                messagesDropped));
-
-            // Emit a metric which can be monitored to ensure it doesn't happen often.
-            metrics.logMessagesDropped.inc(messagesDropped);
-          }
-          metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE));
+          handleEvent(event);
         }
       } catch (Exception e) {
         System.err.println("[StreamAppender] Error sending log message:");
@@ -211,6 +175,35 @@ public class StreamAppender extends AppenderSkeleton {
     }
   }
 
+  private void handleEvent(LoggingEvent event) throws InterruptedException {
+    // Serialize the event before adding to the queue to leverage the caller thread
+    // and ensure that the transferThread can keep up.
+    if (!logQueue.offer(serde.toBytes(subLog(event)), queueTimeoutS, TimeUnit.SECONDS)) {
+      // Do NOT retry adding to the queue. Dropping the event allows us to alleviate the unlikely
+      // possibility of a deadlock, which can arise due to a circular dependency between the SystemProducer
+      // which is used for StreamAppender and the log, which uses StreamAppender. Any locks held in the callstack
+      // of those two code paths can cause a deadlock. Dropping the event allows us to proceed.
+
+      // Scenario:
+      // T1: holds L1 and is waiting for L2
+      // T2: holds L2 and is waiting to produce to BQ1 which is drained by T3 (SystemProducer) which is waiting for L1
+
+      // This has happened due to locks in Kafka and log4j (see SAMZA-1537), which are both out of our control,
+      // so dropping events in the StreamAppender is our best recourse.
+
+      // Drain the queue instead of dropping one message just to reduce the frequency of warn logs above.
+      int messagesDropped = logQueue.drainTo(new ArrayList<>()) + 1; // +1 because of the current log event
+      log.warn(String.format("Exceeded timeout %ss while trying to log to %s. Dropping %d log messages.",
+          queueTimeoutS,
+          systemStream.toString(),
+          messagesDropped));
+
+      // Emit a metric which can be monitored to ensure it doesn't happen often.
+      metrics.logMessagesDropped.inc(messagesDropped);
+    }
+    metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE));
+  }
+
   private String subAppend(LoggingEvent event) {
     if (this.layout == null) {
       return event.getRenderedMessage();
@@ -259,31 +252,23 @@ public class StreamAppender extends AppenderSkeleton {
     }
   }
 
+  @VisibleForTesting
+  boolean readyToInitialize() {
+    return LoggingContextHolder.INSTANCE.getConfig() != null;
+  }
+
   /**
-   * get the config for the AM or containers based on the containers' names.
-   *
-   * @return Config the config of this container
+   * This should only be called after verifying that the {@link LoggingContextHolder} has the config.
    */
   protected Config getConfig() {
-    Config config;
-
-    try {
-      if (isApplicationMaster) {
-        config = JobModelManager.currentJobModelManager().jobModel().getConfig();
-      } else {
-        String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
-        String response = HttpUtil.read(new URL(url), 30000, new ExponentialSleepStrategy());
-        config = SamzaObjectMapper.getObjectMapper().readValue(response, JobModel.class).getConfig();
-      }
-    } catch (IOException e) {
-      throw new SamzaException("can not read the config", e);
-    }
+    Config config = LoggingContextHolder.INSTANCE.getConfig();
     // Make system producer drop producer errors for StreamAppender
-    config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS, "true"));
-
-    return config;
+    return new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS, "true"));
   }
 
+  /**
+   * This should only be called after verifying that the {@link LoggingContextHolder} has the config.
+   */
   protected void setupSystem() {
     config = getConfig();
     Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
@@ -305,9 +290,10 @@ public class StreamAppender extends AppenderSkeleton {
     setSerde(log4jSystemConfig, systemName, streamName);
 
     if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
-      // Explicitly create stream appender stream with the partition count the same as the number of containers.
-      System.out.println("[StreamAppender] creating stream " + streamName + " with partition count " + getPartitionCount());
-      StreamSpec streamSpec = StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, getPartitionCount());
+      int streamPartitionCount = getPartitionCount();
+      System.out.println(
+          "[StreamAppender] creating stream " + streamName + " with partition count " + streamPartitionCount);
+      StreamSpec streamSpec = StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, streamPartitionCount);
 
       // SystemAdmin only needed for stream creation here.
       SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
@@ -328,7 +314,6 @@ public class StreamAppender extends AppenderSkeleton {
   }
 
   private void startTransferThread() {
-
     try {
       // Serialize the key once, since we will use it for every event.
       final byte[] keyBytes = key.getBytes("UTF-8");
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemAdmin.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemAdmin.java
index 5c0e526..36d77da 100644
--- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemAdmin.java
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemAdmin.java
@@ -29,7 +29,7 @@ import org.apache.samza.system.SystemStreamPartition;
 
 
 public class MockSystemAdmin implements SystemAdmin {
-  public static String createdStreamName = "";
+  public static StreamSpec createdStreamSpec = null;
 
   @Override
   public void start() {
@@ -58,7 +58,7 @@ public class MockSystemAdmin implements SystemAdmin {
 
   @Override
   public boolean createStream(StreamSpec streamSpec) {
-    createdStreamName = streamSpec.getPhysicalName();
+    createdStreamSpec = streamSpec;
     return true;
   }
 
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
index e5c1e97..79c6856 100644
--- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
@@ -19,17 +19,15 @@
 
 package org.apache.samza.logging.log4j;
 
-import static org.junit.Assert.*;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 import org.apache.samza.config.Config;
@@ -41,26 +39,30 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class TestStreamAppender {
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
-  static Logger log = Logger.getLogger(TestStreamAppender.class);
+public class TestStreamAppender {
+  private static final Logger LOG = Logger.getLogger(TestStreamAppender.class);
 
   @After
   public void tearDown() {
-    log.removeAllAppenders();
+    LOG.removeAllAppenders();
     MockSystemProducer.listeners.clear();
     MockSystemProducer.messagesReceived.clear();
-    MockSystemAdmin.createdStreamName = "";
+    MockSystemAdmin.createdStreamSpec = null;
   }
 
   @Test
   public void testDefaultSerde() {
     System.setProperty("samza.container.name", "samza-container-1");
     MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
-    PatternLayout layout = new PatternLayout();
-    layout.setConversionPattern("%m");
-    systemProducerAppender.setLayout(layout);
     systemProducerAppender.activateOptions();
+    LOG.addAppender(systemProducerAppender);
+    // trigger system set up by sending a log
+    LOG.info("log message");
     assertNotNull(systemProducerAppender.getSerde());
     assertEquals(LoggingEventJsonSerde.class, systemProducerAppender.getSerde().getClass());
   }
@@ -69,7 +71,7 @@ public class TestStreamAppender {
   public void testNonDefaultSerde() {
     System.setProperty("samza.container.name", "samza-container-1");
     String streamName = StreamAppender.getStreamName("log4jTest", "1");
-    Map<String, String> map = new HashMap<String, String>();
+    Map<String, String> map = new HashMap<>();
     map.put("job.name", "log4jTest");
     map.put("job.id", "1");
     map.put("serializers.registry.log4j-string.class", LoggingEventStringSerdeFactory.class.getCanonicalName());
@@ -77,10 +79,10 @@ public class TestStreamAppender {
     map.put("systems.mock.streams." + streamName + ".samza.msg.serde", "log4j-string");
     map.put("task.log4j.system", "mock");
     MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(new MapConfig(map));
-    PatternLayout layout = new PatternLayout();
-    layout.setConversionPattern("%m");
-    systemProducerAppender.setLayout(layout);
     systemProducerAppender.activateOptions();
+    LOG.addAppender(systemProducerAppender);
+    // trigger system set up by sending a log
+    LOG.info("log message");
     assertNotNull(systemProducerAppender.getSerde());
     assertEquals(LoggingEventStringSerde.class, systemProducerAppender.getSerde().getClass());
   }
@@ -94,30 +96,36 @@ public class TestStreamAppender {
     layout.setConversionPattern("%m");
     systemProducerAppender.setLayout(layout);
     systemProducerAppender.activateOptions();
-    log.addAppender(systemProducerAppender);
+    LOG.addAppender(systemProducerAppender);
 
     List<String> messages = Lists.newArrayList("testing1", "testing2");
     logAndVerifyMessages(messages);
   }
 
   @Test
-  public void testSystemProducerAppenderInAM() throws InterruptedException {
+  public void testSystemProducerAppenderNotInitialized() throws InterruptedException {
     System.setProperty("samza.container.name", "samza-job-coordinator");
 
-    MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
+    // add a counter to make sure that the initial message doesn't get produced
+    AtomicInteger numMessagesProduced = new AtomicInteger(0);
+    MockSystemProducer.listeners.add((source, envelope) -> numMessagesProduced.incrementAndGet());
+
+    MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(baseConfig(), false);
     PatternLayout layout = new PatternLayout();
     layout.setConversionPattern("%m");
     systemProducerAppender.setLayout(layout);
     systemProducerAppender.activateOptions();
-    log.addAppender(systemProducerAppender);
+    LOG.addAppender(systemProducerAppender);
 
-    log.info("no-received"); // System isn't initialized yet, so this message should be dropped
+    LOG.info("no-received"); // System isn't initialized yet, so this message should be dropped
 
+    // explicitly trigger initialization to test that new messages do get sent to the stream
     systemProducerAppender.setupSystem();
-    MockSystemProducerAppender.systemInitialized = true;
+    systemProducerAppender.systemInitialized = true;
 
     List<String> messages = Lists.newArrayList("testing3", "testing4");
     logAndVerifyMessages(messages);
+    assertEquals(messages.size(), numMessagesProduced.get());
   }
 
   @Test
@@ -125,13 +133,12 @@ public class TestStreamAppender {
     System.setProperty("samza.container.name", "samza-container-1");
 
     MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
-    PatternLayout layout = new PatternLayout();
-    layout.setConversionPattern("%m");
-    systemProducerAppender.setLayout(layout);
-    systemProducerAppender.activateOptions(); // setupSystem() called inside here.
-    log.addAppender(systemProducerAppender);
+    systemProducerAppender.activateOptions();
+    LOG.addAppender(systemProducerAppender);
+    // trigger system set up by sending a log
+    LOG.info("log message");
 
-    Assert.assertEquals("", MockSystemAdmin.createdStreamName);
+    Assert.assertNull(MockSystemAdmin.createdStreamSpec);
   }
 
   @Test
@@ -146,32 +153,59 @@ public class TestStreamAppender {
         "task.log4j.system", "mock"));
 
     MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(mapConfig);
-    PatternLayout layout = new PatternLayout();
-    layout.setConversionPattern("%m");
-    systemProducerAppender.setLayout(layout);
-    systemProducerAppender.activateOptions(); // setupSystem() called inside here.
-    log.addAppender(systemProducerAppender);
+    systemProducerAppender.activateOptions();
+    LOG.addAppender(systemProducerAppender);
+    // trigger system set up by sending a log
+    LOG.info("log message");
 
-    Assert.assertEquals("__samza_log4jTest_1_logs", MockSystemAdmin.createdStreamName);
+    Assert.assertEquals("__samza_log4jTest_1_logs", MockSystemAdmin.createdStreamSpec.getPhysicalName());
+    // job.container.count defaults to 1
+    Assert.assertEquals(1, MockSystemAdmin.createdStreamSpec.getPartitionCount());
   }
 
   @Test
-  public void testDefaultPartitionCount() {
-    MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
-    Assert.assertEquals(1, systemProducerAppender.getPartitionCount()); // job.container.count defaults to 1
+  public void testStreamCreationUpSetupWithJobContainerCountConfigured() {
+    System.setProperty("samza.container.name", "samza-container-1");
 
-    Map<String, String> map = new HashMap<>();
-    map.put("job.name", "log4jTest");
-    map.put("job.id", "1");
-    map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
-    map.put("task.log4j.system", "mock");
-    map.put("job.container.count", "4");
-    systemProducerAppender = new MockSystemProducerAppender(new MapConfig(map));
-    Assert.assertEquals(4, systemProducerAppender.getPartitionCount());
+    MapConfig mapConfig = new MapConfig(new ImmutableMap.Builder<String, String>()
+        .put("task.log4j.create.stream.enabled", "true") // Enable explicit stream creation
+        .put("job.name", "log4jTest")
+        .put("job.id", "1")
+        .put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName())
+        .put("task.log4j.system", "mock")
+        .put("job.container.count", "4")
+        .build());
+
+    MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(mapConfig);
+    systemProducerAppender.activateOptions();
+    LOG.addAppender(systemProducerAppender);
+    // trigger system set up by sending a log
+    LOG.info("log message");
+
+    Assert.assertEquals("__samza_log4jTest_1_logs", MockSystemAdmin.createdStreamSpec.getPhysicalName());
+    Assert.assertEquals(4, MockSystemAdmin.createdStreamSpec.getPartitionCount());
+  }
+
+  @Test
+  public void testStreamCreationUpSetupWithPartitionCountConfigured() {
+    System.setProperty("samza.container.name", "samza-container-1");
+
+    MapConfig mapConfig = new MapConfig(ImmutableMap.of(
+        "task.log4j.create.stream.enabled", "true", // Enable explicit stream creation
+        "job.name", "log4jTest",
+        "job.id", "1",
+        "systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName(),
+        "task.log4j.system", "mock"));
 
-    systemProducerAppender = new MockSystemProducerAppender();
+    MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(mapConfig);
     systemProducerAppender.setPartitionCount(8);
-    Assert.assertEquals(8, systemProducerAppender.getPartitionCount());
+    systemProducerAppender.activateOptions();
+    LOG.addAppender(systemProducerAppender);
+    // trigger system set up by sending a log
+    LOG.info("log message");
+
+    Assert.assertEquals("__samza_log4jTest_1_logs", MockSystemAdmin.createdStreamSpec.getPhysicalName());
+    Assert.assertEquals(8, MockSystemAdmin.createdStreamSpec.getPartitionCount());
   }
 
   @Test
@@ -183,7 +217,7 @@ public class TestStreamAppender {
     layout.setConversionPattern("%m");
     systemProducerAppender.setLayout(layout);
     systemProducerAppender.activateOptions();
-    log.addAppender(systemProducerAppender);
+    LOG.addAppender(systemProducerAppender);
 
     List<String> messages = Lists.newArrayList("testing5", "testing6", "testing7");
 
@@ -197,7 +231,7 @@ public class TestStreamAppender {
     });
 
     // Log the messages
-    messages.forEach((message) -> log.info(message));
+    messages.forEach(LOG::info);
 
     // Wait for messages
     assertTrue("Thread did not send all messages. Count: " + allMessagesSent.getCount(),
@@ -214,7 +248,7 @@ public class TestStreamAppender {
     layout.setConversionPattern("%m");
     systemProducerAppender.setLayout(layout);
     systemProducerAppender.activateOptions();
-    log.addAppender(systemProducerAppender);
+    LOG.addAppender(systemProducerAppender);
 
     int extraMessageCount = 5;
     int expectedMessagesSent = extraMessageCount - 1; // -1 because when the queue is drained there is one additional message that couldn't be added
@@ -236,7 +270,7 @@ public class TestStreamAppender {
     });
 
     // Log the messages. This is where the timeout will happen!
-    messages.forEach((message) -> log.info(message));
+    messages.forEach(LOG::info);
 
     assertEquals(messages.size() - expectedMessagesSent, systemProducerAppender.metrics.logMessagesDropped.getCount());
 
@@ -255,7 +289,7 @@ public class TestStreamAppender {
     MockSystemProducer.listeners.add((source, envelope) -> allMessagesSent.countDown());
 
     // Log the messages
-    messages.forEach((message) -> log.info(message));
+    messages.forEach(LOG::info);
 
     // Wait for messages
     assertTrue("Timeout while waiting for StreamAppender to send all messages. Count: " + allMessagesSent.getCount(),
@@ -273,24 +307,37 @@ public class TestStreamAppender {
     return String.format("\"message\":\"%s\"", message);
   }
 
+  private static Config baseConfig() {
+    Map<String, String> map = new HashMap<>();
+    map.put("job.name", "log4jTest");
+    map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
+    map.put("task.log4j.system", "mock");
+    return new MapConfig(map);
+  }
+
   /**
-   * a mock class which overrides the getConfig method in SystemProducerAppener
-   * for testing purpose. Because the environment variable where the config
-   * stays is difficult to test.
+   * Mock class which overrides config-related methods in {@link StreamAppender} for testing.
    */
-  class MockSystemProducerAppender extends StreamAppender {
+  private static class MockSystemProducerAppender extends StreamAppender {
     private final Config config;
+    private final boolean readyToInitialize;
 
     public MockSystemProducerAppender() {
-      Map<String, String> map = new HashMap<String, String>();
-      map.put("job.name", "log4jTest");
-      map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
-      map.put("task.log4j.system", "mock");
-      config = new MapConfig(map);
+      this(baseConfig(), true);
     }
 
     public MockSystemProducerAppender(Config config) {
+      this(config, true);
+    }
+
+    public MockSystemProducerAppender(Config config, boolean readyToInitialize) {
       this.config = config;
+      this.readyToInitialize = readyToInitialize;
+    }
+
+    @Override
+    boolean readyToInitialize() {
+      return this.readyToInitialize;
     }
 
     @Override
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index f82a576..aab712a 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -19,11 +19,8 @@
 
 package org.apache.samza.logging.log4j2;
 
-import com.google.common.collect.ImmutableMap;
-import java.io.IOException;
 import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -46,27 +43,20 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.Log4jSystemConfig;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.SerializerConfig;
-import org.apache.samza.config.ShellCommandConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.job.model.JobModel;
+import org.apache.samza.logging.LoggingContextHolder;
 import org.apache.samza.logging.log4j2.serializers.LoggingEventJsonSerdeFactory;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeFactory;
-import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.ExponentialSleepStrategy;
-import org.apache.samza.util.HttpUtil;
 import org.apache.samza.util.MetricsReporterLoader;
 import org.apache.samza.util.ReflectionUtil;
 
@@ -74,7 +64,6 @@ import org.apache.samza.util.ReflectionUtil;
 public class StreamAppender extends AbstractAppender {
 
   private static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name";
-  private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
   private static final String SOURCE = "log4j-log";
 
   // Hidden config for now. Will move to appropriate Config class when ready to.
@@ -89,13 +78,13 @@ public class StreamAppender extends AbstractAppender {
   private byte[] keyBytes; // Serialize the key once, since we will use it for every event.
   private String containerName = null;
   private int partitionCount = 0;
-  private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
 
-  private Thread transferThread;
+  private volatile Thread transferThread;
   private Config config = null;
   private String streamName = null;
   private final boolean usingAsyncLogger;
+  private final LoggingContextHolder loggingContextHolder;
 
   /**
    * used to detect if this thread is called recursively
@@ -103,24 +92,35 @@ public class StreamAppender extends AbstractAppender {
   private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
 
   protected static final int DEFAULT_QUEUE_SIZE = 100;
-  protected static volatile boolean systemInitialized = false;
+  protected volatile boolean systemInitialized = false;
   protected StreamAppenderMetrics metrics;
   protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
 
+  /**
+   * Constructor is protected so that this class can be extended.
+   */
   protected StreamAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions,
       boolean usingAsyncLogger, String streamName) {
+    this(name, filter, layout, ignoreExceptions, usingAsyncLogger, streamName, LoggingContextHolder.INSTANCE);
+  }
+
+  /**
+   * Constructor is protected so that this class can be extended.
+   * @param loggingContextHolder included so that this can be injected for testing purposes in child classes
+   */
+  protected StreamAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions,
+      boolean usingAsyncLogger, String streamName, LoggingContextHolder loggingContextHolder) {
     super(name, filter, layout, ignoreExceptions);
     this.streamName = streamName;
     this.usingAsyncLogger = usingAsyncLogger;
+    this.loggingContextHolder = loggingContextHolder;
   }
 
   @Override
   public void start() {
     super.start();
     containerName = System.getProperty(JAVA_OPTS_CONTAINER_NAME);
-    if (containerName != null) {
-      isApplicationMaster = containerName.contains(JOB_COORDINATOR_TAG);
-    } else {
+    if (containerName == null) {
       throw new SamzaException("Got null container name from system property: " + JAVA_OPTS_CONTAINER_NAME +
           ". This is used as the key for the log appender, so can't proceed.");
     }
@@ -132,14 +132,6 @@ public class StreamAppender extends AbstractAppender {
       throw new SamzaException(
           String.format("Container name: %s could not be encoded to bytes. %s cannot proceed.", key, getName()), e);
     }
-
-    // StreamAppender has to wait until the JobCoordinator is up when the log is in the AM
-    if (isApplicationMaster) {
-      systemInitialized = false;
-    } else {
-      setupSystem();
-      systemInitialized = true;
-    }
   }
 
   /**
@@ -152,11 +144,11 @@ public class StreamAppender extends AbstractAppender {
   }
 
   /**
-   * Getter for the Config parameter.
+   * This should only be called after verifying that the {@link LoggingContextHolder} has the config.
    */
   protected Config getConfig() {
     if (config == null) {
-      config = fetchConfig();
+      config = this.loggingContextHolder.getConfig();
     }
     return this.config;
   }
@@ -164,6 +156,7 @@ public class StreamAppender extends AbstractAppender {
   /**
    * Getter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, boolean, String)} for when this is called.
    * Example: {@literal <param name="PartitionCount" value="4"/>}
+   * This needs to be called after the appender is initialized with the full Samza job config in {@link #setupSystem()}.
    * @return The configured partition count of the StreamAppender stream. If not set, returns {@link JobConfig#getContainerCount()}.
    */
   public int getPartitionCount() {
@@ -200,15 +193,20 @@ public class StreamAppender extends AbstractAppender {
       try {
         recursiveCall.set(true);
         if (!systemInitialized) {
-          if (JobModelManager.currentJobModelManager() != null) {
-            // JobCoordinator has been instantiated
-            setupSystem();
-            systemInitialized = true;
+          // configs are needed to set up producer system, so check that before actually initializing
+          if (this.loggingContextHolder.getConfig() != null) {
+            synchronized (this) {
+              if (!systemInitialized) {
+                setupSystem();
+                systemInitialized = true;
+              }
+            }
+            handleEvent(event);
           } else {
-            System.out.println("Waiting for the JobCoordinator to be instantiated...");
+            // skip sending the log to the stream if initialization can't happen yet
+            System.out.println("Waiting for config to become available before log can be handled");
           }
         } else {
-          // handle event based on if async or sync logger is being used
           handleEvent(event);
         }
       } catch (Exception e) {
@@ -302,16 +300,18 @@ public class StreamAppender extends AbstractAppender {
   @Override
   public void stop() {
     System.out.println(String.format("Shutting down the %s...", getName()));
-    transferThread.interrupt();
-    try {
-      transferThread.join();
-    } catch (InterruptedException e) {
-      System.err.println("Interrupted while waiting for transfer thread to finish." + e);
-      Thread.currentThread().interrupt();
+    if (transferThread != null) {
+      transferThread.interrupt();
+      try {
+        transferThread.join();
+      } catch (InterruptedException e) {
+        System.err.println("Interrupted while waiting for transfer thread to finish." + e);
+        Thread.currentThread().interrupt();
+      }
     }
 
     flushSystemProducer();
-    if (systemProducer !=  null) {
+    if (systemProducer != null) {
       systemProducer.stop();
     }
   }
@@ -325,31 +325,6 @@ public class StreamAppender extends AbstractAppender {
     }
   }
 
-  /**
-   * get the config for the AM or containers based on the containers' names.
-   *
-   * @return Config the config of this container
-   */
-  private Config fetchConfig() {
-    Config config;
-
-    try {
-      if (isApplicationMaster) {
-        config = JobModelManager.currentJobModelManager().jobModel().getConfig();
-      } else {
-        String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
-        String response = HttpUtil.read(new URL(url), 30000, new ExponentialSleepStrategy());
-        config = SamzaObjectMapper.getObjectMapper().readValue(response, JobModel.class).getConfig();
-      }
-    } catch (IOException e) {
-      throw new SamzaException("can not read the config", e);
-    }
-    // Make system producer drop producer errors for StreamAppender
-    config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS, "true"));
-
-    return config;
-  }
-
   protected Log4jSystemConfig getLog4jSystemConfig(Config config) {
     return new Log4jSystemConfig(config);
   }
@@ -360,10 +335,9 @@ public class StreamAppender extends AbstractAppender {
 
   protected void setupStream(SystemFactory systemFactory, String systemName) {
     if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
-      // Explicitly create stream appender stream with the partition count the same as the number of containers.
-      System.out.println(String.format("[%s] creating stream ", getName()) + streamName + " with partition count " + getPartitionCount());
-      StreamSpec streamSpec =
-          StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, getPartitionCount());
+      int streamPartitionCount = getPartitionCount();
+      System.out.println(String.format("[%s] creating stream ", getName()) + streamName + " with partition count " + streamPartitionCount);
+      StreamSpec streamSpec = StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, streamPartitionCount);
 
       // SystemAdmin only needed for stream creation here.
       SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
@@ -373,6 +347,9 @@ public class StreamAppender extends AbstractAppender {
     }
   }
 
+  /**
+   * This should only be called after verifying that the {@link LoggingContextHolder} has the config.
+   */
   protected void setupSystem() {
     config = getConfig();
     Log4jSystemConfig log4jSystemConfig = getLog4jSystemConfig(config);
diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemAdmin.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemAdmin.java
index e2c1499..be99ad3 100644
--- a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemAdmin.java
+++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemAdmin.java
@@ -29,7 +29,7 @@ import org.apache.samza.system.SystemStreamPartition;
 
 
 public class MockSystemAdmin implements SystemAdmin {
-  public static String createdStreamName = "";
+  public static StreamSpec createdStreamSpec = null;
 
   @Override
   public void start() {
@@ -58,7 +58,7 @@ public class MockSystemAdmin implements SystemAdmin {
 
   @Override
   public boolean createStream(StreamSpec streamSpec) {
-    createdStreamName = streamSpec.getPhysicalName();
+    createdStreamSpec = streamSpec;
     return true;
   }
 
diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java
deleted file mode 100644
index ce6f081..0000000
--- a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.logging.log4j2;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.logging.log4j.core.Filter;
-import org.apache.logging.log4j.core.Layout;
-import org.apache.logging.log4j.core.config.plugins.Plugin;
-import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
-import org.apache.logging.log4j.core.config.plugins.PluginElement;
-import org.apache.logging.log4j.core.config.plugins.PluginFactory;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-
-
-/**
- * a mock class which overrides the getConfig method in SystemProducerAppender
- * for testing purpose. Because the environment variable where the config
- * stays is difficult to test.
- */
-@Plugin(name = "MockSystemProducer", category = "Core", elementType = "appender", printObject = true)
-class MockSystemProducerAppender extends StreamAppender {
-  private static Config config;
-
-  protected MockSystemProducerAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, final boolean usingAsyncLogger, Config config, String streamName) {
-    super(name, filter, layout, ignoreExceptions, usingAsyncLogger, streamName);
-  }
-
-  @PluginFactory
-  public static MockSystemProducerAppender createAppender(
-      @PluginAttribute("name") final String name,
-      @PluginElement("Filter") final Filter filter,
-      @PluginElement("Layout") Layout<? extends Serializable> layout,
-      @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
-      @PluginAttribute(value = "usingAsyncLogger", defaultBoolean = false) final boolean usingAsyncLogger,
-      @PluginElement("Config") final Config testConfig,
-      @PluginAttribute("streamName") String streamName) {
-    if (testConfig == null) {
-      initConfig();
-    } else {
-      config = testConfig;
-    }
-    return new MockSystemProducerAppender(name, filter, layout, ignoreExceptions, usingAsyncLogger, config, streamName);
-  }
-
-  @Override
-  protected Config getConfig() {
-    return config;
-  }
-
-  private static void initConfig() {
-    Map<String, String> map = new HashMap<String, String>();
-    map.put("job.name", "log4jTest");
-    map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
-    map.put("task.log4j.system", "mock");
-    config = new MapConfig(map);
-  }
-}
-
diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
index 88f9405..74437ba 100644
--- a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
+++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
@@ -19,8 +19,6 @@
 
 package org.apache.samza.logging.log4j2;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -28,6 +26,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.core.Appender;
@@ -42,90 +43,89 @@ import org.apache.logging.log4j.core.config.builder.api.RootLoggerComponentBuild
 import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
 import org.apache.logging.log4j.core.config.plugins.Plugin;
 import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.logging.LoggingContextHolder;
 import org.apache.samza.logging.log4j2.serializers.LoggingEventJsonSerde;
 import org.apache.samza.logging.log4j2.serializers.LoggingEventStringSerde;
 import org.apache.samza.logging.log4j2.serializers.LoggingEventStringSerdeFactory;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
 
-import static org.junit.Assert.*;
 
 public class TestStreamAppender {
+  private static final Logger LOG = (Logger) LogManager.getLogger(TestStreamAppender.class);
+
+  @Mock
+  private LoggingContextHolder loggingContextHolder;
 
-  static Logger log = (Logger) LogManager.getLogger(TestStreamAppender.class);
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    when(this.loggingContextHolder.getConfig()).thenReturn(baseConfig());
+  }
 
   @After
   public void tearDown() {
     removeAllAppenders();
     MockSystemProducer.listeners.clear();
     MockSystemProducer.messagesReceived.clear();
-    MockSystemAdmin.createdStreamName = "";
+    MockSystemAdmin.createdStreamSpec = null;
   }
 
   @Test
   public void testDefaultSerde() {
     System.setProperty("samza.container.name", "samza-container-1");
-    PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender =
-        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
-    systemProducerAppender.start();
-    assertNotNull(systemProducerAppender.getSerde());
-    assertEquals(LoggingEventJsonSerde.class, systemProducerAppender.getSerde().getClass());
+    StreamAppender streamAppender =
+        new StreamAppender("testName", null, null, false, false, null, this.loggingContextHolder);
+    startAndAttachAppender(streamAppender);
+    // trigger system set up by sending a log
+    LOG.info("log message");
+    assertEquals(LoggingEventJsonSerde.class, streamAppender.getSerde().getClass());
+    streamAppender.stop();
   }
 
   @Test
   public void testNonDefaultSerde() {
     System.setProperty("samza.container.name", "samza-container-1");
-    Map<String, String> map = new HashMap<String, String>();
-    map.put("job.name", "log4jTest");
-    map.put("job.id", "1");
-    map.put("serializers.registry.log4j-string.class", LoggingEventStringSerdeFactory.class.getCanonicalName());
-    map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
-    map.put("systems.mock.streams.__samza_log4jTest_1_logs.samza.msg.serde", "log4j-string");
-    map.put("task.log4j.system", "mock");
-    PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, false, new MapConfig(map), null);
-    systemProducerAppender.start();
-    assertNotNull(systemProducerAppender.getSerde());
-    assertEquals(LoggingEventStringSerde.class, systemProducerAppender.getSerde().getClass());
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("job.name", "log4jTest");
+    configMap.put("job.id", "1");
+    configMap.put("serializers.registry.log4j-string.class", LoggingEventStringSerdeFactory.class.getCanonicalName());
+    configMap.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
+    configMap.put("systems.mock.streams.__samza_log4jTest_1_logs.samza.msg.serde", "log4j-string");
+    configMap.put("task.log4j.system", "mock");
+    when(this.loggingContextHolder.getConfig()).thenReturn(new MapConfig(configMap));
+
+    StreamAppender streamAppender =
+        new StreamAppender("testName", null, null, false, false, null, this.loggingContextHolder);
+    startAndAttachAppender(streamAppender);
+    // trigger system set up by sending a log
+    LOG.info("log message");
+    assertEquals(LoggingEventStringSerde.class, streamAppender.getSerde().getClass());
+    streamAppender.stop();
   }
 
   @Test
-  public void testDefaultStreamName() {
+  public void testSystemProducerAppenderAppend() throws InterruptedException {
     System.setProperty("samza.container.name", "samza-container-1");
-    PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
-    systemProducerAppender.start();
-    log.addAppender(systemProducerAppender);
-    Assert.assertEquals("__samza_log4jTest_1_logs", systemProducerAppender.getStreamName());
-  }
 
-  @Test
-  public void testCustomStreamName() {
-    System.setProperty("samza.container.name", "samza-container-1");
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, "test-stream-name");
-    systemProducerAppender.start();
-    log.addAppender(systemProducerAppender);
-    Assert.assertEquals("test-stream-name", systemProducerAppender.getStreamName());
-  }
-
-  @Test
-  public void testSystemProducerAppenderInContainer() throws InterruptedException {
-    System.setProperty("samza.container.name", "samza-container-1");
-
-    PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender =
-        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
-    systemProducerAppender.start();
-
-    log.addAppender(systemProducerAppender);
-    log.setLevel(Level.INFO);
+    StreamAppender streamAppender =
+        new StreamAppender("testName", null, layout, false, false, null, this.loggingContextHolder);
+    startAndAttachAppender(streamAppender);
     List<String> messages = Lists.newArrayList("testing1", "testing2");
     logAndVerifyMessages(messages);
-    systemProducerAppender.stop();
+    streamAppender.stop();
   }
 
   @Test
@@ -135,19 +135,14 @@ public class TestStreamAppender {
     ConfigurationFactory.setConfigurationFactory(new AsyncLoggerConfigurationFactory());
 
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender =
-        MockSystemProducerAppender.createAppender("testName", null, layout, false, true, null, null);
-    systemProducerAppender.setupSystem();
-    systemProducerAppender.systemInitialized = true;
-    systemProducerAppender.start();
-    log.addAppender(systemProducerAppender);
-    log.setLevel(Level.INFO);
+    StreamAppender streamAppender =
+        new StreamAppender("testName", null, layout, false, true, null, this.loggingContextHolder);
+    startAndAttachAppender(streamAppender);
     List<String> messages = Lists.newArrayList("testing1", "testing2");
     logAndVerifyMessages(messages);
-    systemProducerAppender.stop();
+    streamAppender.stop();
   }
 
-
   @Plugin(name = "AsyncLoggerConfigurationFactory", category = ConfigurationFactory.CATEGORY)
   @Order(50)
   public static class AsyncLoggerConfigurationFactory extends ConfigurationFactory {
@@ -177,41 +172,46 @@ public class TestStreamAppender {
   }
 
   @Test
-  public void testSystemProducerAppenderInAM() throws InterruptedException {
+  public void testSystemProducerAppenderNotInitialized() throws InterruptedException {
     System.setProperty("samza.container.name", "samza-job-coordinator");
 
+    when(this.loggingContextHolder.getConfig()).thenReturn(null);
+    // add a counter to make sure that the initial message doesn't get produced
+    AtomicInteger numMessagesProduced = new AtomicInteger(0);
+    MockSystemProducer.listeners.add((source, envelope) -> numMessagesProduced.incrementAndGet());
+
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender =
-        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
-    systemProducerAppender.start();
-    log.addAppender(systemProducerAppender);
-    log.setLevel(Level.INFO);
+    StreamAppender streamAppender =
+        new StreamAppender("testName", null, layout, false, false, null, this.loggingContextHolder);
+    startAndAttachAppender(streamAppender);
 
-    log.info("no-received"); // System isn't initialized yet, so this message should be dropped
+    LOG.info("no-received"); // System isn't initialized yet, so this message should be dropped
 
-    systemProducerAppender.setupSystem();
-    MockSystemProducerAppender.systemInitialized = true;
+    // make config available so messages now get sent to the stream
+    when(this.loggingContextHolder.getConfig()).thenReturn(baseConfig());
 
     List<String> messages = Lists.newArrayList("testing3", "testing4");
     logAndVerifyMessages(messages);
-    systemProducerAppender.stop();
+    streamAppender.stop();
+    assertEquals(messages.size(), numMessagesProduced.get());
   }
 
   @Test
   public void testNoStreamCreationUponSetupByDefault() {
     System.setProperty("samza.container.name", "samza-container-1");
 
-    PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender =
-        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
-    systemProducerAppender.start();
-    log.addAppender(systemProducerAppender);
+    StreamAppender streamAppender =
+        new StreamAppender("testName", null, null, false, false, null, this.loggingContextHolder);
+    startAndAttachAppender(streamAppender);
+    // trigger system set up by sending a log
+    LOG.info("log message");
+    streamAppender.stop();
 
-    Assert.assertEquals("", MockSystemAdmin.createdStreamName);
+    Assert.assertNull(MockSystemAdmin.createdStreamSpec);
   }
 
   @Test
-  public void testStreamCreationUponSetupWhenEnabled() {
+  public void testStreamCreationDefaultStreamName() {
     System.setProperty("samza.container.name", "samza-container-1");
 
     MapConfig mapConfig = new MapConfig(ImmutableMap.of(
@@ -220,49 +220,77 @@ public class TestStreamAppender {
         "job.id", "1",
         "systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName(),
         "task.log4j.system", "mock"));
+    when(this.loggingContextHolder.getConfig()).thenReturn(mapConfig);
+
+    StreamAppender streamAppender =
+        new StreamAppender("testName", null, null, false, false, null, this.loggingContextHolder);
+    startAndAttachAppender(streamAppender);
+    // trigger system set up by sending a log
+    LOG.info("log message");
+    streamAppender.stop();
+
+    Assert.assertEquals("__samza_log4jTest_1_logs", MockSystemAdmin.createdStreamSpec.getPhysicalName());
+    // job.container.count defaults to 1
+    Assert.assertEquals(1, MockSystemAdmin.createdStreamSpec.getPartitionCount());
+  }
 
-    PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender =
-        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, mapConfig, null);
-    systemProducerAppender.start();
-    log.addAppender(systemProducerAppender);
+  @Test
+  public void testStreamCreationCustomStreamName() {
+    System.setProperty("samza.container.name", "samza-container-1");
+
+    MapConfig mapConfig = new MapConfig(ImmutableMap.of(
+        "task.log4j.create.stream.enabled", "true", // Enable explicit stream creation
+        "job.name", "log4jTest",
+        "job.id", "1",
+        "systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName(),
+        "task.log4j.system", "mock"));
+    when(this.loggingContextHolder.getConfig()).thenReturn(mapConfig);
+
+    StreamAppender streamAppender =
+        new StreamAppender("testName", null, null, false, false, "test-stream-name", this.loggingContextHolder);
+    startAndAttachAppender(streamAppender);
+    // trigger system set up by sending a log
+    LOG.info("log message");
+    streamAppender.stop();
+
+    Assert.assertEquals("test-stream-name", MockSystemAdmin.createdStreamSpec.getPhysicalName());
+    // job.container.count defaults to 1
+    Assert.assertEquals(1, MockSystemAdmin.createdStreamSpec.getPartitionCount());
 
-    Assert.assertEquals("__samza_log4jTest_1_logs", MockSystemAdmin.createdStreamName);
   }
 
   @Test
-  public void testDefaultPartitionCount() {
+  public void testStreamCreationUpSetupWithJobContainerCountConfigured() throws InterruptedException {
     System.setProperty("samza.container.name", "samza-container-1");
-    MockSystemProducerAppender systemProducerAppender =
-        MockSystemProducerAppender.createAppender("testName", null, null, false, false, null, null);
-    Assert.assertEquals(1, systemProducerAppender.getPartitionCount()); // job.container.count defaults to 1
 
-    Map<String, String> map = new HashMap<>();
-    map.put("job.name", "log4jTest");
-    map.put("job.id", "1");
-    map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
-    map.put("task.log4j.system", "mock");
-    map.put("job.container.count", "4");
-    systemProducerAppender =
-        MockSystemProducerAppender.createAppender("testName", null, null, false, false, new MapConfig(map), null);
-    Assert.assertEquals(4, systemProducerAppender.getPartitionCount());
-
-    systemProducerAppender =
-        MockSystemProducerAppender.createAppender("testName", null, null, false, false, null, null);
-    systemProducerAppender.setPartitionCount(8);
-    Assert.assertEquals(8, systemProducerAppender.getPartitionCount());
+    MapConfig mapConfig = new MapConfig(new ImmutableMap.Builder<String, String>()
+        .put("task.log4j.create.stream.enabled", "true") // Enable explicit stream creation
+        .put("job.name", "log4jTest")
+        .put("job.id", "1")
+        .put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName())
+        .put("task.log4j.system", "mock")
+        .put("job.container.count", "4")
+        .build());
+    when(this.loggingContextHolder.getConfig()).thenReturn(mapConfig);
+
+    StreamAppender streamAppender =
+        new StreamAppender("testName", null, null, false, false, null, this.loggingContextHolder);
+    startAndAttachAppender(streamAppender);
+    // trigger system set up by sending a log
+    LOG.info("log message");
+    streamAppender.stop();
+
+    Assert.assertEquals("__samza_log4jTest_1_logs", MockSystemAdmin.createdStreamSpec.getPhysicalName());
+    Assert.assertEquals(4, MockSystemAdmin.createdStreamSpec.getPartitionCount());
   }
 
   @Test
   public void testExceptionsDoNotKillTransferThread() throws InterruptedException {
     System.setProperty("samza.container.name", "samza-container-1");
 
-    PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender =
-        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
-    systemProducerAppender.start();
-    log.addAppender(systemProducerAppender);
-    log.setLevel(Level.INFO);
+    StreamAppender streamAppender =
+        new StreamAppender("testName", null, null, false, false, null, this.loggingContextHolder);
+    startAndAttachAppender(streamAppender);
 
     List<String> messages = Lists.newArrayList("testing5", "testing6", "testing7");
 
@@ -276,25 +304,22 @@ public class TestStreamAppender {
     });
 
     // Log the messages
-    messages.forEach((message) -> log.info(message));
+    messages.forEach(LOG::info);
 
     // Wait for messages
     assertTrue("Thread did not send all messages. Count: " + allMessagesSent.getCount(),
         allMessagesSent.await(60, TimeUnit.SECONDS));
-    systemProducerAppender.stop();
+    streamAppender.stop();
   }
 
   @Test
   public void testQueueTimeout() throws InterruptedException {
     System.setProperty("samza.container.name", "samza-container-1");
 
-    PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender =
-        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
-    systemProducerAppender.queueTimeoutS = 1;
-    systemProducerAppender.start();
-    log.addAppender(systemProducerAppender);
-    log.setLevel(Level.INFO);
+    StreamAppender streamAppender =
+        new StreamAppender("testName", null, null, false, false, null, this.loggingContextHolder);
+    streamAppender.queueTimeoutS = 1;
+    startAndAttachAppender(streamAppender);
 
     int extraMessageCount = 5;
     int expectedMessagesSent = extraMessageCount - 1; // -1 because when the queue is drained there is one additional message that couldn't be added
@@ -316,9 +341,9 @@ public class TestStreamAppender {
     });
 
     // Log the messages. This is where the timeout will happen!
-    messages.forEach((message) -> log.info(message));
+    messages.forEach(LOG::info);
 
-    assertEquals(messages.size() - expectedMessagesSent, systemProducerAppender.metrics.logMessagesDropped.getCount());
+    assertEquals(messages.size() - expectedMessagesSent, streamAppender.metrics.logMessagesDropped.getCount());
 
     // Allow all the rest of the messages to send.
     waitForTimeout.countDown();
@@ -327,7 +352,7 @@ public class TestStreamAppender {
     assertTrue("Thread did not send all messages. Count: " + allMessagesSent.getCount(),
         allMessagesSent.await(60, TimeUnit.SECONDS));
     assertEquals(expectedMessagesSent, MockSystemProducer.messagesReceived.size());
-    systemProducerAppender.stop();
+    streamAppender.stop();
   }
 
   private void logAndVerifyMessages(List<String> messages) throws InterruptedException {
@@ -336,7 +361,7 @@ public class TestStreamAppender {
     MockSystemProducer.listeners.add((source, envelope) -> allMessagesSent.countDown());
 
     // Log the messages
-    messages.forEach((message) -> log.info(message));
+    messages.forEach(LOG::info);
 
     // Wait for messages
     assertTrue("Timeout while waiting for StreamAppender to send all messages. Count: " + allMessagesSent.getCount(),
@@ -345,20 +370,33 @@ public class TestStreamAppender {
     // Verify
     assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
     for (int i = 0; i < messages.size(); i++) {
-      HashMap<String, String> messageMap = new HashMap<String, String>();
       assertTrue("Message mismatch at index " + i,
           new String((byte[]) MockSystemProducer.messagesReceived.get(i)).contains(asJsonMessageSegment(messages.get(i))));
     }
   }
 
+  private static Config baseConfig() {
+    Map<String, String> map = new HashMap<>();
+    map.put("job.name", "log4jTest");
+    map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
+    map.put("task.log4j.system", "mock");
+    return new MapConfig(map);
+  }
+
+  private static void startAndAttachAppender(StreamAppender streamAppender) {
+    streamAppender.start();
+    LOG.addAppender(streamAppender);
+    LOG.setLevel(Level.INFO);
+  }
+
   private String asJsonMessageSegment(String message) {
     return String.format("\"message\":\"%s\"", message);
   }
 
   private void removeAllAppenders() {
-    Map<String, Appender> allAppenders = log.getAppenders();
+    Map<String, Appender> allAppenders = LOG.getAppenders();
     for (String name: allAppenders.keySet()) {
-      log.removeAppender(allAppenders.get(name));
+      LOG.removeAppender(allAppenders.get(name));
     }
   }
 }