You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/08/21 23:34:52 UTC

[GitHub] [samza] prateekm commented on a change in pull request #1421: SAMZA-2439: Remove LocalityManager and container location information from JobModel

prateekm commented on a change in pull request #1421:
URL: https://github.com/apache/samza/pull/1421#discussion_r475007343



##########
File path: samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
##########
@@ -39,34 +36,14 @@
  * </p>
  */
 public class JobModel {
-  private static final String EMPTY_STRING = "";
   private final Config config;
   private final Map<String, ContainerModel> containers;
 
-  private final LocalityManager localityManager;
-  private final Map<String, String> localityMappings;
-
   public int maxChangeLogStreamPartitions;
 
   public JobModel(Config config, Map<String, ContainerModel> containers) {
-    this(config, containers, null);
-  }
-
-  public JobModel(Config config, Map<String, ContainerModel> containers, LocalityManager localityManager) {

Review comment:
       If it's not too much extra work, does it make sense to move JobModel to samza-api and expose it in JobContext as part of this PR?

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
##########
@@ -494,7 +498,9 @@ private String getSourceHostForContainer(ContainerPlacementRequestMessage reques
           processorId, currentResource.getContainerId(), currentResource.getHost(), requestMessage);
       sourceHost = currentResource.getHost();
     } else {
-      sourceHost = samzaApplicationState.jobModelManager.jobModel().getContainerToHostValue(processorId, SetContainerHostMapping.HOST_KEY);
+      sourceHost = Optional.ofNullable(localityManager.readLocality().getHostLocality(processorId))

Review comment:
       Can localityManager or readLocality results be null?

##########
File path: samza-api/src/main/java/org/apache/samza/job/model/HostLocality.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import com.google.common.base.Objects;

Review comment:
       Nitpick: Can use java.util.Objects (here and other classes)?

##########
File path: samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import com.google.common.base.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality mapping of an application.

Review comment:
       Since this is in samza-api, let's describe what "locality mapping" etc. mean for users unfamiliar with Samza terms.

##########
File path: samza-api/src/main/java/org/apache/samza/job/model/HostLocality.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import com.google.common.base.Objects;
+
+/**
+ * A class to represent the host locality information.
+ * Fields such as <i>jmxUrl</i> and <i>jmxTunnelingUrl</i> exist for backward compatibility reasons as they were
+ * historically stored under the same name space as locality and surfaced within the framework through the locality
+ * manager.
+ */
+public class HostLocality {

Review comment:
       Should this be ContainerLocation or something? HostLocality sounds wrong because the host doesn't have a location, it is the location.

##########
File path: samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
##########
@@ -39,34 +36,14 @@
  * </p>
  */
 public class JobModel {
-  private static final String EMPTY_STRING = "";
   private final Config config;
   private final Map<String, ContainerModel> containers;
 
-  private final LocalityManager localityManager;
-  private final Map<String, String> localityMappings;
-
   public int maxChangeLogStreamPartitions;
 
   public JobModel(Config config, Map<String, ContainerModel> containers) {
-    this(config, containers, null);
-  }
-
-  public JobModel(Config config, Map<String, ContainerModel> containers, LocalityManager localityManager) {
     this.config = config;
     this.containers = Collections.unmodifiableMap(containers);
-    this.localityManager = localityManager;
-
-    // initialize container localityMappings
-    this.localityMappings = new HashMap<>();

Review comment:
       Nice, thanks for cleaning all of this up!

##########
File path: samza-core/src/main/java/org/apache/samza/coordinator/server/LocalityServlet.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.server;
+
+import java.io.IOException;
+import java.util.Optional;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.job.model.HostLocality;
+import org.apache.samza.job.model.LocalityModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+/**
+ * A servlet for locality information of a job. The servlet is hosted alongside of the {@link JobServlet} which hosts
+ * job model & configuration. Historically, locality information was part of job model but we extracted the locality
+ * as job model is static within the lifecycle of an application attempt while locality changes in the event of container
+ * movements.
+ *
+ * This separation enables us to achieve performance benefits by caching job model when it is served by the AM as it
+ * can incur significant penalty in the job start time for jobs with large number of containers.
+ */
+public class LocalityServlet extends HttpServlet {
+  private static final String PROCESSOR_ID_PARAM = "processorId";
+  private final ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+  private final LocalityManager localityManager;
+
+  public LocalityServlet(LocalityManager localityManager) {
+    this.localityManager = localityManager;
+  }
+
+  @Override
+  public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+    response.setContentType("application/json");
+    response.setStatus(HttpServletResponse.SC_OK);
+    LocalityModel localityModel = localityManager.readLocality();
+
+    if (request.getParameterMap().size() == 1) {
+      String processorId = request.getParameter(PROCESSOR_ID_PARAM);
+      HostLocality hostLocality = Optional.ofNullable(localityModel.getHostLocality(processorId))
+          .orElse(new HostLocality(processorId, ""));

Review comment:
       Should this return an HTTP error (404 etc.) instead?

##########
File path: samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
##########
@@ -466,11 +464,6 @@ class JobModelManager(
       debug("Stopping HTTP server.")
       server.stop
       info("Stopped HTTP server.")
-      if (localityManager != null) {
-        info("Stopping localityManager")
-        localityManager.close()

Review comment:
       Where is this called now?

##########
File path: samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import com.google.common.base.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality mapping of an application.
+ * Currently the locality mapping represents the container host locality of an application.
+ *
+ * We want to keep the locality mapping open and not tie it to a container to potentially

Review comment:
       What does this mean? 
   
   Phrasing: Instead of "we want to", maybe just describe what the implementation does and why.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org