You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ng...@apache.org on 2020/04/21 12:08:44 UTC

svn commit: r1876791 - in /jackrabbit/oak/trunk/oak-search-elastic/src: main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ test/java/org/apache/jackrabbit/oak/plugins/...

Author: ngupta
Date: Tue Apr 21 12:08:44 2020
New Revision: 1876791

URL: http://svn.apache.org/viewvc?rev=1876791&view=rev
Log:
OAK-9015 | ElasticserachIndexWriter#close method implementation should follow the underlying contract's doc

Modified:
    jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java
    jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java
    jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java

Modified: jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java?rev=1876791&r1=1876790&r2=1876791&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java Tue Apr 21 12:08:44 2020
@@ -44,6 +44,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.index.ElasticsearchDocument.pathToId;
@@ -57,6 +59,7 @@ class ElasticsearchIndexWriter implement
     private final ElasticsearchIndexDefinition indexDefinition;
 
     private final BulkProcessor bulkProcessor;
+    private Optional<Boolean> indexUpdated = Optional.empty();
 
     ElasticsearchIndexWriter(@NotNull ElasticsearchConnection elasticsearchConnection,
                                        @NotNull ElasticsearchIndexDefinition indexDefinition) {
@@ -77,7 +80,7 @@ class ElasticsearchIndexWriter implement
     private BulkProcessor initBulkProcessor() {
         return BulkProcessor.builder((request, bulkListener) ->
                         elasticsearchConnection.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
-                new OakBulkProcessorLister())
+                new OakBulkProcessorListener())
                 .setBulkActions(indexDefinition.bulkActions)
                 .setBulkSize(new ByteSizeValue(indexDefinition.bulkSizeBytes))
                 .setFlushInterval(TimeValue.timeValueMillis(indexDefinition.bulkFlushIntervalMs))
@@ -104,9 +107,36 @@ class ElasticsearchIndexWriter implement
 
     @Override
     public boolean close(long timestamp) throws IOException {
-        // TODO : track index updates and return accordingly
-        // TODO : if/when we do async push, this is where to wait for those ops to complete
-        return false;
+        LOG.trace("Calling close on bulk processor {}", bulkProcessor);
+        bulkProcessor.close();
+        LOG.trace("Bulk Processor {} closed", bulkProcessor);
+
+        // bulkProcessor.close() calls the OakBulkProcessorListener.beforeBulk in a blocking manner
+        // indexUpdated would be unset there if it was false till now (not even a single update succeeded)
+        // in this case wait for sometime for the last OakBulkProcessorListener.afterBulk to be called
+        // where indexUpdated can possibly be set to true, return false in case of timeout.
+        // We don't wait in case indexUpdated is already set (This would be if any of the previous flushes for this processor
+        // were successful i.e index was updated at least once)
+        final long start = System.currentTimeMillis();
+        long timeoutMillis = indexDefinition.bulkFlushIntervalMs * 5 ;
+        while (!indexUpdated.isPresent()) {
+            long lastAttempt = System.currentTimeMillis();
+            long elapsedTime = lastAttempt - start;
+            if (elapsedTime > timeoutMillis) {
+                // indexUpdate was not set till now, return false
+                LOG.trace("Timed out waiting for the bulk processor response. Returning indexUpdated = false");
+                return false;
+            } else {
+                try {
+                    LOG.trace("Waiting for afterBulk response...");
+                    Thread.sleep(100);
+                } catch (InterruptedException ex) {
+                    //
+                }
+            }
+        }
+        LOG.trace("Returning indexUpdated = {}", indexUpdated.get());
+        return indexUpdated.get();
     }
 
     // TODO: we need to check if the index already exists and in that case we have to figure out if there are
@@ -156,10 +186,16 @@ class ElasticsearchIndexWriter implement
                 indexDefinition.getRemoteIndexName(), requestMsg, response.isAcknowledged());
     }
 
