You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mk...@apache.org on 2020/05/14 07:27:50 UTC
svn commit: r1877724 - in /jackrabbit/oak/trunk/oak-search-elastic/src:
main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/
main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/
main/java/org/apache/jackrabbit/oak/plugins/...
Author: mkataria
Date: Thu May 14 07:27:50 2020
New Revision: 1877724
URL: http://svn.apache.org/viewvc?rev=1877724&view=rev
Log:
OAK-9053: Reindexing Strategy for ES indexes Patch provided by:armverma
Modified:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatisticsTest.java
Modified: jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java?rev=1877724&r1=1877723&r2=1877724&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java Thu May 14 07:27:50 2020
@@ -19,6 +19,7 @@
package org.apache.jackrabbit.oak.plugins.index.elasticsearch;
import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -63,11 +64,15 @@ public class ElasticsearchIndexDefinitio
public final int bulkRetries;
public final long bulkRetriesBackoff;
private final String indexPrefix;
+ private final String remoteAlias;
public ElasticsearchIndexDefinition(NodeState root, NodeState defn, String indexPath, String indexPrefix) {
super(root, getIndexDefinitionState(defn), determineIndexFormatVersion(defn), determineUniqueId(defn), indexPath);
+ boolean isReindex = defn.getBoolean(IndexConstants.REINDEX_PROPERTY_NAME);
+ String indexSuffix = "-" + (getReindexCount() + (isReindex ? 1 : 0));
this.indexPrefix = indexPrefix != null ? indexPrefix : "";
- this.remoteIndexName = setupIndexName();
+ this.remoteAlias = setupAlias();
+ this.remoteIndexName = getESSafeIndexName(this.remoteAlias + indexSuffix);
this.bulkActions = getOptionalValue(defn, BULK_ACTIONS, BULK_ACTIONS_DEFAULT);
this.bulkSizeBytes = getOptionalValue(defn, BULK_SIZE_BYTES, BULK_SIZE_BYTES_DEFAULT);
this.bulkFlushIntervalMs = getOptionalValue(defn, BULK_FLUSH_INTERVAL_MS, BULK_FLUSH_INTERVAL_MS_DEFAULT);
@@ -76,17 +81,27 @@ public class ElasticsearchIndexDefinitio
}
/**
+ * Returns the index alias on the Elasticsearch cluster. This alias should be used for any index related operations
+ * instead of accessing the index directly.
+ * @return the Elasticsearch index alias
+ */
+ public String getRemoteIndexAlias() {
+ return remoteAlias;
+ }
+
+ /**
* Returns the index identifier on the Elasticsearch cluster. Notice this can be different from the value returned
- * from {@code getIndexName}.
+ * from {@code getIndexName}. The index name shouldn't be used for index read or updates. Alias obtained from {@link #getRemoteIndexAlias()}
+ * should be used for such purposes.
* @return the Elasticsearch index identifier
*/
public String getRemoteIndexName() {
return remoteIndexName;
}
- private String setupIndexName() {
+ private String setupAlias() {
// TODO: implement advanced remote index name strategy that takes into account multiple tenants and re-index process
- return getESSafeIndexName(indexPrefix + "." + getIndexPath() + "-" + getReindexCount());
+ return getESSafeIndexName(indexPrefix + "." + getIndexPath());
}
/**
Modified: jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java?rev=1877724&r1=1877723&r2=1877724&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java Thu May 14 07:27:50 2020
@@ -21,6 +21,9 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriter;
import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
@@ -28,9 +31,13 @@ import org.elasticsearch.action.bulk.Bul
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.GetAliasesResponse;
+import org.elasticsearch.client.IndicesClient;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -44,8 +51,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.index.ElasticsearchDocument.pathToId;
@@ -92,7 +100,7 @@ class ElasticsearchIndexWriter implement
@Override
public void updateDocument(String path, ElasticsearchDocument doc) throws IOException {
- IndexRequest request = new IndexRequest(indexDefinition.getRemoteIndexName())
+ IndexRequest request = new IndexRequest(indexDefinition.getRemoteIndexAlias())
.id(pathToId(path))
.source(doc.build(), XContentType.JSON);
bulkProcessor.add(request);
@@ -100,7 +108,7 @@ class ElasticsearchIndexWriter implement
@Override
public void deleteDocuments(String path) throws IOException {
- DeleteRequest request = new DeleteRequest(indexDefinition.getRemoteIndexName())
+ DeleteRequest request = new DeleteRequest(indexDefinition.getRemoteIndexAlias())
.id(pathToId(path));
bulkProcessor.add(request);
}
@@ -142,7 +150,59 @@ class ElasticsearchIndexWriter implement
// TODO: we need to check if the index already exists and in that case we have to figure out if there are
// "breaking changes" in the index definition
protected void provisionIndex() throws IOException {
- CreateIndexRequest request = new CreateIndexRequest(indexDefinition.getRemoteIndexName());
+
+ IndicesClient indicesClient = elasticsearchConnection.getClient().indices();
+ final String indexName = indexDefinition.getRemoteIndexName();
+
+ CreateIndexRequest createIndexRequest = constructCreateIndexRequest(indexName);
+ String requestMsg = Strings.toString(createIndexRequest.toXContent(jsonBuilder(), EMPTY_PARAMS));
+ CreateIndexResponse response = indicesClient.create(createIndexRequest, RequestOptions.DEFAULT);
+ checkResponseAcknowledgement(response, "Create index call not acknowledged for index " + indexName);
+
+ LOG.info("Updated settings for index {} = {}. Response acknowledged: {}",
+ indexDefinition.getRemoteIndexAlias(), requestMsg, response.isAcknowledged());
+
+
+ GetAliasesRequest getAliasesRequest = new GetAliasesRequest(indexDefinition.getRemoteIndexAlias());
+ GetAliasesResponse aliasesResponse = indicesClient.getAlias(getAliasesRequest, RequestOptions.DEFAULT);
+ Map<String, Set<AliasMetaData>> aliases = aliasesResponse.getAliases();
+ IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
+ for (String oldIndexName : aliases.keySet()) {
+ IndicesAliasesRequest.AliasActions removeAction = new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.REMOVE);
+ removeAction.index(oldIndexName).alias(indexDefinition.getRemoteIndexAlias());
+ indicesAliasesRequest.addAliasAction(removeAction);
+ }
+ IndicesAliasesRequest.AliasActions addAction = new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD);
+ addAction.index(indexName).alias(indexDefinition.getRemoteIndexAlias());
+ indicesAliasesRequest.addAliasAction(addAction);
+ AcknowledgedResponse updateAliasResponse = indicesClient.updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT);
+ checkResponseAcknowledgement(updateAliasResponse, "Update alias call not acknowledged for alias "
+ + indexDefinition.getRemoteIndexAlias());
+ LOG.info("Updated alias {} to index {}. Response acknowledged: {}", indexDefinition.getRemoteIndexAlias(),
+ indexName, updateAliasResponse.isAcknowledged());
+ deleteOldIndices(indicesClient, aliases.keySet());
+ }
+
+ private void checkResponseAcknowledgement(AcknowledgedResponse response, String exceptionMessage) {
+ if (!response.isAcknowledged()) {
+ throw new IllegalStateException(exceptionMessage);
+ }
+ }
+
+ private void deleteOldIndices(IndicesClient indicesClient, Set<String> indices) throws IOException {
+ if (indices.size() == 0)
+ return;
+ DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest();
+ for (String oldIndexName : indices) {
+ deleteIndexRequest.indices(oldIndexName);
+ }
+ AcknowledgedResponse deleteIndexResponse = indicesClient.delete(deleteIndexRequest, RequestOptions.DEFAULT);
+ checkResponseAcknowledgement(deleteIndexResponse, "Delete index call not acknowledged for indices " + indices);
+ LOG.info("Deleted indices {}. Response acknowledged: {}", indices.toString(), deleteIndexResponse.isAcknowledged());
+ }
+
+ private CreateIndexRequest constructCreateIndexRequest(String indexName) throws IOException {
+ CreateIndexRequest request = new CreateIndexRequest(indexName);
// provision settings
request.settings(Settings.builder()
@@ -179,11 +239,7 @@ class ElasticsearchIndexWriter implement
mappingBuilder.endObject();
request.mapping(mappingBuilder);
- String requestMsg = Strings.toString(request.toXContent(jsonBuilder(), EMPTY_PARAMS));
- CreateIndexResponse response = elasticsearchConnection.getClient().indices().create(request, RequestOptions.DEFAULT);
-
- LOG.info("Updated settings for index {} = {}. Response acknowledged: {}",
- indexDefinition.getRemoteIndexName(), requestMsg, response.isAcknowledged());
+ return request;
}
private class OakBulkProcessorListener implements BulkProcessor.Listener {
Modified: jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java?rev=1877724&r1=1877723&r2=1877724&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java Thu May 14 07:27:50 2020
@@ -84,7 +84,7 @@ class ElasticsearchIndexStatistics imple
@Override
public int numDocs() {
return statsCache.getUnchecked(
- new CountRequestDescriptor(elasticsearchConnection, indexDefinition.getRemoteIndexName(), null)
+ new CountRequestDescriptor(elasticsearchConnection, indexDefinition.getRemoteIndexAlias(), null)
);
}
@@ -95,7 +95,7 @@ class ElasticsearchIndexStatistics imple
@Override
public int getDocCountFor(String field) {
return statsCache.getUnchecked(
- new CountRequestDescriptor(elasticsearchConnection, indexDefinition.getRemoteIndexName(), field)
+ new CountRequestDescriptor(elasticsearchConnection, indexDefinition.getRemoteIndexAlias(), field)
);
}
Modified: jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java?rev=1877724&r1=1877723&r2=1877724&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java Thu May 14 07:27:50 2020
@@ -37,7 +37,7 @@ public class ElasticsearchSearcher {
public SearchResponse search(ElasticsearchSearcherModel elasticsearchSearcherModel) throws IOException {
SearchSourceBuilder searchSourceBuilder = SearchSourceBuilderUtil.createSearchSourceBuilder(elasticsearchSearcherModel);
- SearchRequest request = new SearchRequest(indexNode.getDefinition().getRemoteIndexName())
+ SearchRequest request = new SearchRequest(indexNode.getDefinition().getRemoteIndexAlias())
.source(searchSourceBuilder);
return indexNode.getConnection().getClient().search(request, RequestOptions.DEFAULT);
@@ -51,7 +51,7 @@ public class ElasticsearchSearcher {
.storedField(FieldNames.PATH)
.size(batchSize);
- SearchRequest request = new SearchRequest(indexNode.getDefinition().getRemoteIndexName())
+ SearchRequest request = new SearchRequest(indexNode.getDefinition().getRemoteIndexAlias())
.source(searchSourceBuilder);
return indexNode.getConnection().getClient().search(request, RequestOptions.DEFAULT);
Modified: jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java?rev=1877724&r1=1877723&r2=1877724&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java Thu May 14 07:27:50 2020
@@ -51,7 +51,7 @@ public class ElasticsearchIndexWriterTes
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- when(indexDefinitionMock.getRemoteIndexName()).thenReturn("test-index");
+ when(indexDefinitionMock.getRemoteIndexAlias()).thenReturn("test-index");
indexWriter = new ElasticsearchIndexWriter(elasticsearchConnectionMock, indexDefinitionMock, bulkProcessorMock);
}
Modified: jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatisticsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatisticsTest.java?rev=1877724&r1=1877723&r2=1877724&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatisticsTest.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatisticsTest.java Thu May 14 07:27:50 2020
@@ -58,7 +58,7 @@ public class ElasticsearchIndexStatistic
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- when(indexDefinitionMock.getRemoteIndexName()).thenReturn("test-index");
+ when(indexDefinitionMock.getRemoteIndexAlias()).thenReturn("test-index");
when(elasticsearchConnectionMock.getClient()).thenReturn(elasticClientMock);
}