You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/09/10 02:05:16 UTC

[GitHub] [samza] cameronlee314 opened a new pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

cameronlee314 opened a new pull request #1529:
URL: https://github.com/apache/samza/pull/1529


   Feature: Adding a job coordinator which does not do resource management. For Samza on YARN, the `ClusterBasedJobCoordinator` does resource management. However, for Samza on Kubernetes, it is not necessary to have a job coordinator which does resource management, because Kubernetes controllers can take care of resource management.
   
   Changes:
   1. Added a `StaticResourceJobCoordinator` which handles responsibilities like job model calculation, communicating job model to workers, monitoring input streams which may cause the job model to change, and startpoint fanout.
   2. Added new abstraction layer `CoordinatorCommunication` for communication between job coordinator and workers. Before, the only communication option was an HTTP endpoint. The new abstraction layer allows us to start decoupling the coordination from the HTTP endpoint. This PR doesn't expose an option to plug in a custom communication layer yet, but there is an interface to start working off of.
   
   Testing:
   1. Added unit tests
   2. Tested a Samza job using this new coordinator in Kubernetes
   
   API changes (all changes are backwards compatible):
   1. Set the config `job.coordinator.use.static.resource.job.coordinator` to `true` in order to use the new coordinator.
   2. Set the config `job.coordinator.restart.signal.factory` to define how to restart the Samza job when an input stream changes which will change the job model. This plug-in is dependent on where the Samza job is running (e.g. Kubernetes). Currently, there is only a no-op implementation of this restart signal.
   
   Note: This PR reuses components like `JobModelHelper`, `JobCoordinatorMetadataManager`, `StreamPartitionCountMonitorFactory`, and `StartpointManager` across `ClusterBasedJobCoordinator` and `StaticResourceJobCoordinator`. I considered consolidating `ClusterBasedJobCoordinator` and `StaticResourceJobCoordinator` even further to share code which encapsulates usage of multiple of the above components, but it's not yet clear how to fit the new and old flows together cleanly. There might be further divergence between the coordinators as we iterate on on `StaticResourceJobCoordinator`, so I felt it would be easier to leave them more decoupled for now. Also, keeping them decoupled reduces risk that a change will impact the existing `ClusterBasedJobCoordinator`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on pull request #1529:
URL: https://github.com/apache/samza/pull/1529#issuecomment-921094175


   > This looks like a significant change, do we want a SEP for it?
   
   @kw2542 That's a good point about having a SEP. Do you think it would be reasonable to get some implementation checked in and tried out in Kubernetes a little more before posting a SEP? I do already have some context to put into a SEP now if you think that would be better.
   Does anyone else have thoughts on posting the SEP now vs. later?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r712268786



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+
+/**
+ * Implements HTTP-based communication between the coordinator and workers.
+ */
+public class HttpCoordinatorToWorkerCommunicationFactory implements CoordinatorToWorkerCommunicationFactory {
+  @Override
+  public CoordinatorCommunication coordinatorCommunication(CoordinatorCommunicationContext context) {
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(context.getConfig());
+    JobModelHttpServlet jobModelHttpServlet = new JobModelHttpServlet(context.getJobModelProvider(),
+        new JobModelHttpServlet.Metrics(context.getMetricsRegistry()));
+    HttpServer httpServer = new HttpServer("/", clusterManagerConfig.getCoordinatorUrlPort(), null,
+        new ServletHolder(DefaultServlet.class));
+    httpServer.addServlet("/", jobModelHttpServlet);

Review comment:
       I see locality as a job metadata which happens to be tied to physical resource its running. Ideally, these all should be served by metadata store and I see `JobCoordinator` servlet as an interim read path for all metadata.
   
   I wouldn't assume tools/clients reading from multiple systems about job metadata and hence RM controller wouldn't be responsible for exposing the information.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r712562642



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      waitForShutdownQuietly();
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    } finally {
+      this.coordinatorCommunication.stop();
+      this.startpointManager.stop();
+      this.systemAdmins.stop();
+    }
+  }
+
+  /**
+   * Set shutdown flag for coordinator and release any threads waiting for the shutdown.
+   */
+  public void signalShutdown() {
+    if (this.shouldShutdown.compareAndSet(false, true)) {
+      LOG.info("Shutdown signalled");
+      this.shutdownLatch.countDown();
+    }
+  }
+
+  private static JobModelHelper buildJobModelHelper(MetadataStore metadataStore,
+      StreamMetadataCache streamMetadataCache) {
+    LocalityManager localityManager =
+        new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
+    TaskAssignmentManager taskAssignmentManager =
+        new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE),
+            new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE));
+    TaskPartitionAssignmentManager taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE));
+    return new JobModelHelper(localityManager, taskAssignmentManager, taskPartitionAssignmentManager,
+        streamMetadataCache, JobModelCalculator.INSTANCE);
+  }
+
+  private JobModel newJobModel() {
+    return this.jobModelHelper.newJobModel(this.config, this.changelogStreamManager.readPartitionMapping());
+  }
+
+  /**
+   * Run set up steps so that workers can begin processing:
+   * 1. Persist job coordinator metadata
+   * 2. Publish new job model on coordinator-to-worker communication channel
+   * 3. Create metadata resources
+   * 4. Handle startpoints
+   */
+  private void prepareWorkerExecution(JobModel jobModel, JobCoordinatorMetadata newMetadata,
+      Set<JobMetadataChange> jobMetadataChanges) throws IOException {
+    if (!jobMetadataChanges.isEmpty()) {
+      this.jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+    }
+    this.jobModelServingContext.setJobModel(jobModel);
+
+    metadataResourceUtil(jobModel).createResources();
+
+    // the fan out trigger logic comes from ClusterBasedJobCoordinator, in which a new job model can trigger a fan out
+    if (this.startpointManager != null && !jobMetadataChanges.isEmpty()) {
+      startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+    }
+  }
+
+  @VisibleForTesting
+  MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+    return new MetadataResourceUtil(jobModel, this.metrics, this.config);
+  }
+
+  private void waitForShutdown() throws InterruptedException {
+    LOG.info("Waiting for coordinator to be signalled for shutdown");
+    boolean latchReleased = false;
+    while (!latchReleased && !this.shouldShutdown.get()) {
+      /*
+       * Using a timeout as a defensive measure in case we are waiting for a shutdown but the latch is not triggered
+       * for some reason.
+       */
+      latchReleased = this.shutdownLatch.await(15, TimeUnit.SECONDS);
+    }
+  }
+
+  private Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata newMetadata) {
+    JobCoordinatorMetadata previousMetadata = this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
+    return this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, previousMetadata);
+  }
+
+  private void waitForShutdownQuietly() {
+    try {
+      waitForShutdown();
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting to shutdown", e);
+    }
+  }

Review comment:
       Code was changed so that there is no longer a `waitForShutdown` in the job coordinator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r712559510



##########
File path: samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
##########
@@ -77,4 +79,8 @@ public String getJobCoordinatorFactoryClassName() {
       return jobCoordinatorFactoryClassName;
     }
   }
+
+  public boolean getUseStaticResourceJobCoordinator() {

Review comment:
       This method was removed since we will be using the `job.coordinator.factory` config to specify the job coordinator to use.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] alnzng commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
alnzng commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r711349016



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -19,27 +19,36 @@
 package org.apache.samza.clustermanager;
 
 import java.util.List;
