You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by pi...@apache.org on 2022/06/15 15:45:04 UTC
[atlas] branch master updated: ATLAS-4575 : SDX Version Compatibility - Handle cross version messages
This is an automated email from the ASF dual-hosted git repository.
pinal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new bad730c76 ATLAS-4575 : SDX Version Compatibility - Handle cross version messages
bad730c76 is described below
commit bad730c76de50ae9b4682b1c0a357d70edccbcc9
Author: vinayak.marraiya <vi...@freestoneinfotech.com>
AuthorDate: Tue Jun 14 14:14:55 2022 +0530
ATLAS-4575 : SDX Version Compatibility - Handle cross version messages
Signed-off-by: Pinal Shah <pi...@freestoneinfotech.com>
---
.../AtlasNotificationMessageDeserializer.java | 13 ++++++++++
.../org/apache/atlas/kafka/KafkaConsumerTest.java | 28 ++++++++++++++++++++++
2 files changed, 41 insertions(+)
diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
index b43bc7c66..207747d7d 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -26,6 +26,7 @@ import org.apache.atlas.model.notification.AtlasNotificationMessage;
import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.model.notification.MessageVersion;
+import org.apache.atlas.model.notification.MessageSource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -233,6 +234,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
AtlasNotificationMessage<T> atlasNotificationMessage = AtlasType.fromV1Json(msgJson, notificationMessageType);
+ checkCrossCombatMessageVersion(atlasNotificationMessage);
checkVersion(atlasNotificationMessage, msgJson);
ret = atlasNotificationMessage.getMessage();
@@ -317,4 +319,15 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
notificationLogger.info(String.format(VERSION_MISMATCH_MSG, expectedVersion, notificationMessage.getVersion(), messageJson));
}
}
+
+ protected void checkCrossCombatMessageVersion(AtlasNotificationBaseMessage notificationMessage) {
+ String sourceVersion = new MessageSource(this.getClass().getSimpleName()).getVersion();
+ MessageSource notificationSourceVersion = notificationMessage.getSource();
+
+ if (notificationMessage.getSource() != null) {
+ if (!StringUtils.equalsIgnoreCase(notificationSourceVersion.getVersion(), sourceVersion)) {
+ LOG.warn("Hook and Atlas server build versions are not similar : {}, {}", notificationSourceVersion.getVersion(), sourceVersion);
+ }
+ }
+ }
}
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 1af1f3efa..425c8941b 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -20,6 +20,7 @@ package org.apache.atlas.kafka;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.model.notification.MessageSource;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.notification.IncompatibleVersionException;
@@ -61,6 +62,9 @@ public class KafkaConsumerTest {
@Mock
private KafkaConsumer kafkaConsumer;
+ @Mock
+ private MessageSource messageSource;
+
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
@@ -170,4 +174,28 @@ public class KafkaConsumerTest {
assertEquals(deserializedEntity.getTraits(), entity.getTraits());
assertEquals(deserializedEntity.getTrait(TRAIT_NAME), entity.getTrait(TRAIT_NAME));
}
+
+ @Test
+ public void checkCrossCombatMessageVersionTest() throws Exception {
+ Referenceable entity = getEntity(TRAIT_NAME);
+ EntityUpdateRequest message = new EntityUpdateRequest("user1", entity);
+ when(messageSource.getVersion()).thenReturn("9.9.9");
+ String json = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message,"","",false,messageSource));
+ TopicPartition tp = new TopicPartition(ATLAS_HOOK_TOPIC,0);
+ List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L, "mykey", json));
+ Map mp = Collections.singletonMap(tp,klist);
+ ConsumerRecords records = new ConsumerRecords(mp);
+
+ kafkaConsumer.assign(Collections.singletonList(tp));
+
+ when(kafkaConsumer.poll(100L)).thenReturn(records);
+
+ AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer ,false, 100L);
+
+ try {
+ List<AtlasKafkaMessage<HookNotification>> messageList = consumer.receive();
+ } catch (IncompatibleVersionException e) {
+ e.printStackTrace();
+ }
+ }
}