You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/09/03 02:47:28 UTC
[airavata-data-lake] branch master updated: Error handling in kafka
serializers
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new d74f1c1 Error handling in kafka serializers
d74f1c1 is described below
commit d74f1c1ec6b434b036c10ea0024278a73746b33d
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Sep 2 22:45:32 2021 -0400
Error handling in kafka serializers
---
.../messaging/consumer/MessageConsumer.java | 4 ----
.../messaging/model/NotificationEventDeserializer.java | 14 ++++++++++++--
.../messaging/model/NotificationEventSerializer.java | 14 ++++++++++++--
3 files changed, 24 insertions(+), 8 deletions(-)
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/consumer/MessageConsumer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/consumer/MessageConsumer.java
index 9af5a59..f787fe7 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/consumer/MessageConsumer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/consumer/MessageConsumer.java
@@ -44,8 +44,6 @@ public class MessageConsumer {
LOGGER.info("Received data orchestrator records {}", partitionRecords.size());
for (ConsumerRecord<String, NotificationEvent> record : partitionRecords) {
-
-
try {
callback.process(record.value());
} catch (Exception exception) {
@@ -53,8 +51,6 @@ public class MessageConsumer {
}finally {
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
}
-
-
}
}
}
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
index bf0265d..a14d4ce 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
@@ -2,6 +2,8 @@ package org.apache.airavata.dataorchestrator.messaging.model;
import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
@@ -9,6 +11,9 @@ import java.util.Map;
* Notification event deserializer
*/
public class NotificationEventDeserializer implements Deserializer<NotificationEvent> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(NotificationEventDeserializer.class);
+
@Override
public void configure(Map<String, ?> map, boolean b) {
@@ -17,8 +22,13 @@ public class NotificationEventDeserializer implements Deserializer<NotificationE
@Override
public NotificationEvent deserialize(String topic, byte[] bytes) {
String deserialized = new String(bytes);
- Gson gson = new Gson();
- return gson.fromJson(deserialized, NotificationEvent.class);
+ try {
+ Gson gson = new Gson();
+ return gson.fromJson(deserialized, NotificationEvent.class);
+ } catch (Exception e) {
+ LOGGER.error("Failed to deserialize the message {}. So returning null", deserialized, e);
+ return null;
+ }
}
@Override
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
index 3ebe998..a98d477 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
@@ -2,6 +2,8 @@ package org.apache.airavata.dataorchestrator.messaging.model;
import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@@ -10,6 +12,9 @@ import java.util.Map;
* Notification event serializer
*/
public class NotificationEventSerializer implements Serializer<NotificationEvent> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(NotificationEventSerializer.class);
+
@Override
public void configure(Map<String, ?> map, boolean b) {
@@ -18,8 +23,13 @@ public class NotificationEventSerializer implements Serializer<NotificationEvent
@Override
public byte[] serialize(String s, NotificationEvent notificationEvent) {
- Gson gson = new Gson();
- return gson.toJson(notificationEvent).getBytes(StandardCharsets.UTF_8);
+ try {
+ Gson gson = new Gson();
+ return gson.toJson(notificationEvent).getBytes(StandardCharsets.UTF_8);
+ } catch (Exception e) {
+ LOGGER.error("Failed to serialize message {}. So returning null", s, e);
+ return null;
+ }
}
@Override