You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/10/26 23:39:08 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1300] Rethrow
exceptions when storing job status in state store
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d1875e6 [GOBBLIN-1300] Rethrow exceptions when storing job status in state store
d1875e6 is described below
commit d1875e61080ef68251e76a27452a2126ddd77e7f
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Mon Oct 26 16:38:59 2020 -0700
[GOBBLIN-1300] Rethrow exceptions when storing job status in state store
Closes #3138 from jack-moseley/state-store-
exception
---
.../gobblin/runtime/KafkaAvroJobStatusMonitorTest.java | 2 +-
.../service/monitoring/KafkaAvroJobStatusMonitor.java | 16 +++++++---------
.../service/monitoring/KafkaJobStatusMonitor.java | 8 ++++----
3 files changed, 12 insertions(+), 14 deletions(-)
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index a4df1c5..c443b42 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -176,7 +176,7 @@ public class KafkaAvroJobStatusMonitorTest {
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
messageAndMetadata = iterator.next();
- Assert.assertNull(jobStatusMonitor.parseJobStatus(messageAndMetadata.message()));
+ Assert.assertNull(jobStatusMonitor.parseJobStatus(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata)));
// Check that state didn't get set to running since it was already complete
messageAndMetadata = iterator.next();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index a4250d5..3ea701c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
-import org.apache.avro.AvroRuntimeException;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
@@ -36,6 +35,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
@@ -86,21 +86,19 @@ public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
}
@Override
- public org.apache.gobblin.configuration.State parseJobStatus(byte[] message)
- throws IOException {
- InputStream is = new ByteArrayInputStream(message);
- schemaVersionWriter.readSchemaVersioningInformation(new DataInputStream(is));
-
- Decoder decoder = DecoderFactory.get().binaryDecoder(is, this.decoder.get());
+ public org.apache.gobblin.configuration.State parseJobStatus(DecodeableKafkaRecord<byte[],byte[]> message) {
try {
+ InputStream is = new ByteArrayInputStream(message.getValue());
+ schemaVersionWriter.readSchemaVersioningInformation(new DataInputStream(is));
+ Decoder decoder = DecoderFactory.get().binaryDecoder(is, this.decoder.get());
GobblinTrackingEvent decodedMessage = this.reader.get().read(null, decoder);
return parseJobStatus(decodedMessage);
} catch (Exception exc) {
this.messageParseFailures.mark();
if (this.messageParseFailures.getFiveMinuteRate() < 1) {
- log.warn("Unable to decode input message.", exc);
+ log.warn("Unable to decode input message at kafka offset" + message.getOffset(), exc);
} else {
- log.warn("Unable to decode input message.");
+ log.warn("Unable to decode input message at kafka offset" + message.getOffset());
}
return null;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index ed879df..68f48c9 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -130,15 +130,15 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
@Override
protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
try {
- org.apache.gobblin.configuration.State jobStatus = parseJobStatus(message.getValue());
+ org.apache.gobblin.configuration.State jobStatus = parseJobStatus(message);
if (jobStatus != null) {
try(Timer.Context context = getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
addJobStatusToStateStore(jobStatus, this.stateStore);
}
}
} catch (IOException ioe) {
- String messageStr = new String(message.getValue(), Charsets.UTF_8);
- log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.getOffset(), messageStr), ioe);
+ // Throw RuntimeException to avoid advancing kafka offsets without updating state store
+ throw new RuntimeException("Failed to add job status to state store for kafka offset " + message.getOffset(), ioe);
}
}
@@ -232,6 +232,6 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
return Long.parseLong(Splitter.on(STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(0));
}
- public abstract org.apache.gobblin.configuration.State parseJobStatus(byte[] message) throws IOException;
+ public abstract org.apache.gobblin.configuration.State parseJobStatus(DecodeableKafkaRecord<byte[],byte[]> message);
}