You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by pi...@apache.org on 2022/08/18 04:41:39 UTC

[atlas] branch master updated: ATLAS-4619: Refactor Atlas webapp module to remove Kafka core dependency

This is an automated email from the ASF dual-hosted git repository.

pinal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c7d8c072 ATLAS-4619: Refactor Atlas webapp module to remove Kafka core dependency
3c7d8c072 is described below

commit 3c7d8c072055c246200a5617488ace5a1d64ae57
Author: Patrik Marton <pm...@cloudera.com>
AuthorDate: Fri Jul 29 11:50:44 2022 +0200

    ATLAS-4619: Refactor Atlas webapp module to remove Kafka core dependency
    
    Change-Id: Ie3422851cb711da4e7c4d0845319db6c33333f65
    Signed-off-by: Pinal Shah <pi...@freestoneinfotech.com>
---
 .../notification/NotificationHookConsumer.java     | 25 ++++++++--------------
 .../NotificationHookConsumerKafkaTest.java         | 22 ++++++++-----------
 2 files changed, 18 insertions(+), 29 deletions(-)

diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 49c504f9f..1cdfcef8a 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -19,7 +19,6 @@ package org.apache.atlas.notification;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import kafka.utils.ShutdownableThread;
 import org.apache.atlas.*;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
@@ -122,6 +121,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
     private static final String EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION = "JanusGraphException";
     private static final String EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION = "PermanentLockingException";
+    private static final int KAFKA_CONSUMER_SHUTDOWN_WAIT = 30000;
 
     // from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer
     public static final String DUMMY_DATABASE               = "_dummy_database";
@@ -379,7 +379,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             if (executors != null) {
                 executors.shutdown();
 
-                if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+                if (!executors.awaitTermination(KAFKA_CONSUMER_SHUTDOWN_WAIT, TimeUnit.MILLISECONDS)) {
                     LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
                 }
 
@@ -523,21 +523,21 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     }
 
     @VisibleForTesting
-    class HookConsumer extends ShutdownableThread {
+    class HookConsumer extends Thread {
         private final NotificationConsumer<HookNotification> consumer;
         private final AtomicBoolean                          shouldRun      = new AtomicBoolean(false);
         private final List<String>                           failedMessages = new ArrayList<>();
         private final AdaptiveWaiter                         adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
 
         public HookConsumer(NotificationConsumer<HookNotification> consumer) {
-            super("atlas-hook-consumer-thread", false);
+            super("atlas-hook-consumer-thread");
 
             this.consumer = consumer;
         }
 
         @Override
-        public void doWork() {
-            LOG.info("==> HookConsumer doWork()");
+        public void run() {
+            LOG.info("==> HookConsumer run()");
 
             shouldRun.set(true);
 
@@ -572,12 +572,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                     consumer.close();
                 }
 
-                LOG.info("<== HookConsumer doWork()");
+                LOG.info("<== HookConsumer run()");
             }
         }
 
         @VisibleForTesting
-        void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) throws AtlasServiceException, AtlasException {
+        void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
             AtlasPerfTracer  perf           = null;
             HookNotification message        = kafkaMsg.getMessage();
             String           messageUser    = message.getUser();
@@ -957,26 +957,19 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             return true;
         }
 
-        @Override
         public void shutdown() {
             LOG.info("==> HookConsumer shutdown()");
 
             // handle the case where thread was not started at all
             // and shutdown called
-            if (shouldRun.get() == false) {
+            if (!shouldRun.compareAndSet(true, false)) {
                 return;
             }
 
-            super.initiateShutdown();
-
-            shouldRun.set(false);
-
             if (consumer != null) {
                 consumer.wakeup();
             }
 
-            super.awaitShutdown();
-
             LOG.info("<== HookConsumer shutdown()");
         }
     }
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index fdfc2560d..716f592f5 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -186,23 +186,19 @@ public class NotificationHookConsumerKafkaTest {
     }
 
     void consumeOneMessage(NotificationConsumer<HookNotification> consumer,
-                           NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException {
-        try {
-            long startTime = System.currentTimeMillis(); //fetch starting time
+                           NotificationHookConsumer.HookConsumer hookConsumer) {
+        long startTime = System.currentTimeMillis(); //fetch starting time
 
-            while ((System.currentTimeMillis() - startTime) < 10000) {
-                List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive();
+        while ((System.currentTimeMillis() - startTime) < 10000) {
+            List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive();
 
-                for (AtlasKafkaMessage<HookNotification> msg : messages) {
-                    hookConsumer.handleMessage(msg);
-                }
+            for (AtlasKafkaMessage<HookNotification> msg : messages) {
+                hookConsumer.handleMessage(msg);
+            }
 
-                if (messages.size() > 0) {
-                    break;
-                }
+            if (messages.size() > 0) {
+                break;
             }
-        } catch (AtlasServiceException | AtlasException e) {
-            Assert.fail("Consumer failed with exception ", e);
         }
     }