You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2021/11/17 20:45:21 UTC
[samza] branch master updated: SAMZA-2704: Wire in diagnostics reporter for Kubernetes job coordinator (#1553)
This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new f9c0e2b SAMZA-2704: Wire in diagnostics reporter for Kubernetes job coordinator (#1553)
f9c0e2b is described below
commit f9c0e2bb165f3d3cb93820d95a4a3f27920519b0
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Wed Nov 17 12:45:12 2021 -0800
SAMZA-2704: Wire in diagnostics reporter for Kubernetes job coordinator (#1553)
API changes: N/A
---
.../clustermanager/JobCoordinatorLaunchUtil.java | 8 +--
.../samza/coordinator/CoordinationConstants.java | 5 ++
.../StaticResourceJobCoordinator.java | 57 +++++++++++++++++++---
.../samza/diagnostics/DiagnosticsManager.java | 19 ++++----
.../TestStaticResourceJobCoordinator.java | 49 +++++++++++++++++--
5 files changed, 116 insertions(+), 22 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
index 053b913..837f2c6 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
@@ -33,6 +33,7 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.coordinator.NoProcessorJobCoordinatorListener;
@@ -55,7 +56,6 @@ import org.slf4j.LoggerFactory;
*/
public class JobCoordinatorLaunchUtil {
private static final Logger LOG = LoggerFactory.getLogger(JobCoordinatorLaunchUtil.class);
- private static final String JOB_COORDINATOR_SOURCE_NAME = "JobCoordinator";
/**
* There is no processor associated with this job coordinator, so adding a placeholder value.
*/
@@ -115,9 +115,11 @@ public class JobCoordinatorLaunchUtil {
jobCoordinatorFactory.getJobCoordinator(JOB_COORDINATOR_PROCESSOR_ID_PLACEHOLDER, finalConfig, metrics,
metadataStore);
Map<String, MetricsReporter> metricsReporters =
- MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_SOURCE_NAME);
+ MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig),
+ CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME);
metricsReporters.values()
- .forEach(metricsReporter -> metricsReporter.register(JOB_COORDINATOR_SOURCE_NAME, metrics));
+ .forEach(
+ metricsReporter -> metricsReporter.register(CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, metrics));
metricsReporters.values().forEach(MetricsReporter::start);
CountDownLatch waitForShutdownLatch = new CountDownLatch(1);
jobCoordinator.setListener(new NoProcessorJobCoordinatorListener(waitForShutdownLatch));
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
index 22268a8..64524df 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
@@ -36,4 +36,9 @@ public final class CoordinationConstants {
private static final String YARN_CONTAINER_EXECUTION_ID_PARAM_FORMAT = YARN_EXECUTION_ENVIRONMENT_CONTAINER_ID + "=" + "%s";
public static final String YARN_CONTAINER_HEARTBEAT_ENDPOINT_FORMAT = YARN_CONTAINER_HEARTBEAT_SERVLET_FORMAT + "?" +
YARN_CONTAINER_EXECUTION_ID_PARAM_FORMAT;
+
+ /**
+ * Container name to use for job coordinator in components like metrics and diagnostics.
+ */
+ public static final String JOB_COORDINATOR_CONTAINER_NAME = "JobCoordinator";
}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
index 9f4810e..25038e9 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
@@ -25,6 +25,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.coordinator.JobModelHelper;
@@ -37,6 +40,7 @@ import org.apache.samza.coordinator.StreamRegexMonitorFactory;
import org.apache.samza.coordinator.communication.CoordinatorCommunication;
import org.apache.samza.coordinator.communication.JobInfoServingContext;
import org.apache.samza.coordinator.lifecycle.JobRestartSignal;
+import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.job.JobCoordinatorMetadata;
import org.apache.samza.job.JobMetadataChange;
import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
@@ -47,6 +51,7 @@ import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.storage.ChangelogStreamManager;
import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.DiagnosticsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,21 +80,27 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
private final String processorId;
private final Config config;
- private Optional<JobCoordinatorListener> jobCoordinatorListener = Optional.empty();
+ private volatile Optional<JobCoordinatorListener> jobCoordinatorListener = Optional.empty();
+
+ /**
+ * {@link DiagnosticsManager} constructed using a {@link JobModel}, so this can only be constructed within
+ * {@link #start} after the job model is built.
+ */
+ private volatile Optional<DiagnosticsManager> currentDiagnosticsManager = Optional.empty();
/**
* Job model is calculated during {@link #start()}, so it is not immediately available.
*/
- private Optional<JobModel> currentJobModel = Optional.empty();
+ private volatile Optional<JobModel> currentJobModel = Optional.empty();
/**
* {@link JobModelMonitors} depend on job model, so they are only available after {@link #start()}.
*/
- private Optional<JobModelMonitors> currentJobModelMonitors = Optional.empty();
+ private volatile Optional<JobModelMonitors> currentJobModelMonitors = Optional.empty();
/**
* Keeps track of if the job coordinator has completed all preparation for running the job, including
* publishing a new job model and starting the job model monitors.
*/
- private AtomicBoolean jobPreparationComplete = new AtomicBoolean(false);
+ private final AtomicBoolean jobPreparationComplete = new AtomicBoolean(false);
StaticResourceJobCoordinator(String processorId, JobModelHelper jobModelHelper,
JobInfoServingContext jobModelServingContext, CoordinatorCommunication coordinatorCommunication,
@@ -123,6 +134,7 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
doSetLoggingContextConfig(jobModel.getConfig());
// monitors should be created right after job model is calculated (see jobModelMonitors() for more details)
JobModelMonitors jobModelMonitors = jobModelMonitors(jobModel);
+ Optional<DiagnosticsManager> diagnosticsManager = diagnosticsManager(jobModel);
JobCoordinatorMetadata newMetadata =
this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
@@ -139,10 +151,14 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
this.jobRestartSignal.restartJob();
} else {
prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
- this.coordinatorCommunication.start();
+ // save components that depend on job model in order to manage lifecycle or access later
+ this.currentDiagnosticsManager = diagnosticsManager;
+ this.currentJobModelMonitors = Optional.of(jobModelMonitors);
this.currentJobModel = Optional.of(jobModel);
+ // lifecycle: start components
+ this.coordinatorCommunication.start();
this.jobCoordinatorListener.ifPresent(listener -> listener.onNewJobModel(this.processorId, jobModel));
- this.currentJobModelMonitors = Optional.of(jobModelMonitors);
+ this.currentDiagnosticsManager.ifPresent(DiagnosticsManager::start);
jobModelMonitors.start();
this.jobPreparationComplete.set(true);
}
@@ -157,6 +173,7 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
try {
this.jobCoordinatorListener.ifPresent(JobCoordinatorListener::onJobModelExpired);
if (this.jobPreparationComplete.get()) {
+ this.currentDiagnosticsManager.ifPresent(StaticResourceJobCoordinator::quietlyStop);
this.currentJobModelMonitors.ifPresent(JobModelMonitors::stop);
this.coordinatorCommunication.stop();
}
@@ -212,6 +229,14 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
LoggingContextHolder.INSTANCE.setConfig(config);
}
+ private Optional<DiagnosticsManager> diagnosticsManager(JobModel jobModel) {
+ JobConfig jobConfig = new JobConfig(this.config);
+ String jobName = jobConfig.getName().orElseThrow(() -> new ConfigException("Missing job name"));
+ // TODO SAMZA-2705: construct execEnvContainerId for diagnostics
+ return buildDiagnosticsManager(jobName, jobConfig.getJobId(), jobModel,
+ CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), this.config);
+ }
+
/**
* Run set up steps so that workers can begin processing:
* 1. Persist job coordinator metadata
@@ -234,13 +259,33 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
}
}
+ /**
+ * Wrapper around {@link MetadataResourceUtil} constructor so it can be stubbed during testing.
+ */
@VisibleForTesting
MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
return new MetadataResourceUtil(jobModel, this.metrics, this.config);
}
+ /**
+ * Wrapper around {@link DiagnosticsUtil#buildDiagnosticsManager} so it can be stubbed during testing.
+ */
+ @VisibleForTesting
+ Optional<DiagnosticsManager> buildDiagnosticsManager(String jobName,
+ String jobId, JobModel jobModel, String containerId, Optional<String> execEnvContainerId, Config config) {
+ return DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerId, config);
+ }
+
private Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata newMetadata) {
JobCoordinatorMetadata previousMetadata = this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
return this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, previousMetadata);
}
+
+ private static void quietlyStop(DiagnosticsManager diagnosticsManager) {
+ try {
+ diagnosticsManager.stop();
+ } catch (InterruptedException e) {
+ LOG.error("Exception while stopping diagnostics manager", e);
+ }
+ }
}
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
index 93ca566..4389fcc 100644
--- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -175,16 +175,17 @@ public class DiagnosticsManager {
}
public void stop() throws InterruptedException {
- scheduler.shutdown();
-
- // Allow any scheduled publishes to finish, and block for termination
- scheduler.awaitTermination(terminationDuration.toMillis(), TimeUnit.MILLISECONDS);
-
- if (!scheduler.isTerminated()) {
- LOG.warn("Unable to terminate scheduler");
- scheduler.shutdownNow();
+ try {
+ scheduler.shutdown();
+ // Allow any scheduled publishes to finish, and block for termination
+ scheduler.awaitTermination(terminationDuration.toMillis(), TimeUnit.MILLISECONDS);
+ } finally {
+ if (!scheduler.isTerminated()) {
+ LOG.warn("Unable to terminate scheduler");
+ scheduler.shutdownNow();
+ }
+ this.systemProducer.stop();
}
- this.systemProducer.stop();
}
public void addExceptionEvent(DiagnosticsExceptionEvent diagnosticsExceptionEvent) {
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
index 03d5776..9386883 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.samza.coordinator.staticresource;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -28,7 +29,10 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.coordinator.JobModelHelper;
import org.apache.samza.coordinator.MetadataResourceUtil;
@@ -39,6 +43,7 @@ import org.apache.samza.coordinator.StreamRegexMonitorFactory;
import org.apache.samza.coordinator.communication.CoordinatorCommunication;
import org.apache.samza.coordinator.communication.JobInfoServingContext;
import org.apache.samza.coordinator.lifecycle.JobRestartSignal;
+import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.job.JobCoordinatorMetadata;
import org.apache.samza.job.JobMetadataChange;
import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
@@ -79,6 +84,8 @@ import static org.mockito.Mockito.when;
* actions (trigger callbacks which will change the job model, trigger shutdown) to check the coordination flow.
*/
public class TestStaticResourceJobCoordinator {
+ private static final String JOB_NAME = "my-samza-job";
+ private static final String JOB_ID = "123";
private static final String PROCESSOR_ID = "samza-job-coordinator";
private static final SystemStream SYSTEM_STREAM = new SystemStream("system", "stream");
private static final TaskName TASK_NAME = new TaskName("Partition " + 0);
@@ -114,16 +121,18 @@ public class TestStaticResourceJobCoordinator {
@Mock
private SystemAdmins systemAdmins;
@Mock
- private Config config;
- @Mock
private JobCoordinatorListener jobCoordinatorListener;
+ @Mock
+ private DiagnosticsManager diagnosticsManager;
+ private Config config;
private StaticResourceJobCoordinator staticResourceJobCoordinator;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
when(this.changelogStreamManager.readPartitionMapping()).thenReturn(this.changelogPartitionMapping);
+ this.config = config();
this.staticResourceJobCoordinator =
spy(new StaticResourceJobCoordinator(PROCESSOR_ID, this.jobModelHelper, this.jobModelServingContext,
this.coordinatorCommunication, this.jobCoordinatorMetadataManager, this.streamPartitionCountMonitorFactory,
@@ -141,11 +150,13 @@ public class TestStaticResourceJobCoordinator {
StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig);
JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig,
ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
+ setUpDiagnosticsManager(jobModel);
MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
this.staticResourceJobCoordinator.start();
assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
verifyStartLifecycle();
verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+ verify(this.diagnosticsManager).start();
verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor,
streamRegexMonitor, newMetadata, SINGLE_SSP_FANOUT);
verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
@@ -158,11 +169,13 @@ public class TestStaticResourceJobCoordinator {
StreamPartitionCountMonitor streamPartitionCountMonitor = setupStreamPartitionCountMonitor(jobModelConfig);
StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig);
setupJobCoordinatorMetadata(jobModel, jobModelConfig, ImmutableSet.of(), true);
+ setUpDiagnosticsManager(jobModel);
MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
this.staticResourceJobCoordinator.start();
assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
verifyStartLifecycle();
verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+ verify(this.diagnosticsManager).start();
verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor,
streamRegexMonitor, null, null);
verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
@@ -175,6 +188,7 @@ public class TestStaticResourceJobCoordinator {
StreamPartitionCountMonitor streamPartitionCountMonitor = setupStreamPartitionCountMonitor(jobModelConfig);
StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig);
setupJobCoordinatorMetadata(jobModel, jobModelConfig, ImmutableSet.of(JobMetadataChange.JOB_MODEL), true);
+ setUpDiagnosticsManager(jobModel);
this.staticResourceJobCoordinator.start();
verifyStartLifecycle();
verify(this.jobRestartSignal).restartJob();
@@ -190,11 +204,13 @@ public class TestStaticResourceJobCoordinator {
StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig);
JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig,
ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT, JobMetadataChange.JOB_MODEL), true);
+ setUpDiagnosticsManager(jobModel);
MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
this.staticResourceJobCoordinator.start();
assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
verifyStartLifecycle();
verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+ verify(this.diagnosticsManager).start();
verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor,
streamRegexMonitor, newMetadata, SINGLE_SSP_FANOUT);
verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
@@ -216,6 +232,9 @@ public class TestStaticResourceJobCoordinator {
when(this.streamRegexMonitorFactory.build(any(), any(), any())).thenReturn(Optional.empty());
JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig,
ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
+ doReturn(Optional.empty()).when(this.staticResourceJobCoordinator)
+ .buildDiagnosticsManager(JOB_NAME, JOB_ID, jobModel,
+ CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), this.config);
MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
this.staticResourceJobCoordinator.start();
assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
@@ -227,13 +246,14 @@ public class TestStaticResourceJobCoordinator {
}
@Test
- public void testStopAfterStart() {
+ public void testStopAfterStart() throws InterruptedException {
Config jobModelConfig = mock(Config.class);
JobModel jobModel = setupJobModel(jobModelConfig);
StreamPartitionCountMonitor streamPartitionCountMonitor = setupStreamPartitionCountMonitor(jobModelConfig);
StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig);
setupJobCoordinatorMetadata(jobModel, jobModelConfig,
ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
+ setUpDiagnosticsManager(jobModel);
metadataResourceUtil(jobModel);
// call start in order to set up monitors
this.staticResourceJobCoordinator.start();
@@ -241,6 +261,7 @@ public class TestStaticResourceJobCoordinator {
this.staticResourceJobCoordinator.stop();
verify(this.jobCoordinatorListener).onJobModelExpired();
+ verify(this.diagnosticsManager).stop();
verify(streamPartitionCountMonitor).stop();
verify(streamRegexMonitor).stop();
verify(this.coordinatorCommunication).stop();
@@ -263,6 +284,9 @@ public class TestStaticResourceJobCoordinator {
when(this.streamRegexMonitorFactory.build(any(), any(), any())).thenReturn(Optional.empty());
setupJobCoordinatorMetadata(jobModel, jobModelConfig,
ImmutableSet.copyOf(Arrays.asList(JobMetadataChange.values())), false);
+ doReturn(Optional.empty()).when(this.staticResourceJobCoordinator)
+ .buildDiagnosticsManager(JOB_NAME, JOB_ID, jobModel,
+ CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), this.config);
metadataResourceUtil(jobModel);
// call start in order to set up monitors
this.staticResourceJobCoordinator.start();
@@ -299,10 +323,12 @@ public class TestStaticResourceJobCoordinator {
StreamRegexMonitor streamRegexMonitor = setupStreamRegexMonitor(jobModel, jobModelConfig);
JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig,
ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT, JobMetadataChange.JOB_MODEL), true);
+ setUpDiagnosticsManager(jobModel);
MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
this.staticResourceJobCoordinator.start();
verifyStartLifecycle();
verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+ verify(this.diagnosticsManager).start();
verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor,
streamRegexMonitor, newMetadata, SINGLE_SSP_FANOUT);
// call the callback from the monitor
@@ -322,10 +348,12 @@ public class TestStaticResourceJobCoordinator {
callbackArgumentCaptor.capture())).thenReturn(Optional.of(streamRegexMonitor));
JobCoordinatorMetadata newMetadata = setupJobCoordinatorMetadata(jobModel, jobModelConfig,
ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT, JobMetadataChange.JOB_MODEL), true);
+ setUpDiagnosticsManager(jobModel);
MetadataResourceUtil metadataResourceUtil = metadataResourceUtil(jobModel);
this.staticResourceJobCoordinator.start();
verifyStartLifecycle();
verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
+ verify(this.diagnosticsManager).start();
verifyPrepareWorkerExecutionAndMonitor(jobModel, metadataResourceUtil, streamPartitionCountMonitor,
streamRegexMonitor, newMetadata, SINGLE_SSP_FANOUT);
// call the callback from the monitor
@@ -393,6 +421,12 @@ public class TestStaticResourceJobCoordinator {
return metadataResourceUtil;
}
+ private void setUpDiagnosticsManager(JobModel expectedJobModel) {
+ doReturn(Optional.of(this.diagnosticsManager)).when(this.staticResourceJobCoordinator)
+ .buildDiagnosticsManager(JOB_NAME, JOB_ID, expectedJobModel,
+ CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), this.config);
+ }
+
private void verifyStartLifecycle() {
verify(this.systemAdmins).start();
verify(this.startpointManager).start();
@@ -437,6 +471,13 @@ public class TestStaticResourceJobCoordinator {
verify(this.staticResourceJobCoordinator, never()).metadataResourceUtil(any());
verify(this.startpointManager, never()).fanOut(any());
verifyZeroInteractions(this.jobModelServingContext, this.coordinatorCommunication, streamPartitionCountMonitor,
- streamRegexMonitor, this.jobCoordinatorListener);
+ streamRegexMonitor, this.jobCoordinatorListener, this.diagnosticsManager);
+ }
+
+ private static Config config() {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put(JobConfig.JOB_NAME, JOB_NAME);
+ configMap.put(JobConfig.JOB_ID, JOB_ID);
+ return new MapConfig(configMap);
}
}
\ No newline at end of file