You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2021/11/17 20:45:21 UTC

[samza] branch master updated: SAMZA-2704: Wire in diagnostics reporter for Kubernetes job coordinator (#1553)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new f9c0e2b  SAMZA-2704: Wire in diagnostics reporter for Kubernetes job coordinator (#1553)
f9c0e2b is described below

commit f9c0e2bb165f3d3cb93820d95a4a3f27920519b0
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Wed Nov 17 12:45:12 2021 -0800

    SAMZA-2704: Wire in diagnostics reporter for Kubernetes job coordinator (#1553)
    
    API changes: N/A
---
 .../clustermanager/JobCoordinatorLaunchUtil.java   |  8 +--
 .../samza/coordinator/CoordinationConstants.java   |  5 ++
 .../StaticResourceJobCoordinator.java              | 57 +++++++++++++++++++---
 .../samza/diagnostics/DiagnosticsManager.java      | 19 ++++----
 .../TestStaticResourceJobCoordinator.java          | 49 +++++++++++++++++--
 5 files changed, 116 insertions(+), 22 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
index 053b913..837f2c6 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
@@ -33,6 +33,7 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.coordinator.NoProcessorJobCoordinatorListener;
@@ -55,7 +56,6 @@ import org.slf4j.LoggerFactory;
  */
 public class JobCoordinatorLaunchUtil {
   private static final Logger LOG = LoggerFactory.getLogger(JobCoordinatorLaunchUtil.class);
-  private static final String JOB_COORDINATOR_SOURCE_NAME = "JobCoordinator";
   /**
    * There is no processor associated with this job coordinator, so adding a placeholder value.
    */
@@ -115,9 +115,11 @@ public class JobCoordinatorLaunchUtil {
         jobCoordinatorFactory.getJobCoordinator(JOB_COORDINATOR_PROCESSOR_ID_PLACEHOLDER, finalConfig, metrics,
             metadataStore);
     Map<String, MetricsReporter> metricsReporters =
-        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_SOURCE_NAME);
+        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig),
+            CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME);
     metricsReporters.values()
-        .forEach(metricsReporter -> metricsReporter.register(JOB_COORDINATOR_SOURCE_NAME, metrics));
+        .forEach(
+          metricsReporter -> metricsReporter.register(CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, metrics));
     metricsReporters.values().forEach(MetricsReporter::start);
     CountDownLatch waitForShutdownLatch = new CountDownLatch(1);
     jobCoordinator.setListener(new NoProcessorJobCoordinatorListener(waitForShutdownLatch));
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
index 22268a8..64524df 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
@@ -36,4 +36,9 @@ public final class CoordinationConstants {
   private static final String YARN_CONTAINER_EXECUTION_ID_PARAM_FORMAT = YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID + "=" + "%s";
   public static final String YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT = YARN_CONTAINER_HEARTBEAT_SERVLET_FORMAT + "?" +
       YARN_CONTAINER_EXECUTION_ID_PARAM_FORMAT;
+
+  /**
+   * Container name to use for job coordinator in components like metrics and diagnostics.
+   */
+  public static final String JOB_COORDINATOR_CONTAINER_NAME = "JobCoordinator";
 }
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
index 9f4810e..25038e9 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
@@ -25,6 +25,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.coordinator.JobModelHelper;
@@ -37,6 +40,7 @@ import org.apache.samza.coordinator.StreamRegexMonitorFactory;
 import org.apache.samza.coordinator.communication.CoordinatorCommunication;
 import org.apache.samza.coordinator.communication.JobInfoServingContext;
 import org.apache.samza.coordinator.lifecycle.JobRestartSignal;
+import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.job.JobCoordinatorMetadata;
 import org.apache.samza.job.JobMetadataChange;
 import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
@@ -47,6 +51,7 @@ import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.startpoint.StartpointManager;
 import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.DiagnosticsUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,21 +80,27 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
   private final String processorId;
   private final Config config;
 
-  private Optional<JobCoordinatorListener> jobCoordinatorListener = Optional.empty();
+  private volatile Optional<JobCoordinatorListener> jobCoordinatorListener = Optional.empty();
+
+  /**
+   * {@link DiagnosticsManager} constructed using a {@link JobModel}, so this can only be constructed within
+   * {@link #start} after the job model is built.
+   */
+  private volatile Optional<DiagnosticsManager> currentDiagnosticsManager = Optional.empty();
 
   /**
    * Job model is calculated during {@link #start()}, so it is not immediately available.
    */
-  private Optional<JobModel> currentJobModel = Optional.empty();
+  private volatile Optional<JobModel> currentJobModel = Optional.empty();
   /**
    * {@link JobModelMonitors} depend on job model, so they are only available after {@link #start()}.
    */
-  private Optional<JobModelMonitors> currentJobModelMonitors = Optional.empty();
+  private volatile Optional<JobModelMonitors> currentJobModelMonitors = Optional.empty();
   /**
    * Keeps track of if the job coordinator has completed all preparation for running the job, including
    * publishing a new job model and starting the job model monitors.
    */
-  private AtomicBoolean jobPreparationComplete = new AtomicBoolean(false);
+  private final AtomicBoolean jobPreparationComplete = new AtomicBoolean(false);
 
   StaticResourceJobCoordinator(String processorId, JobModelHelper jobModelHelper,
       JobInfoServingContext jobModelServingContext, CoordinatorCommunication coordinatorCommunication,
@@ -123,6 +134,7 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
       doSetLoggingContextConfig(jobModel.getConfig());
       // monitors should be created right after job model is calculated (see jobModelMonitors() for more details)
       JobModelMonitors jobModelMonitors = jobModelMonitors(jobModel);
+      Optional<DiagnosticsManager> diagnosticsManager = diagnosticsManager(jobModel);
       JobCoordinatorMetadata newMetadata =
           this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
       Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
@@ -139,10 +151,14 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
         this.jobRestartSignal.restartJob();
       } else {
         prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
-        this.coordinatorCommunication.start();
+        // save components that depend on job model in order to manage lifecycle or access later
+        this.currentDiagnosticsManager = diagnosticsManager;
+        this.currentJobModelMonitors = Optional.of(jobModelMonitors);
         this.currentJobModel = Optional.of(jobModel);
+        // lifecycle: start components
+        this.coordinatorCommunication.start();
         this.jobCoordinatorListener.ifPresent(listener -> listener.onNewJobModel(this.processorId, jobModel));
-        this.currentJobModelMonitors = Optional.of(jobModelMonitors);
+        this.currentDiagnosticsManager.ifPresent(DiagnosticsManager::start);
         jobModelMonitors.start();
         this.jobPreparationComplete.set(true);
       }
@@ -157,6 +173,7 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
     try {
       this.jobCoordinatorListener.ifPresent(JobCoordinatorListener::onJobModelExpired);
       if (this.jobPreparationComplete.get()) {
+        this.currentDiagnosticsManager.ifPresent(StaticResourceJobCoordinator::quietlyStop);
         this.currentJobModelMonitors.ifPresent(JobModelMonitors::stop);
         this.coordinatorCommunication.stop();
       }
@@ -212,6 +229,14 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
     LoggingContextHolder.INSTANCE.setConfig(config);
   }
 
+  private Optional<DiagnosticsManager> diagnosticsManager(JobModel jobModel) {
+    JobConfig jobConfig = new JobConfig(this.config);
+    String jobName = jobConfig.getName().orElseThrow(() -> new ConfigException("Missing job name"));
+    // TODO SAMZA-2705: construct execEnvContainerId for diagnostics
+    return buildDiagnosticsManager(jobName, jobConfig.getJobId(), jobModel,
+        CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), this.config);
+  }
+
   /**
    * Run set up steps so that workers can begin processing:
    * 1. Persist job coordinator metadata
@@ -234,13 +259,33 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
     }
   }
 
+  /**
+   * Wrapper around {@link MetadataResourceUtil} constructor so it can be stubbed during testing.
+   */
   @VisibleForTesting
   MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
     return new MetadataResourceUtil(jobModel, this.metrics, this.config);
   }
 
