You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/10/26 23:39:08 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1300] Rethrow exceptions when storing job status in state store

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d1875e6  [GOBBLIN-1300] Rethrow exceptions when storing job status in state store
d1875e6 is described below

commit d1875e61080ef68251e76a27452a2126ddd77e7f
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Mon Oct 26 16:38:59 2020 -0700

    [GOBBLIN-1300] Rethrow exceptions when storing job status in state store
    
    Closes #3138 from jack-moseley/state-store-
    exception
---
 .../gobblin/runtime/KafkaAvroJobStatusMonitorTest.java   |  2 +-
 .../service/monitoring/KafkaAvroJobStatusMonitor.java    | 16 +++++++---------
 .../service/monitoring/KafkaJobStatusMonitor.java        |  8 ++++----
 3 files changed, 12 insertions(+), 14 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index a4df1c5..c443b42 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -176,7 +176,7 @@ public class KafkaAvroJobStatusMonitorTest {
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
 
     messageAndMetadata = iterator.next();
-    Assert.assertNull(jobStatusMonitor.parseJobStatus(messageAndMetadata.message()));
+    Assert.assertNull(jobStatusMonitor.parseJobStatus(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata)));
 
     // Check that state didn't get set to running since it was already complete
     messageAndMetadata = iterator.next();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index a4250d5..3ea701c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Properties;
 
-import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
@@ -36,6 +35,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
@@ -86,21 +86,19 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
   }
 
   @Override
-  public org.apache.gobblin.configuration.State parseJobStatus(byte[] message)
-      throws IOException {
-    InputStream is = new ByteArrayInputStream(message);
-    schemaVersionWriter.readSchemaVersioningInformation(new DataInputStream(is));
-
-    Decoder decoder = DecoderFactory.get().binaryDecoder(is, this.decoder.get());
+  public org.apache.gobblin.configuration.State parseJobStatus(DecodeableKafkaRecord<byte[],byte[]> message) {
     try {
+      InputStream is = new ByteArrayInputStream(message.getValue());
+      schemaVersionWriter.readSchemaVersioningInformation(new DataInputStream(is));
+      Decoder decoder = DecoderFactory.get().binaryDecoder(is, this.decoder.get());
       GobblinTrackingEvent decodedMessage = this.reader.get().read(null, decoder);
       return parseJobStatus(decodedMessage);
     } catch (Exception exc) {
       this.messageParseFailures.mark();
       if (this.messageParseFailures.getFiveMinuteRate() < 1) {
-        log.warn("Unable to decode input message.", exc);
+        log.warn("Unable to decode input message at kafka offset" + message.getOffset(), exc);
       } else {
-        log.warn("Unable to decode input message.");
+        log.warn("Unable to decode input message at kafka offset" + message.getOffset());
       }
       return null;
     }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index ed879df..68f48c9 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -130,15 +130,15 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
   @Override
   protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
     try {
-      org.apache.gobblin.configuration.State jobStatus = parseJobStatus(message.getValue());
+      org.apache.gobblin.configuration.State jobStatus = parseJobStatus(message);
       if (jobStatus != null) {
         try(Timer.Context context = getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
           addJobStatusToStateStore(jobStatus, this.stateStore);
         }
       }
     } catch (IOException ioe) {
-      String messageStr = new String(message.getValue(), Charsets.UTF_8);
-      log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.getOffset(), messageStr), ioe);
+      // Throw RuntimeException to avoid advancing kafka offsets without updating state store
+      throw new RuntimeException("Failed to add job status to state store for kafka offset " + message.getOffset(), ioe);
     }
   }
 
@@ -232,6 +232,6 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
     return Long.parseLong(Splitter.on(STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(0));
   }
 
-  public abstract org.apache.gobblin.configuration.State parseJobStatus(byte[] message) throws IOException;
+  public abstract org.apache.gobblin.configuration.State parseJobStatus(DecodeableKafkaRecord<byte[],byte[]> message);
 
 }