You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/09/16 16:25:53 UTC

[GitHub] [samza] mynameborat commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

mynameborat commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r709379895



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignal;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactory;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactoryContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  private final StreamPartitionCountMonitorFactory streamPartitionCountMonitorFactory;
+  private final StreamRegexMonitorFactory streamRegexMonitorFactory;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final JobRestartSignal jobRestartSignal;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);

Review comment:
       I noticed some places have ClusterType as Kubernetes (may be no in this PR). Should this be `Kubernetes` instead of broadly classified as `Non_Yarn`? What are the benefits of modeling this as more generic? It seems to give a vibe that standalone can fall into this bucket too.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunicationContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+/**
+ * Contains components which can be used to build a {@link CoordinatorCommunication}.
+ * For example, provides access to job model and handling for worker states.
+ */
+public class CoordinatorCommunicationContext {
+  private final JobModelProvider jobModelProvider;
+  private final Config config;
+  private final MetricsRegistry metricsRegistry;
+
+  public CoordinatorCommunicationContext(JobModelProvider jobModelProvider, Config config,
+      MetricsRegistry metricsRegistry) {
+    this.config = config;
+    this.jobModelProvider = jobModelProvider;
+    this.metricsRegistry = metricsRegistry;

Review comment:
       Is the pattern of having a provider instead of direct job model because job model is absent? Or, is it tied to the fact that job model can change within the lifecycle and hence the provider.
   
   If latter, what about config? Are we treating them as immutable within the lifecycle of the JC?

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config) {
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    if (new JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+      runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runStaticResourceJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore,
+      Config finalConfig) {
+    StaticResourceJobCoordinator staticResourceJobCoordinator =
+        StaticResourceJobCoordinator.build(metrics, metadataStore, finalConfig);
+    addShutdownHook(staticResourceJobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    staticResourceJobCoordinator.run();
+    metricsReporters.values().forEach(MetricsReporter::stop);
+  }
+

Review comment:
       Can we use job coordinator factory instead and just have a common interface for job coordinators to expose `run` method? I saw your note on consolidation although this seems even high layer that can be consolidated so that the iteration to this class can be minimized with any iteration within the `StaticResourceJobCoordinator`?

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/JobModelMonitors.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+/**
+ * Utility class for managing multiple monitors.
+ */
+public class JobModelMonitors {
+  private final StreamPartitionCountMonitor streamPartitionCountMonitor;
+  private final StreamRegexMonitor streamRegexMonitor;
+
+  public JobModelMonitors(StreamPartitionCountMonitor streamPartitionCountMonitor,
+      StreamRegexMonitor streamRegexMonitor) {
+    this.streamPartitionCountMonitor = streamPartitionCountMonitor;
+    this.streamRegexMonitor = streamRegexMonitor;
+  }
+
+  public void start() {
+    if (this.streamPartitionCountMonitor != null) {
+      this.streamPartitionCountMonitor.start();
+    }
+
+    if (this.streamRegexMonitor != null) {
+      this.streamRegexMonitor.start();
+    }
+  }
+
+  public void stop() {
+    if (this.streamPartitionCountMonitor != null) {
+      this.streamPartitionCountMonitor.stop();
+    }
+
+    if (this.streamRegexMonitor != null) {
+      this.streamRegexMonitor.stop();
+    }
+  }

Review comment:
       We have an implicit assumption that start is invoked on the monitors in the event of non-null monitors. Do we need an explicit signal to ensure `start` has indeed been called and completed?
   
   If not, are we relying on the underlying monitor contracts to treat `stop` on incomplete `start` as no-op.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();

Review comment:
       shouldn't this be handling for null scenario? Why not use optional instead? 

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/JobModelMonitors.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+/**
+ * Utility class for managing multiple monitors.
+ */
+public class JobModelMonitors {
+  private final StreamPartitionCountMonitor streamPartitionCountMonitor;
+  private final StreamRegexMonitor streamRegexMonitor;
+
+  public JobModelMonitors(StreamPartitionCountMonitor streamPartitionCountMonitor,
+      StreamRegexMonitor streamRegexMonitor) {
+    this.streamPartitionCountMonitor = streamPartitionCountMonitor;
+    this.streamRegexMonitor = streamRegexMonitor;

Review comment:
       It will be ideal to have this take non-null monitors or no-op monitors in case of monitors being absent. 
   
   I noticed some of the creation flows in the previous have optional being created from these factories. Seems a bit of inconsistent with factories returning optional but these entities accepting `null`. 
   
   I also don't like optional being passed in through construction as its an anti-pattern. Hence no-op seemed like cleaner or you can have the instance variable store optional which isn't terrible.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      waitForShutdownQuietly();
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    } finally {
+      this.coordinatorCommunication.stop();
+      this.startpointManager.stop();

Review comment:
       same as above. needs to handle null or convert it to optional. I'd prefer latter so that people are forced to know this can be absent.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      waitForShutdownQuietly();
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    } finally {
+      this.coordinatorCommunication.stop();
+      this.startpointManager.stop();
+      this.systemAdmins.stop();
+    }
+  }
+
+  /**
+   * Set shutdown flag for coordinator and release any threads waiting for the shutdown.
+   */
+  public void signalShutdown() {
+    if (this.shouldShutdown.compareAndSet(false, true)) {
+      LOG.info("Shutdown signalled");
+      this.shutdownLatch.countDown();
+    }
+  }
+
+  private static JobModelHelper buildJobModelHelper(MetadataStore metadataStore,
+      StreamMetadataCache streamMetadataCache) {
+    LocalityManager localityManager =
+        new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
+    TaskAssignmentManager taskAssignmentManager =
+        new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE),
+            new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE));
+    TaskPartitionAssignmentManager taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE));
+    return new JobModelHelper(localityManager, taskAssignmentManager, taskPartitionAssignmentManager,
+        streamMetadataCache, JobModelCalculator.INSTANCE);
+  }
+
+  private JobModel newJobModel() {
+    return this.jobModelHelper.newJobModel(this.config, this.changelogStreamManager.readPartitionMapping());
+  }
+
+  /**
+   * Run set up steps so that workers can begin processing:
+   * 1. Persist job coordinator metadata
+   * 2. Publish new job model on coordinator-to-worker communication channel
+   * 3. Create metadata resources
+   * 4. Handle startpoints
+   */
+  private void prepareWorkerExecution(JobModel jobModel, JobCoordinatorMetadata newMetadata,
+      Set<JobMetadataChange> jobMetadataChanges) throws IOException {
+    if (!jobMetadataChanges.isEmpty()) {
+      this.jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+    }
+    this.jobModelServingContext.setJobModel(jobModel);
+
+    metadataResourceUtil(jobModel).createResources();
+
+    // the fan out trigger logic comes from ClusterBasedJobCoordinator, in which a new job model can trigger a fan out
+    if (this.startpointManager != null && !jobMetadataChanges.isEmpty()) {
+      startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+    }
+  }
+
+  @VisibleForTesting
+  MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+    return new MetadataResourceUtil(jobModel, this.metrics, this.config);
+  }
+
+  private void waitForShutdown() throws InterruptedException {
+    LOG.info("Waiting for coordinator to be signalled for shutdown");
+    boolean latchReleased = false;
+    while (!latchReleased && !this.shouldShutdown.get()) {
+      /*
+       * Using a timeout as a defensive measure in case we are waiting for a shutdown but the latch is not triggered
+       * for some reason.
+       */
+      latchReleased = this.shutdownLatch.await(15, TimeUnit.SECONDS);
+    }
+  }
+
+  private Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata newMetadata) {
+    JobCoordinatorMetadata previousMetadata = this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
+    return this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, previousMetadata);
+  }
+
+  private void waitForShutdownQuietly() {
+    try {
+      waitForShutdown();
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting to shutdown", e);
+    }
+  }

