You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ab...@apache.org on 2023/04/19 01:43:49 UTC
[ranger] branch master updated: RANGER-4130: Improve performance of event processing in agsync by optimizing number of commits to Kafka broker
This is an automated email from the ASF dual-hosted git repository.
abhay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/master by this push:
new e8a6125ba RANGER-4130: Improve performance of event processing in agsync by optimizing number of commits to Kafka broker
e8a6125ba is described below
commit e8a6125ba99b5ca4f62923552ddb251ee476cfdd
Author: Abhay Kulkarni <ab...@apache.org>
AuthorDate: Tue Apr 18 18:07:32 2023 -0700
RANGER-4130: Improve performance of event processing in agsync by optimizing number of commits to Kafka broker
---
.../tagsync/source/atlas/AtlasTagSource.java | 68 ++++++++++------------
1 file changed, 32 insertions(+), 36 deletions(-)
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index a618cc986..34a39f73c 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -52,9 +52,9 @@ public class AtlasTagSource extends AbstractTagSource {
public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = "atlas-application.properties";
- public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.kafka.bootstrap.servers";
- public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.kafka.zookeeper.connect";
- public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id";
+ public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.kafka.bootstrap.servers";
+ public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.kafka.zookeeper.connect";
+ public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id";
public static final int MAX_WAIT_TIME_IN_MILLIS = 1000;
@@ -168,11 +168,10 @@ public class AtlasTagSource extends AbstractTagSource {
private final List<RangerAtlasEntityWithTags> atlasEntitiesWithTags = new ArrayList<>();
private final List<AtlasKafkaMessage<EntityNotification>> messages = new ArrayList<>();
+ private AtlasKafkaMessage<EntityNotification> lastUnhandledMessage = null;
- private long offsetOfLastMessageDeliveredToRanger = -1L;
private long offsetOfLastMessageCommittedToKafka = -1L;
-
- private boolean isHandlingDeleteOps = false;
+ private boolean isHandlingDeleteOps = false;
private ConsumerRunnable(NotificationConsumer<EntityNotification> consumer) {
this.consumer = consumer;
@@ -222,10 +221,11 @@ public class AtlasTagSource extends AbstractTagSource {
}
atlasEntitiesWithTags.add(new RangerAtlasEntityWithTags(notificationWrapper));
+ messages.add(message);
} else {
AtlasNotificationMapper.logUnhandledEntityNotification(notificationWrapper);
+ lastUnhandledMessage = message;
}
- messages.add(message);
}
} else {
LOG.error("Null entityNotification received from Kafka!! Ignoring..");
@@ -235,6 +235,10 @@ public class AtlasTagSource extends AbstractTagSource {
buildAndUploadServiceTags();
}
}
+ if (lastUnhandledMessage != null) {
+ commitToKafka(lastUnhandledMessage);
+ lastUnhandledMessage = null;
+ }
} catch (Exception exception) {
LOG.error("Caught exception..: ", exception);
@@ -255,9 +259,7 @@ public class AtlasTagSource extends AbstractTagSource {
LOG.debug("==> buildAndUploadServiceTags()");
}
- if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags)) {
-
- commitToKafka();
+ if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags) && CollectionUtils.isNotEmpty(messages)) {
Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processAtlasEntities(atlasEntitiesWithTags);
@@ -284,17 +286,16 @@ public class AtlasTagSource extends AbstractTagSource {
}
}
- offsetOfLastMessageDeliveredToRanger = messages.get(messages.size() - 1).getOffset();
+ AtlasKafkaMessage<EntityNotification> latestMessageDeliveredToRanger = messages.get(messages.size() - 1);
+ commitToKafka(latestMessageDeliveredToRanger);
+
+ atlasEntitiesWithTags.clear();
+ messages.clear();
if (LOG.isDebugEnabled()) {
LOG.debug("Completed processing batch of messages of size:[" + messages.size() + "] received from NotificationConsumer");
}
- commitToKafka();
-
- atlasEntitiesWithTags.clear();
- messages.clear();
-
}
if (LOG.isDebugEnabled()) {
@@ -302,34 +303,29 @@ public class AtlasTagSource extends AbstractTagSource {
}
}
- private void commitToKafka() {
+ private void commitToKafka(AtlasKafkaMessage<EntityNotification> messageToCommit) {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> commitToKafka()");
+ LOG.debug("==> commitToKafka(" + messageToCommit + ")");
}
- for (AtlasKafkaMessage<EntityNotification> message : messages) {
- if (message.getOffset() > offsetOfLastMessageCommittedToKafka) {
- if (message.getOffset() <= offsetOfLastMessageDeliveredToRanger) {
- // Already delivered to Ranger
- TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing message with offset:[" + message.getOffset() + "] to Kafka");
- }
- consumer.commit(partition, message.getOffset());
- offsetOfLastMessageCommittedToKafka = message.getOffset();
- } catch (Exception commitException) {
- LOG.warn("Ranger tagsync already processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException);
- LOG.warn("This will cause Kafka to deliver this message:[" + message.getOffset() + "] repeatedly!! This may be unrecoverable error!!");
- }
- } else {
- break;
+ long messageOffset = messageToCommit.getOffset();
+ int partitionId = messageToCommit.getPartition();
+
+ if (offsetOfLastMessageCommittedToKafka < messageOffset) {
+ TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", partitionId);
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committing message with offset:[" + messageOffset + "] to Kafka");
}
+ consumer.commit(partition, messageOffset);
+ offsetOfLastMessageCommittedToKafka = messageOffset;
+ } catch (Exception commitException) {
+ LOG.warn("Ranger tagsync already processed message at offset " + messageOffset + ". Ignoring failure in committing message:[" + messageToCommit + "]", commitException);
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug("<== commitToKafka()");
+ LOG.debug("<== commitToKafka(" + messageToCommit + ")");
}
}
}