You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/05/16 08:48:36 UTC

[james-project] 21/23: JAMES-2719 Refactor delete by query on ES backend

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 45e3224f2187d0cbab7523d433c669bffa3f7973
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Thu May 16 11:46:00 2019 +0700

    JAMES-2719 Refactor delete by query on ES backend
---
 .../es/v6/DeleteByQueryActionListener.java         | 39 +++++++++++++
 .../backends/es/v6/DeleteByQueryPerformer.java     | 66 ----------------------
 .../james/backends/es/v6/ElasticSearchIndexer.java | 25 +++++---
 .../backends/es/v6/ElasticSearchIndexerTest.java   | 39 +++++++------
 4 files changed, 74 insertions(+), 95 deletions(-)

diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryActionListener.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryActionListener.java
new file mode 100644
index 0000000..d89a9ec
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryActionListener.java
@@ -0,0 +1,39 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeleteByQueryActionListener implements ActionListener<BulkByScrollResponse> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DeleteByQueryActionListener.class);
+
+    @Override
+    public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
+
+    }
+
+    @Override
+    public void onFailure(Exception e) {
+        LOGGER.warn("Error during the ES delete by query operation: ", e);
+    }
+}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java
deleted file mode 100644
index a912c0f..0000000
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.reindex.DeleteByQueryRequest;
-
-import com.google.common.annotations.VisibleForTesting;
-
-public class DeleteByQueryPerformer {
-    public static final TimeValue TIMEOUT = new TimeValue(60000);
-
-    private final RestHighLevelClient client;
-    private final ExecutorService executor;
-    private final int batchSize;
-    private final WriteAliasName aliasName;
-    private final TypeName typeName;
-
-    @VisibleForTesting
-    public DeleteByQueryPerformer(RestHighLevelClient client, ExecutorService executor, int batchSize, WriteAliasName aliasName, TypeName typeName) {
-        this.client = client;
-        this.executor = executor;
-        this.batchSize = batchSize;
-        this.aliasName = aliasName;
-        this.typeName = typeName;
-    }
-
-    public Future<Void> perform(QueryBuilder queryBuilder) {
-        return executor.submit(() -> doDeleteByQuery(queryBuilder));
-    }
-
-    protected Void doDeleteByQuery(QueryBuilder queryBuilder) throws IOException {
-        DeleteByQueryRequest request = new DeleteByQueryRequest(aliasName.getValue())
-            .setDocTypes(typeName.getValue())
-            .setScroll(TIMEOUT)
-            .setQuery(queryBuilder)
-            .setBatchSize(batchSize);
-
-        client.deleteByQuery(request, RequestOptions.DEFAULT);
-        return null;
-    }
-}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java
index 492ec37..79adb69 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java
@@ -21,8 +21,6 @@ package org.apache.james.backends.es.v6;
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.StringUtils;
 import org.elasticsearch.action.bulk.BulkRequest;
@@ -34,8 +32,10 @@ import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.ValidationException;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,29 +45,30 @@ import com.google.common.base.Preconditions;
 public class ElasticSearchIndexer {
     private static final int DEBUG_MAX_LENGTH_CONTENT = 1000;
     private static final int DEFAULT_BATCH_SIZE = 100;
+    private static final TimeValue TIMEOUT = new TimeValue(60000);
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchIndexer.class);
 
     private final RestHighLevelClient client;
-    private final DeleteByQueryPerformer deleteByQueryPerformer;
     private final AliasName aliasName;
     private final TypeName typeName;
+    private final int batchSize;
 
