You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by pi...@apache.org on 2022/08/18 04:41:39 UTC
[atlas] branch master updated: ATLAS-4619: Refactor Atlas webapp module to remove Kafka core dependency
This is an automated email from the ASF dual-hosted git repository.
pinal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 3c7d8c072 ATLAS-4619: Refactor Atlas webapp module to remove Kafka core dependency
3c7d8c072 is described below
commit 3c7d8c072055c246200a5617488ace5a1d64ae57
Author: Patrik Marton <pm...@cloudera.com>
AuthorDate: Fri Jul 29 11:50:44 2022 +0200
ATLAS-4619: Refactor Atlas webapp module to remove Kafka core dependency
Change-Id: Ie3422851cb711da4e7c4d0845319db6c33333f65
Signed-off-by: Pinal Shah <pi...@freestoneinfotech.com>
---
.../notification/NotificationHookConsumer.java | 25 ++++++++--------------
.../NotificationHookConsumerKafkaTest.java | 22 ++++++++-----------
2 files changed, 18 insertions(+), 29 deletions(-)
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 49c504f9f..1cdfcef8a 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -19,7 +19,6 @@ package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import kafka.utils.ShutdownableThread;
import org.apache.atlas.*;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
@@ -122,6 +121,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
private static final String EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION = "JanusGraphException";
private static final String EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION = "PermanentLockingException";
+ private static final int KAFKA_CONSUMER_SHUTDOWN_WAIT = 30000;
// from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer
public static final String DUMMY_DATABASE = "_dummy_database";
@@ -379,7 +379,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (executors != null) {
executors.shutdown();
- if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+ if (!executors.awaitTermination(KAFKA_CONSUMER_SHUTDOWN_WAIT, TimeUnit.MILLISECONDS)) {
LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
@@ -523,21 +523,21 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
@VisibleForTesting
- class HookConsumer extends ShutdownableThread {
+ class HookConsumer extends Thread {
private final NotificationConsumer<HookNotification> consumer;
private final AtomicBoolean shouldRun = new AtomicBoolean(false);
private final List<String> failedMessages = new ArrayList<>();
private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
public HookConsumer(NotificationConsumer<HookNotification> consumer) {
- super("atlas-hook-consumer-thread", false);
+ super("atlas-hook-consumer-thread");
this.consumer = consumer;
}
@Override
- public void doWork() {
- LOG.info("==> HookConsumer doWork()");
+ public void run() {
+ LOG.info("==> HookConsumer run()");
shouldRun.set(true);
@@ -572,12 +572,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
consumer.close();
}
- LOG.info("<== HookConsumer doWork()");
+ LOG.info("<== HookConsumer run()");
}
}
@VisibleForTesting
- void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) throws AtlasServiceException, AtlasException {
+ void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
AtlasPerfTracer perf = null;
HookNotification message = kafkaMsg.getMessage();
String messageUser = message.getUser();
@@ -957,26 +957,19 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return true;
}
- @Override
public void shutdown() {
LOG.info("==> HookConsumer shutdown()");
// handle the case where thread was not started at all
// and shutdown called
- if (shouldRun.get() == false) {
+ if (!shouldRun.compareAndSet(true, false)) {
return;
}
- super.initiateShutdown();
-
- shouldRun.set(false);
-
if (consumer != null) {
consumer.wakeup();
}
- super.awaitShutdown();
-
LOG.info("<== HookConsumer shutdown()");
}
}
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index fdfc2560d..716f592f5 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -186,23 +186,19 @@ public class NotificationHookConsumerKafkaTest {
}
void consumeOneMessage(NotificationConsumer<HookNotification> consumer,
- NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException {
- try {
- long startTime = System.currentTimeMillis(); //fetch starting time
+ NotificationHookConsumer.HookConsumer hookConsumer) {
+ long startTime = System.currentTimeMillis(); //fetch starting time
- while ((System.currentTimeMillis() - startTime) < 10000) {
- List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive();
+ while ((System.currentTimeMillis() - startTime) < 10000) {
+ List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive();
- for (AtlasKafkaMessage<HookNotification> msg : messages) {
- hookConsumer.handleMessage(msg);
- }
+ for (AtlasKafkaMessage<HookNotification> msg : messages) {
+ hookConsumer.handleMessage(msg);
+ }
- if (messages.size() > 0) {
- break;
- }
+ if (messages.size() > 0) {
+ break;
}
- } catch (AtlasServiceException | AtlasException e) {
- Assert.fail("Consumer failed with exception ", e);
}
}