You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2021/10/11 04:50:58 UTC
[atlas] branch master updated: ATLAS-4408: Dynamic handling of
failure in updating index
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 261331b ATLAS-4408: Dynamic handling of failure in updating index
261331b is described below
commit 261331bbd1fa8e50517421ab834f01f89f4997e1
Author: Radhika Kundam <rk...@cloudera.com>
AuthorDate: Sun Oct 10 21:50:29 2021 -0700
ATLAS-4408: Dynamic handling of failure in updating index
---
.../org/apache/atlas/repository/Constants.java | 8 +
.../repository/graphdb/AtlasGraphIndexClient.java | 6 +
.../repository/graphdb/AtlasGraphManagement.java | 19 ++
.../graphdb/janus/AtlasJanusGraphDatabase.java | 74 +++++-
.../graphdb/janus/AtlasJanusGraphIndexClient.java | 55 +++-
.../graphdb/janus/AtlasJanusGraphManagement.java | 85 ++++++-
.../org/apache/atlas/ApplicationProperties.java | 2 +
.../repository/graph/GraphBackedSearchIndexer.java | 3 +
.../repository/graph/IndexRecoveryService.java | 283 +++++++++++++++++++++
.../graph/RecoveryInfoManagementTest.java | 65 +++++
.../atlas/listener/ActiveStateChangeHandler.java | 3 +-
11 files changed, 575 insertions(+), 28 deletions(-)
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index aea0c13..2669c8a 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -228,6 +228,14 @@ public final class Constants {
public static final String TASK_START_TIME = encodePropertyKey(TASK_PREFIX + "startTime");
public static final String TASK_END_TIME = encodePropertyKey(TASK_PREFIX + "endTime");
+ /**
+ * Index Recovery vertex property keys.
+ */
+ public static final String INDEX_RECOVERY_PREFIX = INTERNAL_PROPERTY_KEY_PREFIX + "idxRecovery_";
+ public static final String PROPERTY_KEY_INDEX_RECOVERY_NAME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "name");
+ public static final String PROPERTY_KEY_INDEX_RECOVERY_START_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "startTime");
+ public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime");
+
/*
* All supported file-format extensions for Bulk Imports through file upload
*/
diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphIndexClient.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphIndexClient.java
index 9960d89..54cf5f6 100644
--- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphIndexClient.java
+++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphIndexClient.java
@@ -54,4 +54,10 @@ public interface AtlasGraphIndexClient {
* @param suggestionProperties the list of suggestion properties.
*/
void applySuggestionFields(String collectionName, List<String> suggestionProperties);
+
+ /**
+ * Returns status of index client
+ * @return returns true if index client is active
+ */
+ boolean isHealthy();
}
diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
index 7e3b2f4..50d17a2 100644
--- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
+++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
@@ -180,4 +180,23 @@ public interface AtlasGraphManagement {
* @throws Exception
*/
void reindex(String indexName, List<AtlasElement> elements) throws Exception;
+
+ /**
+ * Starts recovering indices from the specified recovery time and returns TransactionRecovery
+ * @param startTime
+ * @return transactionRecoveryObject
+ */
+ Object startIndexRecovery(long startTime);
+
+ /**
+ * Stop index recovery.
+ * @param txRecoveryObject
+ */
+ void stopIndexRecovery(Object txRecoveryObject);
+
+ /**
+ * Print index recovery stats.
+ * @param txRecoveryObject
+ */
+ void printIndexRecoveryStats(Object txRecoveryObject);
}
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
index 11267c4..0d47e38 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
@@ -28,7 +28,6 @@ import org.apache.atlas.repository.graphdb.janus.serializer.BigDecimalSerializer
import org.apache.atlas.repository.graphdb.janus.serializer.BigIntegerSerializer;
import org.apache.atlas.repository.graphdb.janus.serializer.TypeCategorySerializer;
import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
-import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
import org.janusgraph.core.JanusGraph;
@@ -48,28 +47,36 @@ import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY;
+import static org.apache.atlas.ApplicationProperties.INDEX_RECOVERY_CONF;
+
/**
* Default implementation for Graph Provider that doles out JanusGraph.
*/
public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, AtlasJanusEdge> {
private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphDatabase.class);
- private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("AtlasJanusGraphDatabase");
private static final String OLDER_STORAGE_EXCEPTION = "Storage version is incompatible with current client";
/**
* Constant for the configuration property that indicates the prefix.
*/
- public static final String GRAPH_PREFIX = "atlas.graph";
- public static final String INDEX_BACKEND_CONF = "index.search.backend";
- public static final String SOLR_ZOOKEEPER_URL = "atlas.graph.index.search.solr.zookeeper-url";
- public static final String SOLR_ZOOKEEPER_URLS = "atlas.graph.index.search.solr.zookeeper-urls";
- public static final String INDEX_BACKEND_LUCENE = "lucene";
- public static final String INDEX_BACKEND_ES = "elasticsearch";
+ public static final String GRAPH_PREFIX = "atlas.graph";
+ public static final String INDEX_BACKEND_CONF = "index.search.backend";
+ public static final String SOLR_ZOOKEEPER_URL = "atlas.graph.index.search.solr.zookeeper-url";
+ public static final String SOLR_ZOOKEEPER_URLS = "atlas.graph.index.search.solr.zookeeper-urls";
+ public static final String INDEX_BACKEND_LUCENE = "lucene";
+ public static final String INDEX_BACKEND_ES = "elasticsearch";
+ public static final String GRAPH_TX_LOG_CONF = "tx.log-tx";
+ public static final String GRAPH_TX_LOG_VERBOSE_CONF = "tx.recovery.verbose";
+ public static final String SOLR_INDEX_TX_LOG_TTL_CONF = "write.ahead.log.ttl.in.hours";
+ public static final String GRAPH_TX_LOG_TTL_CONF = "log.tx.ttl";
+ public static final long DEFAULT_GRAPH_TX_LOG_TTL = 72; //Hrs
private static volatile AtlasJanusGraph atlasGraphInstance = null;
private static volatile JanusGraph graphInstance;
@@ -166,8 +173,11 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
throw new RuntimeException(e);
}
- graphInstance = initJanusGraph(config);
+ configureTxLogBasedIndexRecovery();
+
+ graphInstance = initJanusGraph(config);
atlasGraphInstance = new AtlasJanusGraph();
+
validateIndexBackend(config);
}
@@ -192,6 +202,52 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
}
}
+ public static void configureTxLogBasedIndexRecovery() {
+ try {
+ boolean recoveryEnabled = ApplicationProperties.get().getBoolean(INDEX_RECOVERY_CONF, DEFAULT_INDEX_RECOVERY);
+ long ttl = ApplicationProperties.get().getLong(SOLR_INDEX_TX_LOG_TTL_CONF, DEFAULT_GRAPH_TX_LOG_TTL);
+ Duration txLogTtlSecs = Duration.ofSeconds(Duration.ofHours(ttl).getSeconds());
+
+ Map<String, Object> properties = new HashMap<String, Object>() {{
+ put(GRAPH_TX_LOG_CONF, recoveryEnabled);
+ put(GRAPH_TX_LOG_VERBOSE_CONF, recoveryEnabled);
+ put(GRAPH_TX_LOG_TTL_CONF, txLogTtlSecs);
+ }};
+
+ updateGlobalConfiguration(properties);
+
+ LOG.info("Tx Log-based Index Recovery: {}!", recoveryEnabled ? "Enabled" : "Disabled");
+ } catch (Exception e) {
+ LOG.error("Error: Failed!", e);
+ }
+ }
+
+ private static void updateGlobalConfiguration(Map<String, Object> map) {
+ JanusGraph graph = null;
+ JanusGraphManagement managementSystem = null;
+
+ try {
+ graph = initJanusGraph(getConfiguration());
+ managementSystem = graph.openManagement();
+
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ managementSystem.set(entry.getKey(), entry.getValue());
+ }
+
+ LOG.info("Global properties updated!: {}", map);
+ } catch (Exception ex) {
+ LOG.error("Error updating global configuration: {}", map, ex);
+ } finally {
+ if (managementSystem != null) {
+ managementSystem.commit();
+ }
+
+ if (graph != null) {
+ graph.close();
+ }
+ }
+ }
+
public static JanusGraph getBulkLoadingGraphInstance() {
try {
Configuration cfg = getConfiguration();
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
index ef42dbd..9e9fdd8 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
@@ -40,16 +40,22 @@ import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.response.FacetField;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.TermsResponse;
-import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList;
import org.janusgraph.diskstorage.solr.Solr6Index;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
import static org.apache.atlas.repository.Constants.FREETEXT_REQUEST_HANDLER;
import static org.apache.atlas.repository.Constants.VERTEX_INDEX;
@@ -57,11 +63,15 @@ import static org.apache.atlas.repository.Constants.VERTEX_INDEX;
public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphIndexClient.class);
- private static final FreqComparator FREQ_COMPARATOR = new FreqComparator();
- private static final int DEFAULT_SUGGESTION_COUNT = 5;
- private static final int MIN_FACET_COUNT_REQUIRED = 1;
- private static final String TERMS_PREFIX = "terms.prefix";
- private static final String TERMS_FIELD = "terms.fl";
+ private static final FreqComparator FREQ_COMPARATOR = new FreqComparator();
+ private static final int DEFAULT_SUGGESTION_COUNT = 5;
+ private static final int MIN_FACET_COUNT_REQUIRED = 1;
+ private static final String TERMS_PREFIX = "terms.prefix";
+ private static final String TERMS_FIELD = "terms.fl";
+ private static final int SOLR_HEALTHY_STATUS = 0;
+ private static final long SOLR_STATUS_LOG_FREQUENCY_MS = 60000;//Prints SOLR DOWN status for every 1 min
+ private static long prevSolrHealthCheckTime;
+
private final Configuration configuration;
@@ -70,6 +80,29 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
this.configuration = configuration;
}
+ public boolean isHealthy() {
+ boolean isHealthy = false;
+ long currentTime = System.currentTimeMillis();
+
+ try {
+ if (isSolrHealthy()) {
+ isHealthy = true;
+ }
+ } catch (Exception exception) {
+ if (LOG.isDebugEnabled()) {
+ LOG.error("Error: isHealthy", exception);
+ }
+ }
+
+ if (!isHealthy && (prevSolrHealthCheckTime == 0 || currentTime - prevSolrHealthCheckTime > SOLR_STATUS_LOG_FREQUENCY_MS)) {
+ LOG.info("Solr Health: Unhealthy!");
+
+ prevSolrHealthCheckTime = currentTime;
+ }
+
+ return isHealthy;
+ }
+
@Override
public void applySearchWeight(String collectionName, Map<String, Integer> indexFieldName2SearchWeightMap) {
SolrClient solrClient = null;
@@ -340,6 +373,12 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
return Collections.EMPTY_LIST;
}
+ private boolean isSolrHealthy() throws SolrServerException, IOException {
+ SolrClient client = Solr6Index.getSolrClient();
+
+ return client != null && client.ping(Constants.VERTEX_INDEX).getStatus() == SOLR_HEALTHY_STATUS;
+ }
+
private void graphManagementCommit(AtlasGraphManagement management) {
try {
management.commit();
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
index 1cc7f8b..b3eb071 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
@@ -19,37 +19,46 @@ package org.apache.atlas.repository.graphdb.janus;
import com.google.common.base.Preconditions;
import org.apache.atlas.AtlasConfiguration;
-import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
+import org.apache.atlas.repository.graphdb.AtlasEdgeLabel;
import org.apache.atlas.repository.graphdb.AtlasElement;
+import org.apache.atlas.repository.graphdb.AtlasGraphIndex;
+import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
+import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
+import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.janusgraph.core.Cardinality;
import org.janusgraph.core.EdgeLabel;
+import org.janusgraph.core.JanusGraph;
import org.janusgraph.core.JanusGraphElement;
+import org.janusgraph.core.JanusGraphFactory;
import org.janusgraph.core.PropertyKey;
-import org.janusgraph.core.schema.*;
+import org.janusgraph.core.log.TransactionRecovery;
+import org.janusgraph.core.schema.ConsistencyModifier;
+import org.janusgraph.core.schema.JanusGraphIndex;
+import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.core.schema.JanusGraphManagement.IndexBuilder;
+import org.janusgraph.core.schema.Mapping;
+import org.janusgraph.core.schema.Parameter;
+import org.janusgraph.core.schema.PropertyKeyMaker;
import org.janusgraph.diskstorage.BackendTransaction;
import org.janusgraph.diskstorage.indexing.IndexEntry;
import org.janusgraph.graphdb.database.IndexSerializer;
import org.janusgraph.graphdb.database.StandardJanusGraph;
import org.janusgraph.graphdb.database.management.ManagementSystem;
import org.janusgraph.graphdb.internal.Token;
-import org.apache.atlas.repository.graphdb.AtlasCardinality;
-import org.apache.atlas.repository.graphdb.AtlasEdgeLabel;
-import org.apache.atlas.repository.graphdb.AtlasGraphIndex;
-import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
-import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
-import org.apache.commons.lang.StringUtils;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.janusgraph.graphdb.log.StandardTransactionLogProcessor;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.types.IndexType;
import org.janusgraph.graphdb.types.MixedIndexType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -347,6 +356,62 @@ public class AtlasJanusGraphManagement implements AtlasGraphManagement {
}
}
+ @Override
+ public Object startIndexRecovery(long recoveryStartTime) {
+ Instant recoveryStartInstant = Instant.ofEpochMilli(recoveryStartTime);
+ JanusGraph janusGraph = this.graph.getGraph();
+
+ return JanusGraphFactory.startTransactionRecovery(janusGraph, recoveryStartInstant);
+ }
+
+ @Override
+ public void stopIndexRecovery(Object txRecoveryObject) {
+ if (txRecoveryObject == null) {
+ return;
+ }
+
+ try {
+ if (txRecoveryObject instanceof TransactionRecovery) {
+ TransactionRecovery txRecovery = (TransactionRecovery) txRecoveryObject;
+ StandardJanusGraph janusGraph = (StandardJanusGraph) this.graph.getGraph();
+
+ LOG.info("stopIndexRecovery: Index Client is unhealthy. Index recovery: Paused!");
+
+ janusGraph.getBackend().getSystemTxLog().close();
+
+ txRecovery.shutdown();
+ } else {
+ LOG.error("stopIndexRecovery({}): Invalid transaction recovery object!", txRecoveryObject);
+ }
+ } catch (Exception e) {
+ LOG.warn("stopIndexRecovery: Error while shutting down transaction recovery", e);
+ }
+ }
+
+ @Override
+ public void printIndexRecoveryStats(Object txRecoveryObject) {
+ if (txRecoveryObject == null) {
+ return;
+ }
+
+ try {
+ if (txRecoveryObject instanceof TransactionRecovery) {
+ StandardTransactionLogProcessor txRecovery = (StandardTransactionLogProcessor) txRecoveryObject;
+ long[] statistics = txRecovery.getStatistics();
+
+ if (statistics.length >= 2) {
+ LOG.info("Index Recovery: Stats: Success:{}: Failed: {}", statistics[0], statistics[1]);
+ } else {
+ LOG.info("Index Recovery: Stats: {}", statistics);
+ }
+ } else {
+ LOG.error("Transaction stats: Invalid transaction recovery object!: Unexpected type: {}: Details: {}", txRecoveryObject.getClass().toString(), txRecoveryObject);
+ }
+ } catch (Exception e) {
+ LOG.error("Error: Retrieving log transaction stats!", e);
+ }
+ }
+
private void reindexElement(ManagementSystem managementSystem, IndexSerializer indexSerializer, MixedIndexType indexType, List<AtlasElement> elements) throws Exception {
Map<String, Map<String, List<IndexEntry>>> documentsPerStore = new HashMap<>();
StandardJanusGraphTx tx = managementSystem.getWrappedTx();
diff --git a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
index 682206d..4a37c77 100644
--- a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
+++ b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
@@ -50,6 +50,7 @@ public final class ApplicationProperties extends PropertiesConfiguration {
public static final String INDEX_BACKEND_CONF = "atlas.graph.index.search.backend";
public static final String INDEX_MAP_NAME_CONF = "atlas.graph.index.search.map-name";
public static final String SOLR_WAIT_SEARCHER_CONF = "atlas.graph.index.search.solr.wait-searcher";
+ public static final String INDEX_RECOVERY_CONF = "atlas.index.recovery.enable";
public static final String ENABLE_FULLTEXT_SEARCH_CONF = "atlas.search.fulltext.enable";
public static final String ENABLE_FREETEXT_SEARCH_CONF = "atlas.search.freetext.enable";
public static final String ATLAS_RUN_MODE = "atlas.run.mode";
@@ -66,6 +67,7 @@ public final class ApplicationProperties extends PropertiesConfiguration {
public static final String DEFAULT_GRAPHDB_BACKEND = GRAPHBD_BACKEND_JANUS;
public static final boolean DEFAULT_SOLR_WAIT_SEARCHER = false;
public static final boolean DEFAULT_INDEX_MAP_NAME = false;
+ public static final boolean DEFAULT_INDEX_RECOVERY = true;
public static final AtlasRunMode DEFAULT_ATLAS_RUN_MODE = AtlasRunMode.PROD;
public static final String INDEX_SEARCH_MAX_RESULT_SET_SIZE = "atlas.graph.index.search.max-result-set-size";
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index ddfb008..d65bb1a 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -353,6 +353,9 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
createCommonVertexIndex(management, TASK_CREATED_TIME, UniqueKind.NONE, Long.class, SINGLE, true, false);
createCommonVertexIndex(management, TASK_STATUS, UniqueKind.NONE, String.class, SINGLE, true, false);
+ // index recovery
+ createCommonVertexIndex(management, PROPERTY_KEY_INDEX_RECOVERY_NAME, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
+
// create vertex-centric index
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class, SINGLE);
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, Boolean.class, SINGLE);
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
new file mode 100644
index 0000000..2f11610
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.graph;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.listener.ActiveStateChangeHandler;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.service.Service;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.util.Iterator;
+import java.util.TimeZone;
+
+import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY;
+import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_NAME;
+import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME;
+import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_START_TIME;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
+
+@Component
+@Order(8)
+public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(IndexRecoveryService.class);
+ private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+ private static final String INDEX_HEALTH_MONITOR_THREAD_NAME = "index-health-monitor";
+ private static final String SOLR_STATUS_CHECK_RETRY_INTERVAL = "atlas.graph.index.status.check.frequency";
+ private static final String SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME = "atlas.graph.index.recovery.start.time";
+ private static final long SOLR_STATUS_RETRY_DEFAULT_MS = 30000; // 30 secs default
+
+ private final Thread indexHealthMonitor;
+ private final RecoveryInfoManagement recoveryInfoManagement;
+ private Configuration configuration;
+ private boolean isIndexRecoveryEnabled;
+
+ @Inject
+ public IndexRecoveryService(Configuration config, AtlasGraph graph) {
+ this.configuration = config;
+ this.isIndexRecoveryEnabled = config.getBoolean(ApplicationProperties.INDEX_RECOVERY_CONF, DEFAULT_INDEX_RECOVERY);
+ long recoveryStartTimeFromConfig = getRecoveryStartTimeFromConfig(config);
+ long healthCheckFrequencyMillis = config.getLong(SOLR_STATUS_CHECK_RETRY_INTERVAL, SOLR_STATUS_RETRY_DEFAULT_MS);
+ this.recoveryInfoManagement = new RecoveryInfoManagement(graph);
+
+ RecoveryThread recoveryThread = new RecoveryThread(recoveryInfoManagement, graph, recoveryStartTimeFromConfig, healthCheckFrequencyMillis);
+ this.indexHealthMonitor = new Thread(recoveryThread, INDEX_HEALTH_MONITOR_THREAD_NAME);
+ }
+
+ private long getRecoveryStartTimeFromConfig(Configuration config) {
+ long ret = 0L;
+
+ try {
+ String time = config.getString(SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME);
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ ret = dateFormat.parse(time).toInstant().toEpochMilli();
+ } catch (Exception e) {
+ LOG.debug("Error fetching: {}", SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public void start() throws AtlasException {
+ if (configuration == null || !HAConfiguration.isHAEnabled(configuration)) {
+ LOG.info("==> IndexRecoveryService.start()");
+
+ startTxLogMonitoring();
+
+ LOG.info("<== IndexRecoveryService.start()");
+ }
+ }
+
+ @Override
+ public void stop() throws AtlasException {
+ try {
+ indexHealthMonitor.join();
+ } catch (InterruptedException e) {
+ LOG.error("indexHealthMonitor: Interrupted", e);
+ }
+ }
+
+ @Override
+ public void instanceIsActive() throws AtlasException {
+ LOG.info("==> IndexRecoveryService.instanceIsActive()");
+
+ startTxLogMonitoring();
+
+ LOG.info("<== IndexRecoveryService.instanceIsActive()");
+ }
+
+ @Override
+ public void instanceIsPassive() throws AtlasException {
+ LOG.info("IndexRecoveryService.instanceIsPassive(): no action needed.");
+ }
+
+ @Override
+ public int getHandlerOrder() {
+ return ActiveStateChangeHandler.HandlerOrder.INDEX_RECOVERY.getOrder();
+ }
+
+ private void startTxLogMonitoring() {
+ if (!isIndexRecoveryEnabled) {
+ LOG.warn("IndexRecoveryService: Recovery should be enabled.");
+
+ return;
+ }
+
+ indexHealthMonitor.start();
+ }
+
+ private static class RecoveryThread implements Runnable {
+ private final AtlasGraph graph;
+ private final RecoveryInfoManagement recoveryInfoManagement;
+ private long indexStatusCheckRetryMillis;
+ private Object txRecoveryObject;
+
+ private RecoveryThread(RecoveryInfoManagement recoveryInfoManagement, AtlasGraph graph, long startTimeFromConfig, long healthCheckFrequencyMillis) {
+ this.graph = graph;
+ this.recoveryInfoManagement = recoveryInfoManagement;
+ this.indexStatusCheckRetryMillis = healthCheckFrequencyMillis;
+
+ if (startTimeFromConfig > 0) {
+ this.recoveryInfoManagement.updateStartTime(startTimeFromConfig);
+ }
+ }
+
+ public void run() {
+ LOG.info("Index Health Monitor: Starting...");
+
+ while (true) {
+ try {
+ boolean solrHealthy = isSolrHealthy();
+
+ if (this.txRecoveryObject == null && solrHealthy) {
+ startMonitoring();
+ }
+
+ if (this.txRecoveryObject != null && !solrHealthy) {
+ stopMonitoring();
+ }
+ } catch (Exception e) {
+ LOG.error("Error: Index recovery monitoring!", e);
+ }
+ }
+ }
+
+ private boolean isSolrHealthy() throws AtlasException, InterruptedException {
+ Thread.sleep(indexStatusCheckRetryMillis);
+
+ return this.graph.getGraphIndexClient().isHealthy();
+ }
+
+ private void startMonitoring() {
+ Long startTime = null;
+
+ try {
+ startTime = recoveryInfoManagement.getStartTime();
+ txRecoveryObject = this.graph.getManagementSystem().startIndexRecovery(startTime);
+
+ printIndexRecoveryStats();
+ } catch (Exception e) {
+ LOG.error("Index Recovery: Start: Error!", e);
+ } finally {
+ LOG.info("Index Recovery: Started! Recovery time: {}", Instant.ofEpochMilli(startTime));
+ }
+ }
+
+ private void stopMonitoring() {
+ Instant newStartTime = Instant.now().minusMillis(indexStatusCheckRetryMillis);
+
+ try {
+ this.graph.getManagementSystem().stopIndexRecovery(txRecoveryObject);
+
+ recoveryInfoManagement.updateStartTime(newStartTime.toEpochMilli());
+
+ printIndexRecoveryStats();
+ } catch (Exception e) {
+ LOG.info("Index Recovery: Stopped! Error!", e);
+ } finally {
+ this.txRecoveryObject = null;
+
+ LOG.info("Index Recovery: Stopped! Recovery time: {}", newStartTime);
+ }
+ }
+
+ private void printIndexRecoveryStats() {
+ this.graph.getManagementSystem().printIndexRecoveryStats(txRecoveryObject);
+ }
+ }
+
+ @VisibleForTesting
+ static class RecoveryInfoManagement {
+ private static final String INDEX_RECOVERY_TYPE_NAME = "__solrIndexRecoveryInfo";
+
+ private final AtlasGraph graph;
+
+ public RecoveryInfoManagement(AtlasGraph graph) {
+ this.graph = graph;
+ }
+
+ public void updateStartTime(Long startTime) {
+ try {
+ Long prevStartTime = null;
+ AtlasVertex vertex = findVertex();
+
+ if (vertex == null) {
+ vertex = graph.addVertex();
+ } else {
+ prevStartTime = getStartTime(vertex);
+ }
+
+ setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME);
+ setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_START_TIME, startTime);
+ setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME, prevStartTime);
+
+ } catch (Exception ex) {
+ LOG.error("Error: Updating: {}!", ex);
+ } finally {
+ graph.commit();
+ }
+ }
+
+ public Long getStartTime() {
+ AtlasVertex vertex = findVertex();
+
+ return getStartTime(vertex);
+ }
+
+ private Long getStartTime(AtlasVertex vertex) {
+ if (vertex == null) {
+ LOG.warn("Vertex passed is NULL: Returned is 0");
+
+ return 0L;
+ }
+
+ Long startTime = 0L;
+
+ try {
+ startTime = vertex.getProperty(PROPERTY_KEY_INDEX_RECOVERY_START_TIME, Long.class);
+ } catch (Exception e) {
+ LOG.error("Error retrieving startTime", e);
+ }
+
+ return startTime;
+ }
+
+ private AtlasVertex findVertex() {
+ AtlasGraphQuery query = graph.query().has(PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME);
+ Iterator<AtlasVertex> results = query.vertices().iterator();
+
+ return results.hasNext() ? results.next() : null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/RecoveryInfoManagementTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/RecoveryInfoManagementTest.java
new file mode 100644
index 0000000..d0382f2
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/RecoveryInfoManagementTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.graph;
+
+import com.google.inject.Inject;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.repository.AtlasTestBase;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class RecoveryInfoManagementTest extends AtlasTestBase {
+
+ @Inject
+ private AtlasGraph atlasGraph;
+
+
+ @BeforeTest
+ public void setupTest() {
+ RequestContext.clear();
+ RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
+ }
+
+ @BeforeClass
+ public void initialize() throws Exception {
+ super.initialize();
+ }
+
+ @AfterClass
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @Test
+ public void verifyCreateUpdate() {
+ IndexRecoveryService.RecoveryInfoManagement rm = new IndexRecoveryService.RecoveryInfoManagement(atlasGraph);
+ long now = System.currentTimeMillis();
+ rm.updateStartTime(now);
+
+ long storedTime = rm.getStartTime();
+ Assert.assertEquals(now, storedTime);
+ }
+}
diff --git a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
index ba8f088..2916d6b 100644
--- a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
+++ b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
@@ -33,7 +33,8 @@ public interface ActiveStateChangeHandler {
ATLAS_PATCH_SERVICE(3),
DEFAULT_METADATA_SERVICE(4),
NOTIFICATION_HOOK_CONSUMER(5),
- TASK_MANAGEMENT(6);
+ TASK_MANAGEMENT(6),
+ INDEX_RECOVERY(7);
private final int order;