You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2021/10/11 04:51:52 UTC

[atlas] branch branch-2.0 updated: ATLAS-4408: Dynamic handling of failure in updating index

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 9021859  ATLAS-4408: Dynamic handling of failure in updating index
9021859 is described below

commit 9021859cd90ec56e770c8e68907441807951e894
Author: Radhika Kundam <rk...@cloudera.com>
AuthorDate: Sun Oct 10 21:50:29 2021 -0700

    ATLAS-4408: Dynamic handling of failure in updating index
    
    (cherry picked from commit 261331bbd1fa8e50517421ab834f01f89f4997e1)
---
 .../org/apache/atlas/repository/Constants.java     |   8 +
 .../repository/graphdb/AtlasGraphIndexClient.java  |   6 +
 .../repository/graphdb/AtlasGraphManagement.java   |  19 ++
 .../graphdb/janus/AtlasJanusGraphDatabase.java     |  74 +++++-
 .../graphdb/janus/AtlasJanusGraphIndexClient.java  |  55 +++-
 .../graphdb/janus/AtlasJanusGraphManagement.java   |  85 ++++++-
 .../org/apache/atlas/ApplicationProperties.java    |   2 +
 .../repository/graph/GraphBackedSearchIndexer.java |   3 +
 .../repository/graph/IndexRecoveryService.java     | 283 +++++++++++++++++++++
 .../graph/RecoveryInfoManagementTest.java          |  65 +++++
 .../atlas/listener/ActiveStateChangeHandler.java   |   3 +-
 11 files changed, 575 insertions(+), 28 deletions(-)

diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index aea0c13..2669c8a 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -228,6 +228,14 @@ public final class Constants {
     public static final String TASK_START_TIME        = encodePropertyKey(TASK_PREFIX + "startTime");
     public static final String TASK_END_TIME          = encodePropertyKey(TASK_PREFIX + "endTime");
 
+    /**
+     * Index Recovery vertex property keys.
+     */
+    public static final String INDEX_RECOVERY_PREFIX                  = INTERNAL_PROPERTY_KEY_PREFIX + "idxRecovery_";
+    public static final String PROPERTY_KEY_INDEX_RECOVERY_NAME       = encodePropertyKey(INDEX_RECOVERY_PREFIX + "name");
+    public static final String PROPERTY_KEY_INDEX_RECOVERY_START_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "startTime");
+    public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME  = encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime");
+
     /*
      * All supported file-format extensions for Bulk Imports through file upload
      */
diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphIndexClient.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphIndexClient.java
index 9960d89..54cf5f6 100644
--- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphIndexClient.java
+++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphIndexClient.java
@@ -54,4 +54,10 @@ public interface AtlasGraphIndexClient {
      * @param suggestionProperties the list of suggestion properties.
      */
     void applySuggestionFields(String collectionName, List<String> suggestionProperties);
+
+    /**
+     * Returns status of index client
+     * @return returns true if index client is active
+     */
+    boolean isHealthy();
 }
diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
index 7e3b2f4..50d17a2 100644
--- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
+++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
@@ -180,4 +180,23 @@ public interface AtlasGraphManagement {
      * @throws Exception
      */
     void reindex(String indexName, List<AtlasElement> elements) throws Exception;
+
+    /**
+     * Starts recovering indices from the specified recovery time and returns TransactionRecovery
+     * @param startTime
+     * @return transactionRecoveryObject
+     */
+    Object startIndexRecovery(long startTime);
+
+    /**
+     * Stop index recovery.
+     * @param txRecoveryObject
+     */
+    void stopIndexRecovery(Object txRecoveryObject);
+
+    /**
+     * Print index recovery stats.
+     * @param txRecoveryObject
+     */
+    void printIndexRecoveryStats(Object txRecoveryObject);
 }
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
index 11267c4..0d47e38 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
@@ -28,7 +28,6 @@ import org.apache.atlas.repository.graphdb.janus.serializer.BigDecimalSerializer
 import org.apache.atlas.repository.graphdb.janus.serializer.BigIntegerSerializer;
 import org.apache.atlas.repository.graphdb.janus.serializer.TypeCategorySerializer;
 import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
-import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
 import org.janusgraph.core.JanusGraph;
