You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/01/07 19:06:20 UTC

[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1455: SAMZA-2615: Metrics for Application Master High Availability

lakshmi-manasa-g commented on a change in pull request #1455:
URL: https://github.com/apache/samza/pull/1455#discussion_r553003257



##########
File path: docs/learn/documentation/versioned/container/metrics-table.html
##########
@@ -977,6 +983,57 @@ <h1>Samza Metrics Reference</h1>
         <td>single-barrier-rebalancing-time</td>
         <td><a href="#average-time">Average time</a> taken for all the processors to get the latest version of the job model after single processor change (without the occurence of a barrier timeout)</td>
     </tr>
+    <tr>
+        <th colspan="2" class="section" id="job-coordinator-metadata-manager-metrics">org.apache.samza.coordinator.JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics<br><span style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following metrics are applicable when Application Master High Availability is enabled</span></th>
+    </tr>
+    <tr>
+        <td>application-attempt-count</td>
+        <td>Denotes the application attempt count within the scope of a single deployment</td>
+    </tr>
+    <tr>
+        <td>config-changed</td>
+        <td>Denotes configuration changed across attempts within the scope of a single deployment</td>
+    </tr>
+    <tr>
+        <td>job-model-changed</td>
+        <td>Denotes job model changed across attempts within the scope of a single deployment</td>
+    </tr>
+    <tr>
+        <td>metadata-read-failed-count</td>
+        <td>Read failure count from the underlying metadata store</td>
+    </tr>
+    <tr>
+        <td>metadata-write-failed-count</td>
+        <td>Write failure count to the underlying metadata store</td>
+    </tr>
+    <tr>
+        <td>metadata-generation-failed-count</td>
+        <td>Number of times the metadata generation failed</td>
+    </tr>
+    <tr>
+        <td>new-deployment</td>
+        <td>Denotes a new deployment due to changes in metadata either attributed to config or job model</td>
+    </tr>
+    </tbody>
+    <tr>
+        <th colspan="2" class="section" id="container-heartbeat-monitor-metrics">org.apache.samza.container.ContainerHeartbeatMonitor.ContainerHeartbeatMetrics<br><span style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following metrics are applicable when Application Master High Availability is enabled</span></th>
+    </tr>
+    <tr>
+        <td>heartbeat-discovery-time-ms</td>
+        <td>Time taken in millis for the container to discover new AM in the event AM changes for heartbeat establishment</td>
+    </tr>
+    <tr>
+        <td>heartbeat-established-failure-count</td>
+        <td>Number of times failed to establish heartbeat in the event of AM changes</td>
+    </tr>
+    <tr>
+        <td>heartbeat-established-with-new-am-count</td>
+        <td>Number of times heartbeat is established with the new AM in the event of AM changes</td>
+    </tr>
+    <tr>
+        <td>heartbeat-expired-count</td>
+        <td>Number of times heartbeat expired with the active AM</td>

Review comment:
       these are per container or job level metrics?

##########
File path: docs/learn/documentation/versioned/container/metrics-table.html
##########
@@ -977,6 +983,57 @@ <h1>Samza Metrics Reference</h1>
         <td>single-barrier-rebalancing-time</td>
         <td><a href="#average-time">Average time</a> taken for all the processors to get the latest version of the job model after single processor change (without the occurence of a barrier timeout)</td>
     </tr>
+    <tr>
+        <th colspan="2" class="section" id="job-coordinator-metadata-manager-metrics">org.apache.samza.coordinator.JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics<br><span style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following metrics are applicable when Application Master High Availability is enabled</span></th>
+    </tr>
+    <tr>
+        <td>application-attempt-count</td>
+        <td>Denotes the application attempt count within the scope of a single deployment</td>
+    </tr>
+    <tr>
+        <td>config-changed</td>
+        <td>Denotes configuration changed across attempts within the scope of a single deployment</td>
+    </tr>
+    <tr>
+        <td>job-model-changed</td>
+        <td>Denotes job model changed across attempts within the scope of a single deployment</td>
+    </tr>
+    <tr>
+        <td>metadata-read-failed-count</td>
+        <td>Read failure count from the underlying metadata store</td>
+    </tr>
+    <tr>
+        <td>metadata-write-failed-count</td>
+        <td>Write failure count to the underlying metadata store</td>

Review comment:
       reads and writes are specifically in AM HA code and not all reads/writes to the metadata store right. 

##########
File path: samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -55,31 +66,42 @@
   private final long retryCount;
 
   private ContainerHeartbeatClient containerHeartbeatClient;
+  private ContainerHeartbeatMetrics metrics;
+  private Map<String, MetricsReporter> reporters;
   private String coordinatorUrl;
   private boolean started = false;
 
   public ContainerHeartbeatMonitor(Runnable onContainerExpired, String coordinatorUrl, String containerExecutionId,
-      MetadataStore coordinatorStreamStore, boolean isApplicationMasterHighAvailabilityEnabled, long retryCount,
-      long sleepDurationForReconnectWithAM) {
+      MetadataStore coordinatorStreamStore, Config config) {
     this(onContainerExpired, new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId),
         Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY), coordinatorUrl, containerExecutionId,
-        coordinatorStreamStore, isApplicationMasterHighAvailabilityEnabled, retryCount, sleepDurationForReconnectWithAM);
+        coordinatorStreamStore, config);
   }
 
   @VisibleForTesting
   ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient,
       ScheduledExecutorService scheduler, String coordinatorUrl, String containerExecutionId,
