You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/11/20 20:58:45 UTC

[GitHub] [samza] PawasChhokra opened a new pull request #1446: [WIP] SAMZA-2605: Make Standby Container Requests Rack Aware

PawasChhokra opened a new pull request #1446:
URL: https://github.com/apache/samza/pull/1446


   Feature: The aim of this feature is to make all standby container requests rack aware such that all active containers and their corresponding standby containers are always on different racks. This helps with decreased downtime of applications during rack failures.
   
   Changes: Added a new RackManager class, added rack retrieval code in YarnClusterResourceManager, and made standby container assignment rack aware.
   
   Upgrade instructions: TBD


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r545283845



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.SamzaException;
+
+/**
+ * A fault domain is a set of hardware components that share a single point of failure.
+ * This class identifies the type (ex: rack) and ID (ex: rack ID) of the fault domain in question.
+ * A host can belong to multiple fault domains.
+ * A fault domain may have greater than or equal to 1 hosts.
+ * A cluster can comprise of hosts on multiple fault domains.
+ */
+public class FaultDomain {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r539975584



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {

Review comment:
       This makes sense. Made the changes. Thanks for the suggestions :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r539514875



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).

Review comment:
       I've explicitly called out that this is not thread safe, however it does not need to be. I have also added in the interface documentation that the host to fault domain map is cached and explained under what scenarios it will be updated. Therefore, to answer your question, the manager will not show the current state of the cluster. It will just show the cached state on the basis of the last time it retrieved that information. I have also added information stating that it will store information of only the nodes that are running in the cluster at that point in time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r539951301



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,18 +403,32 @@ boolean checkStandbyConstraints(String containerIdToStart, String host) {
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
-        return false;
+      if (resource != null) {
+        if (!resource.getHost().equals(ResourceRequestState.ANY_HOST) && !host.equals(ResourceRequestState.ANY_HOST)
+                && faultDomainManager.checkHostsOnSameFaultDomain(host, resource.getHost())) {
+          log.info("Container {} cannot be started on host {} because container {} is already scheduled on this rack",
+                  containerIdToStart, host, containerID);
+          return false;
+        } else if (resource.getHost().equals(host)) {
+          log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
+                  containerIdToStart, host, containerID);
+          return false;
+        }
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already running on this host",
-            containerIdToStart, host, containerID);
-        return false;
+      if (resource != null) {
+        if (!resource.getHost().equals(ResourceRequestState.ANY_HOST) && !host.equals(ResourceRequestState.ANY_HOST)

Review comment:
       1. Addressed in the previous comment. The hosts passed here will never be ANY_HOST and will have a hostname.
   2. This should be handled here.
   3. Simplified the check and removed the ANY_HOST checks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r546164722



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Set;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ *  This interface gets fault domain information of all hosts that are running in the cluster,
+ *  from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a container can be placed on (for ex: based on standby constraints).
+ *  The host to fault domain map used here will always be cached and only updated in case the AM dies or an active
+ *  container is assigned to a host which is not in the map.
+ *  This is not thread-safe.
+ */
+@InterfaceStability.Unstable
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the last cached fault domain values in a cluster, for all hosts that are healthy, up and running.

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r542557637



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster for RUNNING nodes.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular node resides on.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  FaultDomain getFaultDomainOfNode(String host);
+
+  /**
+   * This method checks if the two hostnames provided reside on the same fault domain.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same fault domain
+   */
+  boolean checkHostsOnSameFaultDomain(String host1, String host2);

Review comment:
       Discussed more about this offline. We've decided to keep this method since it has closed responsibility, hides the details about figuring out the fault domains for hosts and performing the equals check, and defines a clear contract with the FaultDomainManager. I've updated the documentation to denote this.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r545335678



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
##########
@@ -381,6 +428,9 @@ public final void issueResourceRequest(SamzaResourceRequest request) {
     } else {
       state.preferredHostRequests.incrementAndGet();
     }
+    if (!request.getFaultDomains().isEmpty()) {

Review comment:
       is it possible getFaultDomains returns null? because the expired metric increment checks for it.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
##########
@@ -170,6 +170,26 @@
    */
   public final AtomicInteger failedContainerPlacementActions = new AtomicInteger(0);
 
+  /**
+   * Number of fault domain aware container requests made for a container.

Review comment:
       nit: wondering if this reads as though "# of requests per container" -- whereas we want to communicate "# of requests by job" right? not too strong about this. okay to drop. if changing, pl do for rest of the things below

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -409,16 +470,18 @@ public void checkStandbyConstraintsAndRunStreamProcessor(SamzaResourceRequest re
       log.info("Running container {} on {} meets standby constraints, preferredHost = {}", containerID,
           samzaResource.getHost(), preferredHost);
       containerAllocator.runStreamProcessor(request, preferredHost);
+      samzaApplicationState.faultDomainAwareContainersStarted.incrementAndGet();

Review comment:
       this will increment and emit metrics even when config=off right? 
   are we okay with that?

##########
File path: samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
##########
@@ -250,6 +258,14 @@ public String getContainerManagerClass() {
     return get(CLUSTER_MANAGER_FACTORY, CLUSTER_MANAGER_FACTORY_DEFAULT);
   }
 
+  public String getFaultDomainManagerClass() {
+    return get(FAULT_DOMAIN_MANAGER_FACTORY, FAULT_DOMAIN_MANAGER_FACTORY_DEFAULT);
+  }
+
+  public boolean getFaultDomainAwareStandbyEnabled() {
+    return getBoolean(FAULT_DOMAIN_AWARE_STANDBY_ENABLED, false);

Review comment:
       nit: might be nice to just keep default value="false" in FAULT_DOMAIN_AWARE_STANDBY_ENABLED_DEFUALT=false. 

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,24 +421,39 @@ boolean checkStandbyConstraints(String containerIdToStart, String host) {
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID)) {
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already running on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID)) {
         return false;
       }
     }
 
     return true;
   }
 
+  boolean checkStandbyConstraintsHelper(String containerIdToStart, String hostToStartContainerOn, SamzaResource existingResource, String existingContainerID) {
+    if (existingResource != null) {
+      ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
+      if (clusterManagerConfig.getFaultDomainAwareStandbyEnabled() && faultDomainManager.hasSameFaultDomains(hostToStartContainerOn, existingResource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this rack",
+                containerIdToStart, hostToStartContainerOn, existingContainerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (existingResource.getHost().equals(hostToStartContainerOn)) {
+        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
+                containerIdToStart, hostToStartContainerOn, existingContainerID);

Review comment:
       im a little confused how the logging is impacted.
   Earlier it was 
   `if (resource != null && resource.getHost().equals(host)) {
           log.info("..running on this host",`
   
   Now it is         
   `if (existingResource != null) {
         // if rack aware enabled
         } else if (existingResource.getHost().equals(hostToStartContainerOn)) {
           log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host"`
           
     Did you mean to say the exact log message is changed?

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
##########
@@ -170,6 +170,26 @@
    */
   public final AtomicInteger failedContainerPlacementActions = new AtomicInteger(0);
 
+  /**
+   * Number of fault domain aware container requests made for a container.
+   */
+  public final AtomicInteger faultDomainAwareContainerRequests = new AtomicInteger(0);
+
+  /**
+   * Number of fault domain aware container requests made for a container.

Review comment:
       nit: java doc is a copy of above :P 

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -181,13 +207,14 @@ private void handleStandbyContainerStop(String standbyContainerID, String resour
 
       // request standbycontainer's host for active-container
       SamzaResourceRequest resourceRequestForActive =
-          containerAllocator.getResourceRequestWithDelay(activeContainerID, standbyContainerHostname, preferredHostRetryDelay);
+        containerAllocator.getResourceRequestWithDelay(activeContainerID, standbyContainerHostname, preferredHostRetryDelay);
       // record the resource request, before issuing it to avoid race with allocation-thread
       failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
       containerAllocator.issueResourceRequest(resourceRequestForActive);
 
       // request any-host for standby container
-      containerAllocator.requestResource(standbyContainerID, ResourceRequestState.ANY_HOST);
+      containerAllocator.requestResource(standbyContainerID, ResourceRequestState.ANY_HOST,

Review comment:
       i feel this should also be guarded the config -- we would want the flow to be exactly same as earlier when config is off right.
   Same for other requests made.
   since yarnclusterResourceManager honors the fault domains only when config = on, im okay with dropping it since though the code is changing when config=off, the flow does not change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r545287853



##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.clustermanager.FaultDomain;
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainType;
+import org.apache.samza.clustermanager.SamzaApplicationState;
+
+/**
+ * This class functionality works with the assumption that the job.standbytasks.replication.factor is 2.
+ * For values greater than 2, it is possible that the standby containers could be on the same rack as the active, or the already existing standby racks.
+ */
+public class YarnFaultDomainManager implements FaultDomainManager {
+
+  private Multimap<String, FaultDomain> hostToRackMap;
+  private final SamzaApplicationState state;
+  private final YarnClientImpl yarnClient;
+
+  public YarnFaultDomainManager(SamzaApplicationState state) {
+    this.state = state;

Review comment:
       Removed `SamzaApplicationState` as mentioned above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r543305421



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.SamzaException;
+
+/**
+ * A fault domain is a set of hardware components that share a single point of failure.
+ * This class identifies the type (ex: rack) and ID (ex: rack ID) of the fault domain in question.
+ * A host can belong to multiple fault domains.
+ * A fault domain may have greater than or equal to 1 hosts.
+ * A cluster can comprise of hosts on multiple fault domains.
+ */
+public class FaultDomain {
+
+  private final FaultDomainType type;
+  private final String id;
+
+  public FaultDomain(FaultDomainType type, String id) {
+    if (type == null || id == null) {
+      throw new SamzaException("Fault domain type and ID cannot be null.");

Review comment:
       can we be consistent and use `Preconditions or Objects.nonNull`? 

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import org.apache.samza.SamzaException;
+
+/**
+ * A fault domain is a set of hardware components that share a single point of failure.
+ * This class identifies the type (ex: rack) and ID (ex: rack ID) of the fault domain in question.
+ * A host can belong to multiple fault domains.
+ * A fault domain may have greater than or equal to 1 hosts.
+ * A cluster can comprise of hosts on multiple fault domains.
+ */
+public class FaultDomain {

Review comment:
       Doesn't it require `equals & hashcode` to be overridden as well for fault domain check?

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainType.java
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+public enum FaultDomainType {

Review comment:
       nit: java docs

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Set;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ *  This interface gets fault domain information of all hosts that are running in the cluster,
+ *  from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a container can be placed on (for ex: based on standby constraints).
+ *  The host to fault domain map used here will always be cached and only updated in case the AM dies or an active
+ *  container is assigned to a host which is not in the map.
+ *  This is not thread-safe.
+ */
+@InterfaceStability.Unstable
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the last cached fault domain values in a cluster, for all hosts that are healthy, up and running.

Review comment:
       cached, seems an implementation detail and no API detail.
   Do we require strong freshness? If so, the API should mandate that.
   
   If not, you should callout that the freshness is an implementation detail and API doesn't mandate anything.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
##########
@@ -170,6 +170,31 @@
    */
   public final AtomicInteger failedContainerPlacementActions = new AtomicInteger(0);
 
+  /**
+   * Number of fault domain aware container requests made for a container.
+   */
+  public final AtomicInteger hostToFaultDomainCacheUpdates = new AtomicInteger(0);

Review comment:
       why is this metric part of `SamzaApplicationState`? Refer to the above comment on `FaultDomainManager`. Caching or not is an implementation detail and seems very specific metric to track by the implementation of `FaultDomainManager`. Hence, its resting place is not within `SamzaApplicationState`.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
##########
@@ -381,6 +428,9 @@ public final void issueResourceRequest(SamzaResourceRequest request) {
     } else {
       state.preferredHostRequests.incrementAndGet();
     }
+    if (request.getFaultDomains() != null && !request.getFaultDomains().isEmpty()) {

Review comment:
       The `null` check seems redundant since we already ensure the `faultDomains` are empty or present.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -156,6 +161,20 @@ public void handleContainerStopFail(String containerID, String resourceID,
     }
   }
 
+  /**
+   * This method gets the set of racks that the given active container's corresponding standby can be placed on.
+   * The set of racks returned is based on the set difference between the active container's racks,
+   * and all the available racks in the cluster based on the host to fault domain cache.
+   * @param host The hostname of the active container
+   * @return the set of racks on which this active container's standby can be scheduled
+   */
+  public Set<FaultDomain> getAllowedFaultDomainsForSchedulingStandbyContainer(String host) {
+    Set<FaultDomain> activeContainerRack = faultDomainManager.getFaultDomainOfHost(host);

Review comment:
       Potentially passing in a nullable host into `getFaultDomainOfHost`. Can we add a check to this method  or ensure only `non-nullable` can be passed into the method with annotation?

##########
File path: samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
##########
@@ -38,6 +38,9 @@
   private static final String CLUSTER_MANAGER_FACTORY = "samza.cluster-manager.factory";
   private static final String CLUSTER_MANAGER_FACTORY_DEFAULT = "org.apache.samza.job.yarn.YarnResourceManagerFactory";
 
+  private static final String FAULT_DOMAIN_MANAGER_FACTORY = "samza.fault-domain-manager.factory";
+  private static final String FAULT_DOMAIN_MANAGER_FACTORY_DEFAULT = "org.apache.samza.job.yarn.RackManagerFactory";

Review comment:
       should it be `YarnFaultDomainManagerFactory`?

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -361,8 +382,29 @@ private FailoverMetadata registerActiveContainerFailure(String activeContainerID
   }
 
   /**
-   * Check if matching this SamzaResourceRequest to the given resource, meets all standby-container container constraints.
+   * This method returns the active container host given a standby or active container ID.
    *
+   * @param containerID Standby or active container container ID
+   * @return The active container host
+   */
+  String getActiveContainerHost(String containerID) {
+    String activeContainerId = containerID;
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      activeContainerId = StandbyTaskUtil.getActiveContainerId(containerID);
+    }
+    SamzaResource resource = samzaApplicationState.pendingProcessors.get(activeContainerId);
+    if (resource == null) {
+      resource = samzaApplicationState.runningProcessors.get(activeContainerId);
+    }
+    if (resource != null) {
+      return resource.getHost();
+    }
+    return null;

Review comment:
       Can we return optional instead of null? It would force callers to handle the non-existent case of a host for a given container id.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Set;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ *  This interface gets fault domain information of all hosts that are running in the cluster,
+ *  from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a container can be placed on (for ex: based on standby constraints).
+ *  The host to fault domain map used here will always be cached and only updated in case the AM dies or an active
+ *  container is assigned to a host which is not in the map.
+ *  This is not thread-safe.
+ */
+@InterfaceStability.Unstable
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the last cached fault domain values in a cluster, for all hosts that are healthy, up and running.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular host resides on based on the internal cache.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  Set<FaultDomain> getFaultDomainOfHost(String host);

Review comment:
       nit: s/getFaultDomainOfHost/getFaultDomainsForHost

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -63,7 +69,11 @@
   private final Instant requestTimestamp;
 
   public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String processorId) {
-    this(numCores, memoryMB, preferredHost, processorId, Instant.now());
+    this(numCores, memoryMB, preferredHost, processorId, Instant.now(), null);

Review comment:
       Pass `ImmutableSet.of` or any variant of empty set instead of `null` and get rid of the `null` check within the other constructor.

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.clustermanager.FaultDomain;
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainType;
+import org.apache.samza.clustermanager.SamzaApplicationState;
+
+/**
+ * This class functionality works with the assumption that the job.standbytasks.replication.factor is 2.
+ * For values greater than 2, it is possible that the standby containers could be on the same rack as the active, or the already existing standby racks.
+ */
+public class YarnFaultDomainManager implements FaultDomainManager {
+
+  private Multimap<String, FaultDomain> hostToRackMap;
+  private final SamzaApplicationState state;
+  private final YarnClientImpl yarnClient;
+
+  public YarnFaultDomainManager(SamzaApplicationState state) {
+    this.state = state;

Review comment:
       refer to the comments on the metrics. Addressing that should get rid of this dependency for `YarnFaultDomainManager`. Do we see other needs for this dependency?
   I'd assume this takes a `MetricsRegistry` instead
   
   context: `SamzaApplicationState`

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -73,7 +83,25 @@ public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, St
     this.requestId = UUID.randomUUID().toString();
     this.processorId = processorId;
     this.requestTimestamp = requestTimestamp;
-    log.info("SamzaResourceRequest created for Processor ID: {} on host: {} at time: {} with Request ID: {}", this.processorId, this.preferredHost, this.requestTimestamp, this.requestId);
+    this.faultDomains = new HashSet<>();
+    log.info("SamzaResourceRequest created for Processor ID: {} on host: {} at time: {} with Request ID: {}, and the following list of fault domains: {}",
+            this.processorId, this.preferredHost, this.requestTimestamp, this.requestId, this.faultDomains);
+  }
+
+  public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String processorId, Instant requestTimestamp, Set<FaultDomain> faultDomains) {
+    this.numCores = numCores;

Review comment:
       can we add `null` pre conditions check for `faultDomains`?

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -109,15 +141,16 @@ public String toString() {
             ", requestId='" + requestId + '\'' +
             ", processorId=" + processorId +
             ", requestTimestampMs=" + requestTimestamp +
+            ", faultDomains=" + faultDomains.toString() +
             '}';
   }
 
-  /**
-   * Requests are ordered by the processor type and the time at which they were created.
-   * Requests with timestamps in the future for retries take less precedence than timestamps in the past or current.
-   * Otherwise, active processors take precedence over standby processors, regardless of timestamp.
-   * @param o the other
-   */
+    /**
+     * Requests are ordered by the processor type and the time at which they were created.
+     * Requests with timestamps in the future for retries take less precedence than timestamps in the past or current.
+     * Otherwise, active processors take precedence over standby processors, regardless of timestamp.
+     * @param o the other
+     */

Review comment:
       nit: fix indentation

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -156,6 +161,20 @@ public void handleContainerStopFail(String containerID, String resourceID,
     }
   }
 
+  /**
+   * This method gets the set of racks that the given active container's corresponding standby can be placed on.
+   * The set of racks returned is based on the set difference between the active container's racks,
+   * and all the available racks in the cluster based on the host to fault domain cache.
+   * @param host The hostname of the active container
+   * @return the set of racks on which this active container's standby can be scheduled
+   */
+  public Set<FaultDomain> getAllowedFaultDomainsForSchedulingStandbyContainer(String host) {
+    Set<FaultDomain> activeContainerRack = faultDomainManager.getFaultDomainOfHost(host);
+    Set<FaultDomain> standbyRacks = faultDomainManager.getAllFaultDomains();
+    standbyRacks.removeAll(activeContainerRack);
+    return standbyRacks;

Review comment:
       can we rename the variables to generic fault domain instead of racks as that is the case?

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.clustermanager.FaultDomain;
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainType;
+import org.apache.samza.clustermanager.SamzaApplicationState;
+
+/**
+ * This class functionality works with the assumption that the job.standbytasks.replication.factor is 2.
+ * For values greater than 2, it is possible that the standby containers could be on the same rack as the active, or the already existing standby racks.
+ */
+public class YarnFaultDomainManager implements FaultDomainManager {
+
+  private Multimap<String, FaultDomain> hostToRackMap;
+  private final SamzaApplicationState state;
+  private final YarnClientImpl yarnClient;
+
+  public YarnFaultDomainManager(SamzaApplicationState state) {
+    this.state = state;
+    this.yarnClient = new YarnClientImpl();
+    this.hostToRackMap = computeHostToFaultDomainMap();
+  }
+
+  @VisibleForTesting
+  YarnFaultDomainManager(SamzaApplicationState state, YarnClientImpl yarnClient, Multimap<String, FaultDomain> hostToRackMap) {
+    this.state = state;
+    this.yarnClient = yarnClient;
+    this.hostToRackMap = hostToRackMap;
+  }
+
+  /**
+   * This method returns all the last cached rack values in a cluster, for all hosts that are healthy, up and running.
+   * @return a set of {@link FaultDomain}s
+   */
+  @Override
+  public Set<FaultDomain> getAllFaultDomains() {
+    return new HashSet<>(hostToRackMap.values());
+  }
+
+  /**
+   * This method returns the rack a particular host resides on based on the internal cache.
+   * In case the rack of a host does not exist in this cache, we update the cache by computing the host to rack map again using Yarn.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  @Override
+  public Set<FaultDomain> getFaultDomainOfHost(String host) {
+    if (!hostToRackMap.containsKey(host)) {
+      hostToRackMap = computeHostToFaultDomainMap();
+      state.hostToFaultDomainCacheUpdates.incrementAndGet();
+    }
+    return new HashSet<>(hostToRackMap.get(host));
+  }
+
+  /**
+   * This method checks if the two hostnames provided reside on the same rack.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same rack
+   */
+  @Override
+  public boolean hasSameFaultDomains(String host1, String host2) {
+    if (!hostToRackMap.keySet().contains(host1) || !hostToRackMap.keySet().contains(host2)) {
+      hostToRackMap = computeHostToFaultDomainMap();
+      state.hostToFaultDomainCacheUpdates.incrementAndGet();
+    }
+    return hostToRackMap.get(host1).toString().equals(hostToRackMap.get(host2).toString());

Review comment:
       should go away with the `equals` & `hashcode` implemented for `FaultDomain`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r539888750



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -109,15 +135,24 @@ public String toString() {
             ", requestId='" + requestId + '\'' +
             ", processorId=" + processorId +
             ", requestTimestampMs=" + requestTimestamp +
+            ", faultDomains=" + convertFaultDomainSetToString() +
             '}';
   }
 
-  /**
-   * Requests are ordered by the processor type and the time at which they were created.
-   * Requests with timestamps in the future for retries take less precedence than timestamps in the past or current.
-   * Otherwise, active processors take precedence over standby processors, regardless of timestamp.
-   * @param o the other
-   */
+  private String convertFaultDomainSetToString() {
+    StringBuilder faultDomainSb = new StringBuilder();

Review comment:
       Yes it should. I had added the FaultDomain toString method later on hence forgot to remove this. Thanks for observing :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r547140581



##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/RackManager.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.samza.clustermanager.FaultDomain;
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainType;
+
+public class RackManager implements FaultDomainManager {
+
+  private final Map<String, FaultDomain> nodeToRackMap;
+
+  public RackManager() {
+        this.nodeToRackMap = computeNodeToFaultDomainMap();
+    }
+
+  /**
+   * This method returns all the rack values in a cluster for RUNNING nodes.
+   * @return a set of {@link FaultDomain}s
+   */
+  @Override
+  public Set<FaultDomain> getAllFaultDomains() {
+    return new HashSet<>(nodeToRackMap.values());
+  }
+
+  /**
+   * This method returns the rack a particular node resides on.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  @Override
+  public FaultDomain getFaultDomainOfNode(String host) {
+    return nodeToRackMap.get(host);
+  }
+
+  /**
+   * This method checks if the two hostnames provided reside on the same rack.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same rack
+   */
+  @Override
+  public boolean checkHostsOnSameFaultDomain(String host1, String host2) {
+    return nodeToRackMap.get(host1).equals(nodeToRackMap.get(host2));
+  }
+
+  /**
+   * This method gets the set of racks that the given active container's corresponding standby can be placed on.
+   * @param host The hostname of the active container
+   * @return the set of racks on which this active container's standby can be scheduled
+   */
+  @Override
+  public Set<FaultDomain> getAllowedFaultDomainsForSchedulingContainer(String host) {
+    FaultDomain activeContainerRack = nodeToRackMap.get(host);
+    Set<FaultDomain> standbyRacks = new HashSet<>(nodeToRackMap.values());
+    standbyRacks.remove(activeContainerRack);
+    return standbyRacks;
+  }
+
+  /**
+   * This method returns the cached map of nodes to racks.
+   * @return stored map of node to the rack it resides on
+   */
+  @Override
+  public Map<String, FaultDomain> getNodeToFaultDomainMap() {
+    return nodeToRackMap;
+  }
+
+  /**
+   * This method gets the node to rack (fault domain for Yarn) mapping from Yarn for all running nodes.
+   * @return A map of hostname to rack name.
+   */
+  @Override
+  public Map<String, FaultDomain> computeNodeToFaultDomainMap() {
+    YarnClientImpl yarnClient = new YarnClientImpl();
+    Map<String, FaultDomain> nodeToRackMap = new HashMap<>();
+    try {
+      List<NodeReport> nodeReport = yarnClient.getNodeReports(NodeState.RUNNING);
+      nodeReport.forEach(report -> {
+        FaultDomain rack = new FaultDomain(FaultDomainType.RACK, report.getRackName());
+        nodeToRackMap.put(report.getNodeId().getHost(), rack);
+      });
+    } catch (YarnException e) {
+      e.printStackTrace();

Review comment:
       Thanks for pointing this out. I've instead thrown a SamzaException if we are unable to get node reports from Yarn, since we need that for rack awareness to work. This will take care of the other concerns that you raised.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r546188896



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,24 +421,39 @@ boolean checkStandbyConstraints(String containerIdToStart, String host) {
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID)) {
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already running on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID)) {
         return false;
       }
     }
 
     return true;
   }
 
+  boolean checkStandbyConstraintsHelper(String containerIdToStart, String hostToStartContainerOn, SamzaResource existingResource, String existingContainerID) {
+    if (existingResource != null) {
+      ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
+      if (clusterManagerConfig.getFaultDomainAwareStandbyEnabled() && faultDomainManager.hasSameFaultDomains(hostToStartContainerOn, existingResource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this rack",
+                containerIdToStart, hostToStartContainerOn, existingContainerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (existingResource.getHost().equals(hostToStartContainerOn)) {
+        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
+                containerIdToStart, hostToStartContainerOn, existingContainerID);

Review comment:
       Instead of making this inline, I added a parameter for the log to avoid code duplication and also made `ClusterManagerConfig` as a parameter. Let me know if that makes sense to you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r539948685



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,18 +403,32 @@ boolean checkStandbyConstraints(String containerIdToStart, String host) {
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
-        return false;
+      if (resource != null) {
+        if (!resource.getHost().equals(ResourceRequestState.ANY_HOST) && !host.equals(ResourceRequestState.ANY_HOST)

Review comment:
       1. Sorry for this confusion, but the hosts passed here will never be ANY_HOST and will have a hostname. Hence, I have removed the existing checks that I had for ANY_HOST.
   2. The pending processor list will have the actual hostname instead of ANY_HOST. Hence, the check will always suffice so as to not bring up a standby on the same fault domain as the actives are spun up first. This check is to ensure that a standby container should not come up on the same host or rack as an active container whose launch is pending.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r545289191



##########
File path: samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
##########
@@ -49,6 +49,11 @@
    */
   public static final String CONTAINER_LABEL = "yarn.container.label";
 
+  /**
+   * Determines whether standby allocation is rack aware or not.
+   */
+  public static final String RACK_AWARE_STANDBY_ENABLED = "yarn.rack-aware.standby.enabled";
+

Review comment:
       Thanks for the suggestion. I agree with you. `2` is especially important and I was trying to look for a way around that. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r546189004



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -181,13 +207,14 @@ private void handleStandbyContainerStop(String standbyContainerID, String resour
 
       // request standbycontainer's host for active-container
       SamzaResourceRequest resourceRequestForActive =
-          containerAllocator.getResourceRequestWithDelay(activeContainerID, standbyContainerHostname, preferredHostRetryDelay);
+        containerAllocator.getResourceRequestWithDelay(activeContainerID, standbyContainerHostname, preferredHostRetryDelay);
       // record the resource request, before issuing it to avoid race with allocation-thread
       failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
       containerAllocator.issueResourceRequest(resourceRequestForActive);
 
       // request any-host for standby container
-      containerAllocator.requestResource(standbyContainerID, ResourceRequestState.ANY_HOST);
+      containerAllocator.requestResource(standbyContainerID, ResourceRequestState.ANY_HOST,

Review comment:
       I thought about this a lot and I think it makes sense to not alter the current flow at all in case the config is turned on. Hence I have now added that check for every `requestResource`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r543405715



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,24 +417,38 @@ boolean checkStandbyConstraints(String containerIdToStart, String host) {
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID)) {
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already running on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID)) {
         return false;
       }
     }
 
     return true;
   }
 
+  boolean checkStandbyConstraintsHelper(String containerIdToStart, String hostToStartContainerOn, SamzaResource existingResource, String existingContainerID) {

Review comment:
       Should we guard this with the configuration as well? 
   

##########
File path: samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
##########
@@ -49,6 +49,11 @@
    */
   public static final String CONTAINER_LABEL = "yarn.container.label";
 
+  /**
+   * Determines whether standby allocation is rack aware or not.
+   */
+  public static final String RACK_AWARE_STANDBY_ENABLED = "yarn.rack-aware.standby.enabled";
+

Review comment:
       What are your thoughts on repurposing this configuration under `ClusterManagerConfig` something like `cluster-manager.fault-domain-aware.standby.enabled`? 
   
   Here are some of the reasons why i think it will be useful
   
   1. We will need a kill switch in samza core to ensure we can turn off the usage of `FaultDomainManager`.
   2. Current implementation forces `FaultDomainManager` is present regardless of its usage in the code path and does work even if this flag is turned off resulting in half-baked experience and unnecessary work.
   3. While specific implementation can have cut off switches, implementations can make consistent assumptions that the information necessary for rack-aware a.k.a fault domain aware requests will be provided by the samza core as part of SamzaResourceRequest if its enabled in the core.
   4. Provides room for optimizations where the request constraints aren't used for the scenarios where its disabled at the core but still turned on at the cluster level.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r539867459



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster for RUNNING nodes.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular node resides on.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  FaultDomain getFaultDomainOfNode(String host);
+
+  /**
+   * This method checks if the two hostnames provided reside on the same fault domain.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same fault domain
+   */
+  boolean checkHostsOnSameFaultDomain(String host1, String host2);
+
+  /**
+   * This method gets the set of fault domains that the given active container's corresponding standby can be placed on.
+   * @param host The hostname of the active container
+   * @return the set of fault domains on which this active container's standby can be scheduled
+   */
+  Set<FaultDomain> getAllowedFaultDomainsForSchedulingContainer(String host);
+
+  /**
+   * This method returns the cached map of nodes to fault domains.
+   * @return stored map of node to the fault domain it resides on

Review comment:
       If this value is null or empty, we will retrieve the actual map by calling the compute function written below this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r547120067



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -233,9 +275,13 @@ private void initiateStandbyAwareAllocation(String activeContainerID, String res
             standbyHost, activeContainerID, standbyHost, resourceID);
         FailoverMetadata failoverMetadata = this.registerActiveContainerFailure(activeContainerID, resourceID);
 
+        Set<FaultDomain> allowedFaultDomains = new HashSet<>();
+        if (isFaultDomainAwareStandbyEnabled()) {
+          allowedFaultDomains = getAllowedFaultDomainsForStandbyContainerGivenContainerId(activeContainerID);
+        }
         // record the resource request, before issuing it to avoid race with allocation-thread
         SamzaResourceRequest resourceRequestForActive =
-            containerAllocator.getResourceRequest(activeContainerID, standbyHost);
+                containerAllocator.getResourceRequest(activeContainerID, standbyHost, allowedFaultDomains);

Review comment:
       I've simplified this now as explained in the next comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r547138145



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -168,11 +173,13 @@ public void handleContainerStopFail(String containerID, String resourceID,
    * @param host The hostname of the active container
    * @return the set of racks on which this active container's standby can be scheduled
    */
-  public Set<FaultDomain> getAllowedFaultDomainsForSchedulingStandbyContainer(String host) {
-    Set<FaultDomain> activeContainerRack = faultDomainManager.getFaultDomainOfHost(host);
-    Set<FaultDomain> standbyRacks = faultDomainManager.getAllFaultDomains();
-    standbyRacks.removeAll(activeContainerRack);
-    return standbyRacks;
+  public Set<FaultDomain> getAllowedFaultDomainsForSchedulingStandbyContainer(Optional<String> host) {
+    Set<FaultDomain> standbyFaultDomain = faultDomainManager.getAllFaultDomains();
+    if (host.isPresent()) {
+      Set<FaultDomain> activeContainerFaultDomain = faultDomainManager.getFaultDomainsForHost(host.get());
+      standbyFaultDomain.removeAll(activeContainerFaultDomain);
+    }
+    return standbyFaultDomain;

Review comment:
       I've extracted out the simplified part of the method in a separate method (`getAllowedFaultDomainsForStandbyContainerGivenActiveContainerHost`), and am doing extra overloaded functionalities in the parent method (`getAllowedFaultDomainsForStandbyContainerGivenHostToExclude`). This way, the caller can decide which method to invoke. Also, I've removed Optional from the argument. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r539872461



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -63,7 +69,11 @@
   private final Instant requestTimestamp;
 
   public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String processorId) {
-    this(numCores, memoryMB, preferredHost, processorId, Instant.now());
+    this(numCores, memoryMB, preferredHost, processorId, Instant.now(), null);

Review comment:
       When we make the actual container request to Yarn, the null value will ensure that the rack requirement for that request is ignored, similar to how it currently is without rack awareness. The code on Yarn side checks specifically for it to be null.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r547119767



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -361,8 +407,41 @@ private FailoverMetadata registerActiveContainerFailure(String activeContainerID
   }
 
   /**
-   * Check if matching this SamzaResourceRequest to the given resource, meets all standby-container container constraints.
+   * This method checks from the config if standby allocation is fault domain aware or not, and requests resources accordingly.
+   *
+   * @param containerAllocator ContainerAllocator object that requests for resources from the resource manager
+   * @param containerID Samza container ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   */
+  void checkFaultDomainAwarenessEnabledAndRequestResource(ContainerAllocator containerAllocator, String containerID, String preferredHost) {

Review comment:
       Makes sense. However, I've renamed the method to `getResourceRequest` and extracted the `issueResourceRequest` part out of it since the place mentioned in the above comment inside `initiateStandbyAwareAllocation` also records the resource request separately before issuing to avoid race conditions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r544913652



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -63,7 +69,11 @@
   private final Instant requestTimestamp;
 
   public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String processorId) {
-    this(numCores, memoryMB, preferredHost, processorId, Instant.now());
+    this(numCores, memoryMB, preferredHost, processorId, Instant.now(), null);

Review comment:
       @lakshmi-manasa-g it should still work as is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r547411491



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -512,7 +529,13 @@ public void checkStandbyConstraintsAndRunStreamProcessor(SamzaResourceRequest re
           "Running standby container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource, and making a new ANY_HOST request",
           containerID, samzaResource.getHost());
       releaseUnstartableContainer(request, samzaResource, preferredHost, resourceRequestState);
-      checkFaultDomainAwarenessEnabledAndRequestResource(containerAllocator, containerID, ResourceRequestState.ANY_HOST);
+      Optional<String> activeContainerHostOpt = getActiveContainerHost(containerID);
+      String activeContainerHost = null;
+      if (activeContainerHostOpt.isPresent()) {
+        activeContainerHost = activeContainerHostOpt.get();
+      }

Review comment:
       can be simplified to 
   ```
   activeContainerHost = getActiveContainerHost(containerID)
      .orElse(null);

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -56,8 +58,15 @@
   // Resource-manager, used to stop containers
   private ClusterResourceManager clusterResourceManager;
 
-  public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
-      ClusterResourceManager clusterResourceManager, LocalityManager localityManager) {
+  // FaultDomainManager, used to get fault domain information of different hosts from the cluster manager.
+  private final FaultDomainManager faultDomainManager;
+
+  private final Config config;

Review comment:
       remove `config` since you have already persisted the switch in the boolean.

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -241,6 +241,11 @@ public void requestResources(SamzaResourceRequest resourceRequest) {
     String processorId = resourceRequest.getProcessorId();
     String requestId = resourceRequest.getRequestId();
     String preferredHost = resourceRequest.getPreferredHost();
+    String[] racks = null;
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
+    if (clusterManagerConfig.getFaultDomainAwareStandbyEnabled()) {
+      racks = resourceRequest.getFaultDomains().stream().map(FaultDomain::getId).toArray(String[]::new);
+    }

Review comment:
       What do you think about not having control flow here and pass empty array instead of null if it doesn't change things semantically in YARN?
   
   By doing so, you will just eliminate unnecessary control flow and given faultDomains is guaranteed to be empty or present, null handling is not necessary as well.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -436,7 +448,11 @@ void checkFaultDomainAwarenessEnabledAndRequestResource(ContainerAllocator conta
     if (resource == null) {
       resource = samzaApplicationState.runningProcessors.get(activeContainerId);
     }
-    return Optional.ofNullable(resource.getHost());
+    if (resource != null) {
+      return Optional.ofNullable(resource.getHost());
+    } else {
+      return Optional.empty();
+    }

Review comment:
       can be simplified to 
   ```
   return Optional.of(resource)
       .map(SamzaResourceRequest::getHost)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r538881117



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {

Review comment:
       The interface exposes too many functionalities that are either not necessary upstream or conflating.
    
   1. `computeNodeToFaultDomainMap`
   2. `getAllowedFaultDomainsForSchedulingContainer`
   3. `getNodeToFaultDomainMap`
   
   The notion of allowed fault domain for the container is not determined by the cluster manager's fault domain manager rather `StandbyContainerManager`. Hence [2] shouldn't be exposed as a functionality.
   [3] seems redundant given you have `getFaultDomainOfNode`
   [1] is completely internal to the manager and shouldn't be exposed.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -114,6 +114,11 @@
    */
   private final ClusterResourceManager clusterResourceManager;
 
+  /**
+   * An interface to get information about nodes and the fault domains they reside on.
+   */
+  private final FaultDomainManager faultDomainManager;

Review comment:
       can be removed since its only used to wire the `ContainerManager`.
   

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster for RUNNING nodes.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular node resides on.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  FaultDomain getFaultDomainOfNode(String host);
+
+  /**
+   * This method checks if the two hostnames provided reside on the same fault domain.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same fault domain
+   */
+  boolean checkHostsOnSameFaultDomain(String host1, String host2);

Review comment:
       Given nodes can belong to multiple fault domain, what does this functionality offer? Overlapping fault domain = `true` or absolute 1:1 mapping as `true`.
   
   Why do you need this functionality in the first place? I thought we were using the set difference approach to get the fault domain of active and then find the set difference between available fault domain and active fault domain and use that for requesting resources.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -183,6 +191,7 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri
       SamzaApplicationState state,
       MetricsRegistryMap registry,
       ClusterResourceManager resourceManager,
+      FaultDomainManager faultDomainManager,

Review comment:
       nit: add new parameters to the end of the signature

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/RackManagerFactory.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainManagerFactory;
+
+/**
+ * A factory to build a {@link RackManager}.
+ */
+public class RackManagerFactory implements FaultDomainManagerFactory {

Review comment:
       Refer to comment above on this being `YarnFaultDomainManager`. Guess that will change this as well.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster for RUNNING nodes.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular node resides on.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  FaultDomain getFaultDomainOfNode(String host);

Review comment:
       Should be `Set<FaultDomain>` given a node can belong to multiple fault domain

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManagerFactory.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+/**
+ * A factory to build a {@link FaultDomainManager}.
+ */
+public interface FaultDomainManagerFactory {
+
+  public FaultDomainManager getFaultDomainManager();

Review comment:
       Should we take `Config` and `MetricsRegistry` as bare minimum parameters to enable us add metrics or use cut off switch within or configure the potential behavior as it evolves.

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/RackManager.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.samza.clustermanager.FaultDomain;
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainType;
+
+public class RackManager implements FaultDomainManager {

Review comment:
       I suppose this needs to be `YarnFaultDomainManager`.
   `FaultDomainManager` by itself is capable of handling all types of fault domain for a given cluster type. Hence  `RackManager` by itself doesn’t seem suitable abstraction rather there should be one for YARN which is capable of reporting all sort of fault domains supported by YARN

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster for RUNNING nodes.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular node resides on.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  FaultDomain getFaultDomainOfNode(String host);

Review comment:
       +1 use consistent terminologies. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r539862368



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster for RUNNING nodes.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular node resides on.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  FaultDomain getFaultDomainOfNode(String host);
+
+  /**
+   * This method checks if the two hostnames provided reside on the same fault domain.

Review comment:
       Added this info in the FaultDomain class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r545285119



##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
##########
@@ -43,20 +42,24 @@
 public class YarnFaultDomainManager implements FaultDomainManager {
 
   private Multimap<String, FaultDomain> hostToRackMap;
-  private final SamzaApplicationState state;
   private final YarnClientImpl yarnClient;
+  private final MetricsRegistry metricsRegistry;
+  private final String groupName = "yarn-fault-domain-manager";

Review comment:
       nit: make it static

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -168,11 +173,13 @@ public void handleContainerStopFail(String containerID, String resourceID,
    * @param host The hostname of the active container
    * @return the set of racks on which this active container's standby can be scheduled
    */
-  public Set<FaultDomain> getAllowedFaultDomainsForSchedulingStandbyContainer(String host) {
-    Set<FaultDomain> activeContainerRack = faultDomainManager.getFaultDomainOfHost(host);
-    Set<FaultDomain> standbyRacks = faultDomainManager.getAllFaultDomains();
-    standbyRacks.removeAll(activeContainerRack);
-    return standbyRacks;
+  public Set<FaultDomain> getAllowedFaultDomainsForSchedulingStandbyContainer(Optional<String> host) {
+    Set<FaultDomain> standbyFaultDomain = faultDomainManager.getAllFaultDomains();
+    if (host.isPresent()) {
+      Set<FaultDomain> activeContainerFaultDomain = faultDomainManager.getFaultDomainsForHost(host.get());
+      standbyFaultDomain.removeAll(activeContainerFaultDomain);
+    }
+    return standbyFaultDomain;

Review comment:
       Sounds like this method is overloaded and has implicit contracts.
   e.g., when the incoming host is not present it returns all the fault domains of the cluster which isn't clear and should be the case. 
   
   Either we should rename the method and call out the implication of having an empty input parameter or just simplify the contract of the method and not overload it.
   
   I'd prefer latter since 
   
   1. Getting the allowed fault domain vs resorting to something may differ at the caller's end.
   2. Getting all the fault domains seems straight forward call to `faultDomainManager`
   3. Additionally, optionals are suitable for returns but taking optional parameter is highly discouraged where possible. Ideally, you want callers to determine on how they want to handle downstream call in the absence of input data and not the downstream method themself.
   

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,24 +421,39 @@ boolean checkStandbyConstraints(String containerIdToStart, String host) {
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID)) {
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already running on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID)) {
         return false;
       }
     }
 
     return true;
   }
 
+  boolean checkStandbyConstraintsHelper(String containerIdToStart, String hostToStartContainerOn, SamzaResource existingResource, String existingContainerID) {
+    if (existingResource != null) {
+      ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
+      if (clusterManagerConfig.getFaultDomainAwareStandbyEnabled() && faultDomainManager.hasSameFaultDomains(hostToStartContainerOn, existingResource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this rack",
+                containerIdToStart, hostToStartContainerOn, existingContainerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (existingResource.getHost().equals(hostToStartContainerOn)) {
+        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
+                containerIdToStart, hostToStartContainerOn, existingContainerID);

Review comment:
       seems like the logging is now impacted and different from the previous flow in the absence of rack aware standby.
   i.e. this method gets invoked with `pendingResource` and `runningResource` and seems both will end up printing as `scheduled on this host` 

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,24 +421,39 @@ boolean checkStandbyConstraints(String containerIdToStart, String host) {
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID)) {
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already running on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID)) {
         return false;
       }
     }
 
     return true;
   }
 
+  boolean checkStandbyConstraintsHelper(String containerIdToStart, String hostToStartContainerOn, SamzaResource existingResource, String existingContainerID) {
+    if (existingResource != null) {
+      ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
+      if (clusterManagerConfig.getFaultDomainAwareStandbyEnabled() && faultDomainManager.hasSameFaultDomains(hostToStartContainerOn, existingResource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this rack",
+                containerIdToStart, hostToStartContainerOn, existingContainerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (existingResource.getHost().equals(hostToStartContainerOn)) {
+        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
+                containerIdToStart, hostToStartContainerOn, existingContainerID);

Review comment:
       can we not inline this within `checkStandbyConstraints`? doesn't seem much to be extracted plus you don't need to recreate ClusterManagerConfig for every invocation and plus addresses the problem I raised above about logging being different.

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
##########
@@ -117,4 +120,8 @@ public boolean hasSameFaultDomains(String host1, String host2) {
     }
     return hostToRackMap;
   }
+
+  private void initMetrics() {
+    hostToFaultDomainCacheUpdates = metricsRegistry.newCounter(groupName, "host-to-fault-domain-cache-updates");
+  }

Review comment:
       why not inline? what are we getting by extracting it to a method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r545396931



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Set;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ *  This interface gets fault domain information of all hosts that are running in the cluster,
+ *  from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a container can be placed on (for ex: based on standby constraints).
+ *  The host to fault domain map used here will always be cached and only updated in case the AM dies or an active
+ *  container is assigned to a host which is not in the map.
+ *  This is not thread-safe.
+ */
+@InterfaceStability.Unstable
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the last cached fault domain values in a cluster, for all hosts that are healthy, up and running.

Review comment:
       Sync'd up offline. Please update the document to reflect. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r545287416



##########
File path: samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
##########
@@ -38,6 +38,9 @@
   private static final String CLUSTER_MANAGER_FACTORY = "samza.cluster-manager.factory";
   private static final String CLUSTER_MANAGER_FACTORY_DEFAULT = "org.apache.samza.job.yarn.YarnResourceManagerFactory";
 
+  private static final String FAULT_DOMAIN_MANAGER_FACTORY = "samza.fault-domain-manager.factory";
+  private static final String FAULT_DOMAIN_MANAGER_FACTORY_DEFAULT = "org.apache.samza.job.yarn.RackManagerFactory";

Review comment:
       Changed. Thanks :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r539518439



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */

Review comment:
       I've marked it as unstable as of now. Will change it to evolving after more thorough testing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r539872461



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -63,7 +69,11 @@
   private final Instant requestTimestamp;
 
   public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String processorId) {
-    this(numCores, memoryMB, preferredHost, processorId, Instant.now());
+    this(numCores, memoryMB, preferredHost, processorId, Instant.now(), null);

Review comment:
       When we make the actual container request to Yarn, the null value says that the rack requirement for that request is ignored, similar to how it currently is without rack awareness. However, I have changed it to an empty set now. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r539525379



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster for RUNNING nodes.

Review comment:
       1. Yarn defines running nodes as the nodes that are healthy and up and running in the cluster. I've clarified that bit here.
   2. It gets all the fault domain values in the cluster that were last cached by the FaultDomainManager.
   3. Changed everything to hosts. :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r546927562



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -56,8 +58,13 @@
   // Resource-manager, used to stop containers
   private ClusterResourceManager clusterResourceManager;
 
-  public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
-      ClusterResourceManager clusterResourceManager, LocalityManager localityManager) {
+  // FaultDomainManager, used to get fault domain information of different hosts from the cluster manager.
+  private final FaultDomainManager faultDomainManager;
+
+  private final Config config;
+
+  public StandbyContainerManager(SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
+                                 FaultDomainManager faultDomainManager, LocalityManager localityManager, Config config) {

Review comment:
       Consistency: keep the new parameters at the end.
   
   

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -469,6 +566,10 @@ void releaseUnstartableContainer(SamzaResourceRequest request, SamzaResource res
     resourceRequestState.cancelResourceRequest(request);
   }
 
+  private boolean isFaultDomainAwareStandbyEnabled() {
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
+    return clusterManagerConfig.getFaultDomainAwareStandbyEnabled();
+  }

Review comment:
       Can be more efficient and determine if it is enabled in constructor and use an instance variable to infer if its enabled?
   Storing config seems unnecessary and creating ClusterManagerConfig on the fly for invocation seems sub-optimal.
   

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -233,9 +275,13 @@ private void initiateStandbyAwareAllocation(String activeContainerID, String res
             standbyHost, activeContainerID, standbyHost, resourceID);
         FailoverMetadata failoverMetadata = this.registerActiveContainerFailure(activeContainerID, resourceID);
 
+        Set<FaultDomain> allowedFaultDomains = new HashSet<>();
+        if (isFaultDomainAwareStandbyEnabled()) {
+          allowedFaultDomains = getAllowedFaultDomainsForStandbyContainerGivenContainerId(activeContainerID);
+        }
         // record the resource request, before issuing it to avoid race with allocation-thread
         SamzaResourceRequest resourceRequestForActive =
-            containerAllocator.getResourceRequest(activeContainerID, standbyHost);
+                containerAllocator.getResourceRequest(activeContainerID, standbyHost, allowedFaultDomains);

Review comment:
       boiler plate since the function introduced below in this PR already does this. 
   context: `checkFaultDomainAwarenessEnabledAndRequestResource`

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,24 +454,39 @@ boolean checkStandbyConstraints(String containerIdToStart, String host) {
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID, "pending")) {
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already running on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID, "running")) {

Review comment:
       I still feel this is unclean to me especially passing in parameters that are not necessarily relevant to the core of what the method is responsible for. e.g., "pending" vs "running" which is purely for logging and isn't state per say and one realizes until reading through the code underneath.
   
   If you don't want to inline and still feel refactor would help and make it clear, i'd suggest extracting the existing logic into `checkActiveAndStandbyOnSameHost` and then add your logic into `checkActiveAndStandbyOnSameFaultDomain` and within `checkStandbyConstraints` you can fire off both of these checks or one of them based on the fault domain enabled or not.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -361,8 +407,41 @@ private FailoverMetadata registerActiveContainerFailure(String activeContainerID
   }
 
   /**
-   * Check if matching this SamzaResourceRequest to the given resource, meets all standby-container container constraints.
+   * This method checks from the config if standby allocation is fault domain aware or not, and requests resources accordingly.
+   *
+   * @param containerAllocator ContainerAllocator object that requests for resources from the resource manager
+   * @param containerID Samza container ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   */
+  void checkFaultDomainAwarenessEnabledAndRequestResource(ContainerAllocator containerAllocator, String containerID, String preferredHost) {

Review comment:
       nit: I'd prefer to rename the method to `requestResource` since the intent of the method that way is clear. i.e. only request resource and potentially return the `SamzaResourceRequest`.
   
   How it does to request resource is keep within and can be inferred by reading the method implementation. The name seems too much and the fact that this returns void makes it unusable in some places which has the exact boiler plate code.

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.samza.clustermanager.FaultDomain;
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainType;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * This class functionality works with the assumption that the job.standbytasks.replication.factor is 2.
+ * For values greater than 2, it is possible that the standby containers could be on the same rack as the active, or the already existing standby racks.
+ */
+public class YarnFaultDomainManager implements FaultDomainManager {
+
+  private static final String FAULT_DOMAIN_MANAGER_GROUP = "yarn-fault-domain-manager";
+  private static final String HOST_TO_FAULT_DOMAIN_CACHE_UPDATES = "host-to-fault-domain-cache-updates";
+  private Multimap<String, FaultDomain> hostToRackMap;
+  private final YarnClientImpl yarnClient;
+  private Counter hostToFaultDomainCacheUpdates;
+
+  public YarnFaultDomainManager(MetricsRegistry metricsRegistry) {
+    this.yarnClient = new YarnClientImpl();
+    yarnClient.init(new YarnConfiguration());
+    yarnClient.start();
+    this.hostToRackMap = computeHostToFaultDomainMap();
+    hostToFaultDomainCacheUpdates = metricsRegistry.newCounter(FAULT_DOMAIN_MANAGER_GROUP, HOST_TO_FAULT_DOMAIN_CACHE_UPDATES);
+  }
+
+  @VisibleForTesting
+  YarnFaultDomainManager(MetricsRegistry metricsRegistry, YarnClientImpl yarnClient, Multimap<String, FaultDomain> hostToRackMap) {
+    this.yarnClient = yarnClient;
+    yarnClient.init(new YarnConfiguration());
+    yarnClient.start();
+    this.hostToRackMap = hostToRackMap;
+    hostToFaultDomainCacheUpdates = metricsRegistry.newCounter(FAULT_DOMAIN_MANAGER_GROUP, HOST_TO_FAULT_DOMAIN_CACHE_UPDATES);
+  }
+
+  /**
+   * This method returns all the last cached rack values in a cluster, for all hosts that are healthy, up and running.
+   * @return a set of {@link FaultDomain}s
+   */
+  @Override
+  public Set<FaultDomain> getAllFaultDomains() {
+    return new HashSet<>(hostToRackMap.values());
+  }
+
+  /**
+   * This method returns the rack a particular host resides on based on the internal cache.
+   * In case the rack of a host does not exist in this cache, we update the cache by computing the host to rack map again using Yarn.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  @Override
+  public Set<FaultDomain> getFaultDomainsForHost(String host) {
+    if (!hostToRackMap.containsKey(host)) {
+      hostToRackMap = computeHostToFaultDomainMap();
+      hostToFaultDomainCacheUpdates.inc();
+    }
+    return new HashSet<>(hostToRackMap.get(host));
+  }
+
+  /**
+   * This method checks if the two hostnames provided reside on the same rack.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same rack
+   */
+  @Override
+  public boolean hasSameFaultDomains(String host1, String host2) {
+    if (!hostToRackMap.keySet().contains(host1) || !hostToRackMap.keySet().contains(host2)) {
+      hostToRackMap = computeHostToFaultDomainMap();
+      hostToFaultDomainCacheUpdates.inc();
+    }
+    return hostToRackMap.get(host1).equals(hostToRackMap.get(host2));
+  }
+
+  /**
+   * This method computes the host to rack map from Yarn.
+   * Only the hosts that are running in the cluster will be a part of this map.
+   * @return map of the host and the rack it resides on
+   */
+  @VisibleForTesting
+  protected Multimap<String, FaultDomain> computeHostToFaultDomainMap() {

Review comment:
       nit: package private instead if it is only used for testing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r540356064



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -63,7 +69,11 @@
   private final Instant requestTimestamp;
 
   public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String processorId) {
-    this(numCores, memoryMB, preferredHost, processorId, Instant.now());
+    this(numCores, memoryMB, preferredHost, processorId, Instant.now(), null);

Review comment:
       just for my clarification: what is the behavior on yarn side for empty set in the request?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r547138145



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -168,11 +173,13 @@ public void handleContainerStopFail(String containerID, String resourceID,
    * @param host The hostname of the active container
    * @return the set of racks on which this active container's standby can be scheduled
    */
-  public Set<FaultDomain> getAllowedFaultDomainsForSchedulingStandbyContainer(String host) {
-    Set<FaultDomain> activeContainerRack = faultDomainManager.getFaultDomainOfHost(host);
-    Set<FaultDomain> standbyRacks = faultDomainManager.getAllFaultDomains();
-    standbyRacks.removeAll(activeContainerRack);
-    return standbyRacks;
+  public Set<FaultDomain> getAllowedFaultDomainsForSchedulingStandbyContainer(Optional<String> host) {
+    Set<FaultDomain> standbyFaultDomain = faultDomainManager.getAllFaultDomains();
+    if (host.isPresent()) {
+      Set<FaultDomain> activeContainerFaultDomain = faultDomainManager.getFaultDomainsForHost(host.get());
+      standbyFaultDomain.removeAll(activeContainerFaultDomain);
+    }
+    return standbyFaultDomain;

Review comment:
       I've extracted out the simplified part of the method in a separate method (`getAllowedFaultDomainsForStandbyContainerGivenActiveContainerHost`), and am doing extra overloaded functionalities in the parent method (getAllowedFaultDomainsForStandbyContainerGivenHostToExclude). This way, the caller can decide which method to invoke. Also, I've removed Optional from the argument. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
mynameborat commented on pull request #1446:
URL: https://github.com/apache/samza/pull/1446#issuecomment-749241472


   A few consistency related comments. Please resolve the conflicts as well.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r546188838



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,24 +421,39 @@ boolean checkStandbyConstraints(String containerIdToStart, String host) {
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID)) {
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already running on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID)) {
         return false;
       }
     }
 
     return true;
   }
 
+  boolean checkStandbyConstraintsHelper(String containerIdToStart, String hostToStartContainerOn, SamzaResource existingResource, String existingContainerID) {
+    if (existingResource != null) {
+      ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
+      if (clusterManagerConfig.getFaultDomainAwareStandbyEnabled() && faultDomainManager.hasSameFaultDomains(hostToStartContainerOn, existingResource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this rack",
+                containerIdToStart, hostToStartContainerOn, existingContainerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (existingResource.getHost().equals(hostToStartContainerOn)) {
+        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
+                containerIdToStart, hostToStartContainerOn, existingContainerID);

Review comment:
       Thanks for noticing the logging difference. I've added a parameter to the helper method to log differently in each case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r548372801



##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
##########
@@ -241,6 +241,11 @@ public void requestResources(SamzaResourceRequest resourceRequest) {
     String processorId = resourceRequest.getProcessorId();
     String requestId = resourceRequest.getRequestId();
     String preferredHost = resourceRequest.getPreferredHost();
+    String[] racks = null;
+    ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
+    if (clusterManagerConfig.getFaultDomainAwareStandbyEnabled()) {
+      racks = resourceRequest.getFaultDomains().stream().map(FaultDomain::getId).toArray(String[]::new);
+    }

Review comment:
       Sure, that makes sense. I've removed the unnecessary check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r547138145



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -168,11 +173,13 @@ public void handleContainerStopFail(String containerID, String resourceID,
    * @param host The hostname of the active container
    * @return the set of racks on which this active container's standby can be scheduled
    */
-  public Set<FaultDomain> getAllowedFaultDomainsForSchedulingStandbyContainer(String host) {
-    Set<FaultDomain> activeContainerRack = faultDomainManager.getFaultDomainOfHost(host);
-    Set<FaultDomain> standbyRacks = faultDomainManager.getAllFaultDomains();
-    standbyRacks.removeAll(activeContainerRack);
-    return standbyRacks;
+  public Set<FaultDomain> getAllowedFaultDomainsForSchedulingStandbyContainer(Optional<String> host) {
+    Set<FaultDomain> standbyFaultDomain = faultDomainManager.getAllFaultDomains();
+    if (host.isPresent()) {
+      Set<FaultDomain> activeContainerFaultDomain = faultDomainManager.getFaultDomainsForHost(host.get());
+      standbyFaultDomain.removeAll(activeContainerFaultDomain);
+    }
+    return standbyFaultDomain;

Review comment:
       I've extracted out the simplified part of the method in a separate method (`getAllowedFaultDomainsForStandbyContainerGivenActiveContainerHost`), and am doing extra overloaded functionalities in the parent method (`getAllowedFaultDomainsForStandbyContainerGivenHostToExclude`). This way, the caller can decide which method to invoke. Also, I've removed Optional from the argument.  Let me know if this makes sense to you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r546153415



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
##########
@@ -381,6 +428,9 @@ public final void issueResourceRequest(SamzaResourceRequest request) {
     } else {
       state.preferredHostRequests.incrementAndGet();
     }
+    if (!request.getFaultDomains().isEmpty()) {

Review comment:
       No, I double checked and this should never be null. I will remove the null check for the expiredFaultDomainAwareContainerRequests metric. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1446: [WIP] SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r536411116



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
##########
@@ -88,7 +88,7 @@
 
   public ContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore,
       SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
-      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager localityManager) {
+      FaultDomainManager faultDomainManager, boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager localityManager) {

Review comment:
       can faultDomainManager be null? 
   if not, maybe a check for notNull?

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.

Review comment:
       using it for standby container is one of the uses -- i feel we should not put this in the interface doc. 

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -63,7 +69,11 @@
   private final Instant requestTimestamp;
 
   public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String processorId) {
-    this(numCores, memoryMB, preferredHost, processorId, Instant.now());
+    this(numCores, memoryMB, preferredHost, processorId, Instant.now(), null);

Review comment:
       emptyset instead of null?
   however, what does empty set/null mean? 
   will the request ignore fault domain notions if its null but somehow break the behavior if its an empty set?

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+public class FaultDomain {
+
+  FaultDomainType type;
+  String id;

Review comment:
       these seem to be final.. if they are not expected to change, maybe mark them so

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
##########
@@ -114,6 +114,11 @@
    */
   private final ClusterResourceManager clusterResourceManager;
 
+  /**
+   * An interface to get information about nodes and the fault domains they reside on.

Review comment:
       minor: interface here is a bit confusing.. no strong objection if you prefer to keep it

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
##########
@@ -87,6 +88,12 @@ public ClusterResourceManager(Callback callback) {
    */
   public abstract void requestResources(SamzaResourceRequest resourceRequest);
 
+  /**
+   * Get the node to fault domain map from the cluster resource manager.
+   * @return A map of the nodes to the fault domain they reside in.
+   */
+  public abstract Map<String, String> getNodeToFaultDomainMap();

Review comment:
       minor: prefer "host" to "node" as rest of samza (non-yarn specific) code uses host

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).

Review comment:
       can you add a few lines about thread-safety, what an implementor of this interface should guarantee/adhere to, what a user should expect and how to use?
   
   looks like all supported functions are read-only kind.. as in the they are all getter/check and no writes (setter) so maybe thread-safety might not be a big concern. But still worth checking it and calling it out.
   
   I am wondering, cluster is a dynamic system - hosts/domains might change and then this manager's internal state is somehow updated by the implementor (?). In such as case, what is the guarantee provided by this manager - will it always show the state of the cluster right now or the last read from cluster manager or maybe some other guarantees like if a host/domain is down in the cluster then it will definitely not be part of the get-method-return-value.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */

Review comment:
       since its a new interface, should we mark it evolving?

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster for RUNNING nodes.

Review comment:
       1. how do we define RUNNING nodes? nodes where containers of  this job are running or Nodes that are healthy in the cluster or nodes with available resources to run container.. 
   2. does it get all domains of the cluster or only those that a job can access?
   
   3. nit: hosts instead of nodes.
   

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster for RUNNING nodes.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular node resides on.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  FaultDomain getFaultDomainOfNode(String host);
+
+  /**
+   * This method checks if the two hostnames provided reside on the same fault domain.

Review comment:
       fault domain is a concept that maybe worth spending a couple of lines over -- like a host can only belong to one domain, domain has >=1 hosts, cluster has all its hosts in one domain or another

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster for RUNNING nodes.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular node resides on.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  FaultDomain getFaultDomainOfNode(String host);

Review comment:
       would be nice to stick to one of the two - host or node - just to keep it consistent :) 

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster for RUNNING nodes.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular node resides on.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  FaultDomain getFaultDomainOfNode(String host);
+
+  /**
+   * This method checks if the two hostnames provided reside on the same fault domain.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same fault domain
+   */
+  boolean checkHostsOnSameFaultDomain(String host1, String host2);
+
+  /**
+   * This method gets the set of fault domains that the given active container's corresponding standby can be placed on.
+   * @param host The hostname of the active container
+   * @return the set of fault domains on which this active container's standby can be scheduled

Review comment:
       1. hmm, i think this manager is designed specifically for standby containers only. I feel this interface has potential beyond standbys and hence we should strive to keep it generic - usable for scheduling actives also maybe?
   2. what is the guarantee manager provides to the user of this method - like for rack upgrades we would want it to be any domain not about to undergo maintenance or currently under maintenance, there are others like this too
   3. actually, adding to #2 above: maybe we should be able to define a "rule set" the domain manager follows to fetch allowed domains. rules could include stuff like not on a particular host/domain (then this method becomes  extensible beyond standby)...wdyt? 
   4. nit: if sticking to standby-only use case, indicate in the name as `getAllowedFaultDomainsForSchedulingStandbyContainer` 

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+public class FaultDomain {
+
+  FaultDomainType type;
+  String id;
+
+  public FaultDomain(FaultDomainType type, String id) {
+    this.type = type;
+    this.id = id;

Review comment:
       can they be null? 

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster for RUNNING nodes.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular node resides on.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  FaultDomain getFaultDomainOfNode(String host);
+
+  /**
+   * This method checks if the two hostnames provided reside on the same fault domain.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same fault domain
+   */
+  boolean checkHostsOnSameFaultDomain(String host1, String host2);
+
+  /**
+   * This method gets the set of fault domains that the given active container's corresponding standby can be placed on.
+   * @param host The hostname of the active container
+   * @return the set of fault domains on which this active container's standby can be scheduled
+   */
+  Set<FaultDomain> getAllowedFaultDomainsForSchedulingContainer(String host);
+
+  /**
+   * This method returns the cached map of nodes to fault domains.
+   * @return stored map of node to the fault domain it resides on

Review comment:
       it may not be cached also right.. depending on the implementation.
   this is where the guarantee of view of the cluster comes into play. 

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster for RUNNING nodes.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular node resides on.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  FaultDomain getFaultDomainOfNode(String host);
+
+  /**
+   * This method checks if the two hostnames provided reside on the same fault domain.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same fault domain
+   */
+  boolean checkHostsOnSameFaultDomain(String host1, String host2);
+
+  /**
+   * This method gets the set of fault domains that the given active container's corresponding standby can be placed on.
+   * @param host The hostname of the active container
+   * @return the set of fault domains on which this active container's standby can be scheduled
+   */
+  Set<FaultDomain> getAllowedFaultDomainsForSchedulingContainer(String host);
+
+  /**
+   * This method returns the cached map of nodes to fault domains.
+   * @return stored map of node to the fault domain it resides on
+   */
+  Map<String, FaultDomain> getNodeToFaultDomainMap();
+
+  /**
+   * This method computes the node to fault domain map from the cluster resource manager.
+   * @return map of node to the fault domain it resides on

Review comment:
       ah, i realized now -- this is the is the method to update the manager's fault domain map right.
   so this is in some sense a setter? in which case we should think about thread-safety between this setter and other getters. 

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -73,6 +83,18 @@ public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, St
     this.requestId = UUID.randomUUID().toString();
     this.processorId = processorId;
     this.requestTimestamp = requestTimestamp;
+    this.faultDomains = new HashSet<>();
+    log.info("SamzaResourceRequest created for Processor ID: {} on host: {} at time: {} with Request ID: {}", this.processorId, this.preferredHost, this.requestTimestamp, this.requestId);

Review comment:
       nit: adding fault domains to the log might be useful for debugging

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManagerFactory.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+/**
+ * A factory to build a {@link FaultDomainManager}.
+ */

Review comment:
       same with stability of interface

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -125,7 +129,8 @@ public void handleContainerLaunchFail(String containerID, String resourceID,
 
     if (StandbyTaskUtil.isStandbyContainer(containerID)) {
       log.info("Handling launch fail for standby-container {}, requesting resource on any host {}", containerID);
-      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
+      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST,
+              faultDomainManager.getAllowedFaultDomainsForSchedulingContainer(getActiveContainerHost(containerID)));

Review comment:
       hmm, wondering if we should place all these changes behind a config?

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
##########
@@ -109,15 +135,24 @@ public String toString() {
             ", requestId='" + requestId + '\'' +
             ", processorId=" + processorId +
             ", requestTimestampMs=" + requestTimestamp +
+            ", faultDomains=" + convertFaultDomainSetToString() +
             '}';
   }
 
-  /**
-   * Requests are ordered by the processor type and the time at which they were created.
-   * Requests with timestamps in the future for retries take less precedence than timestamps in the past or current.
-   * Otherwise, active processors take precedence over standby processors, regardless of timestamp.
-   * @param o the other
-   */
+  private String convertFaultDomainSetToString() {
+    StringBuilder faultDomainSb = new StringBuilder();

Review comment:
       just curious, wont it work without this -esp since you already defined the "toString" for FaultDomain? 

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -181,13 +186,14 @@ private void handleStandbyContainerStop(String standbyContainerID, String resour
 
       // request standbycontainer's host for active-container
       SamzaResourceRequest resourceRequestForActive =
-          containerAllocator.getResourceRequestWithDelay(activeContainerID, standbyContainerHostname, preferredHostRetryDelay);
+              containerAllocator.getResourceRequestWithDelay(activeContainerID, standbyContainerHostname, preferredHostRetryDelay);

Review comment:
       nit - some new extra spaces sneaked in :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r539517529



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.

Review comment:
       Makes sense.  Added the standby part as a possible example instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1446: [WIP] SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r537893226



##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/RackManager.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.samza.clustermanager.FaultDomain;
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainType;
+
+public class RackManager implements FaultDomainManager {
+
+  private final Map<String, FaultDomain> nodeToRackMap;
+
+  public RackManager() {
+        this.nodeToRackMap = computeNodeToFaultDomainMap();
+    }
+
+  /**
+   * This method returns all the rack values in a cluster for RUNNING nodes.
+   * @return a set of {@link FaultDomain}s
+   */
+  @Override
+  public Set<FaultDomain> getAllFaultDomains() {
+    return new HashSet<>(nodeToRackMap.values());
+  }
+
+  /**
+   * This method returns the rack a particular node resides on.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  @Override
+  public FaultDomain getFaultDomainOfNode(String host) {
+    return nodeToRackMap.get(host);
+  }
+
+  /**
+   * This method checks if the two hostnames provided reside on the same rack.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same rack
+   */
+  @Override
+  public boolean checkHostsOnSameFaultDomain(String host1, String host2) {
+    return nodeToRackMap.get(host1).equals(nodeToRackMap.get(host2));
+  }
+
+  /**
+   * This method gets the set of racks that the given active container's corresponding standby can be placed on.
+   * @param host The hostname of the active container
+   * @return the set of racks on which this active container's standby can be scheduled
+   */
+  @Override
+  public Set<FaultDomain> getAllowedFaultDomainsForSchedulingContainer(String host) {
+    FaultDomain activeContainerRack = nodeToRackMap.get(host);
+    Set<FaultDomain> standbyRacks = new HashSet<>(nodeToRackMap.values());
+    standbyRacks.remove(activeContainerRack);

Review comment:
       works for standby replication factor = 2 (aka one active + one standby). this pr guarantees that the standby is not on the same rack as active. If >2 replication, then the standbys themselves might be on the same rack. need to call out that works for 2 and what to expect for >2. just in the pr desc is enough. 

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,18 +403,32 @@ boolean checkStandbyConstraints(String containerIdToStart, String host) {
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
-        return false;
+      if (resource != null) {
+        if (!resource.getHost().equals(ResourceRequestState.ANY_HOST) && !host.equals(ResourceRequestState.ANY_HOST)

Review comment:
       1. What if host = ANY_HOST but resource.getHost is not ANY_HOST Or vice versa?
   
   2. earlier in StandbyContainerManager - there is a request issue made with ANY_HOST and fault domain set.
   would it be possible that there is a pending processor with ANY_HOST as well but same fault domain?
   Actually since a resource request a set of allowed fault domains and not a specific fault domain - is it possible that a pending processor gets started on the same fault domain as this standby? 
   Or do we hope to catch it when that pending processor runs and invokes this same `checkStandbyConstraints` and find the current standby in runningProcessors and catch it in the check below? If this is the case, then what was the original logic behind checking pendingProcessors at all?

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,18 +403,32 @@ boolean checkStandbyConstraints(String containerIdToStart, String host) {
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
-        return false;
+      if (resource != null) {
+        if (!resource.getHost().equals(ResourceRequestState.ANY_HOST) && !host.equals(ResourceRequestState.ANY_HOST)
+                && faultDomainManager.checkHostsOnSameFaultDomain(host, resource.getHost())) {
+          log.info("Container {} cannot be started on host {} because container {} is already scheduled on this rack",
+                  containerIdToStart, host, containerID);
+          return false;
+        } else if (resource.getHost().equals(host)) {
+          log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
+                  containerIdToStart, host, containerID);
+          return false;
+        }
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already running on this host",
-            containerIdToStart, host, containerID);
-        return false;
+      if (resource != null) {
+        if (!resource.getHost().equals(ResourceRequestState.ANY_HOST) && !host.equals(ResourceRequestState.ANY_HOST)
+                && faultDomainManager.checkHostsOnSameFaultDomain(host, resource.getHost())) {
+          log.info("Container {} cannot be started on host {} because container {} is already running on this rack",
+                  containerIdToStart, host, containerID);
+          return false;
+        } else if (resource.getHost().equals(host)) {
+          log.info("Container {} cannot be started on host {} because container {} is already running on this host",
+                  containerIdToStart, host, containerID);
+          return false;

Review comment:
       looks the same as above check. good to make a helper?? wdyt?

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,18 +403,32 @@ boolean checkStandbyConstraints(String containerIdToStart, String host) {
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
-        return false;
+      if (resource != null) {
+        if (!resource.getHost().equals(ResourceRequestState.ANY_HOST) && !host.equals(ResourceRequestState.ANY_HOST)
+                && faultDomainManager.checkHostsOnSameFaultDomain(host, resource.getHost())) {
+          log.info("Container {} cannot be started on host {} because container {} is already scheduled on this rack",
+                  containerIdToStart, host, containerID);
+          return false;
+        } else if (resource.getHost().equals(host)) {
+          log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
+                  containerIdToStart, host, containerID);
+          return false;
+        }
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already running on this host",
-            containerIdToStart, host, containerID);
-        return false;
+      if (resource != null) {
+        if (!resource.getHost().equals(ResourceRequestState.ANY_HOST) && !host.equals(ResourceRequestState.ANY_HOST)

Review comment:
       1. What if host = ANY_HOST but resource.getHost is not ANY_HOST?
   
   2. when alternative resources are being used - checkStandbyAndRunStreamProc is called with ANY_HOST and alternativeResource.get (which is a resource with a proper host name and not ANY_HOST).. are we handling this scenario? looks like it might be handled (checkStandbyConstraints in this case is called with ctrId and proper host name) but wanted a double check on that. 
   
   3. iiuc, a resource is added to SamzaApplicationState.runningProcessors in YarnClusterResourceManager.onContainerStarted which actually creates a new SamzaResource with values taken from Yarn Container -- and hence will not have ANY_HOST. right? hence maybe we can simplify this check? or was there another reason (like guarding against ANY_HOST in resource)?
   

##########
File path: samza-core/src/test/java/org/apache/samza/clustermanager/MockFaultDomainManager.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class MockFaultDomainManager implements FaultDomainManager {
+
+  private final Map<String, FaultDomain> nodeToFaultDomainMap;
+
+  public MockFaultDomainManager() {
+    FaultDomain faultDomain1 = new FaultDomain(FaultDomainType.RACK, "rack-1");
+    FaultDomain faultDomain2 = new FaultDomain(FaultDomainType.RACK, "rack-2");
+    FaultDomain faultDomain3 = new FaultDomain(FaultDomainType.RACK, "rack-1");
+    FaultDomain faultDomain4 = new FaultDomain(FaultDomainType.RACK, "rack-3");
+    FaultDomain faultDomain5 = new FaultDomain(FaultDomainType.RACK, "rack-4");
+    nodeToFaultDomainMap = ImmutableMap.of("host-1", faultDomain1, "host-2", faultDomain2,

Review comment:
       might be good to have at least 2 hosts in same rack to be able to test 

##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/RackManager.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.samza.clustermanager.FaultDomain;
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainType;
+
+public class RackManager implements FaultDomainManager {
+
+  private final Map<String, FaultDomain> nodeToRackMap;
+
+  public RackManager() {
+        this.nodeToRackMap = computeNodeToFaultDomainMap();
+    }
+
+  /**
+   * This method returns all the rack values in a cluster for RUNNING nodes.
+   * @return a set of {@link FaultDomain}s
+   */
+  @Override
+  public Set<FaultDomain> getAllFaultDomains() {
+    return new HashSet<>(nodeToRackMap.values());
+  }
+
+  /**
+   * This method returns the rack a particular node resides on.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  @Override
+  public FaultDomain getFaultDomainOfNode(String host) {
+    return nodeToRackMap.get(host);
+  }
+
+  /**
+   * This method checks if the two hostnames provided reside on the same rack.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same rack
+   */
+  @Override
+  public boolean checkHostsOnSameFaultDomain(String host1, String host2) {
+    return nodeToRackMap.get(host1).equals(nodeToRackMap.get(host2));
+  }
+
+  /**
+   * This method gets the set of racks that the given active container's corresponding standby can be placed on.
+   * @param host The hostname of the active container
+   * @return the set of racks on which this active container's standby can be scheduled
+   */
+  @Override
+  public Set<FaultDomain> getAllowedFaultDomainsForSchedulingContainer(String host) {
+    FaultDomain activeContainerRack = nodeToRackMap.get(host);
+    Set<FaultDomain> standbyRacks = new HashSet<>(nodeToRackMap.values());
+    standbyRacks.remove(activeContainerRack);
+    return standbyRacks;
+  }
+
+  /**
+   * This method returns the cached map of nodes to racks.
+   * @return stored map of node to the rack it resides on
+   */
+  @Override
+  public Map<String, FaultDomain> getNodeToFaultDomainMap() {
+    return nodeToRackMap;
+  }
+
+  /**
+   * This method gets the node to rack (fault domain for Yarn) mapping from Yarn for all running nodes.
+   * @return A map of hostname to rack name.
+   */
+  @Override
+  public Map<String, FaultDomain> computeNodeToFaultDomainMap() {
+    YarnClientImpl yarnClient = new YarnClientImpl();
+    Map<String, FaultDomain> nodeToRackMap = new HashMap<>();
+    try {
+      List<NodeReport> nodeReport = yarnClient.getNodeReports(NodeState.RUNNING);
+      nodeReport.forEach(report -> {
+        FaultDomain rack = new FaultDomain(FaultDomainType.RACK, report.getRackName());
+        nodeToRackMap.put(report.getNodeId().getHost(), rack);
+      });
+    } catch (YarnException e) {
+      e.printStackTrace();

Review comment:
       this exception is swallowed no.. what happens to the rack manager in this case? will it still give a correct view of the cluster's host->rack mapping? how do we ensure the feature still works? 
   
   it possibly returns an empty map. then, nodeToRackMap.get(host) in `getAllowedFaultDomainsForSchedulingContainer` above will return null --> and removing a null from Set could throw NPE (HashSet doesnt throw i think). 
   Even if NPE doesnt happen, `getAllowedFaultDomainsForSchedulingContainer` returns an empty set. What is the behavior of this feature when a resource request is made with an empty set of racks? will Yarn just pick any rack or fail the request?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r545284839



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Set;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ *  This interface gets fault domain information of all hosts that are running in the cluster,
+ *  from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a container can be placed on (for ex: based on standby constraints).
+ *  The host to fault domain map used here will always be cached and only updated in case the AM dies or an active
+ *  container is assigned to a host which is not in the map.
+ *  This is not thread-safe.
+ */
+@InterfaceStability.Unstable
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the last cached fault domain values in a cluster, for all hosts that are healthy, up and running.

Review comment:
       I've added to the documentation that the cache update is an implementation detail.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r539867974



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster for RUNNING nodes.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular node resides on.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  FaultDomain getFaultDomainOfNode(String host);
+
+  /**
+   * This method checks if the two hostnames provided reside on the same fault domain.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same fault domain
+   */
+  boolean checkHostsOnSameFaultDomain(String host1, String host2);
+
+  /**
+   * This method gets the set of fault domains that the given active container's corresponding standby can be placed on.
+   * @param host The hostname of the active container
+   * @return the set of fault domains on which this active container's standby can be scheduled
+   */
+  Set<FaultDomain> getAllowedFaultDomainsForSchedulingContainer(String host);
+
+  /**
+   * This method returns the cached map of nodes to fault domains.
+   * @return stored map of node to the fault domain it resides on
+   */
+  Map<String, FaultDomain> getNodeToFaultDomainMap();
+
+  /**
+   * This method computes the node to fault domain map from the cluster resource manager.
+   * @return map of node to the fault domain it resides on

Review comment:
       AFAIK, since this will run on the main thread, it should be thread safe.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r546192683



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -409,16 +470,18 @@ public void checkStandbyConstraintsAndRunStreamProcessor(SamzaResourceRequest re
       log.info("Running container {} on {} meets standby constraints, preferredHost = {}", containerID,
           samzaResource.getHost(), preferredHost);
       containerAllocator.runStreamProcessor(request, preferredHost);
+      samzaApplicationState.faultDomainAwareContainersStarted.incrementAndGet();

Review comment:
       I've added a check that will ensure that this metric is updated only when that config is on.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r543320208



##########
File path: samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
##########
@@ -38,6 +38,9 @@
   private static final String CLUSTER_MANAGER_FACTORY = "samza.cluster-manager.factory";
   private static final String CLUSTER_MANAGER_FACTORY_DEFAULT = "org.apache.samza.job.yarn.YarnResourceManagerFactory";
 
+  private static final String FAULT_DOMAIN_MANAGER_FACTORY = "samza.fault-domain-manager.factory";
+  private static final String FAULT_DOMAIN_MANAGER_FACTORY_DEFAULT = "org.apache.samza.job.yarn.RackManagerFactory";

Review comment:
       should it be `YarnFaultDomainManagerFactory`?
   
   Also, I feel the configuration should be `cluster-manager.fault-domain-manager.factory` since we have an association of the fault domain manager to the cluster manager.
   
   what do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r539866505



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomainManager.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface gets fault domain information of different nodes from the cluster manager (Yarn/Kubernetes/etc.).
+ *  It also provides other functionality like exposing all the available fault domains, checking if two hosts belong to
+ *  the same fault domain, and getting the valid fault domains that a standby container can be placed on.
+ */
+public interface FaultDomainManager {
+
+  /**
+   * This method returns all the fault domain values in a cluster for RUNNING nodes.
+   * @return a set of {@link FaultDomain}s
+   */
+  Set<FaultDomain> getAllFaultDomains();
+
+  /**
+   * This method returns the fault domain a particular node resides on.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  FaultDomain getFaultDomainOfNode(String host);
+
+  /**
+   * This method checks if the two hostnames provided reside on the same fault domain.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same fault domain
+   */
+  boolean checkHostsOnSameFaultDomain(String host1, String host2);
+
+  /**
+   * This method gets the set of fault domains that the given active container's corresponding standby can be placed on.
+   * @param host The hostname of the active container
+   * @return the set of fault domains on which this active container's standby can be scheduled

Review comment:
       1. The purpose of this method is to find a list of fault domains that the host's (which is meant to be the active container) standby can be placed on. What are you suggesting to change it to in this case, since the function is active and standby container specific?
   2. Any rack about to undergo maintenance will be marked unhealthy. Hence, even if we have that rack in our local cache, Yarn will not return a host to us on an unhealthy rack.
   3. Caught up with Manasa and leaving this aside for now.
   4. Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r545286825



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
##########
@@ -170,6 +170,31 @@
    */
   public final AtomicInteger failedContainerPlacementActions = new AtomicInteger(0);
 
+  /**
+   * Number of fault domain aware container requests made for a container.
+   */
+  public final AtomicInteger hostToFaultDomainCacheUpdates = new AtomicInteger(0);

Review comment:
       Removed this from `SamzaApplicationState` and added it as part of `YarnFaultDomainManager` instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat merged pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
mynameborat merged pull request #1446:
URL: https://github.com/apache/samza/pull/1446


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r547101266



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,24 +454,39 @@ boolean checkStandbyConstraints(String containerIdToStart, String host) {
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID, "pending")) {
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already running on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID, "running")) {

Review comment:
       Got it. I've inlined the method as you suggested.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r545380246



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,24 +421,39 @@ boolean checkStandbyConstraints(String containerIdToStart, String host) {
       SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID)) {
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container {} is already running on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, containerID)) {
         return false;
       }
     }
 
     return true;
   }
 
+  boolean checkStandbyConstraintsHelper(String containerIdToStart, String hostToStartContainerOn, SamzaResource existingResource, String existingContainerID) {
+    if (existingResource != null) {
+      ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
+      if (clusterManagerConfig.getFaultDomainAwareStandbyEnabled() && faultDomainManager.hasSameFaultDomains(hostToStartContainerOn, existingResource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this rack",
+                containerIdToStart, hostToStartContainerOn, existingContainerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (existingResource.getHost().equals(hostToStartContainerOn)) {
+        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
+                containerIdToStart, hostToStartContainerOn, existingContainerID);

Review comment:
       that is correct. the logging message is changed and it also means semantic difference during debugging to differentiate  if its pending code path that triggered this vs running code path that triggered this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r538854823



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+public class FaultDomain {
+
+  FaultDomainType type;
+  String id;
+
+  public FaultDomain(FaultDomainType type, String id) {
+    this.type = type;
+    this.id = id;

Review comment:
       No. I've added a check for the same.

##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/FaultDomain.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+public class FaultDomain {
+
+  FaultDomainType type;
+  String id;
+
+  public FaultDomain(FaultDomainType type, String id) {
+    this.type = type;
+    this.id = id;

Review comment:
       No. I've added a check for the same. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r546929602



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -361,8 +407,41 @@ private FailoverMetadata registerActiveContainerFailure(String activeContainerID
   }
 
   /**
-   * Check if matching this SamzaResourceRequest to the given resource, meets all standby-container container constraints.
+   * This method checks from the config if standby allocation is fault domain aware or not, and requests resources accordingly.
+   *
+   * @param containerAllocator ContainerAllocator object that requests for resources from the resource manager
+   * @param containerID Samza container ID that will be run when a resource is allocated for this request
+   * @param preferredHost name of the host that you prefer to run the processor on
+   */
+  void checkFaultDomainAwarenessEnabledAndRequestResource(ContainerAllocator containerAllocator, String containerID, String preferredHost) {

Review comment:
       nit: I'd prefer to rename the method to `requestResource` since the intent of the method that way is clear. i.e. only request resource and potentially return the `SamzaResourceRequest`.
   
   What and how it does to request resource is kept within and can be inferred by reading the method implementation. The name seems too long and the fact that this returns void makes it unusable in some places which has the exact boiler plate code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r546189549



##########
File path: samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
##########
@@ -117,4 +120,8 @@ public boolean hasSameFaultDomains(String host1, String host2) {
     }
     return hostToRackMap;
   }
+
+  private void initMetrics() {
+    hostToFaultDomainCacheUpdates = metricsRegistry.newCounter(groupName, "host-to-fault-domain-cache-updates");
+  }

Review comment:
       I extracted it just in case we add other metrics in the future. Having said that, I don't see any other metrics being needed as of now, so have inlined.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] PawasChhokra commented on a change in pull request #1446: SAMZA-2605: Make Standby Container Requests Rack Aware

Posted by GitBox <gi...@apache.org>.
PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r546166427



##########
File path: samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
##########
@@ -170,6 +170,26 @@
    */
   public final AtomicInteger failedContainerPlacementActions = new AtomicInteger(0);
 
+  /**
+   * Number of fault domain aware container requests made for a container.

Review comment:
       Yes it should be a for a job. Changed that in the documentation everywhere. Thanks :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org