You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/05/16 08:48:36 UTC
[james-project] 21/23: JAMES-2719 Refactor delete by query on ES
backend
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 45e3224f2187d0cbab7523d433c669bffa3f7973
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Thu May 16 11:46:00 2019 +0700
JAMES-2719 Refactor delete by query on ES backend
---
.../es/v6/DeleteByQueryActionListener.java | 39 +++++++++++++
.../backends/es/v6/DeleteByQueryPerformer.java | 66 ----------------------
.../james/backends/es/v6/ElasticSearchIndexer.java | 25 +++++---
.../backends/es/v6/ElasticSearchIndexerTest.java | 39 +++++++------
4 files changed, 74 insertions(+), 95 deletions(-)
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryActionListener.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryActionListener.java
new file mode 100644
index 0000000..d89a9ec
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryActionListener.java
@@ -0,0 +1,39 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeleteByQueryActionListener implements ActionListener<BulkByScrollResponse> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DeleteByQueryActionListener.class);
+
+ @Override
+ public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
+
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ LOGGER.warn("Error during the ES delete by query operation: ", e);
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java
deleted file mode 100644
index a912c0f..0000000
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.reindex.DeleteByQueryRequest;
-
-import com.google.common.annotations.VisibleForTesting;
-
-public class DeleteByQueryPerformer {
- public static final TimeValue TIMEOUT = new TimeValue(60000);
-
- private final RestHighLevelClient client;
- private final ExecutorService executor;
- private final int batchSize;
- private final WriteAliasName aliasName;
- private final TypeName typeName;
-
- @VisibleForTesting
- public DeleteByQueryPerformer(RestHighLevelClient client, ExecutorService executor, int batchSize, WriteAliasName aliasName, TypeName typeName) {
- this.client = client;
- this.executor = executor;
- this.batchSize = batchSize;
- this.aliasName = aliasName;
- this.typeName = typeName;
- }
-
- public Future<Void> perform(QueryBuilder queryBuilder) {
- return executor.submit(() -> doDeleteByQuery(queryBuilder));
- }
-
- protected Void doDeleteByQuery(QueryBuilder queryBuilder) throws IOException {
- DeleteByQueryRequest request = new DeleteByQueryRequest(aliasName.getValue())
- .setDocTypes(typeName.getValue())
- .setScroll(TIMEOUT)
- .setQuery(queryBuilder)
- .setBatchSize(batchSize);
-
- client.deleteByQuery(request, RequestOptions.DEFAULT);
- return null;
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java
index 492ec37..79adb69 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java
@@ -21,8 +21,6 @@ package org.apache.james.backends.es.v6;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.bulk.BulkRequest;
@@ -34,8 +32,10 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.ValidationException;
+import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,29 +45,30 @@ import com.google.common.base.Preconditions;
public class ElasticSearchIndexer {
private static final int DEBUG_MAX_LENGTH_CONTENT = 1000;
private static final int DEFAULT_BATCH_SIZE = 100;
+ private static final TimeValue TIMEOUT = new TimeValue(60000);
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchIndexer.class);
private final RestHighLevelClient client;
- private final DeleteByQueryPerformer deleteByQueryPerformer;
private final AliasName aliasName;
private final TypeName typeName;
+ private final int batchSize;
- public ElasticSearchIndexer(RestHighLevelClient client, ExecutorService executor,
+ public ElasticSearchIndexer(RestHighLevelClient client,
WriteAliasName aliasName,
TypeName typeName) {
- this(client, executor, aliasName, typeName, DEFAULT_BATCH_SIZE);
+ this(client, aliasName, typeName, DEFAULT_BATCH_SIZE);
}
@VisibleForTesting
- public ElasticSearchIndexer(RestHighLevelClient client, ExecutorService executor,
+ public ElasticSearchIndexer(RestHighLevelClient client,
WriteAliasName aliasName,
TypeName typeName,
int batchSize) {
this.client = client;
- this.deleteByQueryPerformer = new DeleteByQueryPerformer(client, executor, batchSize, aliasName, typeName);
this.aliasName = aliasName;
this.typeName = typeName;
+ this.batchSize = batchSize;
}
public IndexResponse index(String id, String content) throws IOException {
@@ -112,8 +113,14 @@ public class ElasticSearchIndexer {
}
}
- public Future<Void> deleteAllMatchingQuery(QueryBuilder queryBuilder) {
- return deleteByQueryPerformer.perform(queryBuilder);
+ public void deleteAllMatchingQuery(QueryBuilder queryBuilder) {
+ DeleteByQueryRequest request = new DeleteByQueryRequest(aliasName.getValue())
+ .setDocTypes(typeName.getValue())
+ .setScroll(TIMEOUT)
+ .setQuery(queryBuilder)
+ .setBatchSize(batchSize);
+
+ client.deleteByQueryAsync(request, RequestOptions.DEFAULT, new DeleteByQueryActionListener());
}
private void checkArgument(String content) {
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchIndexerTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchIndexerTest.java
index fc2219e..08f02a2 100644
--- a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchIndexerTest.java
+++ b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchIndexerTest.java
@@ -21,11 +21,10 @@ package org.apache.james.backends.es.v6;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Awaitility.await;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
-import java.util.concurrent.Executors;
-
-import org.apache.james.util.concurrent.NamedThreadFactory;
+import org.awaitility.Duration;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
@@ -55,9 +54,7 @@ public class ElasticSearchIndexerTest {
.useIndex(INDEX_NAME)
.addAlias(ALIAS_NAME)
.createIndexAndAliases(getESClient());
- testee = new ElasticSearchIndexer(getESClient(),
- Executors.newSingleThreadExecutor(NamedThreadFactory.withClassName(getClass())),
- ALIAS_NAME, TYPE_NAME, MINIMUM_BATCH_SIZE);
+ testee = new ElasticSearchIndexer(getESClient(), ALIAS_NAME, TYPE_NAME, MINIMUM_BATCH_SIZE);
}
private RestHighLevelClient getESClient() {
@@ -150,16 +147,17 @@ public class ElasticSearchIndexerTest {
testee.index(messageId, content);
elasticSearch.awaitForElasticSearch();
- testee.deleteAllMatchingQuery(termQuery("property", "1")).get();
+ testee.deleteAllMatchingQuery(termQuery("property", "1"));
elasticSearch.awaitForElasticSearch();
try (RestHighLevelClient client = getESClient()) {
- SearchResponse searchResponse = client.search(
- new SearchRequest(INDEX_NAME.getValue())
- .types(TYPE_NAME.getValue())
- .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
- RequestOptions.DEFAULT);
- assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0);
+ await().atMost(Duration.TEN_SECONDS)
+ .until(() -> client.search(
+ new SearchRequest(INDEX_NAME.getValue())
+ .types(TYPE_NAME.getValue())
+ .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
+ RequestOptions.DEFAULT)
+ .getHits().getTotalHits() == 0);
}
}
@@ -181,16 +179,17 @@ public class ElasticSearchIndexerTest {
testee.index(messageId3, content3);
elasticSearch.awaitForElasticSearch();
- testee.deleteAllMatchingQuery(termQuery("property", "1")).get();
+ testee.deleteAllMatchingQuery(termQuery("property", "1"));
elasticSearch.awaitForElasticSearch();
try (RestHighLevelClient client = getESClient()) {
- SearchResponse searchResponse = client.search(
- new SearchRequest(INDEX_NAME.getValue())
- .types(TYPE_NAME.getValue())
- .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
- RequestOptions.DEFAULT);
- assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
+ await().atMost(Duration.TEN_SECONDS)
+ .until(() -> client.search(
+ new SearchRequest(INDEX_NAME.getValue())
+ .types(TYPE_NAME.getValue())
+ .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
+ RequestOptions.DEFAULT)
+ .getHits().getTotalHits() == 1);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org