You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ng...@apache.org on 2020/04/21 12:08:44 UTC
svn commit: r1876791 - in /jackrabbit/oak/trunk/oak-search-elastic/src:
main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/
test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/
test/java/org/apache/jackrabbit/oak/plugins/...
Author: ngupta
Date: Tue Apr 21 12:08:44 2020
New Revision: 1876791
URL: http://svn.apache.org/viewvc?rev=1876791&view=rev
Log:
OAK-9015 | ElasticserachIndexWriter#close method implementation should follow the underlying contract's doc
Modified:
jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java
jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java
Modified: jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java?rev=1876791&r1=1876790&r2=1876791&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java Tue Apr 21 12:08:44 2020
@@ -44,6 +44,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.index.ElasticsearchDocument.pathToId;
@@ -57,6 +59,7 @@ class ElasticsearchIndexWriter implement
private final ElasticsearchIndexDefinition indexDefinition;
private final BulkProcessor bulkProcessor;
+ private Optional<Boolean> indexUpdated = Optional.empty();
ElasticsearchIndexWriter(@NotNull ElasticsearchConnection elasticsearchConnection,
@NotNull ElasticsearchIndexDefinition indexDefinition) {
@@ -77,7 +80,7 @@ class ElasticsearchIndexWriter implement
private BulkProcessor initBulkProcessor() {
return BulkProcessor.builder((request, bulkListener) ->
elasticsearchConnection.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
- new OakBulkProcessorLister())
+ new OakBulkProcessorListener())
.setBulkActions(indexDefinition.bulkActions)
.setBulkSize(new ByteSizeValue(indexDefinition.bulkSizeBytes))
.setFlushInterval(TimeValue.timeValueMillis(indexDefinition.bulkFlushIntervalMs))
@@ -104,9 +107,36 @@ class ElasticsearchIndexWriter implement
@Override
public boolean close(long timestamp) throws IOException {
- // TODO : track index updates and return accordingly
- // TODO : if/when we do async push, this is where to wait for those ops to complete
- return false;
+ LOG.trace("Calling close on bulk processor {}", bulkProcessor);
+ bulkProcessor.close();
+ LOG.trace("Bulk Processor {} closed", bulkProcessor);
+
+ // bulkProcessor.close() calls the OakBulkProcessorListener.beforeBulk in a blocking manner
+ // indexUpdated would be unset there if it was false till now (not even a single update succeeded)
+ // in this case wait for sometime for the last OakBulkProcessorListener.afterBulk to be called
+ // where indexUpdated can possibly be set to true, return false in case of timeout.
+ // We don't wait in case indexUpdated is already set (This would be if any of the previous flushes for this processor
+ // were successful i.e index was updated at least once)
+ final long start = System.currentTimeMillis();
+ long timeoutMillis = indexDefinition.bulkFlushIntervalMs * 5 ;
+ while (!indexUpdated.isPresent()) {
+ long lastAttempt = System.currentTimeMillis();
+ long elapsedTime = lastAttempt - start;
+ if (elapsedTime > timeoutMillis) {
+ // indexUpdate was not set till now, return false
+ LOG.trace("Timed out waiting for the bulk processor response. Returning indexUpdated = false");
+ return false;
+ } else {
+ try {
+ LOG.trace("Waiting for afterBulk response...");
+ Thread.sleep(100);
+ } catch (InterruptedException ex) {
+ //
+ }
+ }
+ }
+ LOG.trace("Returning indexUpdated = {}", indexUpdated.get());
+ return indexUpdated.get();
}
// TODO: we need to check if the index already exists and in that case we have to figure out if there are
@@ -156,10 +186,16 @@ class ElasticsearchIndexWriter implement
indexDefinition.getRemoteIndexName(), requestMsg, response.isAcknowledged());
}
- private static class OakBulkProcessorLister implements BulkProcessor.Listener {
+ private class OakBulkProcessorListener implements BulkProcessor.Listener {
@Override
public void beforeBulk(long executionId, BulkRequest bulkRequest) {
+ if (indexUpdated.isPresent() && !indexUpdated.get()) {
+ // Reset the state only if it's false
+ // If it's true that means index was updated at least once by this processor
+ // and we can return true for indexUpdate.
+ indexUpdated = Optional.empty();
+ }
LOG.info("Sending bulk with id {} -> {}", executionId, bulkRequest.getDescription());
if (LOG.isTraceEnabled()) {
LOG.trace("Bulk Requests: \n{}", bulkRequest.requests()
@@ -185,14 +221,31 @@ class ElasticsearchIndexWriter implement
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
LOG.error("Bulk item with id {} failed", failure.getId(), failure.getCause());
+ } else {
+ // Set indexUpdated to true even if 1 item was updated successfully
+ indexUpdated = Optional.of(true);
}
}
+ // Only set indexUpdated to false if it's unset
+ // If set and true, that means index was updated at least once by this processor.
+ // If set and false, no need to do anything
+ if (!indexUpdated.isPresent()) {
+ indexUpdated = Optional.of(false);
+ }
+ } else {
+ indexUpdated = Optional.of(true);
}
}
@Override
public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) {
- LOG.error("Bulk with id {} thrown an error", executionId, throwable);
+ // Only set indexUpdated to false if it's unset
+ // If set and true, that means index was updated at least once by this processor.
+ // If set and false, no need to do anything
+ if (!indexUpdated.isPresent()) {
+ indexUpdated = Optional.of(false);
+ }
+ LOG.error("Bulk with id {} threw an error", executionId, throwable);
}
}
}
Modified: jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java?rev=1876791&r1=1876790&r2=1876791&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java Tue Apr 21 12:08:44 2020
@@ -107,6 +107,40 @@ public class ElasticsearchPropertyIndexT
}
@Test
+ public void testBulkProcessorFlushLimit() throws Exception {
+ setIndex("test1", createIndex("propa"));
+
+ Tree test = root.getTree("/").addChild("test");
+ for (int i = 1; i < 249; i++) {
+ test.addChild("a" + i).setProperty("propa", "foo" + i);
+ }
+ root.commit();
+
+ // 250 is the default flush limit for bulk processor, and we added just less than 250 nodes
+ // So once the index writer is closed , bulk Processor would be closed and all the 248 entries should be flushed.
+ // Make sure that the last entry is indexed correctly.
+ String propaQuery = "select [jcr:path] from [nt:base] where [propa] = 'foo248'";
+ assertEventually(() -> {
+ assertThat(explain(propaQuery), containsString("elasticsearch:test1"));
+
+ assertQuery(propaQuery, singletonList("/test/a248"));
+ });
+
+ // Now we test for 250 < nodes < 500
+
+ for (int i = 250 ; i < 300 ; i ++) {
+ test.addChild("a" + i).setProperty("propa", "foo" + i);
+ }
+ root.commit();
+ String propaQuery2 = "select [jcr:path] from [nt:base] where [propa] = 'foo299'";
+ assertEventually(() -> {
+ assertThat(explain(propaQuery2), containsString("elasticsearch:test1"));
+
+ assertQuery(propaQuery2, singletonList("/test/a299"));
+ });
+ }
+
+ @Test
public void indexSelection() throws Exception {
setIndex("test1", createIndex("propa", "propb"));
setIndex("test2", createIndex("propc"));
@@ -219,4 +253,5 @@ public class ElasticsearchPropertyIndexT
private static void assertEventually(Runnable r) {
ElasticsearchTestUtils.assertEventually(r, BULK_FLUSH_INTERVAL_MS_DEFAULT * 3);
}
+
}
Modified: jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java?rev=1876791&r1=1876790&r2=1876791&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java (original)
+++ jackrabbit/oak/trunk/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java Tue Apr 21 12:08:44 2020
@@ -92,4 +92,10 @@ public class ElasticsearchIndexWriterTes
verify(bulkProcessorMock, times(2)).add(acDeleteRequest.capture());
}
+ @Test
+ public void closeBulkProcessor() throws IOException {
+ indexWriter.close(System.currentTimeMillis());
+ verify(bulkProcessorMock).close();
+ }
+
}