You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mk...@apache.org on 2020/05/14 07:27:50 UTC

svn commit: r1877724 - in /jackrabbit/oak/trunk/oak-search-elastic/src: main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ main/java/org/apache/jackrabbit/oak/plugins/...

Author: mkataria
Date: Thu May 14 07:27:50 2020
New Revision: 1877724

URL: http://svn.apache.org/viewvc?rev=1877724&view=rev
Log:
OAK-9053: Reindexing Strategy for ES indexes Patch provided by:armverma

Modified:
    jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java
    jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java
    jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java
    jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java
    jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java
    jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatisticsTest.java

Modified: jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java?rev=1877724&r1=1877723&r2=1877724&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java Thu May 14 07:27:50 2020
@@ -19,6 +19,7 @@
 package org.apache.jackrabbit.oak.plugins.index.elasticsearch;
 
 import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
 import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 
@@ -63,11 +64,15 @@ public class ElasticsearchIndexDefinitio
     public final int bulkRetries;
     public final long bulkRetriesBackoff;
     private final String indexPrefix;
+    private final String remoteAlias;
 
     public ElasticsearchIndexDefinition(NodeState root, NodeState defn, String indexPath, String indexPrefix) {
         super(root, getIndexDefinitionState(defn), determineIndexFormatVersion(defn), determineUniqueId(defn), indexPath);
+        boolean isReindex = defn.getBoolean(IndexConstants.REINDEX_PROPERTY_NAME);
+        String indexSuffix = "-" + (getReindexCount() + (isReindex ? 1 : 0));
         this.indexPrefix = indexPrefix != null ? indexPrefix : "";
-        this.remoteIndexName = setupIndexName();
+        this.remoteAlias = setupAlias();
+        this.remoteIndexName = getESSafeIndexName(this.remoteAlias + indexSuffix);
         this.bulkActions = getOptionalValue(defn, BULK_ACTIONS, BULK_ACTIONS_DEFAULT);
         this.bulkSizeBytes = getOptionalValue(defn, BULK_SIZE_BYTES, BULK_SIZE_BYTES_DEFAULT);
         this.bulkFlushIntervalMs = getOptionalValue(defn, BULK_FLUSH_INTERVAL_MS, BULK_FLUSH_INTERVAL_MS_DEFAULT);
@@ -76,17 +81,27 @@ public class ElasticsearchIndexDefinitio
     }
 
     /**
+     * Returns the index alias on the Elasticsearch cluster. This alias should be used for any index related operations
+     * instead of accessing the index directly.
+     * @return the Elasticsearch index alias
+     */
+    public String getRemoteIndexAlias() {
+        return remoteAlias;
+    }
+
+    /**
      * Returns the index identifier on the Elasticsearch cluster. Notice this can be different from the value returned
-     * from {@code getIndexName}.
+     * from {@code getIndexName}. The index name shouldn't be used for index read or updates. Alias obtained from {@link #getRemoteIndexAlias()}
+     * should be used for such purposes.
      * @return the Elasticsearch index identifier
      */
     public String getRemoteIndexName() {
         return remoteIndexName;
     }
 