+  /**
+   * Wrapper around {@link DiagnosticsUtil#buildDiagnosticsManager} so it can be stubbed during testing.
+   */
+  @VisibleForTesting
+  Optional<DiagnosticsManager> buildDiagnosticsManager(String jobName,
+      String jobId, JobModel jobModel, String containerId, Optional<String> execEnvContainerId, Config config) {
+    return DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerId, config);
+  }
+
   private Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata newMetadata) {
     JobCoordinatorMetadata previousMetadata = this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
     return this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, previousMetadata);
   }
+
+  private static void quietlyStop(DiagnosticsManager diagnosticsManager) {
+    try {
+      diagnosticsManager.stop();
+    } catch (InterruptedException e) {
+      LOG.error("Exception while stopping diagnostics manager", e);
+    }
+  }
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
index 93ca566..4389fcc 100644
--- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -175,16 +175,17 @@ public class DiagnosticsManager {
   }
 
   public void stop() throws InterruptedException {
-    scheduler.shutdown();
-
-    // Allow any scheduled publishes to finish, and block for termination
-    scheduler.awaitTermination(terminationDuration.toMillis(), TimeUnit.MILLISECONDS);
-
-    if (!scheduler.isTerminated()) {
-      LOG.warn("Unable to terminate scheduler");
-      scheduler.shutdownNow();
+    try {
+      scheduler.shutdown();
+      // Allow any scheduled publishes to finish, and block for termination
+      scheduler.awaitTermination(terminationDuration.toMillis(), TimeUnit.MILLISECONDS);
+    } finally {
+      if (!scheduler.isTerminated()) {
+        LOG.warn("Unable to terminate scheduler");
+        scheduler.shutdownNow();
+      }
+      this.systemProducer.stop();
     }
-    this.systemProducer.stop();
   }
 
   public void addExceptionEvent(DiagnosticsExceptionEvent diagnosticsExceptionEvent) {
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
index 03d5776..9386883 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.samza.coordinator.staticresource;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -28,7 +29,10 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.coordinator.JobModelHelper;
 import org.apache.samza.coordinator.MetadataResourceUtil;
@@ -39,6 +43,7 @@ import org.apache.samza.coordinator.StreamRegexMonitorFactory;
 import org.apache.samza.coordinator.communication.CoordinatorCommunication;
 import org.apache.samza.coordinator.communication.JobInfoServingContext;
 import org.apache.samza.coordinator.lifecycle.JobRestartSignal;
+import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.job.JobCoordinatorMetadata;
 import org.apache.samza.job.JobMetadataChange;
 import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
@@ -79,6 +84,8 @@ import static org.mockito.Mockito.when;
  * actions (trigger callbacks which will change the job model, trigger shutdown) to check the coordination flow.
  */
 public class TestStaticResourceJobCoordinator {
+  private static final String JOB_NAME = "my-samza-job";
+  private static final String JOB_ID = "123";
   private static final String PROCESSOR_ID = "samza-job-coordinator";
   private static final SystemStream SYSTEM_STREAM = new SystemStream("system", "stream");
   private static final TaskName TASK_NAME = new TaskName("Partition " + 0);
@@ -114,16 +121,18 @@ public class TestStaticResourceJobCoordinator {
   @Mock
   private SystemAdmins systemAdmins;
   @Mock
-  private Config config;
-  @Mock
   private JobCoordinatorListener jobCoordinatorListener;
+  @Mock
+  private DiagnosticsManager diagnosticsManager;
 
+  private Config config;
   private StaticResourceJobCoordinator staticResourceJobCoordinator;
 
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
     when(this.changelogStreamManager.readPartitionMapping()).thenReturn(this.changelogPartitionMapping);
+    this.config = config();
     this.staticResourceJobCoordinator =
         spy(new StaticResourceJobCoordinator(PROCESSOR_ID, this.jobModelHelper, this.jobModelServingContext,
             this.coordinatorCommunication, this.jobCoordinatorMetadataManager, this.streamPartitionCountMonitorFactory,
@@ -141,11 +150,13 @@ public class TestStaticResourceJobCoordinator {
     StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig);
     JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig,
         ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
+    setUpDiagnosticsManager(jobModel);
     MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
     this.staticResourceJobCoordinator.start();
     assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
     verifyStartLifecycle();
     verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+    verify(this.diagnosticsManager).start();
     verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor,
         streamRegexMonitor, newMetadata, SINGLE_SSP_FANOUT);
     verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
@@ -158,11 +169,13 @@ public class TestStaticResourceJobCoordinator {
     StreamPartitionCountMonitor streamPartitionCountMonitor = setupStreamPartitionCountMonitor(jobModelConfig);
     StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig);
     setupJobCoordinatorMetadata(jobModel, jobModelConfig, ImmutableSet.of(), true);
+    setUpDiagnosticsManager(jobModel);
     MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
     this.staticResourceJobCoordinator.start();
     assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
     verifyStartLifecycle();
     verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+    verify(this.diagnosticsManager).start();
     verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor,
         streamRegexMonitor, null, null);
     verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
@@ -175,6 +188,7 @@ public class TestStaticResourceJobCoordinator {
     StreamPartitionCountMonitor streamPartitionCountMonitor = setupStreamPartitionCountMonitor(jobModelConfig);
     StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig);
     setupJobCoordinatorMetadata(jobModel, jobModelConfig, ImmutableSet.of(JobMetadataChange.JOB_MODEL), true);
+    setUpDiagnosticsManager(jobModel);
     this.staticResourceJobCoordinator.start();
     verifyStartLifecycle();
     verify(this.jobRestartSignal).restartJob();
@@ -190,11 +204,13 @@ public class TestStaticResourceJobCoordinator {
     StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig);
     JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig,
         ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT, JobMetadataChange.JOB_MODEL), true);
+    setUpDiagnosticsManager(jobModel);
     MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
     this.staticResourceJobCoordinator.start();
     assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
     verifyStartLifecycle();
     verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+    verify(this.diagnosticsManager).start();
     verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor,
         streamRegexMonitor, newMetadata, SINGLE_SSP_FANOUT);
     verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
@@ -216,6 +232,9 @@ public class TestStaticResourceJobCoordinator {
     when(this.streamRegexMonitorFactory.build(any(), any(), any())).thenReturn(Optional.empty());
     JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig,
         ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
+    doReturn(Optional.empty()).when(this.staticResourceJobCoordinator)
+        .buildDiagnosticsManager(JOB_NAME, JOB_ID, jobModel,
+            CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), this.config);
     MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
     this.staticResourceJobCoordinator.start();
     assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
@@ -227,13 +246,14 @@ public class TestStaticResourceJobCoordinator {
   }
 
   @Test
-  public void testStopAfterStart() {
+  public void testStopAfterStart() throws InterruptedException {
     Config jobModelConfig = mock(Config.class);
     JobModel jobModel = setupJobModel(jobModelConfig);
     StreamPartitionCountMonitor streamPartitionCountMonitor = setupStreamPartitionCountMonitor(jobModelConfig);
     StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig);
     setupJobCoordinatorMetadata(jobModel, jobModelConfig,
         ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
+    setUpDiagnosticsManager(jobModel);
     metadataResourceUtil(jobModel);
     // call start in order to set up monitors
     this.staticResourceJobCoordinator.start();
@@ -241,6 +261,7 @@ public class TestStaticResourceJobCoordinator {
     this.staticResourceJobCoordinator.stop();
 
     verify(this.jobCoordinatorListener).onJobModelExpired();
+    verify(this.diagnosticsManager).stop();
     verify(streamPartitionCountMonitor).stop();
     verify(streamRegexMonitor).stop();
     verify(this.coordinatorCommunication).stop();
@@ -263,6 +284,9 @@ public class TestStaticResourceJobCoordinator {
     when(this.streamRegexMonitorFactory.build(any(), any(), any())).thenReturn(Optional.empty());
     setupJobCoordinatorMetadata(jobModel, jobModelConfig,
         ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
+    doReturn(Optional.empty()).when(this.staticResourceJobCoordinator)
+        .buildDiagnosticsManager(JOB_NAME, JOB_ID, jobModel,
+            CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), this.config);
     metadataResourceUtil(jobModel);
     // call start in order to set up monitors
     this.staticResourceJobCoordinator.start();
@@ -299,10 +323,12 @@ public class TestStaticResourceJobCoordinator {
     StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig);
     JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig,
         ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT, JobMetadataChange.JOB_MODEL), true);
+    setUpDiagnosticsManager(jobModel);
     MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
     this.staticResourceJobCoordinator.start();
     verifyStartLifecycle();
     verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+    verify(this.diagnosticsManager).start();
     verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor,
         streamRegexMonitor, newMetadata, SINGLE_SSP_FANOUT);
     // call the callback from the monitor
@@ -322,10 +348,12 @@ public class TestStaticResourceJobCoordinator {
         callbackArgumentCaptor.capture())).thenReturn(Optional.of(streamRegexMonitor));
     JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig,
         ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT, JobMetadataChange.JOB_MODEL), true);
