You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/09/20 18:49:50 UTC

[GitHub] [gobblin] phet opened a new pull request #3399: Add retries to KafkaJobStatusMonitor for recovery from transient JobStatus state store failure.

phet opened a new pull request #3399:
URL: https://github.com/apache/gobblin/pull/3399


   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1549
    "GaaS should retry processing Kafka events in case of an error"
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if applicable):
   
   At times, the state store may be unavailable, due to transient connection failure.  Once that resolves, we would like Gobblin Tracking Events to be processed w/o needing to restart the kafka consumer for that to happen.  thus, we continue retrying state store updates encapsulated within the message, so that connection repair is automatic (when within the configurable retry window).
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   additional unit tests, included.
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] aplex merged pull request #3399: Add retries to KafkaJobStatusMonitor for recovery from transient JobStatus state store failure.

Posted by GitBox <gi...@apache.org>.
aplex merged pull request #3399:
URL: https://github.com/apache/gobblin/pull/3399


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] aplex commented on a change in pull request #3399: Add retries to KafkaJobStatusMonitor for recovery from transient JobStatus state store failure.

Posted by GitBox <gi...@apache.org>.
aplex commented on a change in pull request #3399:
URL: https://github.com/apache/gobblin/pull/3399#discussion_r712474473



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -147,33 +181,49 @@ protected void createMetrics() {
 
   @Override
   protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
-    try {
-      GobblinTrackingEvent gobblinTrackingEvent = deserializeEvent(message);
+    GobblinTrackingEvent gobblinTrackingEvent = deserializeEvent(message);
 
-      if (gobblinTrackingEvent == null) {
-        return;
-      }
+    if (gobblinTrackingEvent == null) {
+      return;
+    }
 
-      if (IssueEventBuilder.isIssueEvent(gobblinTrackingEvent)) {
-        try (Timer.Context context = getMetricContext().timer(PROCESS_JOB_ISSUE).time()) {
-          jobIssueEventHandler.processEvent(gobblinTrackingEvent);
-        }
+    if (IssueEventBuilder.isIssueEvent(gobblinTrackingEvent)) {
+      try (Timer.Context context = getMetricContext().timer(PROCESS_JOB_ISSUE).time()) {
+        jobIssueEventHandler.processEvent(gobblinTrackingEvent);
       }
+    }
 
-      if (gobblinTrackingEvent.getName().equals(JobEvent.WORK_UNITS_CREATED)) {
-        emitWorkUnitCountMetric(gobblinTrackingEvent);
-        return;
-      }
+    if (gobblinTrackingEvent.getName().equals(JobEvent.WORK_UNITS_CREATED)) {
+      emitWorkUnitCountMetric(gobblinTrackingEvent);
+      return;
+    }
 
-      org.apache.gobblin.configuration.State jobStatus = parseJobStatus(gobblinTrackingEvent);
-      if (jobStatus != null) {
-        try(Timer.Context context = getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
-          addJobStatusToStateStore(jobStatus, this.stateStore);
+    try {
+      persistJobStatusRetryer.call(() -> {

Review comment:
       I think we should retry all previous statements in this method as well, like deserializing an event(that can fail if kafka schema registry is broken), or saving issues, which can cause trouble if Mysql is down.

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -90,6 +100,10 @@
   @Getter
   private final StateStore<org.apache.gobblin.configuration.State> stateStore;
   private final ScheduledExecutorService scheduledExecutorService;
+  private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of(
+      RETRY_TIME_OUT_MS, TimeUnit.HOURS.toMillis(24L), // after a day, presume non-transient and give up

Review comment:
       Why wouldn't we retry indefinitely? If GaaS is not processing Kafka messages, it's not going to work correctly anyway, so if we give up on processing a kafka partition, it's like shutting down the service.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] phet commented on a change in pull request #3399: Add retries to KafkaJobStatusMonitor for recovery from transient JobStatus state store failure.

Posted by GitBox <gi...@apache.org>.
phet commented on a change in pull request #3399:
URL: https://github.com/apache/gobblin/pull/3399#discussion_r715271733



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -90,6 +100,10 @@
   @Getter
   private final StateStore<org.apache.gobblin.configuration.State> stateStore;
   private final ScheduledExecutorService scheduledExecutorService;
+  private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of(
+      RETRY_TIME_OUT_MS, TimeUnit.HOURS.toMillis(24L), // after a day, presume non-transient and give up

Review comment:
       as for how long/whether to terminate... I agree on possibly a >24 hour lag before getting attention (although, again, the timeout could be configured higher).  and while a steady stream of error messages would hopefully get attention, even more important might be to alarm on the topic offset lag.  that's where I've figured the monitoring should sit.  the question I considered was whether after 24 hours the system would be likely to resolve on its own, through retries, and I judged that unlikely.  overall, ITO getting attention, I'd recommend setting the alarm on the kafka topic offset lag to catch when (at average rate) we might fall behind 30 mins of (per-partition) msgs.
   
   what do you think?
   
   overall, I'm not against setting the default value higher.  but because I'd like to make the maximum (timeout) configurable, it's easiest to presume it's always finite.  on your preference, we could choose a higher value, like 7 * 24 hours, whatever we feel is just too long to continue retrying (certainly before the kafka retention would expire!)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] phet commented on a change in pull request #3399: Add retries to KafkaJobStatusMonitor for recovery from transient JobStatus state store failure.

Posted by GitBox <gi...@apache.org>.
phet commented on a change in pull request #3399:
URL: https://github.com/apache/gobblin/pull/3399#discussion_r715266818



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -90,6 +100,10 @@
   @Getter
   private final StateStore<org.apache.gobblin.configuration.State> stateStore;
   private final ScheduledExecutorService scheduledExecutorService;
+  private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of(
+      RETRY_TIME_OUT_MS, TimeUnit.HOURS.toMillis(24L), // after a day, presume non-transient and give up

Review comment:
       good suggestion...  seems there's no construction of a retry chain; rather the last attempt/exception overwrites the prior one, so no memory growth:
   https://github.com/rholder/guava-retrying/blob/master/src/main/java/com/github/rholder/retry/Retryer.java#L155




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] phet commented on a change in pull request #3399: Add retries to KafkaJobStatusMonitor for recovery from transient JobStatus state store failure.

Posted by GitBox <gi...@apache.org>.
phet commented on a change in pull request #3399:
URL: https://github.com/apache/gobblin/pull/3399#discussion_r715274296



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -147,33 +181,49 @@ protected void createMetrics() {
 
   @Override
   protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
-    try {
-      GobblinTrackingEvent gobblinTrackingEvent = deserializeEvent(message);
+    GobblinTrackingEvent gobblinTrackingEvent = deserializeEvent(message);
 
-      if (gobblinTrackingEvent == null) {
-        return;
-      }
+    if (gobblinTrackingEvent == null) {
+      return;
+    }
 
-      if (IssueEventBuilder.isIssueEvent(gobblinTrackingEvent)) {
-        try (Timer.Context context = getMetricContext().timer(PROCESS_JOB_ISSUE).time()) {
-          jobIssueEventHandler.processEvent(gobblinTrackingEvent);
-        }
+    if (IssueEventBuilder.isIssueEvent(gobblinTrackingEvent)) {
+      try (Timer.Context context = getMetricContext().timer(PROCESS_JOB_ISSUE).time()) {
+        jobIssueEventHandler.processEvent(gobblinTrackingEvent);
       }
+    }
 
-      if (gobblinTrackingEvent.getName().equals(JobEvent.WORK_UNITS_CREATED)) {
-        emitWorkUnitCountMetric(gobblinTrackingEvent);
-        return;
-      }
+    if (gobblinTrackingEvent.getName().equals(JobEvent.WORK_UNITS_CREATED)) {
+      emitWorkUnitCountMetric(gobblinTrackingEvent);
+      return;
+    }
 
-      org.apache.gobblin.configuration.State jobStatus = parseJobStatus(gobblinTrackingEvent);
-      if (jobStatus != null) {
-        try(Timer.Context context = getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
-          addJobStatusToStateStore(jobStatus, this.stateStore);
+    try {
+      persistJobStatusRetryer.call(() -> {

Review comment:
       AFA kafka deserialization erorrs: at present, such bad messages are skipped, not retried, which is what I believe is best.  for the specific case of failure to contact the schema registry, we could retry there too... although I thought a separate effort is underway to draw from a previously cached value of schema IDs...
   
   overall, we may differentiate between retrying and silent failures.  let's restrict the former only to operations likely to succeed if retried.  when the thread dies (for whatever reason), we probably want to raise an externally monitorable condition, to alarm on.  that way it's not "silent".  if we don't already have that, we probably should create a separate ticket to track that monitoring enhancement.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] codecov-commenter commented on pull request #3399: Add retries to KafkaJobStatusMonitor for recovery from transient JobStatus state store failure.

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #3399:
URL: https://github.com/apache/gobblin/pull/3399#issuecomment-923211467


   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3399?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3399](https://codecov.io/gh/apache/gobblin/pull/3399?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (1a3b2d2) into [master](https://codecov.io/gh/apache/gobblin/commit/47707df00a6884ada5974a5f5203408ce1efb890?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (47707df) will **decrease** coverage by `3.22%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/gobblin/pull/3399/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/gobblin/pull/3399?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3399      +/-   ##
   ============================================
   - Coverage     46.54%   43.31%   -3.23%     
   + Complexity    10243     2005    -8238     
   ============================================
     Files          2062      401    -1661     
     Lines         80413    17187   -63226     
     Branches       8981     2115    -6866     
   ============================================
   - Hits          37429     7445   -29984     
   + Misses        39516     8904   -30612     
   + Partials       3468      838    -2630     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3399?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../org/apache/gobblin/util/retry/RetryerFactory.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvcmV0cnkvUmV0cnllckZhY3RvcnkuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...in/java/org/apache/gobblin/cluster/HelixUtils.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhVdGlscy5qYXZh) | `34.71% <0.00%> (-3.31%)` | :arrow_down: |
   | [.../gobblin/iceberg/writer/IcebergMetadataWriter.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1pY2ViZXJnL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2ljZWJlcmcvd3JpdGVyL0ljZWJlcmdNZXRhZGF0YVdyaXRlci5qYXZh) | `72.11% <0.00%> (-0.09%)` | :arrow_down: |
   | [...gobblin/writer/http/AbstractHttpWriterBuilder.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3dyaXRlci9odHRwL0Fic3RyYWN0SHR0cFdyaXRlckJ1aWxkZXIuamF2YQ==) | | |
   | [...rg/apache/gobblin/rest/JobExecutionInfoServer.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1yZXN0LXNlcnZpY2UvZ29iYmxpbi1yZXN0LXNlcnZlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9yZXN0L0pvYkV4ZWN1dGlvbkluZm9TZXJ2ZXIuamF2YQ==) | | |
   | [...n/java/org/apache/gobblin/fork/CopyableSchema.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2ZvcmsvQ29weWFibGVTY2hlbWEuamF2YQ==) | | |
   | [...ava/org/apache/gobblin/scheduler/JobScheduler.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NjaGVkdWxlci9Kb2JTY2hlZHVsZXIuamF2YQ==) | | |
   | [...urce/extractor/DefaultCheckpointableWatermark.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc291cmNlL2V4dHJhY3Rvci9EZWZhdWx0Q2hlY2twb2ludGFibGVXYXRlcm1hcmsuamF2YQ==) | | |
   | [...pache/gobblin/compliance/HivePartitionVersion.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tY29tcGxpYW5jZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9jb21wbGlhbmNlL0hpdmVQYXJ0aXRpb25WZXJzaW9uLmphdmE=) | | |
   | [...g/apache/gobblin/service/NoopRequesterService.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi1mbG93LWNvbmZpZy1zZXJ2aWNlL2dvYmJsaW4tZmxvdy1jb25maWctc2VydmljZS1zZXJ2ZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc2VydmljZS9Ob29wUmVxdWVzdGVyU2VydmljZS5qYXZh) | | |
   | ... and [1653 more](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3399?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3399?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [47707df...1a3b2d2](https://codecov.io/gh/apache/gobblin/pull/3399?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] phet commented on a change in pull request #3399: Add retries to KafkaJobStatusMonitor for recovery from transient JobStatus state store failure.

Posted by GitBox <gi...@apache.org>.
phet commented on a change in pull request #3399:
URL: https://github.com/apache/gobblin/pull/3399#discussion_r713708449



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -147,33 +181,49 @@ protected void createMetrics() {
 
   @Override
   protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
-    try {
-      GobblinTrackingEvent gobblinTrackingEvent = deserializeEvent(message);
+    GobblinTrackingEvent gobblinTrackingEvent = deserializeEvent(message);
 
-      if (gobblinTrackingEvent == null) {
-        return;
-      }
+    if (gobblinTrackingEvent == null) {
+      return;
+    }
 
-      if (IssueEventBuilder.isIssueEvent(gobblinTrackingEvent)) {
-        try (Timer.Context context = getMetricContext().timer(PROCESS_JOB_ISSUE).time()) {
-          jobIssueEventHandler.processEvent(gobblinTrackingEvent);
-        }
+    if (IssueEventBuilder.isIssueEvent(gobblinTrackingEvent)) {
+      try (Timer.Context context = getMetricContext().timer(PROCESS_JOB_ISSUE).time()) {
+        jobIssueEventHandler.processEvent(gobblinTrackingEvent);
       }
+    }
 
-      if (gobblinTrackingEvent.getName().equals(JobEvent.WORK_UNITS_CREATED)) {
-        emitWorkUnitCountMetric(gobblinTrackingEvent);
-        return;
-      }
+    if (gobblinTrackingEvent.getName().equals(JobEvent.WORK_UNITS_CREATED)) {
+      emitWorkUnitCountMetric(gobblinTrackingEvent);
+      return;
+    }
 
-      org.apache.gobblin.configuration.State jobStatus = parseJobStatus(gobblinTrackingEvent);
-      if (jobStatus != null) {
-        try(Timer.Context context = getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
-          addJobStatusToStateStore(jobStatus, this.stateStore);
+    try {
+      persistJobStatusRetryer.call(() -> {

Review comment:
       I looked at that, but grew concerned we might retry the wrong things.  for instance, if a corrupted message somehow lacked the frame-aligning magic byte:
   ```
     public Schema readSchemaVersioningInformation(DataInputStream inputStream)
         throws IOException {
       if (inputStream.readByte() != KafkaAvroSchemaRegistry.MAGIC_BYTE) {
         throw new IOException("MAGIC_BYTE not found in Avro message.");
       }
   ```
   we'd retry, although that would never change.  of the two solutions:
     a. to throw a more specific exception for that condition, so retry would accept the attempt and stop
     b. localize retrying (separately) around that particular invocation of the schema registry
   ... I vote for the latter.
   
   of the other two sub-tasks:
     1. `JobIssueEventHandler.processEvent` doesn't unwind errors to the caller, so no indication of failure within.  while there could be correlation between (suppressed) failure there and later failure with the state store, I wondered whether that wouldn't be too presumptuous... and I was unclear whether `processEvent` is idempotent.
     2. work unit metrics recording only seems to throw on caller error (IllegalArg) and given potentially time-series metrics, I again wasn't clear on idempotence.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] aplex commented on a change in pull request #3399: Add retries to KafkaJobStatusMonitor for recovery from transient JobStatus state store failure.

Posted by GitBox <gi...@apache.org>.
aplex commented on a change in pull request #3399:
URL: https://github.com/apache/gobblin/pull/3399#discussion_r715183873



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -90,6 +100,10 @@
   @Getter
   private final StateStore<org.apache.gobblin.configuration.State> stateStore;
   private final ScheduledExecutorService scheduledExecutorService;
+  private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of(
+      RETRY_TIME_OUT_MS, TimeUnit.HOURS.toMillis(24L), // after a day, presume non-transient and give up

Review comment:
       Although, it's worth checking how this retryer logic works. If it tries to accumulate all exceptions that ever occurred, then it will eventually run out of memory with infinite reties and crash the process.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] codecov-commenter edited a comment on pull request #3399: Add retries to KafkaJobStatusMonitor for recovery from transient JobStatus state store failure.

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3399:
URL: https://github.com/apache/gobblin/pull/3399#issuecomment-923211467


   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3399?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3399](https://codecov.io/gh/apache/gobblin/pull/3399?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (1a3b2d2) into [master](https://codecov.io/gh/apache/gobblin/commit/47707df00a6884ada5974a5f5203408ce1efb890?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (47707df) will **decrease** coverage by `0.02%`.
   > The diff coverage is `7.40%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/gobblin/pull/3399/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/gobblin/pull/3399?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3399      +/-   ##
   ============================================
   - Coverage     46.54%   46.52%   -0.03%     
     Complexity    10243    10243              
   ============================================
     Files          2062     2062              
     Lines         80413    80458      +45     
     Branches       8981     8988       +7     
   ============================================
   + Hits          37429    37430       +1     
   - Misses        39516    39558      +42     
   - Partials       3468     3470       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3399?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...pache/gobblin/configuration/ConfigurationKeys.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vY29uZmlndXJhdGlvbi9Db25maWd1cmF0aW9uS2V5cy5qYXZh) | `0.00% <ø> (ø)` | |
   | [.../org/apache/gobblin/util/retry/RetryerFactory.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvcmV0cnkvUmV0cnllckZhY3RvcnkuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...blin/service/monitoring/KafkaJobStatusMonitor.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9LYWZrYUpvYlN0YXR1c01vbml0b3IuamF2YQ==) | `31.29% <9.52%> (-4.28%)` | :arrow_down: |
   | [...metrics/reporter/util/NoopSchemaVersionWriter.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tZXRyaWNzLWxpYnMvZ29iYmxpbi1tZXRyaWNzLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vbWV0cmljcy9yZXBvcnRlci91dGlsL05vb3BTY2hlbWFWZXJzaW9uV3JpdGVyLmphdmE=) | `50.00% <0.00%> (-16.67%)` | :arrow_down: |
   | [...re/ControllerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9jb3JlL0NvbnRyb2xsZXJVc2VyRGVmaW5lZE1lc3NhZ2VIYW5kbGVyRmFjdG9yeS5qYXZh) | `34.37% <0.00%> (-4.69%)` | :arrow_down: |
   | [...che/gobblin/runtime/TaskStateCollectorService.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvVGFza1N0YXRlQ29sbGVjdG9yU2VydmljZS5qYXZh) | `59.48% <0.00%> (-3.82%)` | :arrow_down: |
   | [...in/java/org/apache/gobblin/cluster/HelixUtils.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhVdGlscy5qYXZh) | `34.71% <0.00%> (-3.31%)` | :arrow_down: |
   | [...ics/reporter/util/SchemaRegistryVersionWriter.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL21ldHJpY3MvcmVwb3J0ZXIvdXRpbC9TY2hlbWFSZWdpc3RyeVZlcnNpb25Xcml0ZXIuamF2YQ==) | `27.45% <0.00%> (-1.72%)` | :arrow_down: |
   | [...gobblin/service/modules/core/GitConfigMonitor.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9jb3JlL0dpdENvbmZpZ01vbml0b3IuamF2YQ==) | `81.35% <0.00%> (-1.70%)` | :arrow_down: |
   | [.../gobblin/iceberg/writer/IcebergMetadataWriter.java](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1pY2ViZXJnL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2ljZWJlcmcvd3JpdGVyL0ljZWJlcmdNZXRhZGF0YVdyaXRlci5qYXZh) | `72.11% <0.00%> (-0.09%)` | :arrow_down: |
   | ... and [3 more](https://codecov.io/gh/apache/gobblin/pull/3399/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3399?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3399?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [47707df...1a3b2d2](https://codecov.io/gh/apache/gobblin/pull/3399?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] aplex commented on a change in pull request #3399: Add retries to KafkaJobStatusMonitor for recovery from transient JobStatus state store failure.

Posted by GitBox <gi...@apache.org>.
aplex commented on a change in pull request #3399:
URL: https://github.com/apache/gobblin/pull/3399#discussion_r715185960



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -147,33 +181,49 @@ protected void createMetrics() {
 
   @Override
   protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
-    try {
-      GobblinTrackingEvent gobblinTrackingEvent = deserializeEvent(message);
+    GobblinTrackingEvent gobblinTrackingEvent = deserializeEvent(message);
 
-      if (gobblinTrackingEvent == null) {
-        return;
-      }
+    if (gobblinTrackingEvent == null) {
+      return;
+    }
 
-      if (IssueEventBuilder.isIssueEvent(gobblinTrackingEvent)) {
-        try (Timer.Context context = getMetricContext().timer(PROCESS_JOB_ISSUE).time()) {
-          jobIssueEventHandler.processEvent(gobblinTrackingEvent);
-        }
+    if (IssueEventBuilder.isIssueEvent(gobblinTrackingEvent)) {
+      try (Timer.Context context = getMetricContext().timer(PROCESS_JOB_ISSUE).time()) {
+        jobIssueEventHandler.processEvent(gobblinTrackingEvent);
       }
+    }
 
-      if (gobblinTrackingEvent.getName().equals(JobEvent.WORK_UNITS_CREATED)) {
-        emitWorkUnitCountMetric(gobblinTrackingEvent);
-        return;
-      }
+    if (gobblinTrackingEvent.getName().equals(JobEvent.WORK_UNITS_CREATED)) {
+      emitWorkUnitCountMetric(gobblinTrackingEvent);
+      return;
+    }
 
-      org.apache.gobblin.configuration.State jobStatus = parseJobStatus(gobblinTrackingEvent);
-      if (jobStatus != null) {
-        try(Timer.Context context = getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
-          addJobStatusToStateStore(jobStatus, this.stateStore);
+    try {
+      persistJobStatusRetryer.call(() -> {

Review comment:
       Even if it's kafka deserialization problem, I think it could be beneficial to spam this error in logs repeatedly, so it's easier to find in logs(we just need to look at the recent errors). Also, those implementations can change over time. 
   
   It kind of feels that if any of the kafka processing thread dies, the service will be in the state that it should never be in, and it will be silent about it. Maybe those retries should happen at even higher level, the entry point of the thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] aplex commented on a change in pull request #3399: Add retries to KafkaJobStatusMonitor for recovery from transient JobStatus state store failure.

Posted by GitBox <gi...@apache.org>.
aplex commented on a change in pull request #3399:
URL: https://github.com/apache/gobblin/pull/3399#discussion_r715181274



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -90,6 +100,10 @@
   @Getter
   private final StateStore<org.apache.gobblin.configuration.State> stateStore;
   private final ScheduledExecutorService scheduledExecutorService;
+  private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of(
+      RETRY_TIME_OUT_MS, TimeUnit.HOURS.toMillis(24L), // after a day, presume non-transient and give up

Review comment:
       I can think of a couple of example where those retries can be beneficial over longer period:
   
   - If something is broken on EI environment on weekend, people might not jump in to repair it for a couple of days.
   - While retries are happening, there is a persistent stream of error messages, they will continuously remind SREs that the problem is still happening. If one of the processing thread dies, service will just go quite. Then it will be hard to tell why exactly it has died and when.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] phet commented on a change in pull request #3399: Add retries to KafkaJobStatusMonitor for recovery from transient JobStatus state store failure.

Posted by GitBox <gi...@apache.org>.
phet commented on a change in pull request #3399:
URL: https://github.com/apache/gobblin/pull/3399#discussion_r713689693



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
##########
@@ -90,6 +100,10 @@
   @Getter
   private final StateStore<org.apache.gobblin.configuration.State> stateStore;
   private final ScheduledExecutorService scheduledExecutorService;
+  private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of(
+      RETRY_TIME_OUT_MS, TimeUnit.HOURS.toMillis(24L), // after a day, presume non-transient and give up

Review comment:
       we could consider indefinite retries, although I wanted to set a finite boundary to avoid "infinite looping".  if the error we're presuming transient takes >24 hours to resolve it probably merits human intervention.  at that point an operator is there to restart the service.  realistically, the probability of a truly transient error continuing >24 hours seems low.
   
   if actually a problem with the message itself and/or incompatibility against the DB schema, that may require manual advance of the offset to skip the message or to adjust systems.  concern we might misdiagnose something like that as a transient error made me want to prevent an infinite loop.
   
   still, while this is finite, it may always be overridden by config:
   ```
     public static final String KAFKA_JOB_STATUS_MONITOR_RETRY_TIME_OUT_MINUTES =
         "gobblin.kafka.jobStatusMonitor.retry.timeOut.minutes";
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org