@@ -48,28 +47,36 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY;
+import static org.apache.atlas.ApplicationProperties.INDEX_RECOVERY_CONF;
+
 /**
  * Default implementation for Graph Provider that doles out JanusGraph.
  */
 public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, AtlasJanusEdge> {
     private static final Logger LOG      = LoggerFactory.getLogger(AtlasJanusGraphDatabase.class);
-    private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("AtlasJanusGraphDatabase");
 
     private static final String OLDER_STORAGE_EXCEPTION = "Storage version is incompatible with current client";
 
     /**
      * Constant for the configuration property that indicates the prefix.
      */
-    public static final String GRAPH_PREFIX         = "atlas.graph";
-    public static final String INDEX_BACKEND_CONF   = "index.search.backend";
-    public static final String SOLR_ZOOKEEPER_URL   = "atlas.graph.index.search.solr.zookeeper-url";
-    public static final String SOLR_ZOOKEEPER_URLS  = "atlas.graph.index.search.solr.zookeeper-urls";
-    public static final String INDEX_BACKEND_LUCENE = "lucene";
-    public static final String INDEX_BACKEND_ES     = "elasticsearch";
+    public static final String GRAPH_PREFIX               = "atlas.graph";
+    public static final String INDEX_BACKEND_CONF         = "index.search.backend";
+    public static final String SOLR_ZOOKEEPER_URL         = "atlas.graph.index.search.solr.zookeeper-url";
+    public static final String SOLR_ZOOKEEPER_URLS        = "atlas.graph.index.search.solr.zookeeper-urls";
+    public static final String INDEX_BACKEND_LUCENE       = "lucene";
+    public static final String INDEX_BACKEND_ES           = "elasticsearch";
+    public static final String GRAPH_TX_LOG_CONF          = "tx.log-tx";
+    public static final String GRAPH_TX_LOG_VERBOSE_CONF  = "tx.recovery.verbose";
+    public static final String SOLR_INDEX_TX_LOG_TTL_CONF = "write.ahead.log.ttl.in.hours";
+    public static final String GRAPH_TX_LOG_TTL_CONF      = "log.tx.ttl";
+    public static final long   DEFAULT_GRAPH_TX_LOG_TTL   = 72; //Hrs
 
     private static volatile AtlasJanusGraph atlasGraphInstance = null;
     private static volatile JanusGraph graphInstance;
@@ -166,8 +173,11 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
                         throw new RuntimeException(e);
                     }
 
-                    graphInstance = initJanusGraph(config);
+                    configureTxLogBasedIndexRecovery();
+
+                    graphInstance      = initJanusGraph(config);
                     atlasGraphInstance = new AtlasJanusGraph();
+
                     validateIndexBackend(config);
 
                 }
@@ -192,6 +202,52 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
         }
     }
 