-    private String setupIndexName() {
+    private String setupAlias() {
         // TODO: implement advanced remote index name strategy that takes into account multiple tenants and re-index process
-        return getESSafeIndexName(indexPrefix + "." + getIndexPath() + "-" + getReindexCount());
+        return getESSafeIndexName(indexPrefix + "." + getIndexPath());
     }
 
     /**

Modified: jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java?rev=1877724&r1=1877723&r2=1877724&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java Thu May 14 07:27:50 2020
@@ -21,6 +21,9 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
 import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriter;
 import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
@@ -28,9 +31,13 @@ import org.elasticsearch.action.bulk.Bul
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.GetAliasesResponse;
+import org.elasticsearch.client.IndicesClient;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.indices.CreateIndexRequest;
 import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.cluster.metadata.AliasMetaData;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
@@ -44,8 +51,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.index.ElasticsearchDocument.pathToId;
@@ -92,7 +100,7 @@ class ElasticsearchIndexWriter implement
 
     @Override
     public void updateDocument(String path, ElasticsearchDocument doc) throws IOException {
-        IndexRequest request = new IndexRequest(indexDefinition.getRemoteIndexName())
+        IndexRequest request = new IndexRequest(indexDefinition.getRemoteIndexAlias())
                 .id(pathToId(path))
                 .source(doc.build(), XContentType.JSON);
         bulkProcessor.add(request);
@@ -100,7 +108,7 @@ class ElasticsearchIndexWriter implement
 
     @Override
     public void deleteDocuments(String path) throws IOException {
-        DeleteRequest request = new DeleteRequest(indexDefinition.getRemoteIndexName())
+        DeleteRequest request = new DeleteRequest(indexDefinition.getRemoteIndexAlias())
                 .id(pathToId(path));
         bulkProcessor.add(request);
     }
@@ -142,7 +150,59 @@ class ElasticsearchIndexWriter implement
     // TODO: we need to check if the index already exists and in that case we have to figure out if there are
     // "breaking changes" in the index definition
     protected void provisionIndex() throws IOException {
-        CreateIndexRequest request = new CreateIndexRequest(indexDefinition.getRemoteIndexName());
+
+        IndicesClient indicesClient = elasticsearchConnection.getClient().indices();
+        final String indexName = indexDefinition.getRemoteIndexName();
+
+        CreateIndexRequest createIndexRequest = constructCreateIndexRequest(indexName);
+        String requestMsg = Strings.toString(createIndexRequest.toXContent(jsonBuilder(), EMPTY_PARAMS));
+        CreateIndexResponse response = indicesClient.create(createIndexRequest, RequestOptions.DEFAULT);
+        checkResponseAcknowledgement(response, "Create index call not acknowledged for index " + indexName);
+
+        LOG.info("Updated settings for index {} = {}. Response acknowledged: {}",
+                indexDefinition.getRemoteIndexAlias(), requestMsg, response.isAcknowledged());
+
+
+        GetAliasesRequest getAliasesRequest = new GetAliasesRequest(indexDefinition.getRemoteIndexAlias());
+        GetAliasesResponse aliasesResponse = indicesClient.getAlias(getAliasesRequest, RequestOptions.DEFAULT);
+        Map<String, Set<AliasMetaData>> aliases = aliasesResponse.getAliases();
+        IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
+        for (String oldIndexName : aliases.keySet()) {
+            IndicesAliasesRequest.AliasActions removeAction = new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.REMOVE);
+            removeAction.index(oldIndexName).alias(indexDefinition.getRemoteIndexAlias());
+            indicesAliasesRequest.addAliasAction(removeAction);
+        }
+        IndicesAliasesRequest.AliasActions addAction = new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD);
+        addAction.index(indexName).alias(indexDefinition.getRemoteIndexAlias());
+        indicesAliasesRequest.addAliasAction(addAction);
+        AcknowledgedResponse updateAliasResponse = indicesClient.updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT);
+        checkResponseAcknowledgement(updateAliasResponse, "Update alias call not acknowledged for alias "
+                + indexDefinition.getRemoteIndexAlias());
+        LOG.info("Updated alias {} to index {}. Response acknowledged: {}", indexDefinition.getRemoteIndexAlias(),
+                indexName, updateAliasResponse.isAcknowledged());
+        deleteOldIndices(indicesClient, aliases.keySet());
+    }
+
+    private void checkResponseAcknowledgement(AcknowledgedResponse response, String exceptionMessage) {
+        if (!response.isAcknowledged()) {
+            throw new IllegalStateException(exceptionMessage);
+        }
+    }
+
+    private void deleteOldIndices(IndicesClient indicesClient, Set<String> indices) throws IOException {
+        if (indices.size() == 0)
+            return;
+        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest();
+        for (String oldIndexName : indices) {
+            deleteIndexRequest.indices(oldIndexName);
+        }
+        AcknowledgedResponse deleteIndexResponse = indicesClient.delete(deleteIndexRequest, RequestOptions.DEFAULT);
+        checkResponseAcknowledgement(deleteIndexResponse, "Delete index call not acknowledged for indices " + indices);
+        LOG.info("Deleted indices {}. Response acknowledged: {}", indices.toString(), deleteIndexResponse.isAcknowledged());
+    }
+
+    private CreateIndexRequest constructCreateIndexRequest(String indexName) throws IOException {
+        CreateIndexRequest request = new CreateIndexRequest(indexName);
 
         // provision settings
         request.settings(Settings.builder()
@@ -179,11 +239,7 @@ class ElasticsearchIndexWriter implement
         mappingBuilder.endObject();
         request.mapping(mappingBuilder);
 
-        String requestMsg = Strings.toString(request.toXContent(jsonBuilder(), EMPTY_PARAMS));
-        CreateIndexResponse response = elasticsearchConnection.getClient().indices().create(request, RequestOptions.DEFAULT);
-
-        LOG.info("Updated settings for index {} = {}. Response acknowledged: {}",
-                indexDefinition.getRemoteIndexName(), requestMsg, response.isAcknowledged());
+        return request;
     }
 
     private class OakBulkProcessorListener implements BulkProcessor.Listener {

Modified: jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java?rev=1877724&r1=1877723&r2=1877724&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java Thu May 14 07:27:50 2020
@@ -84,7 +84,7 @@ class ElasticsearchIndexStatistics imple
     @Override
     public int numDocs() {
         return statsCache.getUnchecked(
-                new CountRequestDescriptor(elasticsearchConnection, indexDefinition.getRemoteIndexName(), null)
+                new CountRequestDescriptor(elasticsearchConnection, indexDefinition.getRemoteIndexAlias(), null)
         );
     }
 
@@ -95,7 +95,7 @@ class ElasticsearchIndexStatistics imple
     @Override
     public int getDocCountFor(String field) {
         return statsCache.getUnchecked(
-                new CountRequestDescriptor(elasticsearchConnection, indexDefinition.getRemoteIndexName(), field)
+                new CountRequestDescriptor(elasticsearchConnection, indexDefinition.getRemoteIndexAlias(), field)
         );
     }
 

Modified: jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java?rev=1877724&r1=1877723&r2=1877724&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java Thu May 14 07:27:50 2020
@@ -37,7 +37,7 @@ public class ElasticsearchSearcher {
     public SearchResponse search(ElasticsearchSearcherModel elasticsearchSearcherModel) throws IOException {
         SearchSourceBuilder searchSourceBuilder = SearchSourceBuilderUtil.createSearchSourceBuilder(elasticsearchSearcherModel);
 
-        SearchRequest request = new SearchRequest(indexNode.getDefinition().getRemoteIndexName())
+        SearchRequest request = new SearchRequest(indexNode.getDefinition().getRemoteIndexAlias())
                 .source(searchSourceBuilder);
 
         return indexNode.getConnection().getClient().search(request, RequestOptions.DEFAULT);
@@ -51,7 +51,7 @@ public class ElasticsearchSearcher {
                 .storedField(FieldNames.PATH)
                 .size(batchSize);
 
-        SearchRequest request = new SearchRequest(indexNode.getDefinition().getRemoteIndexName())
+        SearchRequest request = new SearchRequest(indexNode.getDefinition().getRemoteIndexAlias())
                 .source(searchSourceBuilder);
 
         return indexNode.getConnection().getClient().search(request, RequestOptions.DEFAULT);

Modified: jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java?rev=1877724&r1=1877723&r2=1877724&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java Thu May 14 07:27:50 2020
@@ -51,7 +51,7 @@ public class ElasticsearchIndexWriterTes
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        when(indexDefinitionMock.getRemoteIndexName()).thenReturn("test-index");
+        when(indexDefinitionMock.getRemoteIndexAlias()).thenReturn("test-index");
         indexWriter = new ElasticsearchIndexWriter(elasticsearchConnectionMock, indexDefinitionMock, bulkProcessorMock);
     }
 

Modified: jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatisticsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatisticsTest.java?rev=1877724&r1=1877723&r2=1877724&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatisticsTest.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatisticsTest.java Thu May 14 07:27:50 2020
@@ -58,7 +58,7 @@ public class ElasticsearchIndexStatistic
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        when(indexDefinitionMock.getRemoteIndexName()).thenReturn("test-index");
+        when(indexDefinitionMock.getRemoteIndexAlias()).thenReturn("test-index");
         when(elasticsearchConnectionMock.getClient()).thenReturn(elasticClientMock);
     }