You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/09/03 02:47:28 UTC

[airavata-data-lake] branch master updated: Error handling in kafka serializers

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new d74f1c1  Error handling in kafka serializers
d74f1c1 is described below

commit d74f1c1ec6b434b036c10ea0024278a73746b33d
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Sep 2 22:45:32 2021 -0400

    Error handling in kafka serializers
---
 .../messaging/consumer/MessageConsumer.java                |  4 ----
 .../messaging/model/NotificationEventDeserializer.java     | 14 ++++++++++++--
 .../messaging/model/NotificationEventSerializer.java       | 14 ++++++++++++--
 3 files changed, 24 insertions(+), 8 deletions(-)

diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/consumer/MessageConsumer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/consumer/MessageConsumer.java
index 9af5a59..f787fe7 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/consumer/MessageConsumer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/consumer/MessageConsumer.java
@@ -44,8 +44,6 @@ public class MessageConsumer {
                     LOGGER.info("Received data orchestrator records {}", partitionRecords.size());
 
                     for (ConsumerRecord<String, NotificationEvent> record : partitionRecords) {
-
-
                         try {
                             callback.process(record.value());
                         } catch (Exception exception) {
@@ -53,8 +51,6 @@ public class MessageConsumer {
                         }finally {
                             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
                         }
-
-
                     }
                 }
             }
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
index bf0265d..a14d4ce 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
@@ -2,6 +2,8 @@ package org.apache.airavata.dataorchestrator.messaging.model;
 
 import com.google.gson.Gson;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
@@ -9,6 +11,9 @@ import java.util.Map;
  * Notification event deserializer
  */
 public class NotificationEventDeserializer implements Deserializer<NotificationEvent> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(NotificationEventDeserializer.class);
+
     @Override
     public void configure(Map<String, ?> map, boolean b) {
 
@@ -17,8 +22,13 @@ public class NotificationEventDeserializer implements Deserializer<NotificationE
     @Override
     public NotificationEvent deserialize(String topic, byte[] bytes) {
         String deserialized = new String(bytes);
-        Gson gson = new Gson();
-        return gson.fromJson(deserialized, NotificationEvent.class);
+        try {
+            Gson gson = new Gson();
+            return gson.fromJson(deserialized, NotificationEvent.class);
+        } catch (Exception e) {
+            LOGGER.error("Failed to deserialize the message {}. So returning null", deserialized, e);
+            return null;
+        }
     }
 
     @Override
diff --git a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
index 3ebe998..a98d477 100644
--- a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
+++ b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
@@ -2,6 +2,8 @@ package org.apache.airavata.dataorchestrator.messaging.model;
 
 import com.google.gson.Gson;
 import org.apache.kafka.common.serialization.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
@@ -10,6 +12,9 @@ import java.util.Map;
  * Notification event serializer
  */
 public class NotificationEventSerializer implements Serializer<NotificationEvent> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(NotificationEventSerializer.class);
+
     @Override
     public void configure(Map<String, ?> map, boolean b) {
 
@@ -18,8 +23,13 @@ public class NotificationEventSerializer implements Serializer<NotificationEvent
     @Override
     public byte[] serialize(String s, NotificationEvent notificationEvent) {
 
-        Gson gson = new Gson();
-        return gson.toJson(notificationEvent).getBytes(StandardCharsets.UTF_8);
+        try {
+            Gson gson = new Gson();
+            return gson.toJson(notificationEvent).getBytes(StandardCharsets.UTF_8);
+        } catch (Exception e) {
+            LOGGER.error("Failed to serialize message {}. So returning null", s, e);
+            return null;
+        }
     }
 
     @Override