You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/11/19 16:30:32 UTC

[GitHub] [samza] mynameborat commented on a change in pull request #1442: SAMZA-2602: Dynamic heartbeat establish with new AM

mynameborat commented on a change in pull request #1442:
URL: https://github.com/apache/samza/pull/1442#discussion_r527005009



##########
File path: samza-core/src/main/java/org/apache/samza/config/JobConfig.java
##########
@@ -147,6 +147,11 @@
 
   private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled";
 
+  // Enable ClusterBasedJobCoordinator aka ApplicationMaster High Availability.
+  // High availability allows new AM to establish connection with already running containers
+  public static final String JOB_COORDINATOR_HIGH_AVAILABILITY_ENABLED = "job.coordinator.high-availability.enabled";

Review comment:
       Can we update the configurable table in docs?

##########
File path: samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -34,26 +38,45 @@
 public class ContainerHeartbeatMonitor {
   private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatMonitor.class);
   private static final ThreadFactory THREAD_FACTORY = new HeartbeatThreadFactory();
+  private static final CoordinatorStreamValueSerde SERDE = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+  private static final int RETRY_COUNT = 5;

Review comment:
       Should this be configurable? This is tandem with the `sleepDurationForReconnectWithAM` determines how long we want to container to retry establishing connection with AM.
   
   That said, it also means thats how long you can potentially have an overlapping containers (duplicate container) if AM dies and a new deployment happens. Hence, both of these should be configuration knobs and have good documentation around what the impact of these configurations are.

##########
File path: samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
##########
@@ -93,6 +103,31 @@ public void testDoesNotCallbackWhenHeartbeatAlive() throws InterruptedException
     verify(this.scheduler).shutdown();
   }
 
+  @Test
+  public void testReestablishConnectionWithNewAM() throws InterruptedException {
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            "0", coordinatorStreamStore, true, 10));
+    CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    byte[] newCoordinatorUrl = serde.toBytes("http://some-host-2.prod.linkedin.com");
+    ContainerHeartbeatResponse response1 = new ContainerHeartbeatResponse(false);
+    ContainerHeartbeatResponse response2 = new ContainerHeartbeatResponse(true);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(response1).thenReturn(response2);
+    when(this.containerHeartbeatMonitor.getContainerHeartbeatClient()).thenReturn(this.containerHeartbeatClient);

Review comment:
       This looks a bit strange as we are not recreating the heartbeat client for the sake of tests.
   Ideally, you want to verify if a new client is created with a new URL and that gets used.
   
   

##########
File path: samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -84,6 +113,38 @@ public void stop() {
     }
   }
 
+  private boolean checkAndEstablishConnectionWithNewAM() {
+    boolean response = false;
+    int attempt = 1;
+
+    while (attempt <= RETRY_COUNT) {
+      String newCoordinatorUrl = SERDE.fromBytes(coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL));
+      try {
+        if (coordinatorUrl.equals(newCoordinatorUrl)) {
+          LOG.info("Attempt {} to discover new AM. Sleep for {}ms before next attempt.", attempt, sleepDurationForReconnectWithAM);
+          Thread.sleep(sleepDurationForReconnectWithAM);
+        } else {
+          LOG.info("Found new AM: {}. Establishing heartbeat with the new AM.", newCoordinatorUrl);
+          coordinatorUrl = newCoordinatorUrl;
+          containerHeartbeatClient = getContainerHeartbeatClient();
+          response = containerHeartbeatClient.requestHeartbeat().isAlive();
+          LOG.info("Received heartbeat response: {} from new AM: {}", response, this.coordinatorUrl);
+          break;
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted during sleep.");
+        Thread.currentThread().interrupt();
+      }
+      attempt++;
+    }
+    return response;
+  }
+
+  @VisibleForTesting
+  ContainerHeartbeatClient getContainerHeartbeatClient() {

Review comment:
       minor: suggest naming this to create instead of get.

##########
File path: samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -84,6 +113,38 @@ public void stop() {
     }
   }
 
+  private boolean checkAndEstablishConnectionWithNewAM() {

Review comment:
       It might be nice to breakdown the check and the establishing part.
   [1] It makes it easier to write simpler tests but still tests each logic separately
   [2] allows handling and evolution of each modular functions. e.g we don't have any retry for establishing but only for checking today.
   
   wdyt?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org