You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/11/17 21:32:20 UTC

[GitHub] [samza] lakshmi-manasa-g opened a new pull request #1442: SAMZA-2602: Dynamic heartbeat establish with new AM

lakshmi-manasa-g opened a new pull request #1442:
URL: https://github.com/apache/samza/pull/1442


   **Feature:** Main feature is Cluster based Job coordinator (aka AM) high availability  (HA) (TODO: sep/doc how?). The feature ensures that the new AM can establish connection with already running containers to avoid restarting all running containers when AM dies. This PR enables an already running container to establish heartbeat connection with the new AM and introduces a config behind which all of the code for this feature will be.
   
   **Changes:** 
   1. New job config 
   2. ContaineHeartbeatMonitor - will fetch new AM url from cooridnator stream and establish heartbeat with new AM
   
   **Tests:** added unit test
   
   **API changes:** 
   1. config can be set for a job and that will enable AM HA feature.
   2. if config is enabled, container will read coordinator stream until timeout/successful heartbeat reestablish. 
   
   **Usage instructions:** to enable AM HA set config to "true", default value is "false".
   
   **Upgrade instructions:** None


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1442: SAMZA-2602: Dynamic heartbeat establish with new AM

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1442:
URL: https://github.com/apache/samza/pull/1442#discussion_r528730829