-      MetadataStore coordinatorStreamStore, boolean isApplicationMasterHighAvailabilityEnabled,
-      long retryCount, long sleepDurationForReconnectWithAM) {
+      MetadataStore coordinatorStreamStore, Config config) {
     this.onContainerExpired = onContainerExpired;
     this.containerHeartbeatClient = containerHeartbeatClient;
     this.scheduler = scheduler;
     this.coordinatorUrl = coordinatorUrl;
     this.containerExecutionId = containerExecutionId;
     this.coordinatorStreamStore = coordinatorStreamStore;
-    this.isApplicationMasterHighAvailabilityEnabled = isApplicationMasterHighAvailabilityEnabled;
-    this.retryCount = retryCount;
-    this.sleepDurationForReconnectWithAM = sleepDurationForReconnectWithAM;
+
+    JobConfig jobConfig = new JobConfig(config);
+    this.isApplicationMasterHighAvailabilityEnabled = jobConfig.getApplicationMasterHighAvailabilityEnabled();
+    this.retryCount = jobConfig.getContainerHeartbeatRetryCount();
+    this.sleepDurationForReconnectWithAM = jobConfig.getContainerHeartbeatRetrySleepDurationMs();
+
+    initializeMetrics(config);
+  }
+
+  private void initializeMetrics(Config config) {

Review comment:
       nit: please put private methods below public 

##########
File path: samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -100,7 +123,7 @@ public void start() {
           } catch (Exception e) {
             // On exception in re-establish connection with new AM, force exit.
             LOG.error("Exception trying to connect with new AM", e);
-            forceExit("failure in establishing cconnection with new AM", 0);
+            forceExit("failure in establishing cconnection with new AM", SHUTDOWN_TIMOUT_MS);

Review comment:
       thank you for fixing this.
   in case of exception caught in `checkAndEstablishConnectionWithNewAM`, we should increment getHeartbeatEstablishedFailureCount right. 

##########
File path: samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
##########
@@ -177,22 +213,26 @@ public void testConnectToNewAMFailed() throws InterruptedException {
 
   @Test
   public void testConnectToNewAMSerdeException() throws InterruptedException {
-    String containerExecutionId = "0";
     String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
-    this.containerHeartbeatMonitor =
-        spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
-            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    this.containerHeartbeatMonitor = spy(buildContainerHeartbeatMonitor(true));
     CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+    ContainerHeartbeatMonitor.ContainerHeartbeatMetrics metrics = this.containerHeartbeatMonitor.getMetrics();
+
     when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
-    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient);
-    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenThrow(new NullPointerException("serde failed"));
+    when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, CONTAINER_EXECUTION_ID))
+        .thenReturn(this.containerHeartbeatClient);
+    when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL))
+        .thenThrow(new NullPointerException("serde failed"));
 
     this.containerHeartbeatMonitor.start();
     // wait for the executor to finish the heartbeat check task
