You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by si...@apache.org on 2021/10/28 22:23:35 UTC

[atlas] branch branch-2.0 updated: ATLAS-4463: Fixed Infinite loop at Index Health Monitor

This is an automated email from the ASF dual-hosted git repository.

sidmishra pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 25f84d8  ATLAS-4463: Fixed Infinite loop at Index Health Monitor
25f84d8 is described below

commit 25f84d83768011a9dcfeaa59bf67a8a6121433ce
Author: Sidharth Mishra <si...@gmail.com>
AuthorDate: Thu Oct 28 14:10:07 2021 -0700

    ATLAS-4463: Fixed Infinite loop at Index Health Monitor
    
    Signed-off-by: Sidharth Mishra <si...@apache.org>
    (cherry picked from commit 680f23fab55f35cf65a5189ae3882ebe360b1b01)
---
 .../repository/graph/IndexRecoveryService.java     | 28 ++++++++++++++++++++--
 1 file changed, 26 insertions(+), 2 deletions(-)

diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
index 2f11610..b316354 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
@@ -37,6 +37,7 @@ import java.text.SimpleDateFormat;
 import java.time.Instant;
 import java.util.Iterator;
 import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY;
 import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_NAME;
@@ -58,6 +59,7 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
     private final RecoveryInfoManagement recoveryInfoManagement;
     private       Configuration          configuration;
     private       boolean                isIndexRecoveryEnabled;
+    private       RecoveryThread         recoveryThread;
 
     @Inject
     public IndexRecoveryService(Configuration config, AtlasGraph graph) {
@@ -67,7 +69,7 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
         long healthCheckFrequencyMillis  = config.getLong(SOLR_STATUS_CHECK_RETRY_INTERVAL, SOLR_STATUS_RETRY_DEFAULT_MS);
         this.recoveryInfoManagement      = new RecoveryInfoManagement(graph);
 
-        RecoveryThread recoveryThread = new RecoveryThread(recoveryInfoManagement, graph, recoveryStartTimeFromConfig, healthCheckFrequencyMillis);
+        this.recoveryThread = new RecoveryThread(recoveryInfoManagement, graph, recoveryStartTimeFromConfig, healthCheckFrequencyMillis);
         this.indexHealthMonitor = new Thread(recoveryThread, INDEX_HEALTH_MONITOR_THREAD_NAME);
     }
 
@@ -102,6 +104,8 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
     @Override
     public void stop() throws AtlasException {
         try {
+            recoveryThread.shutdown();
+
             indexHealthMonitor.join();
         } catch (InterruptedException e) {
             LOG.error("indexHealthMonitor: Interrupted", e);
@@ -143,6 +147,8 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
         private       long                   indexStatusCheckRetryMillis;
         private       Object                 txRecoveryObject;
 
+        private final AtomicBoolean          shouldRun = new AtomicBoolean(false);
+
         private RecoveryThread(RecoveryInfoManagement recoveryInfoManagement, AtlasGraph graph, long startTimeFromConfig, long healthCheckFrequencyMillis) {
             this.graph                       = graph;
             this.recoveryInfoManagement      = recoveryInfoManagement;
@@ -154,9 +160,11 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
         }
 
         public void run() {
+            shouldRun.set(true);
+
             LOG.info("Index Health Monitor: Starting...");
 
-            while (true) {
+            while (shouldRun.get()) {
                 try {
                     boolean solrHealthy = isSolrHealthy();
 
@@ -173,6 +181,22 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
             }
         }
 
+        public void shutdown() {
+            try {
+                LOG.info("Index Health Monitor: Shutdown: Starting...");
+
+                // handle the case where thread was not started at all
+                // and shutdown called
+                if (shouldRun.get() == false) {
+                    return;
+                }
+
+                shouldRun.set(false);
+            } finally {
+                LOG.info("Index Health Monitor: Shutdown: Done!");
+            }
+        }
+
         private boolean isSolrHealthy() throws AtlasException, InterruptedException {
             Thread.sleep(indexStatusCheckRetryMillis);