Review comment:
       What is the purpose of this wrapper around `waitForShutdown`? What happens if it throws exception while shutdown.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+
+/**
+ * Implements HTTP-based communication between the coordinator and workers.
+ */
+public class HttpCoordinatorToWorkerCommunicationFactory implements CoordinatorToWorkerCommunicationFactory {
+  @Override
+  public CoordinatorCommunication coordinatorCommunication(CoordinatorCommunicationContext context) {
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(context.getConfig());
+    JobModelHttpServlet jobModelHttpServlet = new JobModelHttpServlet(context.getJobModelProvider(),
+        new JobModelHttpServlet.Metrics(context.getMetricsRegistry()));
+    HttpServer httpServer = new HttpServer("/", clusterManagerConfig.getCoordinatorUrlPort(), null,
+        new ServletHolder(DefaultServlet.class));
+    httpServer.addServlet("/", jobModelHttpServlet);

Review comment:
       What about locality servlet? 

##########
File path: samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
##########
@@ -33,6 +33,8 @@
   public final static String DEFAULT_COORDINATOR_FACTORY = ZkJobCoordinatorFactory.class.getName();
   private static final String AZURE_COORDINATION_UTILS_FACTORY = "org.apache.samza.coordinator.AzureCoordinationUtilsFactory";
   private static final String AZURE_COORDINATOR_FACTORY = "org.apache.samza.coordinator.AzureJobCoordinatorFactory";
+  public static final String USE_STATIC_RESOURCE_JOB_COORDINATOR =

Review comment:
       Referring to earlier comment, can we use the factory instead of having a boolean here?

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      waitForShutdownQuietly();
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    } finally {
+      this.coordinatorCommunication.stop();
+      this.startpointManager.stop();
+      this.systemAdmins.stop();
+    }
+  }
+
+  /**
+   * Set shutdown flag for coordinator and release any threads waiting for the shutdown.
+   */
+  public void signalShutdown() {
+    if (this.shouldShutdown.compareAndSet(false, true)) {
+      LOG.info("Shutdown signalled");
+      this.shutdownLatch.countDown();
+    }
+  }
+
+  private static JobModelHelper buildJobModelHelper(MetadataStore metadataStore,
+      StreamMetadataCache streamMetadataCache) {
+    LocalityManager localityManager =
+        new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
+    TaskAssignmentManager taskAssignmentManager =
+        new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE),
+            new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE));
+    TaskPartitionAssignmentManager taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE));
+    return new JobModelHelper(localityManager, taskAssignmentManager, taskPartitionAssignmentManager,
+        streamMetadataCache, JobModelCalculator.INSTANCE);
+  }
+
+  private JobModel newJobModel() {
+    return this.jobModelHelper.newJobModel(this.config, this.changelogStreamManager.readPartitionMapping());
+  }
+
+  /**
+   * Run set up steps so that workers can begin processing:
+   * 1. Persist job coordinator metadata
+   * 2. Publish new job model on coordinator-to-worker communication channel
+   * 3. Create metadata resources
+   * 4. Handle startpoints
+   */
+  private void prepareWorkerExecution(JobModel jobModel, JobCoordinatorMetadata newMetadata,
+      Set<JobMetadataChange> jobMetadataChanges) throws IOException {
+    if (!jobMetadataChanges.isEmpty()) {
+      this.jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+    }
+    this.jobModelServingContext.setJobModel(jobModel);
+
+    metadataResourceUtil(jobModel).createResources();
+
+    // the fan out trigger logic comes from ClusterBasedJobCoordinator, in which a new job model can trigger a fan out
+    if (this.startpointManager != null && !jobMetadataChanges.isEmpty()) {
+      startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+    }
+  }
+
+  @VisibleForTesting
+  MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+    return new MetadataResourceUtil(jobModel, this.metrics, this.config);
+  }
+
+  private void waitForShutdown() throws InterruptedException {
+    LOG.info("Waiting for coordinator to be signalled for shutdown");
+    boolean latchReleased = false;
+    while (!latchReleased && !this.shouldShutdown.get()) {
+      /*
+       * Using a timeout as a defensive measure in case we are waiting for a shutdown but the latch is not triggered
+       * for some reason.
+       */
+      latchReleased = this.shutdownLatch.await(15, TimeUnit.SECONDS);

Review comment:
       Right now, the signal shutdown seems to set the boolean and countdown the latch. If so, why do we need a timeout here? Can we just await forever as there isn't anything within `signalShutdown` to stall shutdown sequence and needs to be timeboxed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org