-    private static class OakBulkProcessorLister implements BulkProcessor.Listener {
+    private class OakBulkProcessorListener implements BulkProcessor.Listener {
 
         @Override
         public void beforeBulk(long executionId, BulkRequest bulkRequest) {
+            if (indexUpdated.isPresent() && !indexUpdated.get()) {
+                // Reset the state only if it's false
+                // If it's true that means index was updated at least once by this processor
+                // and we can return true for indexUpdate.
+                indexUpdated = Optional.empty();
+            }
             LOG.info("Sending bulk with id {} -> {}", executionId, bulkRequest.getDescription());
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Bulk Requests: \n{}", bulkRequest.requests()
@@ -185,14 +221,31 @@ class ElasticsearchIndexWriter implement
                     if (bulkItemResponse.isFailed()) {
                         BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                         LOG.error("Bulk item with id {} failed", failure.getId(), failure.getCause());
+                    } else {
+                        // Set indexUpdated to true even if 1 item was updated successfully
+                        indexUpdated = Optional.of(true);
                     }
                 }
+                // Only set indexUpdated to false if it's unset
+                // If set and true, that means index was updated at least once by this processor.
+                // If set and false, no need to do anything
+                if (!indexUpdated.isPresent()) {
+                    indexUpdated = Optional.of(false);
+                }
+            } else {
+                indexUpdated = Optional.of(true);
             }
         }
 
         @Override
         public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) {
-            LOG.error("Bulk with id {} thrown an error", executionId, throwable);
+            // Only set indexUpdated to false if it's unset
+            // If set and true, that means index was updated at least once by this processor.
+            // If set and false, no need to do anything
+            if (!indexUpdated.isPresent()) {
+                indexUpdated = Optional.of(false);
+            }
+            LOG.error("Bulk with id {} threw an error", executionId, throwable);
         }
     }
 }

Modified: jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java?rev=1876791&r1=1876790&r2=1876791&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java Tue Apr 21 12:08:44 2020
@@ -107,6 +107,40 @@ public class ElasticsearchPropertyIndexT
     }
 
     @Test
+    public void testBulkProcessorFlushLimit() throws Exception {
+        setIndex("test1", createIndex("propa"));
+
+        Tree test = root.getTree("/").addChild("test");
+        for (int i = 1; i < 249; i++) {
+            test.addChild("a" + i).setProperty("propa", "foo" + i);
+        }
+        root.commit();
+
+        // 250 is the default flush limit for bulk processor, and we added just less than 250 nodes
+        // So once the index writer is closed , bulk Processor would be closed and all the 248 entries should be flushed.
+        // Make sure that the last entry is indexed correctly.
+        String propaQuery = "select [jcr:path] from [nt:base] where [propa] = 'foo248'";
+        assertEventually(() -> {
+            assertThat(explain(propaQuery), containsString("elasticsearch:test1"));
+
+            assertQuery(propaQuery, singletonList("/test/a248"));
+        });
+
+        // Now we test for 250 < nodes < 500
+
+        for (int i = 250 ; i < 300 ; i ++) {
+            test.addChild("a" + i).setProperty("propa", "foo" + i);
+        }
+        root.commit();
+        String propaQuery2 = "select [jcr:path] from [nt:base] where [propa] = 'foo299'";
+        assertEventually(() -> {
+            assertThat(explain(propaQuery2), containsString("elasticsearch:test1"));
+
+            assertQuery(propaQuery2, singletonList("/test/a299"));
+        });
+    }
+
+    @Test
     public void indexSelection() throws Exception {
         setIndex("test1", createIndex("propa", "propb"));
         setIndex("test2", createIndex("propc"));
@@ -219,4 +253,5 @@ public class ElasticsearchPropertyIndexT
     private static void assertEventually(Runnable r) {
         ElasticsearchTestUtils.assertEventually(r, BULK_FLUSH_INTERVAL_MS_DEFAULT * 3);
     }
+
 }

Modified: jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java?rev=1876791&r1=1876790&r2=1876791&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java Tue Apr 21 12:08:44 2020
@@ -92,4 +92,10 @@ public class ElasticsearchIndexWriterTes
         verify(bulkProcessorMock, times(2)).add(acDeleteRequest.capture());
     }
 
+    @Test
+    public void closeBulkProcessor() throws IOException {
+        indexWriter.close(System.currentTimeMillis());
+        verify(bulkProcessorMock).close();
+    }
+
 }