+import java.util.Map;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.coordinator.StaticResourceJobCoordinator;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.execution.RemoteJobPlanner;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.DiagnosticsUtil;
+import org.apache.samza.util.MetricsReporterLoader;
 
 
 /**
  * Util class to launch and run {@link ClusterBasedJobCoordinator}.
  * This util is being used by both high/low and beam API Samza jobs.
  */
 public class JobCoordinatorLaunchUtil {
+  private static final String JOB_COORDINATOR_CONTAINER_NAME = "JobCoordinator";

Review comment:
       Looks like the logging function requires this variable `samza.container.name`: https://github.com/apache/samza/blob/master/docs/learn/documentation/versioned/jobs/logging.md
    
   Can you confirm if we still need the variable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r712603393



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+
+/**
+ * Implements HTTP-based communication between the coordinator and workers.
+ */
+public class HttpCoordinatorToWorkerCommunicationFactory implements CoordinatorToWorkerCommunicationFactory {
+  @Override
+  public CoordinatorCommunication coordinatorCommunication(CoordinatorCommunicationContext context) {
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(context.getConfig());
+    JobModelHttpServlet jobModelHttpServlet = new JobModelHttpServlet(context.getJobModelProvider(),
+        new JobModelHttpServlet.Metrics(context.getMetricsRegistry()));
+    HttpServer httpServer = new HttpServer("/", clusterManagerConfig.getCoordinatorUrlPort(), null,
+        new ServletHolder(DefaultServlet.class));
+    httpServer.addServlet("/", jobModelHttpServlet);

Review comment:
       The locality servlet is only used by `ApplicationMasterWebServlet`, which is YARN-specific, so it is not necessary with the new job coordinator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r712561798



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/JobModelMonitors.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+/**
+ * Utility class for managing multiple monitors.
+ */
+public class JobModelMonitors {
+  private final StreamPartitionCountMonitor streamPartitionCountMonitor;
+  private final StreamRegexMonitor streamRegexMonitor;
+
+  public JobModelMonitors(StreamPartitionCountMonitor streamPartitionCountMonitor,
+      StreamRegexMonitor streamRegexMonitor) {
+    this.streamPartitionCountMonitor = streamPartitionCountMonitor;
+    this.streamRegexMonitor = streamRegexMonitor;

Review comment:
       This class is no longer in this PR. Will consider this comment for follow-up PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on pull request #1529:
URL: https://github.com/apache/samza/pull/1529#issuecomment-926204082


   > > Minor comment. Looks good to me!
   > 
   > There are some test failures. Doesn't seem related to your change although I'd just make sure if that is the case.
   
   This error has been happening on some other PRs as well, so it's very unlikely related to this change. It is being handled separately.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] alnzng commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
alnzng commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r709584553



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,

Review comment:
       Out of curiosity, any specific reason you prefer to use a static build method to initialize instead of using a constructor?

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config) {
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    if (new JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+      runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runStaticResourceJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore,
+      Config finalConfig) {
+    StaticResourceJobCoordinator staticResourceJobCoordinator =
+        StaticResourceJobCoordinator.build(metrics, metadataStore, finalConfig);
+    addShutdownHook(staticResourceJobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    staticResourceJobCoordinator.run();
+    metricsReporters.values().forEach(MetricsReporter::stop);

Review comment:
       Any specific reason why we don't put metric reporter related logic inside `StaticResourceJobCoordinator`?

##########
File path: samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
##########
@@ -77,4 +79,8 @@ public String getJobCoordinatorFactoryClassName() {
       return jobCoordinatorFactoryClassName;
     }
   }
+
+  public boolean getUseStaticResourceJobCoordinator() {

Review comment:
       Would you think the method name `isStaticResourceJobCoordinatorUsed`/ `isStaticResourceJobCoordinatorEnabled` more readable? 

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config) {
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    if (new JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+      runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runStaticResourceJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore,
+      Config finalConfig) {
+    StaticResourceJobCoordinator staticResourceJobCoordinator =
+        StaticResourceJobCoordinator.build(metrics, metadataStore, finalConfig);
+    addShutdownHook(staticResourceJobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    staticResourceJobCoordinator.run();
+    metricsReporters.values().forEach(MetricsReporter::stop);
+  }
+
+  /**
+   * This is a separate method so it can be stubbed in tests, since adding a real shutdown hook will cause the hook to
+   * added to the test suite JVM.
+   */
+  @VisibleForTesting
+  static void addShutdownHook(StaticResourceJobCoordinator staticResourceJobCoordinator) {

Review comment:
       Any problem if we apply this shutdown hook function to `ClusterBasedJobCoordinator` as well?

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+
+/**
+ * Implements HTTP-based communication between the coordinator and workers.
+ */
+public class HttpCoordinatorToWorkerCommunicationFactory implements CoordinatorToWorkerCommunicationFactory {
+  @Override
+  public CoordinatorCommunication coordinatorCommunication(CoordinatorCommunicationContext context) {
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(context.getConfig());
+    JobModelHttpServlet jobModelHttpServlet = new JobModelHttpServlet(context.getJobModelProvider(),
+        new JobModelHttpServlet.Metrics(context.getMetricsRegistry()));
+    HttpServer httpServer = new HttpServer("/", clusterManagerConfig.getCoordinatorUrlPort(), null,
+        new ServletHolder(DefaultServlet.class));
+    httpServer.addServlet("/", jobModelHttpServlet);
+    return new HttpCoordinatorCommunication(httpServer);

Review comment:
       What are your thoughts/plans for the job coordinator readiness probe support? Will we leverage this embedded HTTP server?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r712559961



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/JobModelMonitors.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+/**
+ * Utility class for managing multiple monitors.
+ */
+public class JobModelMonitors {
+  private final StreamPartitionCountMonitor streamPartitionCountMonitor;
+  private final StreamRegexMonitor streamRegexMonitor;
+
+  public JobModelMonitors(StreamPartitionCountMonitor streamPartitionCountMonitor,
+      StreamRegexMonitor streamRegexMonitor) {
+    this.streamPartitionCountMonitor = streamPartitionCountMonitor;
+    this.streamRegexMonitor = streamRegexMonitor;
+  }
+
+  public void start() {
+    if (this.streamPartitionCountMonitor != null) {
+      this.streamPartitionCountMonitor.start();
+    }
+
+    if (this.streamRegexMonitor != null) {
+      this.streamRegexMonitor.start();
+    }
+  }
+
+  public void stop() {
+    if (this.streamPartitionCountMonitor != null) {
+      this.streamPartitionCountMonitor.stop();
+    }
+
+    if (this.streamRegexMonitor != null) {
+      this.streamRegexMonitor.stop();
+    }
+  }

Review comment:
       This part of the implementation will be in a follow-up PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] alnzng commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
alnzng commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r711193613



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,

Review comment:
       To me, one limitation of providing only static factory methods is that class without public/protected constructor is not inherited. Although there is a projected constructor in this class, I think the intention is for testing only.
   If we don't think there will be subclasses for this class, then I would prefer to declare this class as a `final` class.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+
+/**
+ * Implements HTTP-based communication between the coordinator and workers.
+ */
+public class HttpCoordinatorToWorkerCommunicationFactory implements CoordinatorToWorkerCommunicationFactory {
+  @Override
+  public CoordinatorCommunication coordinatorCommunication(CoordinatorCommunicationContext context) {
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(context.getConfig());
+    JobModelHttpServlet jobModelHttpServlet = new JobModelHttpServlet(context.getJobModelProvider(),
+        new JobModelHttpServlet.Metrics(context.getMetricsRegistry()));
+    HttpServer httpServer = new HttpServer("/", clusterManagerConfig.getCoordinatorUrlPort(), null,
+        new ServletHolder(DefaultServlet.class));
+    httpServer.addServlet("/", jobModelHttpServlet);
+    return new HttpCoordinatorCommunication(httpServer);

Review comment:
       Actually, my point is that if we want to use this HTTP server to support the readiness probe, then probably we need to think if the class name is generic enough.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config) {
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    if (new JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+      runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runStaticResourceJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore,
+      Config finalConfig) {
+    StaticResourceJobCoordinator staticResourceJobCoordinator =
+        StaticResourceJobCoordinator.build(metrics, metadataStore, finalConfig);
+    addShutdownHook(staticResourceJobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    staticResourceJobCoordinator.run();
+    metricsReporters.values().forEach(MetricsReporter::stop);

Review comment:
       Can you confirm why we don't need to add metrics reporter around `ClusterBasedJobCoordinator`? This inconsistent code path could confuse other people.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] alnzng commented on pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
alnzng commented on pull request #1529:
URL: https://github.com/apache/samza/pull/1529#issuecomment-926136815


   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r715196019



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/NoProcessorJobCoordinatorListener.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.job.model.JobModel;
+
+
+/**
+ * {@link JobCoordinatorListener} for a {@link JobCoordinator} which does not run alongside a processor.
+ */
+public class NoProcessorJobCoordinatorListener implements JobCoordinatorListener {
+  private final CountDownLatch waitForShutdownLatch;
+
+  public NoProcessorJobCoordinatorListener(CountDownLatch waitForShutdownLatch) {
+    this.waitForShutdownLatch = waitForShutdownLatch;
+  }
+
+  @Override
+  public void onJobModelExpired() {
+    // nothing to do
+  }
+
+  @Override
+  public void onNewJobModel(String processorId, JobModel jobModel) {
+    // nothing to do
+  }
+
+  @Override
+  public void onCoordinatorStop() {
+    this.waitForShutdownLatch.countDown();
+  }
+
+  @Override
+  public void onCoordinatorFailure(Throwable t) {
+    this.waitForShutdownLatch.countDown();

Review comment:
       There isn't currently anywhere to bubble this up to, but good suggestion about logging.
   If there is a use case for bubbling up in the future, we can add it in at that point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r709379895



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignal;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactory;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactoryContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  private final StreamPartitionCountMonitorFactory streamPartitionCountMonitorFactory;
+  private final StreamRegexMonitorFactory streamRegexMonitorFactory;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final JobRestartSignal jobRestartSignal;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);

Review comment:
       I noticed some places have ClusterType as Kubernetes (may be no in this PR). Should this be `Kubernetes` instead of broadly classified as `Non_Yarn`? What are the benefits of modeling this as more generic? It seems to give a vibe that standalone can fall into this bucket too.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunicationContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+/**
+ * Contains components which can be used to build a {@link CoordinatorCommunication}.
+ * For example, provides access to job model and handling for worker states.
+ */
+public class CoordinatorCommunicationContext {
+  private final JobModelProvider jobModelProvider;
+  private final Config config;
+  private final MetricsRegistry metricsRegistry;
+
+  public CoordinatorCommunicationContext(JobModelProvider jobModelProvider, Config config,
+      MetricsRegistry metricsRegistry) {
+    this.config = config;
+    this.jobModelProvider = jobModelProvider;
+    this.metricsRegistry = metricsRegistry;

Review comment:
       Is the pattern of having a provider instead of direct job model because job model is absent? Or, is it tied to the fact that job model can change within the lifecycle and hence the provider.
   
   If latter, what about config? Are we treating them as immutable within the lifecycle of the JC?

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config) {
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    if (new JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+      runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runStaticResourceJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore,
+      Config finalConfig) {
+    StaticResourceJobCoordinator staticResourceJobCoordinator =
+        StaticResourceJobCoordinator.build(metrics, metadataStore, finalConfig);
+    addShutdownHook(staticResourceJobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    staticResourceJobCoordinator.run();
+    metricsReporters.values().forEach(MetricsReporter::stop);
+  }
+

Review comment:
       Can we use job coordinator factory instead and just have a common interface for job coordinators to expose `run` method? I saw your note on consolidation although this seems even high layer that can be consolidated so that the iteration to this class can be minimized with any iteration within the `StaticResourceJobCoordinator`?

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/JobModelMonitors.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+/**
+ * Utility class for managing multiple monitors.
+ */
+public class JobModelMonitors {
+  private final StreamPartitionCountMonitor streamPartitionCountMonitor;
+  private final StreamRegexMonitor streamRegexMonitor;
+
+  public JobModelMonitors(StreamPartitionCountMonitor streamPartitionCountMonitor,
+      StreamRegexMonitor streamRegexMonitor) {
+    this.streamPartitionCountMonitor = streamPartitionCountMonitor;
+    this.streamRegexMonitor = streamRegexMonitor;
+  }
+
+  public void start() {
+    if (this.streamPartitionCountMonitor != null) {
+      this.streamPartitionCountMonitor.start();
+    }
+
+    if (this.streamRegexMonitor != null) {
+      this.streamRegexMonitor.start();
+    }
+  }
+
+  public void stop() {
+    if (this.streamPartitionCountMonitor != null) {
+      this.streamPartitionCountMonitor.stop();
+    }
+
+    if (this.streamRegexMonitor != null) {
+      this.streamRegexMonitor.stop();
+    }
+  }

Review comment:
       We have an implicit assumption that start is invoked on the monitors in the event of non-null monitors. Do we need an explicit signal to ensure `start` has indeed been called and completed?
   
   If not, are we relying on the underlying monitor contracts to treat `stop` on incomplete `start` as no-op.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();

Review comment:
       shouldn't this be handling for null scenario? Why not use optional instead? 

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/JobModelMonitors.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+/**
+ * Utility class for managing multiple monitors.
+ */
+public class JobModelMonitors {
+  private final StreamPartitionCountMonitor streamPartitionCountMonitor;
+  private final StreamRegexMonitor streamRegexMonitor;
+
+  public JobModelMonitors(StreamPartitionCountMonitor streamPartitionCountMonitor,
+      StreamRegexMonitor streamRegexMonitor) {
+    this.streamPartitionCountMonitor = streamPartitionCountMonitor;
+    this.streamRegexMonitor = streamRegexMonitor;

Review comment:
       It will be ideal to have this take non-null monitors or no-op monitors in case of monitors being absent. 
   
   I noticed some of the creation flows in the previous have optional being created from these factories. Seems a bit of inconsistent with factories returning optional but these entities accepting `null`. 
   
   I also don't like optional being passed in through construction as its an anti-pattern. Hence no-op seemed like cleaner or you can have the instance variable store optional which isn't terrible.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      waitForShutdownQuietly();
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    } finally {
+      this.coordinatorCommunication.stop();
+      this.startpointManager.stop();

Review comment:
       same as above. needs to handle null or convert it to optional. I'd prefer latter so that people are forced to know this can be absent.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      waitForShutdownQuietly();
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    } finally {
+      this.coordinatorCommunication.stop();
+      this.startpointManager.stop();
+      this.systemAdmins.stop();
+    }
+  }
+
+  /**
+   * Set shutdown flag for coordinator and release any threads waiting for the shutdown.
+   */
+  public void signalShutdown() {
+    if (this.shouldShutdown.compareAndSet(false, true)) {
+      LOG.info("Shutdown signalled");
+      this.shutdownLatch.countDown();
+    }
+  }
+
+  private static JobModelHelper buildJobModelHelper(MetadataStore metadataStore,
+      StreamMetadataCache streamMetadataCache) {
+    LocalityManager localityManager =
+        new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
+    TaskAssignmentManager taskAssignmentManager =
+        new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE),
+            new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE));
+    TaskPartitionAssignmentManager taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE));
+    return new JobModelHelper(localityManager, taskAssignmentManager, taskPartitionAssignmentManager,
+        streamMetadataCache, JobModelCalculator.INSTANCE);
+  }
+
+  private JobModel newJobModel() {
+    return this.jobModelHelper.newJobModel(this.config, this.changelogStreamManager.readPartitionMapping());
+  }
+
+  /**
+   * Run set up steps so that workers can begin processing:
+   * 1. Persist job coordinator metadata
+   * 2. Publish new job model on coordinator-to-worker communication channel
+   * 3. Create metadata resources
+   * 4. Handle startpoints
+   */
+  private void prepareWorkerExecution(JobModel jobModel, JobCoordinatorMetadata newMetadata,
+      Set<JobMetadataChange> jobMetadataChanges) throws IOException {
+    if (!jobMetadataChanges.isEmpty()) {
+      this.jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+    }
+    this.jobModelServingContext.setJobModel(jobModel);
+
+    metadataResourceUtil(jobModel).createResources();
+
+    // the fan out trigger logic comes from ClusterBasedJobCoordinator, in which a new job model can trigger a fan out
+    if (this.startpointManager != null && !jobMetadataChanges.isEmpty()) {
+      startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+    }
+  }
+
+  @VisibleForTesting
+  MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+    return new MetadataResourceUtil(jobModel, this.metrics, this.config);
+  }
+
+  private void waitForShutdown() throws InterruptedException {
+    LOG.info("Waiting for coordinator to be signalled for shutdown");
+    boolean latchReleased = false;
+    while (!latchReleased && !this.shouldShutdown.get()) {
+      /*
+       * Using a timeout as a defensive measure in case we are waiting for a shutdown but the latch is not triggered
+       * for some reason.
+       */
+      latchReleased = this.shutdownLatch.await(15, TimeUnit.SECONDS);
+    }
+  }
+
+  private Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata newMetadata) {
+    JobCoordinatorMetadata previousMetadata = this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
+    return this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, previousMetadata);
+  }
+
+  private void waitForShutdownQuietly() {
+    try {
+      waitForShutdown();
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting to shutdown", e);
+    }
+  }

Review comment:
       What is the purpose of this wrapper around `waitForShutdown`? What happens if it throws exception while shutdown.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+
+/**
+ * Implements HTTP-based communication between the coordinator and workers.
+ */
+public class HttpCoordinatorToWorkerCommunicationFactory implements CoordinatorToWorkerCommunicationFactory {
+  @Override
+  public CoordinatorCommunication coordinatorCommunication(CoordinatorCommunicationContext context) {
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(context.getConfig());
+    JobModelHttpServlet jobModelHttpServlet = new JobModelHttpServlet(context.getJobModelProvider(),
+        new JobModelHttpServlet.Metrics(context.getMetricsRegistry()));
+    HttpServer httpServer = new HttpServer("/", clusterManagerConfig.getCoordinatorUrlPort(), null,
+        new ServletHolder(DefaultServlet.class));
+    httpServer.addServlet("/", jobModelHttpServlet);

Review comment:
       What about locality servlet? 

##########
File path: samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
##########
@@ -33,6 +33,8 @@
   public final static String DEFAULT_COORDINATOR_FACTORY = ZkJobCoordinatorFactory.class.getName();
   private static final String AZURE_COORDINATION_UTILS_FACTORY = "org.apache.samza.coordinator.AzureCoordinationUtilsFactory";
   private static final String AZURE_COORDINATOR_FACTORY = "org.apache.samza.coordinator.AzureJobCoordinatorFactory";
+  public static final String USE_STATIC_RESOURCE_JOB_COORDINATOR =

Review comment:
       Referring to earlier comment, can we use the factory instead of having a boolean here?

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      waitForShutdownQuietly();
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    } finally {
+      this.coordinatorCommunication.stop();
+      this.startpointManager.stop();
+      this.systemAdmins.stop();
+    }
+  }
+
+  /**
+   * Set shutdown flag for coordinator and release any threads waiting for the shutdown.
+   */
+  public void signalShutdown() {
+    if (this.shouldShutdown.compareAndSet(false, true)) {
+      LOG.info("Shutdown signalled");
+      this.shutdownLatch.countDown();
+    }
+  }
+
+  private static JobModelHelper buildJobModelHelper(MetadataStore metadataStore,
+      StreamMetadataCache streamMetadataCache) {
+    LocalityManager localityManager =
+        new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
+    TaskAssignmentManager taskAssignmentManager =
+        new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE),
+            new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE));
+    TaskPartitionAssignmentManager taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE));
+    return new JobModelHelper(localityManager, taskAssignmentManager, taskPartitionAssignmentManager,
+        streamMetadataCache, JobModelCalculator.INSTANCE);
+  }
+
+  private JobModel newJobModel() {
+    return this.jobModelHelper.newJobModel(this.config, this.changelogStreamManager.readPartitionMapping());
+  }
+
+  /**
+   * Run set up steps so that workers can begin processing:
+   * 1. Persist job coordinator metadata
+   * 2. Publish new job model on coordinator-to-worker communication channel
+   * 3. Create metadata resources
+   * 4. Handle startpoints
+   */
+  private void prepareWorkerExecution(JobModel jobModel, JobCoordinatorMetadata newMetadata,
+      Set<JobMetadataChange> jobMetadataChanges) throws IOException {
+    if (!jobMetadataChanges.isEmpty()) {
+      this.jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+    }
+    this.jobModelServingContext.setJobModel(jobModel);
+
+    metadataResourceUtil(jobModel).createResources();
+
+    // the fan out trigger logic comes from ClusterBasedJobCoordinator, in which a new job model can trigger a fan out
+    if (this.startpointManager != null && !jobMetadataChanges.isEmpty()) {
+      startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+    }
+  }
+
+  @VisibleForTesting
+  MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+    return new MetadataResourceUtil(jobModel, this.metrics, this.config);
+  }
+
+  private void waitForShutdown() throws InterruptedException {
+    LOG.info("Waiting for coordinator to be signalled for shutdown");
+    boolean latchReleased = false;
+    while (!latchReleased && !this.shouldShutdown.get()) {
+      /*
+       * Using a timeout as a defensive measure in case we are waiting for a shutdown but the latch is not triggered
+       * for some reason.
+       */
+      latchReleased = this.shutdownLatch.await(15, TimeUnit.SECONDS);

Review comment:
       Right now, the signal shutdown seems to set the boolean and countdown the latch. If so, why do we need a timeout here? Can we just await forever as there isn't anything within `signalShutdown` to stall shutdown sequence and needs to be timeboxed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on pull request #1529:
URL: https://github.com/apache/samza/pull/1529#issuecomment-923400184


   > > This looks like a significant change, do we want a SEP for it?
   > 
   > @kw2542 That's a good point about having a SEP. Do you think it would be reasonable to get some implementation checked in and tried out in Kubernetes a little more before posting a SEP? I do already have some context to put into a SEP now if you think that would be better.
   > Does anyone else have thoughts on posting the SEP now vs. later?
   
   Synced with @mynameborat about this. We will update the existing SEP-20 for Samza on Kubernetes to include this new job coordination flow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 merged pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
cameronlee314 merged pull request #1529:
URL: https://github.com/apache/samza/pull/1529


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] alnzng commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
alnzng commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r709611830



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -19,27 +19,36 @@
 package org.apache.samza.clustermanager;
 
 import java.util.List;
+import java.util.Map;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.coordinator.StaticResourceJobCoordinator;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.execution.RemoteJobPlanner;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.DiagnosticsUtil;
+import org.apache.samza.util.MetricsReporterLoader;
 
 
 /**
  * Util class to launch and run {@link ClusterBasedJobCoordinator}.
  * This util is being used by both high/low and beam API Samza jobs.
  */
 public class JobCoordinatorLaunchUtil {
+  private static final String JOB_COORDINATOR_CONTAINER_NAME = "JobCoordinator";

Review comment:
       When starting job coordinator jvm process, we pass `-Dsamza.container.name=samza-job-coordinator` jvm parameter explicitly in `run-jc.sh` script. 
   
   Looks like here is the inconsistency part. Can you confirm if we should using the same container name?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r712291374



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      waitForShutdownQuietly();
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    } finally {
+      this.coordinatorCommunication.stop();
+      this.startpointManager.stop();
+      this.systemAdmins.stop();
+    }
+  }
+
+  /**
+   * Set shutdown flag for coordinator and release any threads waiting for the shutdown.
+   */
+  public void signalShutdown() {
+    if (this.shouldShutdown.compareAndSet(false, true)) {
+      LOG.info("Shutdown signalled");
+      this.shutdownLatch.countDown();
+    }
+  }
+
+  private static JobModelHelper buildJobModelHelper(MetadataStore metadataStore,
+      StreamMetadataCache streamMetadataCache) {
+    LocalityManager localityManager =
+        new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
+    TaskAssignmentManager taskAssignmentManager =
+        new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE),
+            new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE));
+    TaskPartitionAssignmentManager taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE));
+    return new JobModelHelper(localityManager, taskAssignmentManager, taskPartitionAssignmentManager,
+        streamMetadataCache, JobModelCalculator.INSTANCE);
+  }
+
+  private JobModel newJobModel() {
+    return this.jobModelHelper.newJobModel(this.config, this.changelogStreamManager.readPartitionMapping());
+  }
+
+  /**
+   * Run set up steps so that workers can begin processing:
+   * 1. Persist job coordinator metadata
+   * 2. Publish new job model on coordinator-to-worker communication channel
+   * 3. Create metadata resources
+   * 4. Handle startpoints
+   */
+  private void prepareWorkerExecution(JobModel jobModel, JobCoordinatorMetadata newMetadata,
+      Set<JobMetadataChange> jobMetadataChanges) throws IOException {
+    if (!jobMetadataChanges.isEmpty()) {
+      this.jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+    }
+    this.jobModelServingContext.setJobModel(jobModel);
+
+    metadataResourceUtil(jobModel).createResources();
+
+    // the fan out trigger logic comes from ClusterBasedJobCoordinator, in which a new job model can trigger a fan out
+    if (this.startpointManager != null && !jobMetadataChanges.isEmpty()) {
+      startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+    }
+  }
+
+  @VisibleForTesting
+  MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+    return new MetadataResourceUtil(jobModel, this.metrics, this.config);
+  }
+
+  private void waitForShutdown() throws InterruptedException {
+    LOG.info("Waiting for coordinator to be signalled for shutdown");
+    boolean latchReleased = false;
+    while (!latchReleased && !this.shouldShutdown.get()) {
+      /*
+       * Using a timeout as a defensive measure in case we are waiting for a shutdown but the latch is not triggered
+       * for some reason.
+       */
+      latchReleased = this.shutdownLatch.await(15, TimeUnit.SECONDS);

Review comment:
       I agree with defensive programming in general. However, my concerns are as follows
   - Why 15 seconds? 
   - At what point should this be configurable or should this never be configurable? If latter, how are we accounting for these in the liveness bounds of the overall process?
   - As code evolves and changes happen to shutdown sequence steps, do we need this to increase? if so, what is the guideline? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
mynameborat commented on pull request #1529:
URL: https://github.com/apache/samza/pull/1529#issuecomment-926160680


   > Minor comment. Looks good to me!
   
   There are some test failures. Doesn't seem related to your change although I'd just make sure if that is the case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r715139672



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/NoProcessorJobCoordinatorListener.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.job.model.JobModel;
+
+
+/**
+ * {@link JobCoordinatorListener} for a {@link JobCoordinator} which does not run alongside a processor.
+ */
+public class NoProcessorJobCoordinatorListener implements JobCoordinatorListener {
+  private final CountDownLatch waitForShutdownLatch;
+
+  public NoProcessorJobCoordinatorListener(CountDownLatch waitForShutdownLatch) {
+    this.waitForShutdownLatch = waitForShutdownLatch;
+  }
+
+  @Override
+  public void onJobModelExpired() {
+    // nothing to do
+  }
+
+  @Override
+  public void onNewJobModel(String processorId, JobModel jobModel) {
+    // nothing to do
+  }
+
+  @Override
+  public void onCoordinatorStop() {
+    this.waitForShutdownLatch.countDown();
+  }
+
+  @Override
+  public void onCoordinatorFailure(Throwable t) {
+    this.waitForShutdownLatch.countDown();

Review comment:
       Would be good to bubble up the error. If not, can we log the error here at the very least?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r710411418



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+
+/**
+ * Implements HTTP-based communication between the coordinator and workers.
+ */
+public class HttpCoordinatorToWorkerCommunicationFactory implements CoordinatorToWorkerCommunicationFactory {
+  @Override
+  public CoordinatorCommunication coordinatorCommunication(CoordinatorCommunicationContext context) {
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(context.getConfig());
+    JobModelHttpServlet jobModelHttpServlet = new JobModelHttpServlet(context.getJobModelProvider(),
+        new JobModelHttpServlet.Metrics(context.getMetricsRegistry()));
+    HttpServer httpServer = new HttpServer("/", clusterManagerConfig.getCoordinatorUrlPort(), null,
+        new ServletHolder(DefaultServlet.class));
+    httpServer.addServlet("/", jobModelHttpServlet);

Review comment:
       If the job coordinator isn't responsible for resource management, then I didn't think it should be responsible for exposing locality information. I also couldn't find anything that used the locality endpoint. Therefore, I left it out.

##########
File path: samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
##########
@@ -33,6 +33,8 @@
   public final static String DEFAULT_COORDINATOR_FACTORY = ZkJobCoordinatorFactory.class.getName();
   private static final String AZURE_COORDINATION_UTILS_FACTORY = "org.apache.samza.coordinator.AzureCoordinationUtilsFactory";
   private static final String AZURE_COORDINATOR_FACTORY = "org.apache.samza.coordinator.AzureJobCoordinatorFactory";
+  public static final String USE_STATIC_RESOURCE_JOB_COORDINATOR =

Review comment:
       It doesn't seem like `ClusterBasedJobCoordinator` and `StaticResourceJobCoordinator` fits with the standalone job coordinators, so do you see it being useful to add a pluggable interface for this non-standalone job coordination case? If not, then I'm not sure that we should tie the classes together.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config) {
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    if (new JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+      runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runStaticResourceJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore,
+      Config finalConfig) {
+    StaticResourceJobCoordinator staticResourceJobCoordinator =
+        StaticResourceJobCoordinator.build(metrics, metadataStore, finalConfig);
+    addShutdownHook(staticResourceJobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    staticResourceJobCoordinator.run();
+    metricsReporters.values().forEach(MetricsReporter::stop);
+  }
+
+  /**
+   * This is a separate method so it can be stubbed in tests, since adding a real shutdown hook will cause the hook to
+   * added to the test suite JVM.
+   */
+  @VisibleForTesting
+  static void addShutdownHook(StaticResourceJobCoordinator staticResourceJobCoordinator) {

Review comment:
       `ClusterBasedJobCoordinator` doesn't currently have a clean way to trigger a shutdown from another thread. There should be a shutdown hook for `ClusterBasedJobCoordinator` (https://issues.apache.org/jira/browse/SAMZA-2692), but that is out-of-scope for this PR anyways.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+
+/**
+ * Implements HTTP-based communication between the coordinator and workers.
+ */
+public class HttpCoordinatorToWorkerCommunicationFactory implements CoordinatorToWorkerCommunicationFactory {
+  @Override
+  public CoordinatorCommunication coordinatorCommunication(CoordinatorCommunicationContext context) {
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(context.getConfig());
+    JobModelHttpServlet jobModelHttpServlet = new JobModelHttpServlet(context.getJobModelProvider(),
+        new JobModelHttpServlet.Metrics(context.getMetricsRegistry()));
+    HttpServer httpServer = new HttpServer("/", clusterManagerConfig.getCoordinatorUrlPort(), null,
+        new ServletHolder(DefaultServlet.class));
+    httpServer.addServlet("/", jobModelHttpServlet);
+    return new HttpCoordinatorCommunication(httpServer);

Review comment:
       The scope of this object is to provide communication between JC and workers, and a readiness probe does not fall under that scope.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignal;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactory;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactoryContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  private final StreamPartitionCountMonitorFactory streamPartitionCountMonitorFactory;
+  private final StreamRegexMonitorFactory streamRegexMonitorFactory;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final JobRestartSignal jobRestartSignal;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);

Review comment:
       `JobCoordinatorMetadataManager` ideally should be cluster-agnostic. There is actually a specific flow for YARN, and everything else can actually be applied to the NON_YARN type, including standalone, so it should be ok that this is more generic.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config) {
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    if (new JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+      runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runStaticResourceJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore,
+      Config finalConfig) {
+    StaticResourceJobCoordinator staticResourceJobCoordinator =
+        StaticResourceJobCoordinator.build(metrics, metadataStore, finalConfig);
+    addShutdownHook(staticResourceJobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    staticResourceJobCoordinator.run();
+    metricsReporters.values().forEach(MetricsReporter::stop);
+  }
+

Review comment:
       Are you referring to the `JobCoordinator` interface specifically, or some new general interface with just a `run` method?
   
   1. I did try to fit this into the `JobCoordinator` interface, but it felt forced, since `JobCoordinator` is more tied to `StreamProcessor`.
   2. It doesn't seem that useful to tie this to an interface now, since job coordination doesn't really seem to be a layer that is pluggable. We would just be directly using the concrete classes anyways, so I'm not sure if there is a benefit to having a very basic interface with just a `run` method.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config) {
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    if (new JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+      runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runStaticResourceJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore,
+      Config finalConfig) {
+    StaticResourceJobCoordinator staticResourceJobCoordinator =
+        StaticResourceJobCoordinator.build(metrics, metadataStore, finalConfig);
+    addShutdownHook(staticResourceJobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    staticResourceJobCoordinator.run();
+    metricsReporters.values().forEach(MetricsReporter::stop);

Review comment:
       The `MetricsReporter`s do not belong to `StaticResourceJobCoordinator`, and they are not needed by `StaticResourceJobCoordinator`. Therefore, they do not need to be inside `StaticResourceJobCoordinator`. Since `StaticResourceJobCoordinator` is not creating the `MetricsReporter`s, then it should also not need to manage the lifecycle of the reporters.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -19,27 +19,36 @@
 package org.apache.samza.clustermanager;
 
 import java.util.List;
+import java.util.Map;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.coordinator.StaticResourceJobCoordinator;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.execution.RemoteJobPlanner;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.DiagnosticsUtil;
+import org.apache.samza.util.MetricsReporterLoader;
 
 
 /**
  * Util class to launch and run {@link ClusterBasedJobCoordinator}.
  * This util is being used by both high/low and beam API Samza jobs.
  */
 public class JobCoordinatorLaunchUtil {
+  private static final String JOB_COORDINATOR_CONTAINER_NAME = "JobCoordinator";

Review comment:
       I chose a confusing variable name here. In the existing job coordinator, the value used for constructing metrics is "ApplicationMaster", and the variable names used for that (e.g. `SamzaAppMasterMetrics.sourceName`, `ContainerProcessManager.METRICS_SOURCE_NAME`) use the term "source". I will update the variable.
   "ApplicationMaster" doesn't match "samza-job-coordinator", so this variable shouldn't need to either.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      waitForShutdownQuietly();
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    } finally {
+      this.coordinatorCommunication.stop();
+      this.startpointManager.stop();
+      this.systemAdmins.stop();
+    }
+  }
+
+  /**
+   * Set shutdown flag for coordinator and release any threads waiting for the shutdown.
+   */
+  public void signalShutdown() {
+    if (this.shouldShutdown.compareAndSet(false, true)) {
+      LOG.info("Shutdown signalled");
+      this.shutdownLatch.countDown();
+    }
+  }
+
+  private static JobModelHelper buildJobModelHelper(MetadataStore metadataStore,
+      StreamMetadataCache streamMetadataCache) {
+    LocalityManager localityManager =
+        new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
+    TaskAssignmentManager taskAssignmentManager =
+        new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE),
+            new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE));
+    TaskPartitionAssignmentManager taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE));
+    return new JobModelHelper(localityManager, taskAssignmentManager, taskPartitionAssignmentManager,
+        streamMetadataCache, JobModelCalculator.INSTANCE);
+  }
+
+  private JobModel newJobModel() {
+    return this.jobModelHelper.newJobModel(this.config, this.changelogStreamManager.readPartitionMapping());
+  }
+
+  /**
+   * Run set up steps so that workers can begin processing:
+   * 1. Persist job coordinator metadata
+   * 2. Publish new job model on coordinator-to-worker communication channel
+   * 3. Create metadata resources
+   * 4. Handle startpoints
+   */
+  private void prepareWorkerExecution(JobModel jobModel, JobCoordinatorMetadata newMetadata,
+      Set<JobMetadataChange> jobMetadataChanges) throws IOException {
+    if (!jobMetadataChanges.isEmpty()) {
+      this.jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+    }
+    this.jobModelServingContext.setJobModel(jobModel);
+
+    metadataResourceUtil(jobModel).createResources();
+
+    // the fan out trigger logic comes from ClusterBasedJobCoordinator, in which a new job model can trigger a fan out
+    if (this.startpointManager != null && !jobMetadataChanges.isEmpty()) {
+      startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+    }
+  }
+
+  @VisibleForTesting
+  MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+    return new MetadataResourceUtil(jobModel, this.metrics, this.config);
+  }
+
+  private void waitForShutdown() throws InterruptedException {
+    LOG.info("Waiting for coordinator to be signalled for shutdown");
+    boolean latchReleased = false;
+    while (!latchReleased && !this.shouldShutdown.get()) {
+      /*
+       * Using a timeout as a defensive measure in case we are waiting for a shutdown but the latch is not triggered
+       * for some reason.
+       */
+      latchReleased = this.shutdownLatch.await(15, TimeUnit.SECONDS);

Review comment:
       We can just await forever, but I wanted to put this timeout in as a defensive measure against a future bug in which the shutdown flag gets set but the latch isn't released. That's what I was trying to describe with the comment. Do you think it is not necessary to have this defensive timeout?

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,

Review comment:
       I think constructors should only have assignments to instance variables; anything a constructor needs should be passed into it. This `build` method actually constructs other objects right now, so it's more of a helper factory method instead of a constructor. This could have been moved to an actual factory class, but it didn't seem that useful to do that now.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunicationContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+/**
+ * Contains components which can be used to build a {@link CoordinatorCommunication}.
+ * For example, provides access to job model and handling for worker states.
+ */
+public class CoordinatorCommunicationContext {
+  private final JobModelProvider jobModelProvider;
+  private final Config config;
+  private final MetricsRegistry metricsRegistry;
+
+  public CoordinatorCommunicationContext(JobModelProvider jobModelProvider, Config config,
+      MetricsRegistry metricsRegistry) {
+    this.config = config;
+    this.jobModelProvider = jobModelProvider;
+    this.metricsRegistry = metricsRegistry;

Review comment:
       1. This gives flexibility to change the job model within the lifecycle of the JC. In the current impl, job model won't change within the lifecycle of the JC though. 
   2. It is easier to extend `JobModelProvider` to provide references to other objects. Now, we only need a reference to the serialized job model, but if other communication impls in the future need a reference to the deserialized job model, then it is easier to update the `JobModelProvider`.
   3. It decouples the construction of the communication layer from the calculation of the job model.
   
   Regarding config: My intention of the `config` in `CoordinatorCommunicationContext` was to be the config used to build the `CoordinatorCommunication`, and I guess I was thinking that part would be immutable (or more generally, any config that is used by the JC is immutable).`JobModelProvider` provides the `JobModel` which has the `Config` which is to be communicated to workers, and I suppose that might be mutable when job model changes. I see how it is confusing to have multiple `Config`s. I'll see if I can make this clearer.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      waitForShutdownQuietly();
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    } finally {
+      this.coordinatorCommunication.stop();
+      this.startpointManager.stop();
+      this.systemAdmins.stop();
+    }
+  }
+
+  /**
+   * Set shutdown flag for coordinator and release any threads waiting for the shutdown.
+   */
+  public void signalShutdown() {
+    if (this.shouldShutdown.compareAndSet(false, true)) {
+      LOG.info("Shutdown signalled");
+      this.shutdownLatch.countDown();
+    }
+  }
+
+  private static JobModelHelper buildJobModelHelper(MetadataStore metadataStore,
+      StreamMetadataCache streamMetadataCache) {
+    LocalityManager localityManager =
+        new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
+    TaskAssignmentManager taskAssignmentManager =
+        new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE),
+            new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE));
+    TaskPartitionAssignmentManager taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE));
+    return new JobModelHelper(localityManager, taskAssignmentManager, taskPartitionAssignmentManager,
+        streamMetadataCache, JobModelCalculator.INSTANCE);
+  }
+
+  private JobModel newJobModel() {
+    return this.jobModelHelper.newJobModel(this.config, this.changelogStreamManager.readPartitionMapping());
+  }
+
+  /**
+   * Run set up steps so that workers can begin processing:
+   * 1. Persist job coordinator metadata
+   * 2. Publish new job model on coordinator-to-worker communication channel
+   * 3. Create metadata resources
+   * 4. Handle startpoints
+   */
+  private void prepareWorkerExecution(JobModel jobModel, JobCoordinatorMetadata newMetadata,
+      Set<JobMetadataChange> jobMetadataChanges) throws IOException {
+    if (!jobMetadataChanges.isEmpty()) {
+      this.jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+    }
+    this.jobModelServingContext.setJobModel(jobModel);
+
+    metadataResourceUtil(jobModel).createResources();
+
+    // the fan out trigger logic comes from ClusterBasedJobCoordinator, in which a new job model can trigger a fan out
+    if (this.startpointManager != null && !jobMetadataChanges.isEmpty()) {
+      startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+    }
+  }
+
+  @VisibleForTesting
+  MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+    return new MetadataResourceUtil(jobModel, this.metrics, this.config);
+  }
+
+  private void waitForShutdown() throws InterruptedException {
+    LOG.info("Waiting for coordinator to be signalled for shutdown");
+    boolean latchReleased = false;
+    while (!latchReleased && !this.shouldShutdown.get()) {
+      /*
+       * Using a timeout as a defensive measure in case we are waiting for a shutdown but the latch is not triggered
+       * for some reason.
+       */
+      latchReleased = this.shutdownLatch.await(15, TimeUnit.SECONDS);
+    }
+  }
+
+  private Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata newMetadata) {
+    JobCoordinatorMetadata previousMetadata = this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
+    return this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, previousMetadata);
+  }
+
+  private void waitForShutdownQuietly() {
+    try {
+      waitForShutdown();
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting to shutdown", e);
+    }
+  }