+    public static void configureTxLogBasedIndexRecovery() {
+        try {
+            boolean  recoveryEnabled = ApplicationProperties.get().getBoolean(INDEX_RECOVERY_CONF, DEFAULT_INDEX_RECOVERY);
+            long     ttl             = ApplicationProperties.get().getLong(SOLR_INDEX_TX_LOG_TTL_CONF, DEFAULT_GRAPH_TX_LOG_TTL);
+            Duration txLogTtlSecs    = Duration.ofSeconds(Duration.ofHours(ttl).getSeconds());
+
+            Map<String, Object> properties = new HashMap<String, Object>() {{
+                put(GRAPH_TX_LOG_CONF, recoveryEnabled);
+                put(GRAPH_TX_LOG_VERBOSE_CONF, recoveryEnabled);
+                put(GRAPH_TX_LOG_TTL_CONF, txLogTtlSecs);
+            }};
+
+            updateGlobalConfiguration(properties);
+
+            LOG.info("Tx Log-based Index Recovery: {}!", recoveryEnabled ? "Enabled" : "Disabled");
+        } catch (Exception e) {
+            LOG.error("Error: Failed!", e);
+        }
+    }
+
+    private static void updateGlobalConfiguration(Map<String, Object> map) {
+        JanusGraph           graph            = null;
+        JanusGraphManagement managementSystem = null;
+
+        try {
+            graph            = initJanusGraph(getConfiguration());
+            managementSystem = graph.openManagement();
+
+            for (Map.Entry<String, Object> entry : map.entrySet()) {
+                managementSystem.set(entry.getKey(), entry.getValue());
+            }
+
+            LOG.info("Global properties updated!: {}", map);
+        } catch (Exception ex) {
+            LOG.error("Error updating global configuration: {}", map, ex);
+        } finally {
+            if (managementSystem != null) {
+                managementSystem.commit();
+            }
+
+            if (graph != null) {
+                graph.close();
+            }
+        }
+    }
+
     public static JanusGraph getBulkLoadingGraphInstance() {
         try {
             Configuration cfg = getConfiguration();
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
index ef42dbd..9e9fdd8 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
@@ -40,16 +40,22 @@ import org.apache.solr.client.solrj.request.V2Request;
 import org.apache.solr.client.solrj.response.FacetField;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.client.solrj.response.TermsResponse;
-import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.util.ContentStream;
 import org.apache.solr.common.util.NamedList;
 import org.janusgraph.diskstorage.solr.Solr6Index;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
 
 import static org.apache.atlas.repository.Constants.FREETEXT_REQUEST_HANDLER;
 import static org.apache.atlas.repository.Constants.VERTEX_INDEX;
@@ -57,11 +63,15 @@ import static org.apache.atlas.repository.Constants.VERTEX_INDEX;
 public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphIndexClient.class);
 
-    private static final FreqComparator FREQ_COMPARATOR          = new FreqComparator();
-    private static final int            DEFAULT_SUGGESTION_COUNT = 5;
-    private static final int            MIN_FACET_COUNT_REQUIRED = 1;
-    private static final String         TERMS_PREFIX             = "terms.prefix";
-    private static final String         TERMS_FIELD              = "terms.fl";
+    private static final FreqComparator FREQ_COMPARATOR              = new FreqComparator();
+    private static final int            DEFAULT_SUGGESTION_COUNT     = 5;
+    private static final int            MIN_FACET_COUNT_REQUIRED     = 1;
+    private static final String         TERMS_PREFIX                 = "terms.prefix";
+    private static final String         TERMS_FIELD                  = "terms.fl";
+    private static final int            SOLR_HEALTHY_STATUS          = 0;
+    private static final long           SOLR_STATUS_LOG_FREQUENCY_MS = 60000;//Prints SOLR DOWN status for every 1 min
+    private static long                 prevSolrHealthCheckTime;
+
 
     private final Configuration configuration;
 
@@ -70,6 +80,29 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
         this.configuration = configuration;
     }
 
+    public boolean isHealthy() {
+        boolean isHealthy   = false;
+        long    currentTime = System.currentTimeMillis();
+
+        try {
+            if (isSolrHealthy()) {
+                isHealthy = true;
+            }
+        } catch (Exception exception) {
+            if (LOG.isDebugEnabled()) {
+                LOG.error("Error: isHealthy", exception);
+            }
+        }
+
+        if (!isHealthy && (prevSolrHealthCheckTime == 0 || currentTime - prevSolrHealthCheckTime > SOLR_STATUS_LOG_FREQUENCY_MS)) {
+            LOG.info("Solr Health: Unhealthy!");
+
+            prevSolrHealthCheckTime = currentTime;
+        }
+
+        return isHealthy;
+    }
+
     @Override
     public void applySearchWeight(String collectionName, Map<String, Integer> indexFieldName2SearchWeightMap) {
         SolrClient solrClient = null;
@@ -340,6 +373,12 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
         return Collections.EMPTY_LIST;
     }
 
