You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/09/16 19:25:17 UTC

[GitHub] [samza] cameronlee314 commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

cameronlee314 commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r710411418



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+
+/**
+ * Implements HTTP-based communication between the coordinator and workers.
+ */
+public class HttpCoordinatorToWorkerCommunicationFactory implements CoordinatorToWorkerCommunicationFactory {
+  @Override
+  public CoordinatorCommunication coordinatorCommunication(CoordinatorCommunicationContext context) {
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(context.getConfig());
+    JobModelHttpServlet jobModelHttpServlet = new JobModelHttpServlet(context.getJobModelProvider(),
+        new JobModelHttpServlet.Metrics(context.getMetricsRegistry()));
+    HttpServer httpServer = new HttpServer("/", clusterManagerConfig.getCoordinatorUrlPort(), null,
+        new ServletHolder(DefaultServlet.class));
+    httpServer.addServlet("/", jobModelHttpServlet);

Review comment:
       If the job coordinator isn't responsible for resource management, then I didn't think it should be responsible for exposing locality information. I also couldn't find anything that used the locality endpoint. Therefore, I left it out.

##########
File path: samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
##########
@@ -33,6 +33,8 @@
   public final static String DEFAULT_COORDINATOR_FACTORY = ZkJobCoordinatorFactory.class.getName();
   private static final String AZURE_COORDINATION_UTILS_FACTORY = "org.apache.samza.coordinator.AzureCoordinationUtilsFactory";
   private static final String AZURE_COORDINATOR_FACTORY = "org.apache.samza.coordinator.AzureJobCoordinatorFactory";
+  public static final String USE_STATIC_RESOURCE_JOB_COORDINATOR =

Review comment:
       It doesn't seem like `ClusterBasedJobCoordinator` and `StaticResourceJobCoordinator` fits with the standalone job coordinators, so do you see it being useful to add a pluggable interface for this non-standalone job coordination case? If not, then I'm not sure that we should tie the classes together.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config) {
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    if (new JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+      runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runStaticResourceJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore,
+      Config finalConfig) {
+    StaticResourceJobCoordinator staticResourceJobCoordinator =
+        StaticResourceJobCoordinator.build(metrics, metadataStore, finalConfig);
+    addShutdownHook(staticResourceJobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    staticResourceJobCoordinator.run();
+    metricsReporters.values().forEach(MetricsReporter::stop);
+  }
+
+  /**
+   * This is a separate method so it can be stubbed in tests, since adding a real shutdown hook will cause the hook to
+   * added to the test suite JVM.
+   */
+  @VisibleForTesting
+  static void addShutdownHook(StaticResourceJobCoordinator staticResourceJobCoordinator) {

Review comment:
       `ClusterBasedJobCoordinator` doesn't currently have a clean way to trigger a shutdown from another thread. There should be a shutdown hook for `ClusterBasedJobCoordinator` (https://issues.apache.org/jira/browse/SAMZA-2692), but that is out-of-scope for this PR anyways.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+
+/**
+ * Implements HTTP-based communication between the coordinator and workers.
+ */
+public class HttpCoordinatorToWorkerCommunicationFactory implements CoordinatorToWorkerCommunicationFactory {
+  @Override
+  public CoordinatorCommunication coordinatorCommunication(CoordinatorCommunicationContext context) {
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(context.getConfig());
+    JobModelHttpServlet jobModelHttpServlet = new JobModelHttpServlet(context.getJobModelProvider(),
+        new JobModelHttpServlet.Metrics(context.getMetricsRegistry()));
+    HttpServer httpServer = new HttpServer("/", clusterManagerConfig.getCoordinatorUrlPort(), null,
+        new ServletHolder(DefaultServlet.class));
+    httpServer.addServlet("/", jobModelHttpServlet);
+    return new HttpCoordinatorCommunication(httpServer);

Review comment:
       The scope of this object is to provide communication between JC and workers, and a readiness probe does not fall under that scope.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignal;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactory;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactoryContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  private final StreamPartitionCountMonitorFactory streamPartitionCountMonitorFactory;
+  private final StreamRegexMonitorFactory streamRegexMonitorFactory;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final JobRestartSignal jobRestartSignal;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);

Review comment:
       `JobCoordinatorMetadataManager` ideally should be cluster-agnostic. There is actually a specific flow for YARN, and everything else can actually be applied to the NON_YARN type, including standalone, so it should be ok that this is more generic.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config) {
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    if (new JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+      runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runStaticResourceJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore,
+      Config finalConfig) {
+    StaticResourceJobCoordinator staticResourceJobCoordinator =
+        StaticResourceJobCoordinator.build(metrics, metadataStore, finalConfig);
+    addShutdownHook(staticResourceJobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    staticResourceJobCoordinator.run();
+    metricsReporters.values().forEach(MetricsReporter::stop);
+  }
+

Review comment:
       Are you referring to the `JobCoordinator` interface specifically, or some new general interface with just a `run` method?
   
   1. I did try to fit this into the `JobCoordinator` interface, but it felt forced, since `JobCoordinator` is more tied to `StreamProcessor`.
   2. It doesn't seem that useful to tie this to an interface now, since job coordination doesn't really seem to be a layer that is pluggable. We would just be directly using the concrete classes anyways, so I'm not sure if there is a benefit to having a very basic interface with just a `run` method.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config) {
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    if (new JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+      runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runStaticResourceJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore,
+      Config finalConfig) {
+    StaticResourceJobCoordinator staticResourceJobCoordinator =
+        StaticResourceJobCoordinator.build(metrics, metadataStore, finalConfig);
+    addShutdownHook(staticResourceJobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    staticResourceJobCoordinator.run();
+    metricsReporters.values().forEach(MetricsReporter::stop);

Review comment:
       The `MetricsReporter`s do not belong to `StaticResourceJobCoordinator`, and they are not needed by `StaticResourceJobCoordinator`. Therefore, they do not need to be inside `StaticResourceJobCoordinator`. Since `StaticResourceJobCoordinator` is not creating the `MetricsReporter`s, then it should also not need to manage the lifecycle of the reporters.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -19,27 +19,36 @@
 package org.apache.samza.clustermanager;
 
 import java.util.List;
+import java.util.Map;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.coordinator.StaticResourceJobCoordinator;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.execution.RemoteJobPlanner;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.DiagnosticsUtil;
+import org.apache.samza.util.MetricsReporterLoader;
 
 
 /**
  * Util class to launch and run {@link ClusterBasedJobCoordinator}.
  * This util is being used by both high/low and beam API Samza jobs.
  */
 public class JobCoordinatorLaunchUtil {
+  private static final String JOB_COORDINATOR_CONTAINER_NAME = "JobCoordinator";

Review comment:
       I chose a confusing variable name here. In the existing job coordinator, the value used for constructing metrics is "ApplicationMaster", and the variable names used for that (e.g. `SamzaAppMasterMetrics.sourceName`, `ContainerProcessManager.METRICS_SOURCE_NAME`) use the term "source". I will update the variable.
   "ApplicationMaster" doesn't match "samza-job-coordinator", so this variable shouldn't need to either.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      waitForShutdownQuietly();
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    } finally {
+      this.coordinatorCommunication.stop();
+      this.startpointManager.stop();
+      this.systemAdmins.stop();
+    }
+  }
+
+  /**
+   * Set shutdown flag for coordinator and release any threads waiting for the shutdown.
+   */
+  public void signalShutdown() {
+    if (this.shouldShutdown.compareAndSet(false, true)) {
+      LOG.info("Shutdown signalled");
+      this.shutdownLatch.countDown();
+    }
+  }
+
+  private static JobModelHelper buildJobModelHelper(MetadataStore metadataStore,
+      StreamMetadataCache streamMetadataCache) {
+    LocalityManager localityManager =
+        new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
+    TaskAssignmentManager taskAssignmentManager =
+        new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE),
+            new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE));
+    TaskPartitionAssignmentManager taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE));
+    return new JobModelHelper(localityManager, taskAssignmentManager, taskPartitionAssignmentManager,
+        streamMetadataCache, JobModelCalculator.INSTANCE);
+  }
+
+  private JobModel newJobModel() {
+    return this.jobModelHelper.newJobModel(this.config, this.changelogStreamManager.readPartitionMapping());
+  }
+
+  /**
+   * Run set up steps so that workers can begin processing:
+   * 1. Persist job coordinator metadata
+   * 2. Publish new job model on coordinator-to-worker communication channel
+   * 3. Create metadata resources
+   * 4. Handle startpoints
+   */
+  private void prepareWorkerExecution(JobModel jobModel, JobCoordinatorMetadata newMetadata,
+      Set<JobMetadataChange> jobMetadataChanges) throws IOException {
+    if (!jobMetadataChanges.isEmpty()) {
+      this.jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+    }
+    this.jobModelServingContext.setJobModel(jobModel);
+
+    metadataResourceUtil(jobModel).createResources();
+
+    // the fan out trigger logic comes from ClusterBasedJobCoordinator, in which a new job model can trigger a fan out
+    if (this.startpointManager != null && !jobMetadataChanges.isEmpty()) {
+      startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+    }
+  }
+
+  @VisibleForTesting
+  MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+    return new MetadataResourceUtil(jobModel, this.metrics, this.config);
+  }
+
+  private void waitForShutdown() throws InterruptedException {
+    LOG.info("Waiting for coordinator to be signalled for shutdown");
+    boolean latchReleased = false;
+    while (!latchReleased && !this.shouldShutdown.get()) {
+      /*
+       * Using a timeout as a defensive measure in case we are waiting for a shutdown but the latch is not triggered
+       * for some reason.
+       */
+      latchReleased = this.shutdownLatch.await(15, TimeUnit.SECONDS);

Review comment:
       We can just await forever, but I wanted to put this timeout in as a defensive measure against a future bug in which the shutdown flag gets set but the latch isn't released. That's what I was trying to describe with the comment. Do you think it is not necessary to have this defensive timeout?

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,

Review comment:
       I think constructors should only have assignments to instance variables; anything a constructor needs should be passed into it. This `build` method actually constructs other objects right now, so it's more of a helper factory method instead of a constructor. This could have been moved to an actual factory class, but it didn't seem that useful to do that now.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunicationContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+/**
+ * Contains components which can be used to build a {@link CoordinatorCommunication}.
+ * For example, provides access to job model and handling for worker states.
+ */
+public class CoordinatorCommunicationContext {
+  private final JobModelProvider jobModelProvider;
+  private final Config config;
+  private final MetricsRegistry metricsRegistry;
+
+  public CoordinatorCommunicationContext(JobModelProvider jobModelProvider, Config config,
+      MetricsRegistry metricsRegistry) {
+    this.config = config;
+    this.jobModelProvider = jobModelProvider;
+    this.metricsRegistry = metricsRegistry;

Review comment:
       1. This gives flexibility to change the job model within the lifecycle of the JC. In the current impl, job model won't change within the lifecycle of the JC though. 
   2. It is easier to extend `JobModelProvider` to provide references to other objects. Now, we only need a reference to the serialized job model, but if other communication impls in the future need a reference to the deserialized job model, then it is easier to update the `JobModelProvider`.
   3. It decouples the construction of the communication layer from the calculation of the job model.
   
   Regarding config: My intention of the `config` in `CoordinatorCommunicationContext` was to be the config used to build the `CoordinatorCommunication`, and I guess I was thinking that part would be immutable (or more generally, any config that is used by the JC is immutable).`JobModelProvider` provides the `JobModel` which has the `Config` which is to be communicated to workers, and I suppose that might be mutable when job model changes. I see how it is confusing to have multiple `Config`s. I'll see if I can make this clearer.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      waitForShutdownQuietly();
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    } finally {
+      this.coordinatorCommunication.stop();
+      this.startpointManager.stop();
+      this.systemAdmins.stop();
+    }
+  }
+
+  /**
+   * Set shutdown flag for coordinator and release any threads waiting for the shutdown.
+   */
+  public void signalShutdown() {
+    if (this.shouldShutdown.compareAndSet(false, true)) {
+      LOG.info("Shutdown signalled");
+      this.shutdownLatch.countDown();
+    }
+  }
+
+  private static JobModelHelper buildJobModelHelper(MetadataStore metadataStore,
+      StreamMetadataCache streamMetadataCache) {
+    LocalityManager localityManager =
+        new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
+    TaskAssignmentManager taskAssignmentManager =
+        new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE),
+            new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE));
+    TaskPartitionAssignmentManager taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE));
+    return new JobModelHelper(localityManager, taskAssignmentManager, taskPartitionAssignmentManager,
+        streamMetadataCache, JobModelCalculator.INSTANCE);
+  }
+
+  private JobModel newJobModel() {
+    return this.jobModelHelper.newJobModel(this.config, this.changelogStreamManager.readPartitionMapping());
+  }
+
+  /**
+   * Run set up steps so that workers can begin processing:
+   * 1. Persist job coordinator metadata
+   * 2. Publish new job model on coordinator-to-worker communication channel
+   * 3. Create metadata resources
+   * 4. Handle startpoints
+   */
+  private void prepareWorkerExecution(JobModel jobModel, JobCoordinatorMetadata newMetadata,
+      Set<JobMetadataChange> jobMetadataChanges) throws IOException {
+    if (!jobMetadataChanges.isEmpty()) {
+      this.jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+    }
+    this.jobModelServingContext.setJobModel(jobModel);
+
+    metadataResourceUtil(jobModel).createResources();
+
+    // the fan out trigger logic comes from ClusterBasedJobCoordinator, in which a new job model can trigger a fan out
+    if (this.startpointManager != null && !jobMetadataChanges.isEmpty()) {
+      startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+    }
+  }
+
+  @VisibleForTesting
+  MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+    return new MetadataResourceUtil(jobModel, this.metrics, this.config);
+  }
+
+  private void waitForShutdown() throws InterruptedException {
+    LOG.info("Waiting for coordinator to be signalled for shutdown");
+    boolean latchReleased = false;
+    while (!latchReleased && !this.shouldShutdown.get()) {
+      /*
+       * Using a timeout as a defensive measure in case we are waiting for a shutdown but the latch is not triggered
+       * for some reason.
+       */
+      latchReleased = this.shutdownLatch.await(15, TimeUnit.SECONDS);
+    }
+  }
+
+  private Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata newMetadata) {
+    JobCoordinatorMetadata previousMetadata = this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
+    return this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, previousMetadata);
+  }
+
+  private void waitForShutdownQuietly() {
+    try {
+      waitForShutdown();
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting to shutdown", e);
+    }
+  }

Review comment:
       The caller of `waitForShutdown` doesn't need a try-catch block. `waitForShutdown` doesn't actually do shutdown logic; it just waits on a latch for the shutdown signal. Maybe `waitForShutdownSignal` is a better name for this method.
   If waiting on the latch is interrupted, then it will just stop waiting as if it did get shut down.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org