You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/09/20 20:04:56 UTC

[GitHub] [gobblin] aplex commented on a change in pull request #3399: Add retries to KafkaJobStatusMonitor for recovery from transient JobStatus state store failure.

aplex commented on a change in pull request #3399:
URL: https://github.com/apache/gobblin/pull/3399#discussion_r712474473



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -147,33 +181,49 @@ protected void createMetrics() {
 
   @Override
   protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
-    try {
-      GobblinTrackingEvent gobblinTrackingEvent = deserializeEvent(message);
+    GobblinTrackingEvent gobblinTrackingEvent = deserializeEvent(message);
 
-      if (gobblinTrackingEvent == null) {
-        return;
-      }
+    if (gobblinTrackingEvent == null) {
+      return;
+    }
 
-      if (IssueEventBuilder.isIssueEvent(gobblinTrackingEvent)) {
-        try (Timer.Context context = getMetricContext().timer(PROCESS_JOB_ISSUE).time()) {
-          jobIssueEventHandler.processEvent(gobblinTrackingEvent);
-        }
+    if (IssueEventBuilder.isIssueEvent(gobblinTrackingEvent)) {
+      try (Timer.Context context = getMetricContext().timer(PROCESS_JOB_ISSUE).time()) {
+        jobIssueEventHandler.processEvent(gobblinTrackingEvent);
       }
+    }
 
-      if (gobblinTrackingEvent.getName().equals(JobEvent.WORK_UNITS_CREATED)) {
-        emitWorkUnitCountMetric(gobblinTrackingEvent);
-        return;
-      }
+    if (gobblinTrackingEvent.getName().equals(JobEvent.WORK_UNITS_CREATED)) {
+      emitWorkUnitCountMetric(gobblinTrackingEvent);
+      return;
+    }
 
-      org.apache.gobblin.configuration.State jobStatus = parseJobStatus(gobblinTrackingEvent);
-      if (jobStatus != null) {
-        try(Timer.Context context = getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
-          addJobStatusToStateStore(jobStatus, this.stateStore);
+    try {
+      persistJobStatusRetryer.call(() -> {

Review comment:
       I think we should retry all previous statements in this method as well, like deserializing an event(that can fail if kafka schema registry is broken), or saving issues, which can cause trouble if Mysql is down.

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -90,6 +100,10 @@
   @Getter
   private final StateStore<org.apache.gobblin.configuration.State> stateStore;
   private final ScheduledExecutorService scheduledExecutorService;
+  private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of(
+      RETRY_TIME_OUT_MS, TimeUnit.HOURS.toMillis(24L), // after a day, presume non-transient and give up

Review comment:
       Why wouldn't we retry indefinitely? If GaaS is not processing Kafka messages, it's not going to work correctly anyway, so if we give up on processing a kafka partition, it's like shutting down the service.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org