-    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(10, TimeUnit.SECONDS);
+
     assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    assertEquals("Heartbeat expired count should be 1", 1, metrics.getHeartbeatExpiredCount().getCount());

Review comment:
       should check for getHeartbeatEstablishedFailureCount = 1

##########
File path: samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -91,6 +113,7 @@ public void start() {
     scheduler.scheduleAtFixedRate(() -> {
       ContainerHeartbeatResponse response = containerHeartbeatClient.requestHeartbeat();
       if (!response.isAlive()) {
+        metrics.incrementHeartbeatExpiredCount();

Review comment:
       this metric is emitted even when AM HA is not enabled, do we want to send expired but not established when feature is off?

##########
File path: docs/learn/documentation/versioned/container/metrics-table.html
##########
@@ -977,6 +983,57 @@ <h1>Samza Metrics Reference</h1>
         <td>single-barrier-rebalancing-time</td>
         <td><a href="#average-time">Average time</a> taken for all the processors to get the latest version of the job model after single processor change (without the occurence of a barrier timeout)</td>
     </tr>
+    <tr>
+        <th colspan="2" class="section" id="job-coordinator-metadata-manager-metrics">org.apache.samza.coordinator.JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics<br><span style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following metrics are applicable when Application Master High Availability is enabled</span></th>
+    </tr>
+    <tr>
+        <td>application-attempt-count</td>
+        <td>Denotes the application attempt count within the scope of a single deployment</td>
+    </tr>
+    <tr>
+        <td>config-changed</td>
+        <td>Denotes configuration changed across attempts within the scope of a single deployment</td>
+    </tr>
+    <tr>
+        <td>job-model-changed</td>
+        <td>Denotes job model changed across attempts within the scope of a single deployment</td>
+    </tr>
+    <tr>
+        <td>metadata-read-failed-count</td>
+        <td>Read failure count from the underlying metadata store</td>
+    </tr>
+    <tr>
+        <td>metadata-write-failed-count</td>
+        <td>Write failure count to the underlying metadata store</td>
+    </tr>
+    <tr>
+        <td>metadata-generation-failed-count</td>
+        <td>Number of times the metadata generation failed</td>
+    </tr>
+    <tr>
+        <td>new-deployment</td>
+        <td>Denotes a new deployment due to changes in metadata either attributed to config or job model</td>
+    </tr>
+    </tbody>
+    <tr>
+        <th colspan="2" class="section" id="container-heartbeat-monitor-metrics">org.apache.samza.container.ContainerHeartbeatMonitor.ContainerHeartbeatMetrics<br><span style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following metrics are applicable when Application Master High Availability is enabled</span></th>
+    </tr>
+    <tr>
+        <td>heartbeat-discovery-time-ms</td>
+        <td>Time taken in millis for the container to discover new AM in the event AM changes for heartbeat establishment</td>

Review comment:
       we might want to clarify that this time is set even when heartbeat returned is false. 

##########
File path: samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -150,8 +183,17 @@ ContainerHeartbeatClient createContainerHeartbeatClient(String coordinatorUrl, S
     return new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId);
   }
 
+  @VisibleForTesting
+  ContainerHeartbeatMetrics getMetrics() {
+    return metrics;
+  }
+
   private void forceExit(String message, int timeout) {
     scheduler.schedule(() -> {
+      if (started) {
+        reporters.values().forEach(MetricsReporter::stop);

Review comment:
       iiuc, this is executed when `container.shutdown` invoked in `onContainerExpired.run` does not complete in timeout ms right. I feel we should shutdown the metrics reporters in the path where `onContainerExpired.run` is executed also.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org