You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ab...@apache.org on 2023/04/19 01:43:49 UTC

[ranger] branch master updated: RANGER-4130: Improve performance of event processing in agsync by optimizing number of commits to Kafka broker

This is an automated email from the ASF dual-hosted git repository.

abhay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new e8a6125ba RANGER-4130: Improve performance of event processing in agsync by optimizing number of commits to Kafka broker
e8a6125ba is described below

commit e8a6125ba99b5ca4f62923552ddb251ee476cfdd
Author: Abhay Kulkarni <ab...@apache.org>
AuthorDate: Tue Apr 18 18:07:32 2023 -0700

    RANGER-4130: Improve performance of event processing in agsync by optimizing number of commits to Kafka broker
---
 .../tagsync/source/atlas/AtlasTagSource.java       | 68 ++++++++++------------
 1 file changed, 32 insertions(+), 36 deletions(-)

diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index a618cc986..34a39f73c 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -52,9 +52,9 @@ public class AtlasTagSource extends AbstractTagSource {
 
 	public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = "atlas-application.properties";
 
-	public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.kafka.bootstrap.servers";
-	public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.kafka.zookeeper.connect";
-	public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id";
+	public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS      = "atlas.kafka.bootstrap.servers";
+	public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT   = "atlas.kafka.zookeeper.connect";
+	public static final String TAGSYNC_ATLAS_CONSUMER_GROUP       = "atlas.kafka.entities.group.id";
 
 	public static final int    MAX_WAIT_TIME_IN_MILLIS = 1000;
 
@@ -168,11 +168,10 @@ public class AtlasTagSource extends AbstractTagSource {
 
 		private final List<RangerAtlasEntityWithTags>             atlasEntitiesWithTags = new ArrayList<>();
 		private final List<AtlasKafkaMessage<EntityNotification>> messages              = new ArrayList<>();
+		private       AtlasKafkaMessage<EntityNotification>       lastUnhandledMessage  = null;
 
-		private long    offsetOfLastMessageDeliveredToRanger = -1L;
 		private long    offsetOfLastMessageCommittedToKafka  = -1L;
-
-		private boolean isHandlingDeleteOps   = false;
+		private boolean isHandlingDeleteOps                  = false;
 
 		private ConsumerRunnable(NotificationConsumer<EntityNotification> consumer) {
 			this.consumer = consumer;
@@ -222,10 +221,11 @@ public class AtlasTagSource extends AbstractTagSource {
 										}
 
 										atlasEntitiesWithTags.add(new RangerAtlasEntityWithTags(notificationWrapper));
+										messages.add(message);
 									} else {
 										AtlasNotificationMapper.logUnhandledEntityNotification(notificationWrapper);
+										lastUnhandledMessage = message;
 									}
-									messages.add(message);
 								}
 							} else {
 								LOG.error("Null entityNotification received from Kafka!! Ignoring..");
@@ -235,6 +235,10 @@ public class AtlasTagSource extends AbstractTagSource {
 							buildAndUploadServiceTags();
 						}
 					}
+					if (lastUnhandledMessage != null) {
+						commitToKafka(lastUnhandledMessage);
+						lastUnhandledMessage = null;
+					}
 
 				} catch (Exception exception) {
 					LOG.error("Caught exception..: ", exception);
@@ -255,9 +259,7 @@ public class AtlasTagSource extends AbstractTagSource {
 				LOG.debug("==> buildAndUploadServiceTags()");
 			}
 
-			if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags)) {
-
-				commitToKafka();
+			if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags) && CollectionUtils.isNotEmpty(messages)) {
 
 				Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processAtlasEntities(atlasEntitiesWithTags);
 
@@ -284,17 +286,16 @@ public class AtlasTagSource extends AbstractTagSource {
 					}
 				}
 
-				offsetOfLastMessageDeliveredToRanger = messages.get(messages.size() - 1).getOffset();
+				AtlasKafkaMessage<EntityNotification> latestMessageDeliveredToRanger       = messages.get(messages.size() - 1);
+				commitToKafka(latestMessageDeliveredToRanger);
+
+				atlasEntitiesWithTags.clear();
+				messages.clear();
 
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Completed processing batch of messages of size:[" + messages.size() + "] received from NotificationConsumer");
 				}
 
-				commitToKafka();
-
-				atlasEntitiesWithTags.clear();
-				messages.clear();
-
 			}
 
 			if (LOG.isDebugEnabled()) {
@@ -302,34 +303,29 @@ public class AtlasTagSource extends AbstractTagSource {
 			}
 		}
 
-		private void commitToKafka() {
+		private void commitToKafka(AtlasKafkaMessage<EntityNotification> messageToCommit) {
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("==> commitToKafka()");
+				LOG.debug("==> commitToKafka(" + messageToCommit + ")");
 			}
 
-			for (AtlasKafkaMessage<EntityNotification> message : messages) {
-				if (message.getOffset() > offsetOfLastMessageCommittedToKafka) {
-					if (message.getOffset() <= offsetOfLastMessageDeliveredToRanger) {
-						// Already delivered to Ranger
-						TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
-						try {
-							if (LOG.isDebugEnabled()) {
-								LOG.debug("Committing message with offset:[" + message.getOffset() + "] to Kafka");
-							}
-							consumer.commit(partition, message.getOffset());
-							offsetOfLastMessageCommittedToKafka = message.getOffset();
-						} catch (Exception commitException) {
-							LOG.warn("Ranger tagsync already processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException);
-							LOG.warn("This will cause Kafka to deliver this message:[" + message.getOffset() + "] repeatedly!! This may be unrecoverable error!!");
-						}
-					} else {
-						break;
+			long messageOffset = messageToCommit.getOffset();
+			int  partitionId   = messageToCommit.getPartition();
+
+			if (offsetOfLastMessageCommittedToKafka < messageOffset) {
+				TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", partitionId);
+				try {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Committing message with offset:[" + messageOffset + "] to Kafka");
 					}
+					consumer.commit(partition, messageOffset);
+					offsetOfLastMessageCommittedToKafka = messageOffset;
+				} catch (Exception commitException) {
+					LOG.warn("Ranger tagsync already processed message at offset " + messageOffset + ". Ignoring failure in committing message:[" + messageToCommit + "]", commitException);
 				}
 			}
 
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("<== commitToKafka()");
+				LOG.debug("<== commitToKafka(" + messageToCommit + ")");
 			}
 		}
 	}