You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/08/21 23:03:10 UTC

[GitHub] [samza] mynameborat opened a new pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

mynameborat opened a new pull request #1421:
URL: https://github.com/apache/samza/pull/1421


   *Issues*
   Currently locality information is part of job model. Job model typically is immutable and fixed within the lifecycle of an application attempt. The locality information on the other hand is dynamic and changes in the event of container movements. Due to this difference, it makes it complicated to program, model or define semantics around these models when building features. Furthermore, the implications of this dependency is as follows
   
   1. Enables us to move JobModel to public APIs and expose it in JobContext
   2. Enables us to cache and serve serialized JobModel from the AM servlet to reduce AM overhead (memory, open connections, num threads) during container startup, esp. for jobs with a large number of containers (See: https://github.com/apache/samza/pull/1241)
   3. Removes tech debt: models should be immutable, and should not update themselves.
   4. Removes tech debt: makes current container location a first class concept for container scheduling / placement , and for tools like dashboard, samza-rest, auto-scaling, diagnostics etc.
   
   *Changes*
   
   1. Separated out locality information out of job model into `LocalityModel`
   2. Introduced an endpoint in AM to serve locality information
   3. Added Json MixIns for locality models (LocalityModel & HostLocality) 
   
   *Tests*
   
   1. Added tests for new servlet
   2. Modified existing tests to reflect the refactor
   3. Deployed the new servlet and verified the locality information is accessible
   
   *API Changes*: 
   
   1. Introduced new models for locality. 
   2. Previous job model endpoint will no longer serve locality information. i.e. tools using these will need to update to use the new endpoint; refer usage instructions for details.
   
   *Upgrade Instructions*: None. Refer to the API changes & the usage instructions below to upgrade your tooling if applicable.
   *Usage Instructions*: The new locality information is served under am endpoint within `locality` sub page. Any tooling will now hit `http://<am-endpoint>/locality` instead of `http://<am-endpoint>`.
   The endpoint supports two types of queries
   
   1. Querying for locality information of the entire job. It can be done by hitting the `http://<am-endpoint>/locality`. A sample response will look like the following 
   
   ```
   {
     host-localities: {
       0: {
         id: "0",
         host: "bkumaras-ld2",
         jmx-url: "",
         jmx-tunneling-url: ""
       }
     }
   }
   ```
   
   2. Querying for specific processor locality information. It can be done by specifying the `processorId` in the request. e.g. `GET <am-enpoint>/locality?processorId=x`. A sample response will look like the following
   ```
   {
     id: "0",
     host: "mynameborat-host",
     jmx-url: "",
     jmx-tunneling-url: ""
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat merged pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat merged pull request #1421:
URL: https://github.com/apache/samza/pull/1421


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477014279



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/server/LocalityServlet.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.server;
+
+import java.io.IOException;
+import java.util.Optional;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+/**
+ * A servlet for locality information of a job. The servlet is hosted alongside of the {@link JobServlet} which hosts
+ * job model and configuration. Historically, locality information was part of job model but we extracted the locality
+ * as job model is static within the lifecycle of an application attempt while locality changes in the event of container
+ * movements.
+ *
+ * This separation enables us to achieve performance benefits by caching job model when it is served by the AM as it
+ * can incur significant penalty in the job start time for jobs with large number of containers.
+ */

Review comment:
       Do we know which URL would this be exposed under, 
   http://<AM HOST>:<RPC Port>/abc 
   
   Might be useful to add it here so if we want analytics or tools to ping it for some reason.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r476913697



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -174,6 +176,7 @@
   private final MetadataStore metadataStore;
 
   private final SystemAdmins systemAdmins;
+  private final LocalityManager localityManager;

Review comment:
       In the first pass, I didn't have a local instance for locality manager but ended up introducing one to manage the lifecycle along with the other components. 
   
   What do you think about doing it as part of the consolidation refactor work? We currently have `ContainerPlacementMetadataStore` in job coordinator. when we move that within CM, we can also move locality manager all in on go.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r476728011



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
##########
@@ -80,19 +82,23 @@
 
   private final Optional<StandbyContainerManager> standbyContainerManager;
 
+  private final LocalityManager localityManager;

Review comment:
       1. `SamzaApplicationState` is a holder class to share state between components and not supposed to hold on any manager classes. We have a TODO to remove JobModelManager out of it. It would be counter productive to introduce another component into the holder class. 
   2. I am not a huge fan of Static for following reasons
   
   - It makes testing harder and using power mock for component testing everywhere.
   - Utility classes that handles components with lifecycle is harder to reason about.
   - Static methods are harder to evolve/extend without breaking the signature & modifying the callers and forces the dependencies it uses to be either passed as arguments or define as static variables within the class. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477013673



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
##########
@@ -80,19 +82,23 @@
 
   private final Optional<StandbyContainerManager> standbyContainerManager;
 
+  private final LocalityManager localityManager;

Review comment:
       +1, using state to fetch locality information was a bad pattern.
   Makes writing tests super weird.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r475061741



##########
File path: samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
##########
@@ -39,34 +36,14 @@
  * </p>
  */
 public class JobModel {
-  private static final String EMPTY_STRING = "";
   private final Config config;
   private final Map<String, ContainerModel> containers;
 
-  private final LocalityManager localityManager;
-  private final Map<String, String> localityMappings;
-
   public int maxChangeLogStreamPartitions;
 
   public JobModel(Config config, Map<String, ContainerModel> containers) {
-    this(config, containers, null);
-  }
-
-  public JobModel(Config config, Map<String, ContainerModel> containers, LocalityManager localityManager) {

Review comment:
       Not a big change. I kept it as a separate commit so that if we decide to leave it out, we can revert it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477895944



##########
File path: samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
##########
@@ -167,15 +167,18 @@ object JobModelManager extends Logging {
     */
   def getProcessorLocality(config: Config, localityManager: LocalityManager) = {

Review comment:
       Putting the points from our offline discussion. This is a helper method within the `JobModelManager` that does some massaging on the locality information and decorates it with its own terminology for the absence of locality `ANY_HOST`. The helper method could carry a perception that this is just getting locality information but in theory it is not. Also, this goes over the list of all available containers and populates `ANY_HOST` for the ones that don't have locality. This information is oblivious to the `LocalityManager` as it only treats coordinator stream as the source of truth which only knows of container that has locality data.

##########
File path: samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
##########
@@ -558,14 +548,19 @@ public Void answer(InvocationOnMock invocation) {
   public void testContainerPlacementsForJobRunningInDegradedState() throws Exception {
     // Set failure after retries to false to enable job running in degraded state
     config = new MapConfig(configVals, getConfigWithHostAffinityAndRetries(true, 1, false));
-    state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host-1", "1", "host-2")));
+    state = new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false));
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);

Review comment:
       extracted the locality manager to instance variable to reuse them in the tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477013006



##########
File path: samza-api/src/main/java/org/apache/samza/job/model/ProcessorLocality.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+
+/**
+ * A data model to represent the processor locality information. The locality information refers to the whereabouts
+ * of the physical execution of container.
+ * Fields such as <i>jmxUrl</i> and <i>jmxTunnelingUrl</i> exist for backward compatibility reasons as they were
+ * historically stored under the same name space as locality and surfaced within the framework through the locality
+ * manager.
+ */
+public class ProcessorLocality {

Review comment:
       Would it make sense to add a timestamp field here, to know when was the information last updated ? 
   Timestamps could come in handy when, e.g., debugging race conditions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477019652



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/server/LocalityServlet.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.server;
+
+import java.io.IOException;
+import java.util.Optional;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+/**
+ * A servlet for locality information of a job. The servlet is hosted alongside of the {@link JobServlet} which hosts
+ * job model and configuration. Historically, locality information was part of job model but we extracted the locality
+ * as job model is static within the lifecycle of an application attempt while locality changes in the event of container
+ * movements.
+ *
+ * This separation enables us to achieve performance benefits by caching job model when it is served by the AM as it
+ * can incur significant penalty in the job start time for jobs with large number of containers.
+ */

Review comment:
       There isn't a direct URL per say. I added a general comment to identify the whereabouts of the server and port and the path of the actual servlet. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477013006



##########
File path: samza-api/src/main/java/org/apache/samza/job/model/ProcessorLocality.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+
+/**
+ * A data model to represent the processor locality information. The locality information refers to the whereabouts
+ * of the physical execution of container.
+ * Fields such as <i>jmxUrl</i> and <i>jmxTunnelingUrl</i> exist for backward compatibility reasons as they were
+ * historically stored under the same name space as locality and surfaced within the framework through the locality
+ * manager.
+ */
+public class ProcessorLocality {

Review comment:
       Would it make sense to add a timestamp field here, to know when was the information last updated ? 
   Timestamps could come in handy when, e.g., debugging race conditions.
   
   Similar thoughts on version for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Sanil15 commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
Sanil15 commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477814080



##########
File path: samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
##########
@@ -53,28 +56,26 @@ public LocalityManager(MetadataStore metadataStore) {
   }
 
   /**
-   * Method to allow read container locality information from the {@link MetadataStore}.
-   * This method is used in {@link org.apache.samza.coordinator.JobModelManager}.
+   * Fetch the container locality information from the {@link MetadataStore}.
    *
-   * @return the map of containerId: (hostname)
+   * @return the {@code LocalityModel} for the job
    */
-  public Map<String, Map<String, String>> readContainerLocality() {

Review comment:
       Doing same thing more than 2 times is justified enough for helper imo and not an overkill, Anyways I will live this upto your discretion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r476728011



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
##########
@@ -80,19 +82,23 @@
 
   private final Optional<StandbyContainerManager> standbyContainerManager;
 
+  private final LocalityManager localityManager;

Review comment:
       1. `SamzaApplicationState` is a holder class to share state between components and not supposed to hold on any manager classes. We have a TODO to remove JobModelManager out of it. It would be counter productive to introduce another component into the holder class. 
   2. I am not a huge fan of Static for following reasons
   
   - It makes testing harder and using power mock for component testing everywhere.
   - Utility classes that handles components with lifecycle is harder to reason about.
   - Static methods are harder to evolve without breaking the signature & modifying the callers  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477009078



##########
File path: samza-api/src/main/java/org/apache/samza/context/JobContext.java
##########
@@ -46,4 +47,9 @@
    * @return the id for this job
    */
   String getJobId();
+
+  /*
+   * Returns the job model for this job

Review comment:
       {@link JobModel}




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477797973



##########
File path: samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
##########
@@ -53,28 +56,26 @@ public LocalityManager(MetadataStore metadataStore) {
   }
 
   /**
-   * Method to allow read container locality information from the {@link MetadataStore}.
-   * This method is used in {@link org.apache.samza.coordinator.JobModelManager}.
+   * Fetch the container locality information from the {@link MetadataStore}.
    *
-   * @return the map of containerId: (hostname)
+   * @return the {@code LocalityModel} for the job
    */
-  public Map<String, Map<String, String>> readContainerLocality() {

Review comment:
       We have divergence in our logic on how lack of locality is handled e.g. within `LocalityManager` vs `ContainerProcessorManager`. I see your point in case of complex logic being shared across. However, introducing a helper method to just unwrap the model seems overkill at the moment.
   Also, why only host and what is special about that attribute to have a helper wrapper method and why not for jmx or jmx tunnel url? This goes back to my point of being redundant without much ROI at the cost readability because I'd need to go down one layer further to understand what `readLastSeenHostForContainer` does in its unwrapping logic.
   
   I'd prefer to start simple and not over architect and extract/refactor as we see the need as long as the right foundations are in place for the code to evolve and extend.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Sanil15 commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
Sanil15 commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477875176



##########
File path: samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
##########
@@ -167,15 +167,18 @@ object JobModelManager extends Logging {
     */
   def getProcessorLocality(config: Config, localityManager: LocalityManager) = {

Review comment:
       discussed offline with Bharath since JobModelMAnager dumping locality info when LocalityManager is kind of independent does not make sense to me, please cut a jira here and add a todo so we can track this cleanup 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477015346



##########
File path: samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality information of an application. The locality information refers to the
+ * whereabouts of the physical execution of a samza container. With this information, samza achieves (best effort) affinity
+ * i.e. place the container on the host in which it was running before. By doing this, stateful applications can minimize
+ * the bootstrap time of their state by leveraging the local copy.
+ *
+ * It is suffice to have only {@link ProcessorLocality} model and use it within locality manager. However, this abstraction
+ * enables us extend locality beyond container. e.g. It is useful to track task locality to enable heterogeneous containers
+ * or fine grained execution model.
+ *
+ * In YARN deployment model, processors are interchangeably used for container and <i>processorId</i>refers to
+ * logical container id.
+ */
+public class LocalityModel {

Review comment:
       Are you referring to the data model version or the versioning of the actual data? 
   If it is former, our current only metadata store (Kafka) has a versioning of the format under `CoordinatorStreamMessage`. That should handle evolution in the event of adding format evolution; If it is latter, we don't have a universal versioning of metadata (job model, configuration, locality) yet. We will likely add versioning to all the metadata once with the work on metadata abstraction.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r475757265



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/server/LocalityServlet.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.server;
+
+import java.io.IOException;
+import java.util.Optional;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.job.model.HostLocality;
+import org.apache.samza.job.model.LocalityModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+/**
+ * A servlet for locality information of a job. The servlet is hosted alongside of the {@link JobServlet} which hosts
+ * job model & configuration. Historically, locality information was part of job model but we extracted the locality
+ * as job model is static within the lifecycle of an application attempt while locality changes in the event of container
+ * movements.
+ *
+ * This separation enables us to achieve performance benefits by caching job model when it is served by the AM as it
+ * can incur significant penalty in the job start time for jobs with large number of containers.
+ */
+public class LocalityServlet extends HttpServlet {
+  private static final String PROCESSOR_ID_PARAM = "processorId";
+  private final ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+  private final LocalityManager localityManager;
+
+  public LocalityServlet(LocalityManager localityManager) {
+    this.localityManager = localityManager;
+  }
+
+  @Override
+  public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+    response.setContentType("application/json");
+    response.setStatus(HttpServletResponse.SC_OK);
+    LocalityModel localityModel = localityManager.readLocality();
+
+    if (request.getParameterMap().size() == 1) {
+      String processorId = request.getParameter(PROCESSOR_ID_PARAM);
+      HostLocality hostLocality = Optional.ofNullable(localityModel.getHostLocality(processorId))
+          .orElse(new HostLocality(processorId, ""));

Review comment:
       Thanks, let's keep it compatible for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477011621



##########
File path: samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality information of an application. The locality information refers to the
+ * whereabouts of the physical execution of a samza container. With this information, samza achieves (best effort) affinity
+ * i.e. place the container on the host in which it was running before. By doing this, stateful applications can minimize
+ * the bootstrap time of their state by leveraging the local copy.
+ *
+ * It is suffice to have only {@link ProcessorLocality} model and use it within locality manager. However, this abstraction
+ * enables us extend locality beyond container. e.g. It is useful to track task locality to enable heterogeneous containers
+ * or fine grained execution model.
+ *
+ * In YARN deployment model, processors are interchangeably used for container and <i>processorId</i>refers to
+ * logical container id.
+ */
+public class LocalityModel {
+  /*
+   * A collection of processor locality keyed by processorId.
+   */
+  private Map<String, ProcessorLocality> processorLocalities;
+
+  /**
+   * Construct locality model for the job from the input map of processor localities.
+   * @param processorLocalities host locality information for the job keyed by processor id
+   */
+  public LocalityModel(Map<String, ProcessorLocality> processorLocalities) {
+    this.processorLocalities = processorLocalities;
+  }
+
+  /*
+   * Returns a {@link Map} of {@link ProcessorLocality} keyed by processors id.
+   */
+  public Map<String, ProcessorLocality> getProcessorLocalities() {
+    return processorLocalities;

Review comment:
       Should we return new HashMap(processorLocalities) so that the caller doesnt modify the original one in this class?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477012461



##########
File path: samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality information of an application. The locality information refers to the
+ * whereabouts of the physical execution of a samza container. With this information, samza achieves (best effort) affinity
+ * i.e. place the container on the host in which it was running before. By doing this, stateful applications can minimize
+ * the bootstrap time of their state by leveraging the local copy.
+ *
+ * It is suffice to have only {@link ProcessorLocality} model and use it within locality manager. However, this abstraction
+ * enables us extend locality beyond container. e.g. It is useful to track task locality to enable heterogeneous containers
+ * or fine grained execution model.
+ *
+ * In YARN deployment model, processors are interchangeably used for container and <i>processorId</i>refers to
+ * logical container id.
+ */
+public class LocalityModel {
+  /*
+   * A collection of processor locality keyed by processorId.
+   */
+  private Map<String, ProcessorLocality> processorLocalities;
+
+  /**
+   * Construct locality model for the job from the input map of processor localities.
+   * @param processorLocalities host locality information for the job keyed by processor id
+   */
+  public LocalityModel(Map<String, ProcessorLocality> processorLocalities) {
+    this.processorLocalities = processorLocalities;
+  }
+
+  /*
+   * Returns a {@link Map} of {@link ProcessorLocality} keyed by processors id.
+   */
+  public Map<String, ProcessorLocality> getProcessorLocalities() {
+    return processorLocalities;

Review comment:
       Good catch 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r475014768



##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/server/LocalityServlet.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.server;
+
+import java.io.IOException;
+import java.util.Optional;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.job.model.HostLocality;
+import org.apache.samza.job.model.LocalityModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+/**
+ * A servlet for locality information of a job. The servlet is hosted alongside of the {@link JobServlet} which hosts
+ * job model & configuration. Historically, locality information was part of job model but we extracted the locality
+ * as job model is static within the lifecycle of an application attempt while locality changes in the event of container
+ * movements.
+ *
+ * This separation enables us to achieve performance benefits by caching job model when it is served by the AM as it
+ * can incur significant penalty in the job start time for jobs with large number of containers.
+ */
+public class LocalityServlet extends HttpServlet {
+  private static final String PROCESSOR_ID_PARAM = "processorId";
+  private final ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+  private final LocalityManager localityManager;
+
+  public LocalityServlet(LocalityManager localityManager) {
+    this.localityManager = localityManager;
+  }
+
+  @Override
+  public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+    response.setContentType("application/json");
+    response.setStatus(HttpServletResponse.SC_OK);
+    LocalityModel localityModel = localityManager.readLocality();
+
+    if (request.getParameterMap().size() == 1) {
+      String processorId = request.getParameter(PROCESSOR_ID_PARAM);
+      HostLocality hostLocality = Optional.ofNullable(localityModel.getHostLocality(processorId))
+          .orElse(new HostLocality(processorId, ""));

Review comment:
       I did it to keep compatibility since the prior version read directly from the `JobModelManager` which would return an empty string if it doesn't exist.
   We can break that if we want. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Sanil15 commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
Sanil15 commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r476597594



##########
File path: samza-api/src/main/java/org/apache/samza/job/model/ContainerLocality.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+
+/**
+ * A data model to represent the container locality information. The locality information refers to the whereabouts
+ * of the physical execution of container.
+ * Fields such as <i>jmxUrl</i> and <i>jmxTunnelingUrl</i> exist for backward compatibility reasons as they were
+ * historically stored under the same name space as locality and surfaced within the framework through the locality
+ * manager.
+ */
+public class ContainerLocality {
+  /* Container identifier */
+  private String id;

Review comment:
       Is this the physical id of the container? for example in yarn it will be container_0000_99... if it is yes lets explicitly call it containerId and add a doc suggesting phyical **containerId** of a container
   
   Also don't you need the logical id (called processorId here...)

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -174,6 +176,7 @@
   private final MetadataStore metadataStore;
 
   private final SystemAdmins systemAdmins;
+  private final LocalityManager localityManager;

Review comment:
       Here is another thought:
   LocalityManager and the ContainerPlacementMetadataStore both are present in this class ClusterBasedJobCoordinator because they both need a hold of metadata store
   
   Will it be cleaner to move this to ContainerProcessManager and then eventually move it to ContainerManager (when all the refactors we have in process are in)
   
   We can just pass metadata store from here and then instantiate ContainerPlacementMetadatStore and LocalityManager in ContainerManager 
   

##########
File path: samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality information of an application. The locality information refers to the
+ * whereabouts of the physical execution of a samza container. With this information, samza achieves (best effort) affinity
+ * i.e. place the container on the host in which it was running before. By doing this, stateful applications can minimize
+ * the bootstrap time of their state by leveraging the local copy.
+ *
+ * It is suffice to have only {@link ContainerLocality} model and use it within locality manager. However, this abstraction
+ * enables us extend locality beyond container. e.g. It is useful to track task locality to enable heterogeneous containers
+ * or fine grained execution model.
+ */
+public class LocalityModel {
+  private Map<String, ContainerLocality> containerLocalities;

Review comment:
       Lets add docs here for key of this hashmap, i think its the logical processor id for ex 1,2,3 

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
##########
@@ -80,19 +82,23 @@
 
   private final Optional<StandbyContainerManager> standbyContainerManager;
 
+  private final LocalityManager localityManager;

Review comment:
       Now we are passing this LocalityManager everywhere ContainerManager, CPM, StandbyContainerManager just to get the host that container was last seen....
   
   I think we can have it cleaner by either maintaining that in SamzaApplicationState or having static util in LocalityManager to fetch the last seen host

##########
File path: samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality information of an application. The locality information refers to the
+ * whereabouts of the physical execution of a samza container. With this information, samza achieves (best effort) affinity
+ * i.e. place the container on the host in which it was running before. By doing this, stateful applications can minimize
+ * the bootstrap time of their state by leveraging the local copy.
+ *
+ * It is suffice to have only {@link ContainerLocality} model and use it within locality manager. However, this abstraction
+ * enables us extend locality beyond container. e.g. It is useful to track task locality to enable heterogeneous containers
+ * or fine grained execution model.
+ */
+public class LocalityModel {
+  private Map<String, ContainerLocality> containerLocalities;
+
+  /**
+   * Construct locality model for the job from the input map of container localities.
+   * @param containerLocalities host locality information for the job keyed by container id
+   */
+  public LocalityModel(Map<String, ContainerLocality> containerLocalities) {
+    this.containerLocalities = containerLocalities;
+  }
+
+  /*
+   * Returns a {@link Map} of {@link ContainerLocality} keyed by container id.
+   */
+  public Map<String, ContainerLocality> getContainerLocalities() {
+    return containerLocalities;
+  }
+
+  /*
+   * Returns the {@link ContainerLocality} for the given container id.
+   */
+  public ContainerLocality getContainerLocality(String id) {

Review comment:
       s/id/processorId
   
   Lets actually explicitly call it processorId

##########
File path: samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerLocalityMixIn.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.serializers.model;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * A mix-in Jackson class to convert {@link org.apache.samza.job.model.ContainerLocality} to/from JSON
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public abstract class JsonContainerLocalityMixIn {
+  @JsonCreator
+  public JsonContainerLocalityMixIn(@JsonProperty("id") String id, @JsonProperty("host") String host,
+      @JsonProperty("jmx-url") String jmxUrl, @JsonProperty("jmx-tunneling-url") String jmxTunnelingUrl) {
+  }
+
+  @JsonProperty("id")

Review comment:
       Same lets be specific here about the fact that is it processorId or containerId




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r476718374



##########
File path: samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
##########
@@ -53,28 +56,26 @@ public LocalityManager(MetadataStore metadataStore) {
   }
 
   /**
-   * Method to allow read container locality information from the {@link MetadataStore}.
-   * This method is used in {@link org.apache.samza.coordinator.JobModelManager}.
+   * Fetch the container locality information from the {@link MetadataStore}.
    *
-   * @return the map of containerId: (hostname)
+   * @return the {@code LocalityModel} for the job
    */
-  public Map<String, Map<String, String>> readContainerLocality() {

Review comment:
       we got rid of this code and the new method returns a model. We will need a method to return the locality of all containers or specific container & we already have a method to expose both. 
   
   Exposing helper methods within each attributes of `ContainerLocality` like `host` or `jmx` seems too redundant up top here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477013146



##########
File path: samza-api/src/main/java/org/apache/samza/job/model/ProcessorLocality.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+
+/**
+ * A data model to represent the processor locality information. The locality information refers to the whereabouts
+ * of the physical execution of container.
+ * Fields such as <i>jmxUrl</i> and <i>jmxTunnelingUrl</i> exist for backward compatibility reasons as they were
+ * historically stored under the same name space as locality and surfaced within the framework through the locality
+ * manager.
+ */
+public class ProcessorLocality {
+  /* Processor identifier. In YARN deployment model, this corresponds to the logical container id */
+  private String id;
+  /* Host on which the processor is currently placed */
+  private String host;
+  private String jmxUrl;
+  /* JMX tunneling URL for debugging */
+  private String jmxTunnelingUrl;
+
+  public ProcessorLocality(String id, String host) {
+    this(id, host, "", "");
+  }
+
+  public ProcessorLocality(String id, String host, String jmxUrl, String jmxTunnelingUrl) {
+    this.id = id;
+    this.host = host;
+    this.jmxUrl = jmxUrl;
+    this.jmxTunnelingUrl = jmxTunnelingUrl;
+  }
+
+  public String id() {
+    return id;
+  }
+
+  public String host() {
+    return host;

Review comment:
       I long for the day Samza code uses Lombok :-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Sanil15 commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
Sanil15 commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r476628944



##########
File path: samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
##########
@@ -53,28 +56,26 @@ public LocalityManager(MetadataStore metadataStore) {
   }
 
   /**
-   * Method to allow read container locality information from the {@link MetadataStore}.
-   * This method is used in {@link org.apache.samza.coordinator.JobModelManager}.
+   * Fetch the container locality information from the {@link MetadataStore}.
    *
-   * @return the map of containerId: (hostname)
+   * @return the {@code LocalityModel} for the job
    */
-  public Map<String, Map<String, String>> readContainerLocality() {

Review comment:
       Can't we have helper here something like this: `readLastSeenHostForContainer(String processorId)`
   
   Rather than reading locality hashmap everywhere then doing a get for last seen locality




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477011426



##########
File path: samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality information of an application. The locality information refers to the
+ * whereabouts of the physical execution of a samza container. With this information, samza achieves (best effort) affinity
+ * i.e. place the container on the host in which it was running before. By doing this, stateful applications can minimize
+ * the bootstrap time of their state by leveraging the local copy.
+ *
+ * It is suffice to have only {@link ProcessorLocality} model and use it within locality manager. However, this abstraction
+ * enables us extend locality beyond container. e.g. It is useful to track task locality to enable heterogeneous containers
+ * or fine grained execution model.
+ *
+ * In YARN deployment model, processors are interchangeably used for container and <i>processorId</i>refers to
+ * logical container id.
+ */
+public class LocalityModel {

Review comment:
       Would it make sense to add a version field to the model, so that later when someone changes it, downstream code can handle it based on the version number (e.g., int) ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Sanil15 commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
Sanil15 commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r476625264



##########
File path: samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
##########
@@ -158,23 +159,24 @@ public void validateJmxMetrics() throws Exception {
     CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(config, metricsRegistry);
     coordinatorStreamStore.init();
     try {
-      Config configFromCoordinatorStream = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
-      ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamStore);
-      JobModelManager jobModelManager =
-          JobModelManager.apply(configFromCoordinatorStream, changelogStreamManager.readPartitionMapping(),
-              coordinatorStreamStore, metricsRegistry);
+      LocalityManager localityManager =
+          new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE));
       validator.init(config);
-      Map<String, String> jmxUrls = jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
-      for (Map.Entry<String, String> entry : jmxUrls.entrySet()) {
-        String containerId = entry.getKey();
-        String jmxUrl = entry.getValue();
-        log.info("validate container " + containerId + " metrics with JMX: " + jmxUrl);
-        JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
-        jmxMetrics.connect();
-        validator.validate(jmxMetrics);
-        jmxMetrics.close();
-        log.info("validate container " + containerId + " successfully");
+      LocalityModel localityModel = localityManager.readLocality();
+
+      for (ContainerLocality containerLocality : localityModel.getContainerLocalities().values()) {
+        String containerId = containerLocality.id();
+        String jmxUrl = containerLocality.jmxTunnelingUrl();
+        if (StringUtils.isNotBlank(jmxUrl)) {

Review comment:
       In samza we control JMX enable disable with a config, you should only emit JMX metrics here if that config is enabled




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on pull request #1421:
URL: https://github.com/apache/samza/pull/1421#issuecomment-680376765


   > Did a first pass, let's resolve those then will take a look at tests closely
   
   Thanks for the review. Addressed the feedback as discussed offline around `processorId` & `containerId`. Renamed the `ContainerLocality` to `ProcessorLocality` for clarity and consistency.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Sanil15 commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
Sanil15 commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477611350



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -174,6 +176,7 @@
   private final MetadataStore metadataStore;
 
   private final SystemAdmins systemAdmins;
+  private final LocalityManager localityManager;

Review comment:
       Sure we can take this work as a part of consolidation refactor, please add a todo in code so we do not forget that 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r475013098



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
##########
@@ -494,7 +498,9 @@ private String getSourceHostForContainer(ContainerPlacementRequestMessage reques
           processorId, currentResource.getContainerId(), currentResource.getHost(), requestMessage);
       sourceHost = currentResource.getHost();
     } else {
-      sourceHost = samzaApplicationState.jobModelManager.jobModel().getContainerToHostValue(processorId, SetContainerHostMapping.HOST_KEY);
+      sourceHost = Optional.ofNullable(localityManager.readLocality().getHostLocality(processorId))

Review comment:
       No. `readLocality` will always return an empty `LocalityModel`. As far as `localityManager`, we always inject one. I still added precondition check in CPM & CM.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477015857



##########
File path: samza-api/src/main/java/org/apache/samza/job/model/ProcessorLocality.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+
+/**
+ * A data model to represent the processor locality information. The locality information refers to the whereabouts
+ * of the physical execution of container.
+ * Fields such as <i>jmxUrl</i> and <i>jmxTunnelingUrl</i> exist for backward compatibility reasons as they were
+ * historically stored under the same name space as locality and surfaced within the framework through the locality
+ * manager.
+ */
+public class ProcessorLocality {
+  /* Processor identifier. In YARN deployment model, this corresponds to the logical container id */
+  private String id;
+  /* Host on which the processor is currently placed */
+  private String host;
+  private String jmxUrl;
+  /* JMX tunneling URL for debugging */
+  private String jmxTunnelingUrl;
+
+  public ProcessorLocality(String id, String host) {
+    this(id, host, "", "");
+  }
+
+  public ProcessorLocality(String id, String host, String jmxUrl, String jmxTunnelingUrl) {
+    this.id = id;
+    this.host = host;
+    this.jmxUrl = jmxUrl;
+    this.jmxTunnelingUrl = jmxTunnelingUrl;
+  }
+
+  public String id() {
+    return id;
+  }
+
+  public String host() {
+    return host;

Review comment:
       :-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477015346



##########
File path: samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality information of an application. The locality information refers to the
+ * whereabouts of the physical execution of a samza container. With this information, samza achieves (best effort) affinity
+ * i.e. place the container on the host in which it was running before. By doing this, stateful applications can minimize
+ * the bootstrap time of their state by leveraging the local copy.
+ *
+ * It is suffice to have only {@link ProcessorLocality} model and use it within locality manager. However, this abstraction
+ * enables us extend locality beyond container. e.g. It is useful to track task locality to enable heterogeneous containers
+ * or fine grained execution model.
+ *
+ * In YARN deployment model, processors are interchangeably used for container and <i>processorId</i>refers to
+ * logical container id.
+ */
+public class LocalityModel {

Review comment:
       Are you referring to the data model version or the versioning of the actual data? 
   If it is former, our current only metadata store (Kafka) has a versioning of the format under `CoordinatorStreamMessage`. That should handle evolution in the event of adding fields; If it is latter, we don't have a universal versioning of metadata (job model, configuration, locality) yet. We will likely add versioning to all the metadata once with the work on metadata abstraction.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r476728302



##########
File path: samza-api/src/main/java/org/apache/samza/job/model/ContainerLocality.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+
+/**
+ * A data model to represent the container locality information. The locality information refers to the whereabouts
+ * of the physical execution of container.
+ * Fields such as <i>jmxUrl</i> and <i>jmxTunnelingUrl</i> exist for backward compatibility reasons as they were
+ * historically stored under the same name space as locality and surfaced within the framework through the locality
+ * manager.
+ */
+public class ContainerLocality {
+  /* Container identifier */
+  private String id;

Review comment:
       Logical id. I updated the variable name to clarify it..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r475756770



##########
File path: samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
##########
@@ -39,34 +36,14 @@
  * </p>
  */
 public class JobModel {
-  private static final String EMPTY_STRING = "";
   private final Config config;
   private final Map<String, ContainerModel> containers;
 
-  private final LocalityManager localityManager;
-  private final Map<String, String> localityMappings;
-
   public int maxChangeLogStreamPartitions;
 
   public JobModel(Config config, Map<String, ContainerModel> containers) {
-    this(config, containers, null);
-  }
-
-  public JobModel(Config config, Map<String, ContainerModel> containers, LocalityManager localityManager) {

Review comment:
       Sounds good, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r476728126



##########
File path: samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality information of an application. The locality information refers to the
+ * whereabouts of the physical execution of a samza container. With this information, samza achieves (best effort) affinity
+ * i.e. place the container on the host in which it was running before. By doing this, stateful applications can minimize
+ * the bootstrap time of their state by leveraging the local copy.
+ *
+ * It is suffice to have only {@link ContainerLocality} model and use it within locality manager. However, this abstraction
+ * enables us extend locality beyond container. e.g. It is useful to track task locality to enable heterogeneous containers
+ * or fine grained execution model.
+ */
+public class LocalityModel {
+  private Map<String, ContainerLocality> containerLocalities;
+
+  /**
+   * Construct locality model for the job from the input map of container localities.
+   * @param containerLocalities host locality information for the job keyed by container id
+   */
+  public LocalityModel(Map<String, ContainerLocality> containerLocalities) {
+    this.containerLocalities = containerLocalities;
+  }
+
+  /*
+   * Returns a {@link Map} of {@link ContainerLocality} keyed by container id.
+   */
+  public Map<String, ContainerLocality> getContainerLocalities() {
+    return containerLocalities;
+  }
+
+  /*
+   * Returns the {@link ContainerLocality} for the given container id.
+   */
+  public ContainerLocality getContainerLocality(String id) {

Review comment:
       done 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Sanil15 commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
Sanil15 commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477612586



##########
File path: samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
##########
@@ -53,28 +56,26 @@ public LocalityManager(MetadataStore metadataStore) {
   }
 
   /**
-   * Method to allow read container locality information from the {@link MetadataStore}.
-   * This method is used in {@link org.apache.samza.coordinator.JobModelManager}.
+   * Fetch the container locality information from the {@link MetadataStore}.
    *
-   * @return the map of containerId: (hostname)
+   * @return the {@code LocalityModel} for the job
    */
-  public Map<String, Map<String, String>> readContainerLocality() {

Review comment:
       ```
   Optional.ofNullable(localityManager.readLocality().getProcessorLocality(processorId))
             .map(ProcessorLocality::host)
             .orElse(null);
   ```
   
   is redundant and I was refeering to make a helper method that returns this so if tomorrow there is a third place where we need last seen host we would not repeat the same thing




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477012461



##########
File path: samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality information of an application. The locality information refers to the
+ * whereabouts of the physical execution of a samza container. With this information, samza achieves (best effort) affinity
+ * i.e. place the container on the host in which it was running before. By doing this, stateful applications can minimize
+ * the bootstrap time of their state by leveraging the local copy.
+ *
+ * It is suffice to have only {@link ProcessorLocality} model and use it within locality manager. However, this abstraction
+ * enables us extend locality beyond container. e.g. It is useful to track task locality to enable heterogeneous containers
+ * or fine grained execution model.
+ *
+ * In YARN deployment model, processors are interchangeably used for container and <i>processorId</i>refers to
+ * logical container id.
+ */
+public class LocalityModel {
+  /*
+   * A collection of processor locality keyed by processorId.
+   */
+  private Map<String, ProcessorLocality> processorLocalities;
+
+  /**
+   * Construct locality model for the job from the input map of processor localities.
+   * @param processorLocalities host locality information for the job keyed by processor id
+   */
+  public LocalityModel(Map<String, ProcessorLocality> processorLocalities) {
+    this.processorLocalities = processorLocalities;
+  }
+
+  /*
+   * Returns a {@link Map} of {@link ProcessorLocality} keyed by processors id.
+   */
+  public Map<String, ProcessorLocality> getProcessorLocalities() {
+    return processorLocalities;

Review comment:
       Good catch 👍 The wrapper class is immutable; will make the instance variable hold on an immutable copy in the constructor to avoid creating multiple copies of `HashMap` during invocation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] rmatharu commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
rmatharu commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477010734



##########
File path: samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality information of an application. The locality information refers to the
+ * whereabouts of the physical execution of a samza container. With this information, samza achieves (best effort) affinity
+ * i.e. place the container on the host in which it was running before. By doing this, stateful applications can minimize
+ * the bootstrap time of their state by leveraging the local copy.

Review comment:
       Super Nit: 
   "...This locality information is used to place a container, if possible, on the host that it was last seen on...." 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
prateekm commented on pull request #1421:
URL: https://github.com/apache/samza/pull/1421#issuecomment-678554494


   Looks pretty good at a high level. Left some minor comments, but let's get a second pair of eyes on this for a detailed review. Thanks for this!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r475019596



##########
File path: samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
##########
@@ -466,11 +464,6 @@ class JobModelManager(
       debug("Stopping HTTP server.")
       server.stop
       info("Stopped HTTP server.")
-      if (localityManager != null) {
-        info("Stopping localityManager")
-        localityManager.close()

Review comment:
       Good catch. It doesn't do much inside close as the metadata store used within gets closed inside job coordinator. For completeness, i moved this to job coordinator.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477016647



##########
File path: samza-api/src/main/java/org/apache/samza/job/model/ProcessorLocality.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+
+/**
+ * A data model to represent the processor locality information. The locality information refers to the whereabouts
+ * of the physical execution of container.
+ * Fields such as <i>jmxUrl</i> and <i>jmxTunnelingUrl</i> exist for backward compatibility reasons as they were
+ * historically stored under the same name space as locality and surfaced within the framework through the locality
+ * manager.
+ */
+public class ProcessorLocality {

Review comment:
       This is a reflection of the data model persisted in Kafka. Introducing timestamp would be an evolution to the locality model we currently have and would need changes to the serde; Agree on its usefulness although it is more useful if we have some sort of versioning of the data.
   
   We should track timestamp being one of the fields to be added to our model when we tackle versioning for metadata.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Sanil15 commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
Sanil15 commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477768099



##########
File path: samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
##########
@@ -672,18 +667,23 @@ public void testAlwaysMoveToAnyHostForHostAffinityDisabled() throws Exception {
     Map<String, String> conf = new HashMap<>();
     conf.putAll(getConfigWithHostAffinityAndRetries(false, 1, true));
     SamzaApplicationState state =
-        new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host-1", "1", "host-2")));
+        new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);

Review comment:
       redundant, isn't this already done in setup?

##########
File path: samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
##########
@@ -558,14 +548,19 @@ public Void answer(InvocationOnMock invocation) {
   public void testContainerPlacementsForJobRunningInDegradedState() throws Exception {
     // Set failure after retries to false to enable job running in degraded state
     config = new MapConfig(configVals, getConfigWithHostAffinityAndRetries(true, 1, false));
-    state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host-1", "1", "host-2")));
+    state = new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
-    containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false));
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);

Review comment:
       redundant, isn't this already done in setup?

##########
File path: samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
##########
@@ -167,15 +167,18 @@ object JobModelManager extends Logging {
     */
   def getProcessorLocality(config: Config, localityManager: LocalityManager) = {

Review comment:
       Why do we need this here can't the callee of this not call LocalityManager since we are already making them independent 

##########
File path: samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
##########
@@ -807,16 +807,21 @@ public Void answer(InvocationOnMock invocation) {
   @Test(expected = NullPointerException.class)
   public void testBadControlRequestRejected() throws Exception {
     SamzaApplicationState state =
-        new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host-1", "1", "host-2")));
+        new SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 2, this.server));
     ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);

Review comment:
       redundant, isn't this already done in setup?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r476719396



##########
File path: samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
##########
@@ -158,23 +159,24 @@ public void validateJmxMetrics() throws Exception {
     CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(config, metricsRegistry);
     coordinatorStreamStore.init();
     try {
-      Config configFromCoordinatorStream = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
-      ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamStore);
-      JobModelManager jobModelManager =
-          JobModelManager.apply(configFromCoordinatorStream, changelogStreamManager.readPartitionMapping(),
-              coordinatorStreamStore, metricsRegistry);
+      LocalityManager localityManager =
+          new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE));
       validator.init(config);
-      Map<String, String> jmxUrls = jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
-      for (Map.Entry<String, String> entry : jmxUrls.entrySet()) {
-        String containerId = entry.getKey();
-        String jmxUrl = entry.getValue();
-        log.info("validate container " + containerId + " metrics with JMX: " + jmxUrl);
-        JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
-        jmxMetrics.connect();
-        validator.validate(jmxMetrics);
-        jmxMetrics.close();
-        log.info("validate container " + containerId + " successfully");
+      LocalityModel localityModel = localityManager.readLocality();
+
+      for (ContainerLocality containerLocality : localityModel.getContainerLocalities().values()) {
+        String containerId = containerLocality.id();
+        String jmxUrl = containerLocality.jmxTunnelingUrl();
+        if (StringUtils.isNotBlank(jmxUrl)) {

Review comment:
       this is a CLI tool and I kept the old semantics as is in terms of how it uses the locality information. I'd prefer to keep as is since this PR's scope is not to fix how yarn job validation tool uses JMX.

##########
File path: samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerLocalityMixIn.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.serializers.model;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * A mix-in Jackson class to convert {@link org.apache.samza.job.model.ContainerLocality} to/from JSON
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public abstract class JsonContainerLocalityMixIn {
+  @JsonCreator
+  public JsonContainerLocalityMixIn(@JsonProperty("id") String id, @JsonProperty("host") String host,
+      @JsonProperty("jmx-url") String jmxUrl, @JsonProperty("jmx-tunneling-url") String jmxTunnelingUrl) {
+  }
+
+  @JsonProperty("id")

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r477015346



##########
File path: samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality information of an application. The locality information refers to the
+ * whereabouts of the physical execution of a samza container. With this information, samza achieves (best effort) affinity
+ * i.e. place the container on the host in which it was running before. By doing this, stateful applications can minimize
+ * the bootstrap time of their state by leveraging the local copy.
+ *
+ * It is suffice to have only {@link ProcessorLocality} model and use it within locality manager. However, this abstraction
+ * enables us extend locality beyond container. e.g. It is useful to track task locality to enable heterogeneous containers
+ * or fine grained execution model.
+ *
+ * In YARN deployment model, processors are interchangeably used for container and <i>processorId</i>refers to
+ * logical container id.
+ */
+public class LocalityModel {

Review comment:
       Are you referring to the data model version or the versioning of the actual data? 
   If it is former, our current only metadata store (Kafka) has a versioning of the format under `CoordinatorStreamMessage`. That should handle evolution in the event of adding fields; If it is latter, we don't have a universal versioning of metadata (job model, configuration, locality) yet. We will likely add versioning to all the metadata with the work on metadata abstraction.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r475007343



##########
File path: samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
##########
@@ -39,34 +36,14 @@
  * </p>
  */
 public class JobModel {
-  private static final String EMPTY_STRING = "";
   private final Config config;
   private final Map<String, ContainerModel> containers;
 
-  private final LocalityManager localityManager;
-  private final Map<String, String> localityMappings;
-
   public int maxChangeLogStreamPartitions;
 
   public JobModel(Config config, Map<String, ContainerModel> containers) {
-    this(config, containers, null);
-  }
-
-  public JobModel(Config config, Map<String, ContainerModel> containers, LocalityManager localityManager) {

Review comment:
       If it's not too much extra work, does it make sense to move JobModel to samza-api and expose it in JobContext as part of this PR?

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
##########
@@ -494,7 +498,9 @@ private String getSourceHostForContainer(ContainerPlacementRequestMessage reques
           processorId, currentResource.getContainerId(), currentResource.getHost(), requestMessage);
       sourceHost = currentResource.getHost();
     } else {
-      sourceHost = samzaApplicationState.jobModelManager.jobModel().getContainerToHostValue(processorId, SetContainerHostMapping.HOST_KEY);
+      sourceHost = Optional.ofNullable(localityManager.readLocality().getHostLocality(processorId))

Review comment:
       Can localityManager or readLocality results be null?

##########
File path: samza-api/src/main/java/org/apache/samza/job/model/HostLocality.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import com.google.common.base.Objects;

Review comment:
       Nitpick: Can use java.util.Objects (here and other classes)?

##########
File path: samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import com.google.common.base.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality mapping of an application.

Review comment:
       Since this is in samza-api, let's describe what "locality mapping" etc. mean for users unfamiliar with Samza terms.

##########
File path: samza-api/src/main/java/org/apache/samza/job/model/HostLocality.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import com.google.common.base.Objects;
+
+/**
+ * A class to represent the host locality information.
+ * Fields such as <i>jmxUrl</i> and <i>jmxTunnelingUrl</i> exist for backward compatibility reasons as they were
+ * historically stored under the same name space as locality and surfaced within the framework through the locality
+ * manager.
+ */
+public class HostLocality {

Review comment:
       Should this be ContainerLocation or something? HostLocality sounds wrong because the host doesn't have a location, it is the location.

##########
File path: samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
##########
@@ -39,34 +36,14 @@
  * </p>
  */
 public class JobModel {
-  private static final String EMPTY_STRING = "";
   private final Config config;
   private final Map<String, ContainerModel> containers;
 
-  private final LocalityManager localityManager;
-  private final Map<String, String> localityMappings;
-
   public int maxChangeLogStreamPartitions;
 
   public JobModel(Config config, Map<String, ContainerModel> containers) {
-    this(config, containers, null);
-  }
-
-  public JobModel(Config config, Map<String, ContainerModel> containers, LocalityManager localityManager) {
     this.config = config;
     this.containers = Collections.unmodifiableMap(containers);
-    this.localityManager = localityManager;
-
-    // initialize container localityMappings
-    this.localityMappings = new HashMap<>();

Review comment:
       Nice, thanks for cleaning all of this up!

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/server/LocalityServlet.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.server;
+
+import java.io.IOException;
+import java.util.Optional;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.job.model.HostLocality;
+import org.apache.samza.job.model.LocalityModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+/**
+ * A servlet for locality information of a job. The servlet is hosted alongside of the {@link JobServlet} which hosts
+ * job model & configuration. Historically, locality information was part of job model but we extracted the locality
+ * as job model is static within the lifecycle of an application attempt while locality changes in the event of container
+ * movements.
+ *
+ * This separation enables us to achieve performance benefits by caching job model when it is served by the AM as it
+ * can incur significant penalty in the job start time for jobs with large number of containers.
+ */
+public class LocalityServlet extends HttpServlet {
+  private static final String PROCESSOR_ID_PARAM = "processorId";
+  private final ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+  private final LocalityManager localityManager;
+
+  public LocalityServlet(LocalityManager localityManager) {
+    this.localityManager = localityManager;
+  }
+
+  @Override
+  public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+    response.setContentType("application/json");
+    response.setStatus(HttpServletResponse.SC_OK);
+    LocalityModel localityModel = localityManager.readLocality();
+
+    if (request.getParameterMap().size() == 1) {
+      String processorId = request.getParameter(PROCESSOR_ID_PARAM);
+      HostLocality hostLocality = Optional.ofNullable(localityModel.getHostLocality(processorId))
+          .orElse(new HostLocality(processorId, ""));

Review comment:
       Should this return an HTTP error (404 etc.) instead?

##########
File path: samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
##########
@@ -466,11 +464,6 @@ class JobModelManager(
       debug("Stopping HTTP server.")
       server.stop
       info("Stopped HTTP server.")
-      if (localityManager != null) {
-        info("Stopping localityManager")
-        localityManager.close()

Review comment:
       Where is this called now?

##########
File path: samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import com.google.common.base.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality mapping of an application.
+ * Currently the locality mapping represents the container host locality of an application.
+ *
+ * We want to keep the locality mapping open and not tie it to a container to potentially

Review comment:
       What does this mean? 
   
   Phrasing: Instead of "we want to", maybe just describe what the implementation does and why.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org