+    private boolean isSolrHealthy() throws SolrServerException, IOException {
+        SolrClient client = Solr6Index.getSolrClient();
+
+        return client != null && client.ping(Constants.VERTEX_INDEX).getStatus() == SOLR_HEALTHY_STATUS;
+    }
+
     private void graphManagementCommit(AtlasGraphManagement management) {
         try {
             management.commit();
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
index 1cc7f8b..b3eb071 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
@@ -19,37 +19,46 @@ package org.apache.atlas.repository.graphdb.janus;
 
 import com.google.common.base.Preconditions;
 import org.apache.atlas.AtlasConfiguration;
-import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.repository.graphdb.AtlasCardinality;
 import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
+import org.apache.atlas.repository.graphdb.AtlasEdgeLabel;
 import org.apache.atlas.repository.graphdb.AtlasElement;
+import org.apache.atlas.repository.graphdb.AtlasGraphIndex;
+import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
+import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
+import org.apache.commons.lang.StringUtils;
 import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.janusgraph.core.Cardinality;
 import org.janusgraph.core.EdgeLabel;
+import org.janusgraph.core.JanusGraph;
 import org.janusgraph.core.JanusGraphElement;
+import org.janusgraph.core.JanusGraphFactory;
 import org.janusgraph.core.PropertyKey;
-import org.janusgraph.core.schema.*;
+import org.janusgraph.core.log.TransactionRecovery;
+import org.janusgraph.core.schema.ConsistencyModifier;
+import org.janusgraph.core.schema.JanusGraphIndex;
+import org.janusgraph.core.schema.JanusGraphManagement;
 import org.janusgraph.core.schema.JanusGraphManagement.IndexBuilder;
+import org.janusgraph.core.schema.Mapping;
+import org.janusgraph.core.schema.Parameter;
+import org.janusgraph.core.schema.PropertyKeyMaker;
 import org.janusgraph.diskstorage.BackendTransaction;
 import org.janusgraph.diskstorage.indexing.IndexEntry;
 import org.janusgraph.graphdb.database.IndexSerializer;
 import org.janusgraph.graphdb.database.StandardJanusGraph;
 import org.janusgraph.graphdb.database.management.ManagementSystem;
 import org.janusgraph.graphdb.internal.Token;
-import org.apache.atlas.repository.graphdb.AtlasCardinality;
-import org.apache.atlas.repository.graphdb.AtlasEdgeLabel;
-import org.apache.atlas.repository.graphdb.AtlasGraphIndex;
-import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
-import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
-import org.apache.commons.lang.StringUtils;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.janusgraph.graphdb.log.StandardTransactionLogProcessor;
 import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
 import org.janusgraph.graphdb.types.IndexType;
 import org.janusgraph.graphdb.types.MixedIndexType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -347,6 +356,62 @@ public class AtlasJanusGraphManagement implements AtlasGraphManagement {
         }
     }
 
+    @Override
+    public Object startIndexRecovery(long recoveryStartTime) {
+        Instant    recoveryStartInstant = Instant.ofEpochMilli(recoveryStartTime);
+        JanusGraph janusGraph           = this.graph.getGraph();
+
+        return JanusGraphFactory.startTransactionRecovery(janusGraph, recoveryStartInstant);
+    }
+
+    @Override
+    public void stopIndexRecovery(Object txRecoveryObject) {
+        if (txRecoveryObject == null) {
+            return;
+        }
+
+        try {
+            if (txRecoveryObject instanceof TransactionRecovery) {
+                TransactionRecovery txRecovery = (TransactionRecovery) txRecoveryObject;
+                StandardJanusGraph  janusGraph = (StandardJanusGraph) this.graph.getGraph();
+
+                LOG.info("stopIndexRecovery: Index Client is unhealthy. Index recovery: Paused!");
+
+                janusGraph.getBackend().getSystemTxLog().close();
+
+                txRecovery.shutdown();
+            } else {
+                LOG.error("stopIndexRecovery({}): Invalid transaction recovery object!", txRecoveryObject);
+            }
+        } catch (Exception e) {
+            LOG.warn("stopIndexRecovery: Error while shutting down transaction recovery", e);
+        }
+    }
+
+    @Override
+    public void printIndexRecoveryStats(Object txRecoveryObject) {
+        if (txRecoveryObject == null) {
+            return;
+        }
+
+        try {
+            if (txRecoveryObject instanceof TransactionRecovery) {
+                StandardTransactionLogProcessor txRecovery = (StandardTransactionLogProcessor) txRecoveryObject;
+                long[]                          statistics = txRecovery.getStatistics();
+
+                if (statistics.length >= 2) {
+                    LOG.info("Index Recovery: Stats: Success:{}: Failed: {}", statistics[0], statistics[1]);
+                } else {
+                    LOG.info("Index Recovery: Stats: {}", statistics);
+                }
+            } else {
+                LOG.error("Transaction stats: Invalid transaction recovery object!: Unexpected type: {}: Details: {}", txRecoveryObject.getClass().toString(), txRecoveryObject);
+            }
+        } catch (Exception e) {
+            LOG.error("Error: Retrieving log transaction stats!", e);
+        }
+    }
+
     private void reindexElement(ManagementSystem managementSystem, IndexSerializer indexSerializer, MixedIndexType indexType, List<AtlasElement> elements) throws Exception {
         Map<String, Map<String, List<IndexEntry>>> documentsPerStore = new HashMap<>();
         StandardJanusGraphTx tx = managementSystem.getWrappedTx();
diff --git a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
index 682206d..4a37c77 100644
--- a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
+++ b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
@@ -50,6 +50,7 @@ public final class ApplicationProperties extends PropertiesConfiguration {
     public static final String  INDEX_BACKEND_CONF              = "atlas.graph.index.search.backend";
     public static final String  INDEX_MAP_NAME_CONF             = "atlas.graph.index.search.map-name";
     public static final String  SOLR_WAIT_SEARCHER_CONF         = "atlas.graph.index.search.solr.wait-searcher";
+    public static final String  INDEX_RECOVERY_CONF             = "atlas.index.recovery.enable";
     public static final String  ENABLE_FULLTEXT_SEARCH_CONF     = "atlas.search.fulltext.enable";
     public static final String  ENABLE_FREETEXT_SEARCH_CONF     = "atlas.search.freetext.enable";
     public static final String  ATLAS_RUN_MODE                  = "atlas.run.mode";
@@ -66,6 +67,7 @@ public final class ApplicationProperties extends PropertiesConfiguration {
     public static final String  DEFAULT_GRAPHDB_BACKEND         = GRAPHBD_BACKEND_JANUS;
     public static final boolean DEFAULT_SOLR_WAIT_SEARCHER      = false;
     public static final boolean DEFAULT_INDEX_MAP_NAME          = false;
+    public static final boolean DEFAULT_INDEX_RECOVERY          = true;
     public static final AtlasRunMode DEFAULT_ATLAS_RUN_MODE     = AtlasRunMode.PROD;
     public static final String INDEX_SEARCH_MAX_RESULT_SET_SIZE = "atlas.graph.index.search.max-result-set-size";
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index ddfb008..d65bb1a 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -353,6 +353,9 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
             createCommonVertexIndex(management, TASK_CREATED_TIME, UniqueKind.NONE, Long.class, SINGLE, true, false);
             createCommonVertexIndex(management, TASK_STATUS, UniqueKind.NONE, String.class, SINGLE, true, false);
 
+            // index recovery
+            createCommonVertexIndex(management, PROPERTY_KEY_INDEX_RECOVERY_NAME, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
+
             // create vertex-centric index
             createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class, SINGLE);
             createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, Boolean.class, SINGLE);
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
new file mode 100644
index 0000000..2f11610
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.graph;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.listener.ActiveStateChangeHandler;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.service.Service;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.util.Iterator;
+import java.util.TimeZone;
+
+import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY;
+import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_NAME;
+import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME;
+import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_START_TIME;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
+
+@Component
+@Order(8)
+public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
+    private static final Logger LOG                                       = LoggerFactory.getLogger(IndexRecoveryService.class);
+    private static final String DATE_FORMAT                               = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+    private static final String INDEX_HEALTH_MONITOR_THREAD_NAME          = "index-health-monitor";
+    private static final String SOLR_STATUS_CHECK_RETRY_INTERVAL          = "atlas.graph.index.status.check.frequency";
+    private static final String SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME = "atlas.graph.index.recovery.start.time";
+    private static final long   SOLR_STATUS_RETRY_DEFAULT_MS              = 30000; // 30 secs default
+
+    private final Thread                 indexHealthMonitor;
+    private final RecoveryInfoManagement recoveryInfoManagement;
+    private       Configuration          configuration;
+    private       boolean                isIndexRecoveryEnabled;
+
+    @Inject
+    public IndexRecoveryService(Configuration config, AtlasGraph graph) {
+        this.configuration               = config;
+        this.isIndexRecoveryEnabled      = config.getBoolean(ApplicationProperties.INDEX_RECOVERY_CONF, DEFAULT_INDEX_RECOVERY);
+        long recoveryStartTimeFromConfig = getRecoveryStartTimeFromConfig(config);
+        long healthCheckFrequencyMillis  = config.getLong(SOLR_STATUS_CHECK_RETRY_INTERVAL, SOLR_STATUS_RETRY_DEFAULT_MS);
+        this.recoveryInfoManagement      = new RecoveryInfoManagement(graph);
+
+        RecoveryThread recoveryThread = new RecoveryThread(recoveryInfoManagement, graph, recoveryStartTimeFromConfig, healthCheckFrequencyMillis);
+        this.indexHealthMonitor = new Thread(recoveryThread, INDEX_HEALTH_MONITOR_THREAD_NAME);
+    }
+
+    private long getRecoveryStartTimeFromConfig(Configuration config) {
+        long ret = 0L;
+
+        try {
+            String time = config.getString(SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME);
+
+            SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+            dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+            ret = dateFormat.parse(time).toInstant().toEpochMilli();
+        } catch (Exception e) {
+            LOG.debug("Error fetching: {}", SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public void start() throws AtlasException {
+        if (configuration == null || !HAConfiguration.isHAEnabled(configuration)) {
+            LOG.info("==> IndexRecoveryService.start()");
+
+            startTxLogMonitoring();
+
+            LOG.info("<== IndexRecoveryService.start()");
+        }
+    }
+
+    @Override
+    public void stop() throws AtlasException {
+        try {
+            indexHealthMonitor.join();
+        } catch (InterruptedException e) {
+            LOG.error("indexHealthMonitor: Interrupted", e);
+        }
+    }
+
+    @Override
+    public void instanceIsActive() throws AtlasException {
+        LOG.info("==> IndexRecoveryService.instanceIsActive()");
+
+        startTxLogMonitoring();
+
+        LOG.info("<== IndexRecoveryService.instanceIsActive()");
+    }
+
+    @Override
+    public void instanceIsPassive() throws AtlasException {
+        LOG.info("IndexRecoveryService.instanceIsPassive(): no action needed.");
+    }
+
+    @Override
+    public int getHandlerOrder() {
+        return ActiveStateChangeHandler.HandlerOrder.INDEX_RECOVERY.getOrder();
+    }
+
+    private void startTxLogMonitoring() {
+        if (!isIndexRecoveryEnabled) {
+            LOG.warn("IndexRecoveryService: Recovery should be enabled.");
+
+            return;
+        }
+
+        indexHealthMonitor.start();
+    }
+
+    private static class RecoveryThread implements Runnable {
+        private final AtlasGraph             graph;
+        private final RecoveryInfoManagement recoveryInfoManagement;
+        private       long                   indexStatusCheckRetryMillis;
+        private       Object                 txRecoveryObject;
+
+        private RecoveryThread(RecoveryInfoManagement recoveryInfoManagement, AtlasGraph graph, long startTimeFromConfig, long healthCheckFrequencyMillis) {
+            this.graph                       = graph;
+            this.recoveryInfoManagement      = recoveryInfoManagement;
+            this.indexStatusCheckRetryMillis = healthCheckFrequencyMillis;
+
+            if (startTimeFromConfig > 0) {
+                this.recoveryInfoManagement.updateStartTime(startTimeFromConfig);
+            }
+        }
+
+        public void run() {
+            LOG.info("Index Health Monitor: Starting...");
+
+            while (true) {
+                try {
+                    boolean solrHealthy = isSolrHealthy();
+
+                    if (this.txRecoveryObject == null && solrHealthy) {
+                        startMonitoring();
+                    }
+
+                    if (this.txRecoveryObject != null && !solrHealthy) {
+                        stopMonitoring();
+                    }
+                } catch (Exception e) {
+                    LOG.error("Error: Index recovery monitoring!", e);
+                }
+            }
+        }
+
+        private boolean isSolrHealthy() throws AtlasException, InterruptedException {
+            Thread.sleep(indexStatusCheckRetryMillis);
+
+            return this.graph.getGraphIndexClient().isHealthy();
+        }
+
+        private void startMonitoring() {
+            Long startTime = null;
+
+            try {
+                startTime        = recoveryInfoManagement.getStartTime();
+                txRecoveryObject = this.graph.getManagementSystem().startIndexRecovery(startTime);
+
+                printIndexRecoveryStats();
+            } catch (Exception e) {
+                LOG.error("Index Recovery: Start: Error!", e);
+            } finally {
+                LOG.info("Index Recovery: Started! Recovery time: {}", Instant.ofEpochMilli(startTime));
+            }
+        }
+
+        private void stopMonitoring() {
+            Instant newStartTime = Instant.now().minusMillis(indexStatusCheckRetryMillis);
+
+            try {
+                this.graph.getManagementSystem().stopIndexRecovery(txRecoveryObject);
+
+                recoveryInfoManagement.updateStartTime(newStartTime.toEpochMilli());
+
+                printIndexRecoveryStats();
+            } catch (Exception e) {
+                LOG.info("Index Recovery: Stopped! Error!", e);
+            } finally {
+                this.txRecoveryObject = null;
+
+                LOG.info("Index Recovery: Stopped! Recovery time: {}", newStartTime);
+            }
+        }
+
+        private void printIndexRecoveryStats() {
+            this.graph.getManagementSystem().printIndexRecoveryStats(txRecoveryObject);
+        }
+    }
+
+    @VisibleForTesting
+    static class RecoveryInfoManagement {
+        private static final String INDEX_RECOVERY_TYPE_NAME = "__solrIndexRecoveryInfo";
+
+        private final AtlasGraph graph;
+
+        public RecoveryInfoManagement(AtlasGraph graph) {
+            this.graph = graph;
+        }
+
+        public void updateStartTime(Long startTime) {
+            try {
+                Long        prevStartTime = null;
+                AtlasVertex vertex        = findVertex();
+
+                if (vertex == null) {
+                    vertex = graph.addVertex();
+                } else {
+                    prevStartTime = getStartTime(vertex);
+                }
+
+                setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME);
+                setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_START_TIME, startTime);
+                setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME, prevStartTime);
+
+            } catch (Exception ex) {
+                LOG.error("Error: Updating: {}!", ex);
+            } finally {
+                graph.commit();
+            }
+        }
+
+        public Long getStartTime() {
+            AtlasVertex vertex = findVertex();
+
+            return getStartTime(vertex);
+        }
+
+        private Long getStartTime(AtlasVertex vertex) {
+            if (vertex == null) {
+                LOG.warn("Vertex passed is NULL: Returned is 0");
+
+                return 0L;
+            }
+
+            Long startTime = 0L;
+
+            try {
+                startTime = vertex.getProperty(PROPERTY_KEY_INDEX_RECOVERY_START_TIME, Long.class);
+            } catch (Exception e) {
+                LOG.error("Error retrieving startTime", e);
+            }
+
+            return startTime;
+        }
+
+        private AtlasVertex findVertex() {
+            AtlasGraphQuery       query   = graph.query().has(PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME);
+            Iterator<AtlasVertex> results = query.vertices().iterator();
+
+            return results.hasNext() ? results.next() : null;
+        }
+    }
+}
\ No newline at end of file
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/RecoveryInfoManagementTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/RecoveryInfoManagementTest.java
new file mode 100644
index 0000000..d0382f2
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/RecoveryInfoManagementTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.graph;
+
+import com.google.inject.Inject;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.repository.AtlasTestBase;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class RecoveryInfoManagementTest extends AtlasTestBase {
+
+    @Inject
+    private AtlasGraph atlasGraph;
+
+
+    @BeforeTest
+    public void setupTest() {
+        RequestContext.clear();
+        RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
+    }
+
+    @BeforeClass
+    public void initialize() throws Exception {
+        super.initialize();
+    }
+
+    @AfterClass
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    @Test
+    public void verifyCreateUpdate() {
+        IndexRecoveryService.RecoveryInfoManagement rm = new IndexRecoveryService.RecoveryInfoManagement(atlasGraph);
+        long now = System.currentTimeMillis();
+        rm.updateStartTime(now);
+
+        long storedTime = rm.getStartTime();
+        Assert.assertEquals(now, storedTime);
+    }
+}
diff --git a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
index ba8f088..2916d6b 100644
--- a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
+++ b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
@@ -33,7 +33,8 @@ public interface ActiveStateChangeHandler {
         ATLAS_PATCH_SERVICE(3),
         DEFAULT_METADATA_SERVICE(4),
         NOTIFICATION_HOOK_CONSUMER(5),
-        TASK_MANAGEMENT(6);
+        TASK_MANAGEMENT(6),
+        INDEX_RECOVERY(7);
 
         private final int order;