Review comment:
       The caller of `waitForShutdown` doesn't need a try-catch block. `waitForShutdown` doesn't actually do shutdown logic; it just waits on a latch for the shutdown signal. Maybe `waitForShutdownSignal` is a better name for this method.
   If waiting on the latch is interrupted, then it will just stop waiting as if it did get shut down.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r712393785



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,

Review comment:
       I will be adding a factory class due to some other comments as well.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);
+    ChangelogStreamManager changelogStreamManager =
+        new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetChangelogMapping.TYPE));
+    StartpointManager startpointManager =
+        jobConfig.getStartpointEnabled() ? new StartpointManager(metadataStore) : null;
+    SystemAdmins systemAdmins = new SystemAdmins(config, StaticResourceJobCoordinator.class.getSimpleName());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    JobModelHelper jobModelHelper = buildJobModelHelper(metadataStore, streamMetadataCache);
+    return new StaticResourceJobCoordinator(jobModelHelper, jobModelServingContext, coordinatorCommunication,
+        jobCoordinatorMetadataManager, startpointManager, changelogStreamManager, metrics, systemAdmins, config);
+  }
+
+  @VisibleForTesting
+  StaticResourceJobCoordinator(JobModelHelper jobModelHelper, JobModelServingContext jobModelServingContext,
+      CoordinatorCommunication coordinatorCommunication, JobCoordinatorMetadataManager jobCoordinatorMetadataManager,
+      StartpointManager startpointManager, ChangelogStreamManager changelogStreamManager, MetricsRegistry metrics,
+      SystemAdmins systemAdmins, Config config) {
+    this.jobModelHelper = jobModelHelper;
+    this.jobModelServingContext = jobModelServingContext;
+    this.coordinatorCommunication = coordinatorCommunication;
+    this.jobCoordinatorMetadataManager = jobCoordinatorMetadataManager;
+    this.startpointManager = startpointManager;
+    this.changelogStreamManager = changelogStreamManager;
+    this.metrics = metrics;
+    this.systemAdmins = systemAdmins;
+    this.config = config;
+  }
+
+  /**
+   * Run the coordinator.
+   */
+  public void run() {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.warn("Already running; not to going execute run() again");
+      return;
+    }
+    LOG.info("Starting job coordinator");
+    this.systemAdmins.start();
+    this.startpointManager.start();
+    try {
+      JobModel jobModel = newJobModel();
+      JobCoordinatorMetadata newMetadata =
+          this.jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, jobModel.getConfig());
+      Set<JobMetadataChange> jobMetadataChanges = checkForMetadataChanges(newMetadata);
+      prepareWorkerExecution(jobModel, newMetadata, jobMetadataChanges);
+      this.coordinatorCommunication.start();
+      waitForShutdownQuietly();
+    } catch (Exception e) {
+      LOG.error("Error while running job coordinator; exiting", e);
+      throw new SamzaException("Error while running job coordinator", e);
+    } finally {
+      this.coordinatorCommunication.stop();
+      this.startpointManager.stop();
+      this.systemAdmins.stop();
+    }
+  }
+
+  /**
+   * Set shutdown flag for coordinator and release any threads waiting for the shutdown.
+   */
+  public void signalShutdown() {
+    if (this.shouldShutdown.compareAndSet(false, true)) {
+      LOG.info("Shutdown signalled");
+      this.shutdownLatch.countDown();
+    }
+  }
+
+  private static JobModelHelper buildJobModelHelper(MetadataStore metadataStore,
+      StreamMetadataCache streamMetadataCache) {
+    LocalityManager localityManager =
+        new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE));
+    TaskAssignmentManager taskAssignmentManager =
+        new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE),
+            new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE));
+    TaskPartitionAssignmentManager taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE));
+    return new JobModelHelper(localityManager, taskAssignmentManager, taskPartitionAssignmentManager,
+        streamMetadataCache, JobModelCalculator.INSTANCE);
+  }
+
+  private JobModel newJobModel() {
+    return this.jobModelHelper.newJobModel(this.config, this.changelogStreamManager.readPartitionMapping());
+  }
+
+  /**
+   * Run set up steps so that workers can begin processing:
+   * 1. Persist job coordinator metadata
+   * 2. Publish new job model on coordinator-to-worker communication channel
+   * 3. Create metadata resources
+   * 4. Handle startpoints
+   */
+  private void prepareWorkerExecution(JobModel jobModel, JobCoordinatorMetadata newMetadata,
+      Set<JobMetadataChange> jobMetadataChanges) throws IOException {
+    if (!jobMetadataChanges.isEmpty()) {
+      this.jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
+    }
+    this.jobModelServingContext.setJobModel(jobModel);
+
+    metadataResourceUtil(jobModel).createResources();
+
+    // the fan out trigger logic comes from ClusterBasedJobCoordinator, in which a new job model can trigger a fan out
+    if (this.startpointManager != null && !jobMetadataChanges.isEmpty()) {
+      startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+    }
+  }
+
+  @VisibleForTesting
+  MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
+    return new MetadataResourceUtil(jobModel, this.metrics, this.config);
+  }
+
+  private void waitForShutdown() throws InterruptedException {
+    LOG.info("Waiting for coordinator to be signalled for shutdown");
+    boolean latchReleased = false;
+    while (!latchReleased && !this.shouldShutdown.get()) {
+      /*
+       * Using a timeout as a defensive measure in case we are waiting for a shutdown but the latch is not triggered
+       * for some reason.
+       */
+      latchReleased = this.shutdownLatch.await(15, TimeUnit.SECONDS);

Review comment:
       15 seconds was arbitrary. It shouldn't need to be configurable or need to be changed, because it is not expected to actually be needed. I would always expect the shutdown flag to get set along with the latch being released, so it shouldn't matter what this timeout is.
   Based on your questions, I'm going to remove this timeout, since it causes more confusion and it isn't helpful right now.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignal;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactory;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactoryContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  private final StreamPartitionCountMonitorFactory streamPartitionCountMonitorFactory;
+  private final StreamRegexMonitorFactory streamRegexMonitorFactory;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final JobRestartSignal jobRestartSignal;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);

Review comment:
       Config rewriting in open source is currently agnostic of cluster type from what I can tell. Certain specific implementations of config rewriters may care about cluster type, but I think it would be good to keep the core samza engine agnostic of it. The impls that need cluster type will need to figure out cluster type in their own way, so that the samza engine can stay decoupled.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config) {
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    if (new JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+      runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runStaticResourceJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore,
+      Config finalConfig) {
+    StaticResourceJobCoordinator staticResourceJobCoordinator =
+        StaticResourceJobCoordinator.build(metrics, metadataStore, finalConfig);
+    addShutdownHook(staticResourceJobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    staticResourceJobCoordinator.run();
+    metricsReporters.values().forEach(MetricsReporter::stop);
+  }
+

Review comment:
       Ok, I will try fitting it into the `JobCoordinator` interface. The javadocs seem to tie `JobCoordinator` to `StreamProcessor`, and some of the listener methods seemed to be for more dynamic job model handling, but I see where it could potentially be helpful to start merging. I would prefer to not refactor the `ClusterBasedJobCoordinator` flow here in order to fit it into `JobCoordinator` also, so would it be ok to still keep that one as a separate case?

##########
File path: samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
##########
@@ -33,6 +33,8 @@
   public final static String DEFAULT_COORDINATOR_FACTORY = ZkJobCoordinatorFactory.class.getName();
   private static final String AZURE_COORDINATION_UTILS_FACTORY = "org.apache.samza.coordinator.AzureCoordinationUtilsFactory";
   private static final String AZURE_COORDINATOR_FACTORY = "org.apache.samza.coordinator.AzureJobCoordinatorFactory";
+  public static final String USE_STATIC_RESOURCE_JOB_COORDINATOR =

Review comment:
       As mentioned in the other comment, I will try to fit `StaticResourceJobCoordinator` into the `JobCoordinator` interface. Then, we could use a factory method.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -19,27 +19,36 @@
 package org.apache.samza.clustermanager;
 
 import java.util.List;
+import java.util.Map;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.coordinator.StaticResourceJobCoordinator;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.execution.RemoteJobPlanner;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.DiagnosticsUtil;
+import org.apache.samza.util.MetricsReporterLoader;
 
 
 /**
  * Util class to launch and run {@link ClusterBasedJobCoordinator}.
  * This util is being used by both high/low and beam API Samza jobs.
  */
 public class JobCoordinatorLaunchUtil {
+  private static final String JOB_COORDINATOR_CONTAINER_NAME = "JobCoordinator";

Review comment:
       Yes, it is still needs `samza.container.name`. Just to be clear, `samza.container.name` is not associated with this variable.

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/communication/HttpCoordinatorToWorkerCommunicationFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+
+/**
+ * Implements HTTP-based communication between the coordinator and workers.
+ */
+public class HttpCoordinatorToWorkerCommunicationFactory implements CoordinatorToWorkerCommunicationFactory {
+  @Override
+  public CoordinatorCommunication coordinatorCommunication(CoordinatorCommunicationContext context) {
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(context.getConfig());
+    JobModelHttpServlet jobModelHttpServlet = new JobModelHttpServlet(context.getJobModelProvider(),
+        new JobModelHttpServlet.Metrics(context.getMetricsRegistry()));
+    HttpServer httpServer = new HttpServer("/", clusterManagerConfig.getCoordinatorUrlPort(), null,
+        new ServletHolder(DefaultServlet.class));
+    httpServer.addServlet("/", jobModelHttpServlet);
+    return new HttpCoordinatorCommunication(httpServer);

Review comment:
       The communication between JC and workers shouldn't be coupled with a readiness probe. The functionality that each component provides is different and then access pattern to each component is also different.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config) {
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    if (new JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+      runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runStaticResourceJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore,
+      Config finalConfig) {
+    StaticResourceJobCoordinator staticResourceJobCoordinator =
+        StaticResourceJobCoordinator.build(metrics, metadataStore, finalConfig);
+    addShutdownHook(staticResourceJobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    staticResourceJobCoordinator.run();
+    metricsReporters.values().forEach(MetricsReporter::stop);

Review comment:
       `MetricsReporter`s are added inside `ContainerProcessManager` which is inside `ClusterBasedJobCoordinator`. So we don't need to add metrics reporter around `ClusterBasedJobCoordinator` now, although I feel that it would have been more ideal if it was added around it in the first place. I was trying to avoid changing too much of `ClusterBasedJobCoordinator` in this PR because that would make the PR even larger.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r712272025



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/StaticResourceJobCoordinator.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.coordinator.communication.CoordinatorCommunication;
+import org.apache.samza.coordinator.communication.CoordinatorCommunicationContext;
+import org.apache.samza.coordinator.communication.HttpCoordinatorToWorkerCommunicationFactory;
+import org.apache.samza.coordinator.communication.JobModelServingContext;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignal;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactory;
+import org.apache.samza.coordinator.lifecycle.JobRestartSignalFactoryContext;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.JobModelUtil;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.StartpointManager;
+import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles coordination with the workers for a Samza job. This includes generating the job model, managing metadata,
+ * and coordinating with the workers when the job model changes.
+ * This does certain similar things as the older ClusterBasedJobCoordinator, but one notable difference is that this
+ * coordinator does no management of execution resources. It relies on an external component to manage those resources
+ * for a Samza job.
+ */
+public class StaticResourceJobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticResourceJobCoordinator.class);
+
+  private final JobModelHelper jobModelHelper;
+  private final JobModelServingContext jobModelServingContext;
+  private final CoordinatorCommunication coordinatorCommunication;
+  private final JobCoordinatorMetadataManager jobCoordinatorMetadataManager;
+  private final StreamPartitionCountMonitorFactory streamPartitionCountMonitorFactory;
+  private final StreamRegexMonitorFactory streamRegexMonitorFactory;
+  /**
+   * This can be null if startpoints are not enabled.
+   */
+  private final StartpointManager startpointManager;
+  private final ChangelogStreamManager changelogStreamManager;
+  private final JobRestartSignal jobRestartSignal;
+  private final MetricsRegistry metrics;
+  private final SystemAdmins systemAdmins;
+  private final Config config;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final AtomicBoolean shouldShutdown = new AtomicBoolean(false);
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+  public static StaticResourceJobCoordinator build(MetricsRegistry metrics, MetadataStore metadataStore,
+      Config config) {
+    JobModelServingContext jobModelServingContext = new JobModelServingContext();
+    JobConfig jobConfig = new JobConfig(config);
+    CoordinatorCommunicationContext context =
+        new CoordinatorCommunicationContext(jobModelServingContext, config, metrics);
+    CoordinatorCommunication coordinatorCommunication =
+        new HttpCoordinatorToWorkerCommunicationFactory().coordinatorCommunication(context);
+    JobCoordinatorMetadataManager jobCoordinatorMetadataManager = new JobCoordinatorMetadataManager(
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, SetJobCoordinatorMetadataMessage.TYPE),
+        JobCoordinatorMetadataManager.ClusterType.NON_YARN, metrics);

Review comment:
       Fair enough. Although, it seems we need this distinction in config rewriting/expansion. Wouldn't it be better to have implementation consistency in having on common ClusterType and then make `JobCoordinatorMetadataManager` treat different types as one with the exception of YARN?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r712263815



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
##########
@@ -76,11 +85,37 @@ public static void run(SamzaApplication app, Config config) {
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
 
-    ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
-        metrics,
-        metadataStore,
-        finalConfig);
-    jc.run();
+    if (new JobCoordinatorConfig(finalConfig).getUseStaticResourceJobCoordinator()) {
+      runStaticResourceJobCoordinator(metrics, metadataStore, finalConfig);
+    } else {
+      ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(metrics, metadataStore, finalConfig);
+      jc.run();
+    }
+  }
+
+  private static void runStaticResourceJobCoordinator(MetricsRegistryMap metrics, MetadataStore metadataStore,
+      Config finalConfig) {
+    StaticResourceJobCoordinator staticResourceJobCoordinator =
+        StaticResourceJobCoordinator.build(metrics, metadataStore, finalConfig);
+    addShutdownHook(staticResourceJobCoordinator);
+    Map<String, MetricsReporter> metricsReporters =
+        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(finalConfig), JOB_COORDINATOR_CONTAINER_NAME);
+    metricsReporters.values()
+        .forEach(metricsReporter -> metricsReporter.register(JOB_COORDINATOR_CONTAINER_NAME, metrics));
+    metricsReporters.values().forEach(MetricsReporter::start);
+    staticResourceJobCoordinator.run();
+    metricsReporters.values().forEach(MetricsReporter::stop);
+  }
+

Review comment:
       The intent behind introducing `JobCoordinator` is to also have it used with `ClusterBasedJobCoordinator`. It isn't tied to `StreamProcessor` as start, stop, job model and listeners are agnostic to choice of streamprocessor vs otherwise. I'd not consider the deprecated `getProcessorId` as a tie up to `StreamProcessor`.
   
   The divergence here with each job coordinator not following a fixed contract further takes us away from consolidating the job coordinator flows and have future leverage across these job coordinators in terms of feature evolution.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1529: SAMZA-2685: Add job coordinator which does not do resource management

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1529:
URL: https://github.com/apache/samza/pull/1529#discussion_r712291787



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/communication/CoordinatorCommunicationContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.communication;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+/**
+ * Contains components which can be used to build a {@link CoordinatorCommunication}.
+ * For example, provides access to job model and handling for worker states.
+ */
+public class CoordinatorCommunicationContext {
+  private final JobModelProvider jobModelProvider;
+  private final Config config;
+  private final MetricsRegistry metricsRegistry;
+
+  public CoordinatorCommunicationContext(JobModelProvider jobModelProvider, Config config,
+      MetricsRegistry metricsRegistry) {
+    this.config = config;
+    this.jobModelProvider = jobModelProvider;
+    this.metricsRegistry = metricsRegistry;

Review comment:
       sure 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org