##########
File path: samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
##########
@@ -142,7 +143,14 @@ private static void run(
           listener = new ClusterBasedProcessorLifecycleListener(config, processorLifecycleListener, container::shutdown);
       container.setContainerListener(listener);
 
-      ContainerHeartbeatMonitor heartbeatMonitor = createContainerHeartbeatMonitor(container);
+      JobConfig jobConfig = new JobConfig(config);
+      boolean isJobCoordinatorHighAvailabilityEnabled = jobConfig.getJobCoordinatorHighAvailabilityEnabled();
+      long retryCount = jobConfig.getJobCoordinatorDynamicHeartbeatRetryCount();
+      long sleepDurationForReconnectWithAM = jobConfig.getJobCoordinatorHeartbeatReconnectSleepDurationWithAmMs();

Review comment:
       nit: inline instead as variables aren't used after and the getters carry explicit intent of what is being passed as arguments.

##########
File path: samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
##########
@@ -189,9 +197,13 @@ private static void run(
   /**
    * Creates a new container heartbeat monitor if possible.
    * @param container the container to monitor
+   * @param coordinatorStreamStore the metadata store to fetch coordinator url from
+   * @param isJobCoordinatorHighAvailabilityEnabled whether coordinator HA is enabled to fetch new coordinator url

Review comment:
       nit: missing docs for the newly added parameters.

##########
File path: samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
##########
@@ -93,6 +103,79 @@ public void testDoesNotCallbackWhenHeartbeatAlive() throws InterruptedException
     verify(this.scheduler).shutdown();
   }
 
+  @Test
+  public void testReestablishConnectionWithNewAM() throws InterruptedException {
+    String containerExecutionId = "0";
+    String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    ContainerHeartbeatResponse response1 = new ContainerHeartbeatResponse(false);
+    ContainerHeartbeatResponse response2 = new ContainerHeartbeatResponse(true);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(response1).thenReturn(response2);
+    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenReturn(serde.toBytes(newCoordinatorUrl));
+
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);

Review comment:
       we should instead pass our own mocked executor which can run the task that is passed to it and decrement latch. Here is an example - https://github.com/apache/samza/pull/1334

##########
File path: docs/learn/documentation/versioned/jobs/samza-configurations.md
##########
@@ -324,6 +324,9 @@ Samza supports both standalone and clustered ([YARN](yarn-jobs.html)) [deploymen
 |job.container.count|1|The number of YARN containers to request for running your job. This is the main parameter for controlling the scale (allocated computing resources) of your job: to increase the parallelism of processing, you need to increase the number of containers. The minimum is one container, and the maximum number of containers is the number of task instances (usually the number of input stream partitions). Task instances are evenly distributed across the number of containers that you specify.|
 |cluster-manager.container.memory.mb|1024|How much memory, in megabytes, to request from the cluster manager per container of your job. Along with cluster-manager.container.cpu.cores, this property determines how many containers the cluster manager will run on one machine. If the container exceeds this limit, it will be killed, so it is important that the container's actual memory use remains below the limit. The amount of memory used is normally the JVM heap size (configured with task.opts), plus the size of any off-heap memory allocation (for example stores.*.container.cache.size.bytes), plus a safety margin to allow for JVM overheads.|
 |cluster-manager.container.cpu.cores|1|The number of CPU cores to request per container of your job. Each node in the cluster has a certain number of CPU cores available, so this number (along with cluster-manager.container.memory.mb) determines how many containers can be run on one machine.|
+|job.coordinator.high-availability.enabled|false|If true, enables Job Coordinator (AM) high availability (HA) where a new AM can establish connection with already running containers.  
+|job.coordinator.dynamic-heartbeat.retry.count|5|If AM-HA is enabled, when a running container loses heartbeat with AM, this count gives the number of times an already running container will attempt to establish heartbeat with new AM|
+|job.coordinator.dynamic-heartbeat.reconnect-sleep-duration.ms|10000|If AM-HA is enabled, when a running container loses heartbeat with AM, this duration gives the amount of time a running container will sleep between attempts to establish heartbeat with new AM.|

Review comment:
       Alluding to same thread in the refactor PR, we should keep these configurations as YARN configuration.
   I'd recommend to remove dynamic out of the configuration name as well. e.g. `yarn.container.heartbeat.retry.count` and `yarn.container.heartbeat.retry-sleep-duration.ms` or something similar.

##########
File path: docs/learn/documentation/versioned/jobs/samza-configurations.md
##########
@@ -324,6 +324,9 @@ Samza supports both standalone and clustered ([YARN](yarn-jobs.html)) [deploymen
 |job.container.count|1|The number of YARN containers to request for running your job. This is the main parameter for controlling the scale (allocated computing resources) of your job: to increase the parallelism of processing, you need to increase the number of containers. The minimum is one container, and the maximum number of containers is the number of task instances (usually the number of input stream partitions). Task instances are evenly distributed across the number of containers that you specify.|
 |cluster-manager.container.memory.mb|1024|How much memory, in megabytes, to request from the cluster manager per container of your job. Along with cluster-manager.container.cpu.cores, this property determines how many containers the cluster manager will run on one machine. If the container exceeds this limit, it will be killed, so it is important that the container's actual memory use remains below the limit. The amount of memory used is normally the JVM heap size (configured with task.opts), plus the size of any off-heap memory allocation (for example stores.*.container.cache.size.bytes), plus a safety margin to allow for JVM overheads.|
 |cluster-manager.container.cpu.cores|1|The number of CPU cores to request per container of your job. Each node in the cluster has a certain number of CPU cores available, so this number (along with cluster-manager.container.memory.mb) determines how many containers can be run on one machine.|
+|job.coordinator.high-availability.enabled|false|If true, enables Job Coordinator (AM) high availability (HA) where a new AM can establish connection with already running containers.  

Review comment:
       I am leaning towards having this under the `YARN` namespace as well since this is very specific to how we can achieve availability in YARN w.r.t AM going down. For e.g. in ZK model, this doesn't mean much. 
   IIRC, in long run we want to abstract container (expose only `StreamProcessor`) and keep it as an internal construct. 
   @prateekm  can you chime in your opinion here?

##########
File path: samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
##########
@@ -93,6 +103,79 @@ public void testDoesNotCallbackWhenHeartbeatAlive() throws InterruptedException
     verify(this.scheduler).shutdown();
   }
 
+  @Test
+  public void testReestablishConnectionWithNewAM() throws InterruptedException {
+    String containerExecutionId = "0";
+    String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    ContainerHeartbeatResponse response1 = new ContainerHeartbeatResponse(false);
+    ContainerHeartbeatResponse response2 = new ContainerHeartbeatResponse(true);

Review comment:
       Can we extract these to class level constants?
   Perhaps, name them `FAILURE_RESPONSE` and `SUCCESS_RESPONSE` to be explicit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat merged pull request #1442: SAMZA-2602: Dynamic heartbeat establish with new AM

Posted by GitBox <gi...@apache.org>.
mynameborat merged pull request #1442:
URL: https://github.com/apache/samza/pull/1442


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1442: SAMZA-2602: Dynamic heartbeat establish with new AM

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1442:
URL: https://github.com/apache/samza/pull/1442#discussion_r527005009



##########
File path: samza-core/src/main/java/org/apache/samza/config/JobConfig.java
##########
@@ -147,6 +147,11 @@
 
   private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled";
 
+  // Enable ClusterBasedJobCoordinator aka ApplicationMaster High Availability.
+  // High availability allows new AM to establish connection with already running containers
+  public static final String JOB_COORDINATOR_HIGH_AVAILABILITY_ENABLED = "job.coordinator.high-availability.enabled";

Review comment:
       Can we update the configurable table in docs?

##########
File path: samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -34,26 +38,45 @@
 public class ContainerHeartbeatMonitor {
   private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatMonitor.class);
   private static final ThreadFactory THREAD_FACTORY = new HeartbeatThreadFactory();
+  private static final CoordinatorStreamValueSerde SERDE = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+  private static final int RETRY_COUNT = 5;

Review comment:
       Should this be configurable? This is tandem with the `sleepDurationForReconnectWithAM` determines how long we want to container to retry establishing connection with AM.
   
   That said, it also means thats how long you can potentially have an overlapping containers (duplicate container) if AM dies and a new deployment happens. Hence, both of these should be configuration knobs and have good documentation around what the impact of these configurations are.

##########
File path: samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
##########
@@ -93,6 +103,31 @@ public void testDoesNotCallbackWhenHeartbeatAlive() throws InterruptedException
     verify(this.scheduler).shutdown();
   }
 
+  @Test
+  public void testReestablishConnectionWithNewAM() throws InterruptedException {
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            "0", coordinatorStreamStore, true, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    byte[] newCoordinatorUrl = serde.toBytes("http://some-host-2.prod.linkedin.com");
+    ContainerHeartbeatResponse response1 = new ContainerHeartbeatResponse(false);
+    ContainerHeartbeatResponse response2 = new ContainerHeartbeatResponse(true);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(response1).thenReturn(response2);
+    when(this.containerHeartbeatMonitor.getContainerHeartbeatClient()).thenReturn(this.containerHeartbeatClient);

Review comment:
       This looks a bit strange as we are not recreating the heartbeat client for the sake of tests.
   Ideally, you want to verify if a new client is created with a new URL and that gets used.
   
   

##########
File path: samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -84,6 +113,38 @@ public void stop() {
     }
   }
 
+  private boolean checkAndEstablishConnectionWithNewAM() {
+    boolean response = false;
+    int attempt = 1;
+
+    while (attempt <= RETRY_COUNT) {
+      String newCoordinatorUrl = SERDE.fromBytes(coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL));
+      try {
+        if (coordinatorUrl.equals(newCoordinatorUrl)) {
+          LOG.info("Attempt {} to discover new AM. Sleep for {}ms before next attempt.", attempt, sleepDurationForReconnectWithAM);
+          Thread.sleep(sleepDurationForReconnectWithAM);
+        } else {
+          LOG.info("Found new AM: {}. Establishing heartbeat with the new AM.", newCoordinatorUrl);
+          coordinatorUrl = newCoordinatorUrl;
+          containerHeartbeatClient = getContainerHeartbeatClient();
+          response = containerHeartbeatClient.requestHeartbeat().isAlive();
+          LOG.info("Received heartbeat response: {} from new AM: {}", response, this.coordinatorUrl);
+          break;
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted during sleep.");
+        Thread.currentThread().interrupt();
+      }
+      attempt++;
+    }
+    return response;
+  }
+
+  @VisibleForTesting
+  ContainerHeartbeatClient getContainerHeartbeatClient() {

Review comment:
       minor: suggest naming this to create instead of get.

##########
File path: samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -84,6 +113,38 @@ public void stop() {
     }
   }
 
+  private boolean checkAndEstablishConnectionWithNewAM() {

Review comment:
       It might be nice to breakdown the check and the establishing part.
   [1] It makes it easier to write simpler tests but still tests each logic separately
   [2] allows handling and evolution of each modular functions. e.g we don't have any retry for establishing but only for checking today.
   
   wdyt?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1442: SAMZA-2602: Dynamic heartbeat establish with new AM

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1442:
URL: https://github.com/apache/samza/pull/1442#discussion_r527293025



##########
File path: samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -84,6 +113,38 @@ public void stop() {
     }
   }
 
+  private boolean checkAndEstablishConnectionWithNewAM() {

Review comment:
       modular does sound good. but in this case, the smaller functions are really small and dont accomplish much else than a single call to client or store. Hence keeping this same. Also, doesnt have an advantage with testing.

##########
File path: samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
##########
@@ -93,6 +103,31 @@ public void testDoesNotCallbackWhenHeartbeatAlive() throws InterruptedException
     verify(this.scheduler).shutdown();
   }
 
+  @Test
+  public void testReestablishConnectionWithNewAM() throws InterruptedException {
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            "0", coordinatorStreamStore, true, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    byte[] newCoordinatorUrl = serde.toBytes("http://some-host-2.prod.linkedin.com");
+    ContainerHeartbeatResponse response1 = new ContainerHeartbeatResponse(false);
+    ContainerHeartbeatResponse response2 = new ContainerHeartbeatResponse(true);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(response1).thenReturn(response2);
+    when(this.containerHeartbeatMonitor.getContainerHeartbeatClient()).thenReturn(this.containerHeartbeatClient);

Review comment:
       the client's heartbeat response needs to be mocked and hence cant create a new one.
   updated to return mock when using the new url.
   Also added failure scenario tests




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1442: SAMZA-2602: Dynamic heartbeat establish with new AM

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1442:
URL: https://github.com/apache/samza/pull/1442#discussion_r528933321



##########
File path: docs/learn/documentation/versioned/jobs/samza-configurations.md
##########
@@ -324,6 +324,9 @@ Samza supports both standalone and clustered ([YARN](yarn-jobs.html)) [deploymen
 |job.container.count|1|The number of YARN containers to request for running your job. This is the main parameter for controlling the scale (allocated computing resources) of your job: to increase the parallelism of processing, you need to increase the number of containers. The minimum is one container, and the maximum number of containers is the number of task instances (usually the number of input stream partitions). Task instances are evenly distributed across the number of containers that you specify.|
 |cluster-manager.container.memory.mb|1024|How much memory, in megabytes, to request from the cluster manager per container of your job. Along with cluster-manager.container.cpu.cores, this property determines how many containers the cluster manager will run on one machine. If the container exceeds this limit, it will be killed, so it is important that the container's actual memory use remains below the limit. The amount of memory used is normally the JVM heap size (configured with task.opts), plus the size of any off-heap memory allocation (for example stores.*.container.cache.size.bytes), plus a safety margin to allow for JVM overheads.|
 |cluster-manager.container.cpu.cores|1|The number of CPU cores to request per container of your job. Each node in the cluster has a certain number of CPU cores available, so this number (along with cluster-manager.container.memory.mb) determines how many containers can be run on one machine.|
+|job.coordinator.high-availability.enabled|false|If true, enables Job Coordinator (AM) high availability (HA) where a new AM can establish connection with already running containers.  

Review comment:
       sync'd offline and AM HA is very specific to YARN and hence makes more sense for it be under yarn namespace.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1442: SAMZA-2602: Dynamic heartbeat establish with new AM

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1442:
URL: https://github.com/apache/samza/pull/1442#discussion_r528932983



##########
File path: docs/learn/documentation/versioned/jobs/samza-configurations.md
##########
@@ -324,6 +324,9 @@ Samza supports both standalone and clustered ([YARN](yarn-jobs.html)) [deploymen
 |job.container.count|1|The number of YARN containers to request for running your job. This is the main parameter for controlling the scale (allocated computing resources) of your job: to increase the parallelism of processing, you need to increase the number of containers. The minimum is one container, and the maximum number of containers is the number of task instances (usually the number of input stream partitions). Task instances are evenly distributed across the number of containers that you specify.|
 |cluster-manager.container.memory.mb|1024|How much memory, in megabytes, to request from the cluster manager per container of your job. Along with cluster-manager.container.cpu.cores, this property determines how many containers the cluster manager will run on one machine. If the container exceeds this limit, it will be killed, so it is important that the container's actual memory use remains below the limit. The amount of memory used is normally the JVM heap size (configured with task.opts), plus the size of any off-heap memory allocation (for example stores.*.container.cache.size.bytes), plus a safety margin to allow for JVM overheads.|
 |cluster-manager.container.cpu.cores|1|The number of CPU cores to request per container of your job. Each node in the cluster has a certain number of CPU cores available, so this number (along with cluster-manager.container.memory.mb) determines how many containers can be run on one machine.|
+|job.coordinator.high-availability.enabled|false|If true, enables Job Coordinator (AM) high availability (HA) where a new AM can establish connection with already running containers.  
+|job.coordinator.dynamic-heartbeat.retry.count|5|If AM-HA is enabled, when a running container loses heartbeat with AM, this count gives the number of times an already running container will attempt to establish heartbeat with new AM|
+|job.coordinator.dynamic-heartbeat.reconnect-sleep-duration.ms|10000|If AM-HA is enabled, when a running container loses heartbeat with AM, this duration gives the amount of time a running container will sleep between attempts to establish heartbeat with new AM.|

Review comment:
       For bookkeeping, while container heartbeat can serve as a generic model for interaction between job coordinator and container, the contracts today aren't generic enough and are tied into YARN specific implementation.
   
   Standalone, for e.g. uses different mechanism for container/stream processor membership w/ quorum and disconnects. For now, it will be good to move this to YARN specific namespace and revisit it later when we have defined clear contracts for heartbeat as a concept (agnostic to underlying cluster runtime) between container and job coordinator




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org