You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2021/10/12 00:19:01 UTC
[samza] branch master updated: SAMZA-2700:
StaticResourceJobCoordinator does not set config for LoggingContextHolder
(#1544)
This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 0423e04 SAMZA-2700: StaticResourceJobCoordinator does not set config for LoggingContextHolder (#1544)
0423e04 is described below
commit 0423e04d7574dab117fb97e31cac35bd968ae814
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Mon Oct 11 17:18:14 2021 -0700
SAMZA-2700: StaticResourceJobCoordinator does not set config for LoggingContextHolder (#1544)
API changes: N/A
---
.../staticresource/StaticResourceJobCoordinator.java | 10 ++++++++++
.../staticresource/TestStaticResourceJobCoordinator.java | 6 ++++++
2 files changed, 16 insertions(+)
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
index b9fe86f..9595f90 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
@@ -35,6 +35,7 @@ import org.apache.samza.job.JobMetadataChange;
import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.logging.LoggingContextHolder;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.storage.ChangelogStreamManager;
@@ -90,6 +91,7 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
this.startpointManager.ifPresent(StartpointManager::start);
try {
JobModel jobModel = newJobModel();
+ doSetLoggingContextConfig(jobModel.getConfig());
JobCoordinatorMetadata newMetadata =
this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
@@ -135,6 +137,14 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
}
/**
+ * This is a helper method so that we can verify it is called in testing.
+ */
+ @VisibleForTesting
+ void doSetLoggingContextConfig(Config config) {
+ LoggingContextHolder.INSTANCE.setConfig(config);
+ }
+
+ /**
* Run set up steps so that workers can begin processing:
* 1. Persist job coordinator metadata
* 2. Publish new job model on coordinator-to-worker communication channel
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
index 4cbfe3b..30517d2 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
@@ -54,6 +54,7 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@@ -112,6 +113,7 @@ public class TestStaticResourceJobCoordinator {
this.coordinatorCommunication, this.jobCoordinatorMetadataManager, this.startpointManager,
this.changelogStreamManager, this.metrics, this.systemAdmins, this.config));
this.staticResourceJobCoordinator.setListener(this.jobCoordinatorListener);
+ doNothing().when(this.staticResourceJobCoordinator).doSetLoggingContextConfig(any());
}
@Test
@@ -125,6 +127,7 @@ public class TestStaticResourceJobCoordinator {
this.staticResourceJobCoordinator.start();
assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
verifyStartLifecycle();
+ verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, newMetadata, SINGLE_SSP_FANOUT);
verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
}
@@ -139,6 +142,7 @@ public class TestStaticResourceJobCoordinator {
this.staticResourceJobCoordinator.start();
assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
verifyStartLifecycle();
+ verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, null, null);
verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
}
@@ -154,6 +158,7 @@ public class TestStaticResourceJobCoordinator {
this.staticResourceJobCoordinator.start();
assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
verifyStartLifecycle();
+ verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, newMetadata, SINGLE_SSP_FANOUT);
verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
}
@@ -188,6 +193,7 @@ public class TestStaticResourceJobCoordinator {
this.staticResourceJobCoordinator.start();
assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
verify(this.systemAdmins).start();
+ verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, newMetadata, null);
}