You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by pi...@apache.org on 2022/06/15 15:45:04 UTC

[atlas] branch master updated: ATLAS-4575 : SDX Version Compatibility - Handle cross version messages

This is an automated email from the ASF dual-hosted git repository.

pinal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new bad730c76 ATLAS-4575 : SDX Version Compatibility - Handle cross version messages
bad730c76 is described below

commit bad730c76de50ae9b4682b1c0a357d70edccbcc9
Author: vinayak.marraiya <vi...@freestoneinfotech.com>
AuthorDate: Tue Jun 14 14:14:55 2022 +0530

    ATLAS-4575 : SDX Version Compatibility - Handle cross version messages
    
    Signed-off-by: Pinal Shah <pi...@freestoneinfotech.com>
---
 .../AtlasNotificationMessageDeserializer.java      | 13 ++++++++++
 .../org/apache/atlas/kafka/KafkaConsumerTest.java  | 28 ++++++++++++++++++++++
 2 files changed, 41 insertions(+)

diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
index b43bc7c66..207747d7d 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -26,6 +26,7 @@ import org.apache.atlas.model.notification.AtlasNotificationMessage;
 import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.model.notification.MessageVersion;
+import org.apache.atlas.model.notification.MessageSource;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -233,6 +234,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
 
                 AtlasNotificationMessage<T> atlasNotificationMessage = AtlasType.fromV1Json(msgJson, notificationMessageType);
 
+                checkCrossCombatMessageVersion(atlasNotificationMessage);
                 checkVersion(atlasNotificationMessage, msgJson);
 
                 ret = atlasNotificationMessage.getMessage();
@@ -317,4 +319,15 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
             notificationLogger.info(String.format(VERSION_MISMATCH_MSG, expectedVersion, notificationMessage.getVersion(), messageJson));
         }
     }
+
+    protected void checkCrossCombatMessageVersion(AtlasNotificationBaseMessage notificationMessage) {
+        String        sourceVersion             = new MessageSource(this.getClass().getSimpleName()).getVersion();
+        MessageSource notificationSourceVersion = notificationMessage.getSource();
+
+        if (notificationMessage.getSource() != null) {
+            if (!StringUtils.equalsIgnoreCase(notificationSourceVersion.getVersion(), sourceVersion)) {
+                LOG.warn("Hook and Atlas server build versions are not similar : {}, {}", notificationSourceVersion.getVersion(), sourceVersion);
+            }
+        }
+    }
 }
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 1af1f3efa..425c8941b 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -20,6 +20,7 @@ package org.apache.atlas.kafka;
 
 import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.model.notification.MessageSource;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.instance.Struct;
 import org.apache.atlas.notification.IncompatibleVersionException;
@@ -61,6 +62,9 @@ public class KafkaConsumerTest {
     @Mock
     private KafkaConsumer kafkaConsumer;
 
+    @Mock
+    private MessageSource messageSource;
+
     @BeforeMethod
     public void setup() {
         MockitoAnnotations.initMocks(this);
@@ -170,4 +174,28 @@ public class KafkaConsumerTest {
         assertEquals(deserializedEntity.getTraits(), entity.getTraits());
         assertEquals(deserializedEntity.getTrait(TRAIT_NAME), entity.getTrait(TRAIT_NAME));
     }
+
+    @Test
+    public void checkCrossCombatMessageVersionTest() throws Exception {
+        Referenceable                        entity  = getEntity(TRAIT_NAME);
+        EntityUpdateRequest                  message = new EntityUpdateRequest("user1", entity);
+        when(messageSource.getVersion()).thenReturn("9.9.9");
+        String                               json    = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message,"","",false,messageSource));
+        TopicPartition                       tp      = new TopicPartition(ATLAS_HOOK_TOPIC,0);
+        List<ConsumerRecord<String, String>> klist   = Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L, "mykey", json));
+        Map                                  mp      = Collections.singletonMap(tp,klist);
+        ConsumerRecords                      records = new ConsumerRecords(mp);
+
+        kafkaConsumer.assign(Collections.singletonList(tp));
+
+        when(kafkaConsumer.poll(100L)).thenReturn(records);
+
+        AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer ,false, 100L);
+
+        try {
+            List<AtlasKafkaMessage<HookNotification>> messageList = consumer.receive();
+        } catch (IncompatibleVersionException e) {
+            e.printStackTrace();
+        }
+    }
 }