You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2022/05/25 18:17:23 UTC

[GitHub] [gobblin] ZihanLi58 commented on a diff in pull request #3513: [GOBBLIN-1652]Add more log in the KafkaJobStatusMonitor in case it fails to process one GobblinTrackingEvent

ZihanLi58 commented on code in PR #3513:
URL: https://github.com/apache/gobblin/pull/3513#discussion_r881976009


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -221,56 +221,63 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
   @VisibleForTesting
   static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus, StateStore stateStore)
       throws IOException {
-    if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
-      jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, JobStatusRetriever.NA_KEY);
-    }
-    if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) {
-      jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
-    }
-    String flowName = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
-    String flowGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
-    String flowExecutionId = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
-    String jobName = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
-    String jobGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
-    String storeName = jobStatusStoreName(flowGroup, flowName);
-    String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
-
-    List<org.apache.gobblin.configuration.State> states = stateStore.getAll(storeName, tableName);
-    if (states.size() > 0) {
-      org.apache.gobblin.configuration.State previousJobStatus = states.get(states.size() - 1);
-      String previousStatus = previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
-      String currentStatus = jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
-      int previousGeneration = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, 1);
-      // This is to make the change backward compatible as we may not have this info in cluster events
-      // If we does not have those info, we treat the event as coming from the same attempts as previous one
-      int currentGeneration = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, previousGeneration);
-      int previousAttempts = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
-      int currentAttempts = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, previousAttempts);
-      // We use three things to accurately count and thereby bound retries, even amidst out-of-order events (by skipping late arrivals).
-      // The generation is monotonically increasing, while the attempts may re-initialize back to 0. this two-part form prevents the composite value from ever repeating.
-      // And job status reflect the execution status in one attempt
-      if (previousStatus != null && currentStatus != null &&
-          (previousGeneration > currentGeneration
-              || (previousGeneration == currentGeneration && previousAttempts > currentAttempts)
-              || (previousGeneration == currentGeneration && previousAttempts == currentAttempts
-              && ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))){
-        log.warn(String.format("Received status [generation.attempts] = %s [%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
-            currentStatus, currentGeneration, currentAttempts, previousStatus, previousGeneration, previousAttempts, flowGroup, flowName, flowExecutionId, jobGroup, jobName));
-        jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
-      } else {
-        jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+    try {
+      if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
+        jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, JobStatusRetriever.NA_KEY);
+      }
+      if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) {
+        jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
+      }
+      String flowName = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+      String flowGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+      String flowExecutionId = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+      String jobName = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+      String jobGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+      String storeName = jobStatusStoreName(flowGroup, flowName);
+      String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
+
+      List<org.apache.gobblin.configuration.State> states = stateStore.getAll(storeName, tableName);
+      if (states.size() > 0) {
+        org.apache.gobblin.configuration.State previousJobStatus = states.get(states.size() - 1);
+        String previousStatus = previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+        String currentStatus = jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+        int previousGeneration = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, 1);
+        // This is to make the change backward compatible as we may not have this info in cluster events
+        // If we does not have those info, we treat the event as coming from the same attempts as previous one
+        int currentGeneration = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, previousGeneration);
+        int previousAttempts = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
+        int currentAttempts = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, previousAttempts);
+        // We use three things to accurately count and thereby bound retries, even amidst out-of-order events (by skipping late arrivals).
+        // The generation is monotonically increasing, while the attempts may re-initialize back to 0. this two-part form prevents the composite value from ever repeating.
+        // And job status reflect the execution status in one attempt
+        if (previousStatus != null && currentStatus != null && (previousGeneration > currentGeneration || (
+            previousGeneration == currentGeneration && previousAttempts > currentAttempts) || (previousGeneration == currentGeneration && previousAttempts == currentAttempts
+            && ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
+            < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))) {
+          log.warn(String.format(
+              "Received status [generation.attempts] = %s [%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
+              currentStatus, currentGeneration, currentAttempts, previousStatus, previousGeneration, previousAttempts,
+              flowGroup, flowName, flowExecutionId, jobGroup, jobName));
+          jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
+        } else {
+          jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
+        }
       }
-    }
 
-    modifyStateIfRetryRequired(jobStatus);
-    stateStore.put(storeName, tableName, jobStatus);
+      modifyStateIfRetryRequired(jobStatus);
+      stateStore.put(storeName, tableName, jobStatus);
+    } catch (Exception e) {

Review Comment:
   We are seeing NPE without any information, so want to catch the entire method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org