You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2021/10/12 00:19:01 UTC

[samza] branch master updated: SAMZA-2700: StaticResourceJobCoordinator does not set config for LoggingContextHolder (#1544)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 0423e04  SAMZA-2700: StaticResourceJobCoordinator does not set config for LoggingContextHolder (#1544)
0423e04 is described below

commit 0423e04d7574dab117fb97e31cac35bd968ae814
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Mon Oct 11 17:18:14 2021 -0700

    SAMZA-2700: StaticResourceJobCoordinator does not set config for LoggingContextHolder (#1544)
    
    API changes: N/A
---
 .../staticresource/StaticResourceJobCoordinator.java           | 10 ++++++++++
 .../staticresource/TestStaticResourceJobCoordinator.java       |  6 ++++++
 2 files changed, 16 insertions(+)

diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
index b9fe86f..9595f90 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
@@ -35,6 +35,7 @@ import org.apache.samza.job.JobMetadataChange;
 import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.logging.LoggingContextHolder;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.startpoint.StartpointManager;
 import org.apache.samza.storage.ChangelogStreamManager;
@@ -90,6 +91,7 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
     this.startpointManager.ifPresent(StartpointManager::start);
     try {
       JobModel jobModel = newJobModel();
+      doSetLoggingContextConfig(jobModel.getConfig());
       JobCoordinatorMetadata newMetadata =
           this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
       Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
@@ -135,6 +137,14 @@ public class StaticResourceJobCoordinator implements JobCoordinator {
   }
 
   /**
+   * This is a helper method so that we can verify it is called in testing.
+   */
+  @VisibleForTesting
+  void doSetLoggingContextConfig(Config config) {
+    LoggingContextHolder.INSTANCE.setConfig(config);
+  }
+
+  /**
    * Run set up steps so that workers can begin processing:
    * 1. Persist job coordinator metadata
    * 2. Publish new job model on coordinator-to-worker communication channel
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
index 4cbfe3b..30517d2 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/staticresource/TestStaticResourceJobCoordinator.java
@@ -54,6 +54,7 @@ import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
@@ -112,6 +113,7 @@ public class TestStaticResourceJobCoordinator {
             this.coordinatorCommunication, this.jobCoordinatorMetadataManager, this.startpointManager,
             this.changelogStreamManager, this.metrics, this.systemAdmins, this.config));
     this.staticResourceJobCoordinator.setListener(this.jobCoordinatorListener);
+    doNothing().when(this.staticResourceJobCoordinator).doSetLoggingContextConfig(any());
   }
 
   @Test
@@ -125,6 +127,7 @@ public class TestStaticResourceJobCoordinator {
     this.staticResourceJobCoordinator.start();
     assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
     verifyStartLifecycle();
+    verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
     verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, newMetadata, SINGLE_SSP_FANOUT);
     verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
   }
@@ -139,6 +142,7 @@ public class TestStaticResourceJobCoordinator {
     this.staticResourceJobCoordinator.start();
     assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
     verifyStartLifecycle();
+    verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
     verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, null, null);
     verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
   }
@@ -154,6 +158,7 @@ public class TestStaticResourceJobCoordinator {
     this.staticResourceJobCoordinator.start();
     assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
     verifyStartLifecycle();
+    verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
     verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, newMetadata, SINGLE_SSP_FANOUT);
     verify(this.jobCoordinatorListener).onNewJobModel(PROCESSOR_ID, jobModel);
   }
@@ -188,6 +193,7 @@ public class TestStaticResourceJobCoordinator {
     this.staticResourceJobCoordinator.start();
     assertEquals(jobModel, this.staticResourceJobCoordinator.getJobModel());
     verify(this.systemAdmins).start();
+    verify(this.staticResourceJobCoordinator).doSetLoggingContextConfig(jobModelConfig);
     verifyPrepareWorkerExecution(jobModel, metadataResourceUtil, newMetadata, null);
   }