+    setUpDiagnosticsManager(jobModel);
     MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
     this.staticResourceJobCoordinator.start();
     verifyStartLifecycle();
     verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+    verify(this.diagnosticsManager).start();
     verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor,
         streamRegexMonitor, newMetadata, SINGLE_SSP_FANOUT);
     // call the callback from the monitor
@@ -393,6 +421,12 @@ public class TestStaticResourceJobCoordinator {
     return metadataResourceUtil;
   }
 
+  private void setUpDiagnosticsManager(JobModel expectedJobModel) {
+    doReturn(Optional.of(this.diagnosticsManager)).when(this.staticResourceJobCoordinator)
+        .buildDiagnosticsManager(JOB_NAME, JOB_ID, expectedJobModel,
+            CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), this.config);
+  }
+
   private void verifyStartLifecycle() {
     verify(this.systemAdmins).start();
     verify(this.startpointManager).start();
@@ -437,6 +471,13 @@ public class TestStaticResourceJobCoordinator {
     verify(this.staticResourceJobCoordinator, never()).metadataResourceUtil(any());
     verify(this.startpointManager, never()).fanOut(any());
     verifyZeroInteractions(this.jobModelServingContext, this.coordinatorCommunication, streamPartitionCountMonitor,
-        streamRegexMonitor, this.jobCoordinatorListener);
+        streamRegexMonitor, this.jobCoordinatorListener, this.diagnosticsManager);
+  }
+
+  private static Config config() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.JOB_NAME, JOB_NAME);
+    configMap.put(JobConfig.JOB_ID, JOB_ID);
+    return new MapConfig(configMap);
   }
 }
\ No newline at end of file