You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/11/17 17:42:19 UTC

[GitHub] [samza] mynameborat commented on a change in pull request #1553: SAMZA-2704: Wire in diagnostics reporter for Kubernetes job coordinator

mynameborat commented on a change in pull request #1553:
URL: https://github.com/apache/samza/pull/1553#discussion_r751481543



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
##########
@@ -234,13 +262,33 @@ private void prepareWorkerExecution(JobModel jobModel, JobCoordinatorMetadata ne
     }
   }
 
+  /**
+   * Wrapper around {@link MetadataResourceUtil} constructor so it can be stubbed during testing.
+   */
   @VisibleForTesting
   MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
     return new MetadataResourceUtil(jobModel, this.metrics, this.config);
   }
 
+  /**
+   * Wrapper around {@link DiagnosticsUtil#buildDiagnosticsManager} so it can be stubbed during testing.
+   */
+  @VisibleForTesting
+  Optional<DiagnosticsManager> buildDiagnosticsManager(String jobName,
+      String jobId, JobModel jobModel, String containerId, Optional<String> execEnvContainerId, Config config) {
+    return DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerId, config);
+  }
+
   private Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata newMetadata) {
     JobCoordinatorMetadata previousMetadata = this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
     return this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, previousMetadata);
   }
+
+  private static void quietlyStop(DiagnosticsManager diagnosticsManager) {
+    try {
+      diagnosticsManager.stop();

Review comment:
       Since you are making some changes related to diagnostics manager, I noticed the following within the `stop()` method
   
   ```
   scheduler.shutdown();
   
       // Allow any scheduled publishes to finish, and block for termination
       scheduler.awaitTermination(terminationDuration.toMillis(), TimeUnit.MILLISECONDS);
   
       if (!scheduler.isTerminated()) {
         LOG.warn("Unable to terminate scheduler");
         scheduler.shutdownNow();
       }
       this.systemProducer.stop();
       
       ``
       
       It seems like the `systemProducer` can linger in the event Interruption occurs due to `awaitTermination`. Would you mind fixing this as well to ensure `systemProducer.stop()` is invoked as part of finally?

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
##########
@@ -212,6 +225,21 @@ void doSetLoggingContextConfig(Config config) {
     LoggingContextHolder.INSTANCE.setConfig(config);
   }
 
+  /**
+   * Sets up {@link DiagnosticsManager} for the job coordinator.
+   */
+  private void setUpDiagnostics(JobModel jobModel) {
+    JobConfig jobConfig = new JobConfig(this.config);
+    String jobName = jobConfig.getName().orElseThrow(() -> new ConfigException("Missing job name"));
+    // TODO SAMZA-2705: construct execEnvContainerId for diagnostics
+    Optional<DiagnosticsManager> diagnosticsManager = buildDiagnosticsManager(jobName, jobConfig.getJobId(), jobModel,
+        CoordinationConstants.JOB_COORDINATOR_CONTAINER_NAME, Optional.empty(), config);
+    if (diagnosticsManager.isPresent()) {
+      diagnosticsManager.get().start();
+      this.currentDiagnosticsManager = diagnosticsManager;

Review comment:
       why not start the diagnosticsManager along the `coordinatorCommunication` after your setup diagnostics?
   It will be more readable and relatable since we anyways `quietlyStop` if present as part of the `start()` method above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org