-    public ElasticSearchIndexer(RestHighLevelClient client, ExecutorService executor,
+    public ElasticSearchIndexer(RestHighLevelClient client,
                                 WriteAliasName aliasName,
                                 TypeName typeName) {
-        this(client, executor, aliasName, typeName, DEFAULT_BATCH_SIZE);
+        this(client, aliasName, typeName, DEFAULT_BATCH_SIZE);
     }
 
     @VisibleForTesting
-    public ElasticSearchIndexer(RestHighLevelClient client, ExecutorService executor,
+    public ElasticSearchIndexer(RestHighLevelClient client,
                                 WriteAliasName aliasName,
                                 TypeName typeName,
                                 int batchSize) {
         this.client = client;
-        this.deleteByQueryPerformer = new DeleteByQueryPerformer(client, executor, batchSize, aliasName, typeName);
         this.aliasName = aliasName;
         this.typeName = typeName;
+        this.batchSize = batchSize;
     }
 
     public IndexResponse index(String id, String content) throws IOException {
@@ -112,8 +113,14 @@ public class ElasticSearchIndexer {
         }
     }
 
-    public Future<Void> deleteAllMatchingQuery(QueryBuilder queryBuilder) {
-        return deleteByQueryPerformer.perform(queryBuilder);
+    public void deleteAllMatchingQuery(QueryBuilder queryBuilder) {
+        DeleteByQueryRequest request = new DeleteByQueryRequest(aliasName.getValue())
+            .setDocTypes(typeName.getValue())
+            .setScroll(TIMEOUT)
+            .setQuery(queryBuilder)
+            .setBatchSize(batchSize);
+
+        client.deleteByQueryAsync(request, RequestOptions.DEFAULT, new DeleteByQueryActionListener());
     }
 
     private void checkArgument(String content) {
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchIndexerTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchIndexerTest.java
index fc2219e..08f02a2 100644
--- a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchIndexerTest.java
+++ b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchIndexerTest.java
@@ -21,11 +21,10 @@ package org.apache.james.backends.es.v6;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Awaitility.await;
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 
-import java.util.concurrent.Executors;
-
-import org.apache.james.util.concurrent.NamedThreadFactory;
+import org.awaitility.Duration;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.RequestOptions;
@@ -55,9 +54,7 @@ public class ElasticSearchIndexerTest {
             .useIndex(INDEX_NAME)
             .addAlias(ALIAS_NAME)
             .createIndexAndAliases(getESClient());
-        testee = new ElasticSearchIndexer(getESClient(),
-            Executors.newSingleThreadExecutor(NamedThreadFactory.withClassName(getClass())),
-            ALIAS_NAME, TYPE_NAME, MINIMUM_BATCH_SIZE);
+        testee = new ElasticSearchIndexer(getESClient(), ALIAS_NAME, TYPE_NAME, MINIMUM_BATCH_SIZE);
     }
 
     private RestHighLevelClient getESClient() {
@@ -150,16 +147,17 @@ public class ElasticSearchIndexerTest {
         testee.index(messageId, content);
         elasticSearch.awaitForElasticSearch();
         
-        testee.deleteAllMatchingQuery(termQuery("property", "1")).get();
+        testee.deleteAllMatchingQuery(termQuery("property", "1"));
         elasticSearch.awaitForElasticSearch();
         
         try (RestHighLevelClient client = getESClient()) {
-            SearchResponse searchResponse = client.search(
-                new SearchRequest(INDEX_NAME.getValue())
-                    .types(TYPE_NAME.getValue())
-                    .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
-                RequestOptions.DEFAULT);
-            assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0);
+            await().atMost(Duration.TEN_SECONDS)
+                .until(() -> client.search(
+                        new SearchRequest(INDEX_NAME.getValue())
+                            .types(TYPE_NAME.getValue())
+                            .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
+                        RequestOptions.DEFAULT)
+                    .getHits().getTotalHits() == 0);
         }
     }
 
@@ -181,16 +179,17 @@ public class ElasticSearchIndexerTest {
         testee.index(messageId3, content3);
         elasticSearch.awaitForElasticSearch();
 
-        testee.deleteAllMatchingQuery(termQuery("property", "1")).get();
+        testee.deleteAllMatchingQuery(termQuery("property", "1"));
         elasticSearch.awaitForElasticSearch();
         
         try (RestHighLevelClient client = getESClient()) {
-            SearchResponse searchResponse = client.search(
-                new SearchRequest(INDEX_NAME.getValue())
-                    .types(TYPE_NAME.getValue())
-                    .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
-                RequestOptions.DEFAULT);
-            assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
+            await().atMost(Duration.TEN_SECONDS)
+                .until(() -> client.search(
+                    new SearchRequest(INDEX_NAME.getValue())
+                        .types(TYPE_NAME.getValue())
+                        .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
+                    RequestOptions.DEFAULT)
+                    .getHits().getTotalHits() == 1);
         }
     }
     


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org