You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sh...@apache.org on 2022/09/12 09:34:32 UTC
[pulsar] branch master updated: [improve][test] Remove WhiteBox for ElasticSearchSinkTests (#17582)
This is an automated email from the ASF dual-hosted git repository.
shoothzj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new adb63d8433b [improve][test] Remove WhiteBox for ElasticSearchSinkTests (#17582)
adb63d8433b is described below
commit adb63d8433b322d276067e5771acdb4ffdfa64e2
Author: tison <wa...@gmail.com>
AuthorDate: Mon Sep 12 17:34:22 2022 +0800
[improve][test] Remove WhiteBox for ElasticSearchSinkTests (#17582)
Master Issue: #16912
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc-required`
(Your PR needs to update docs and you will update later)
- [x] `doc-not-needed`
(Please explain why)
- [ ] `doc`
(Your PR contains doc changes)
- [ ] `doc-complete`
(Docs have been already added)
---
.../io/elasticsearch/ElasticSearchClient.java | 5 +++
.../pulsar/io/elasticsearch/ElasticSearchSink.java | 5 +++
.../elastic/ElasticSearchJavaRestClient.java | 35 +++++++++++---------
.../opensearch/OpenSearchHighLevelRestClient.java | 10 ++++++
.../io/elasticsearch/ElasticSearchSinkTests.java | 37 ++++++++++------------
testmocks/pom.xml | 5 ---
6 files changed, 57 insertions(+), 40 deletions(-)
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
index 67b04529a77..e46795e2b88 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
@@ -256,6 +256,11 @@ public class ElasticSearchClient implements AutoCloseable {
}
}
+ @VisibleForTesting
+ void setClient(RestClient client) {
+ this.client = client;
+ }
+
private void checkNotFailed() throws Exception {
if (irrecoverableError.get() != null) {
throw irrecoverableError.get();
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index 4efd73f354d..6f029e1f2dc 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -102,6 +102,11 @@ public class ElasticSearchSink implements Sink<GenericObject> {
}
}
+ @VisibleForTesting
+ void setElasticsearchClient(ElasticSearchClient elasticsearchClient) {
+ this.elasticsearchClient = elasticsearchClient;
+ }
+
@Override
public void write(Record<GenericObject> record) throws Exception {
if (!elasticsearchClient.isFailed()) {
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
index 2166df22731..bd1efcb971b 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
@@ -40,6 +40,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Map;
+import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
@@ -52,9 +53,20 @@ import org.elasticsearch.client.RestClientBuilder;
public class ElasticSearchJavaRestClient extends RestClient {
private final ElasticsearchClient client;
- private final ElasticsearchTransport transport;
private final ObjectMapper objectMapper = new ObjectMapper();
- private final BulkProcessor bulkProcessor;
+
+ private BulkProcessor bulkProcessor;
+ private ElasticsearchTransport transport;
+
+ @VisibleForTesting
+ public void setBulkProcessor(BulkProcessor bulkProcessor) {
+ this.bulkProcessor = bulkProcessor;
+ }
+
+ @VisibleForTesting
+ public void setTransport(ElasticsearchTransport transport) {
+ this.transport = transport;
+ }
public ElasticSearchJavaRestClient(ElasticSearchConfig elasticSearchConfig,
BulkProcessor.Listener bulkProcessorListener) {
@@ -77,7 +89,7 @@ public class ElasticSearchJavaRestClient extends RestClient {
});
transport = new RestClientTransport(builder.build(),
new JacksonJsonpMapper());
- this.client = new ElasticsearchClient(transport);
+ client = new ElasticsearchClient(transport);
if (elasticSearchConfig.isBulkEnabled()) {
bulkProcessor = new ElasticBulkProcessor(elasticSearchConfig, client, bulkProcessorListener);
} else {
@@ -112,7 +124,8 @@ public class ElasticSearchJavaRestClient extends RestClient {
throw new IOException("Unable to create index, acknowledged: " + createIndexResponse.acknowledged()
+ " shardsAcknowledged: " + createIndexResponse.shardsAcknowledged());
} catch (ElasticsearchException ex) {
- if (ex.response().error().type().contains("resource_already_exists_exception")) {
+ final String errorType = Objects.requireNonNull(ex.response().error().type());
+ if (errorType.contains("resource_already_exists_exception")) {
return false;
}
throw ex;
@@ -133,11 +146,7 @@ public class ElasticSearchJavaRestClient extends RestClient {
.build();
DeleteResponse deleteResponse = client.delete(req);
- if (deleteResponse.result().equals(Result.Deleted) || deleteResponse.result().equals(Result.NotFound)) {
- return true;
- } else {
- return false;
- }
+ return deleteResponse.result().equals(Result.Deleted) || deleteResponse.result().equals(Result.NotFound);
}
@Override
@@ -150,11 +159,7 @@ public class ElasticSearchJavaRestClient extends RestClient {
.build();
final IndexResponse indexResponse = client.index(indexRequest);
- if (indexResponse.result().equals(Result.Created) || indexResponse.result().equals(Result.Updated)) {
- return true;
- } else {
- return false;
- }
+ return indexResponse.result().equals(Result.Created) || indexResponse.result().equals(Result.Updated);
}
public SearchResponse<Map> search(String indexName) throws IOException {
@@ -200,7 +205,7 @@ public class ElasticSearchJavaRestClient extends RestClient {
try {
transport.close();
} catch (IOException e) {
- log.warn("error while closing the client: {}", e);
+ log.warn("error while closing the client", e);
}
}
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
index 0a4629f24ad..df2597fdf70 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
@@ -339,6 +339,16 @@ public class OpenSearchHighLevelRestClient extends RestClient implements BulkPro
}
}
+ @VisibleForTesting
+ public void setClient(RestHighLevelClient client) {
+ this.client = client;
+ }
+
+ @VisibleForTesting
+ public void setInternalBulkProcessor(org.opensearch.action.bulk.BulkProcessor internalBulkProcessor) {
+ this.internalBulkProcessor = internalBulkProcessor;
+ }
+
@VisibleForTesting
public RestHighLevelClient getClient() {
return client;
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index b7da0f8c0fd..4975217c620 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -44,7 +44,6 @@ import java.util.List;
import java.util.Locale;
import java.io.IOException;
import java.util.HashMap;
-import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
@@ -64,7 +63,6 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.opensearch.client.Node;
import org.opensearch.client.RestHighLevelClient;
-import org.powermock.reflect.Whitebox;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
@@ -75,6 +73,7 @@ import org.testng.annotations.Test;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase {
@@ -414,46 +413,44 @@ public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase {
@Test
public void testCloseClient() throws Exception {
- final ElasticSearchSink sink = new ElasticSearchSink();
- map.put("bulkEnabled", true);
- try {
+ try (ElasticSearchSink sink = new ElasticSearchSink()) {
+ map.put("bulkEnabled", true);
sink.open(map, mockSinkContext);
final ElasticSearchClient elasticSearchClient = spy(sink.getElasticsearchClient());
final RestClient restClient = spy(elasticSearchClient.getRestClient());
- if (restClient instanceof ElasticSearchJavaRestClient) {
- ElasticSearchJavaRestClient client = (ElasticSearchJavaRestClient) restClient;
+ if (restClient instanceof ElasticSearchJavaRestClient client) {
final BulkProcessor bulkProcessor = spy(restClient.getBulkProcessor());
final ElasticsearchTransport transport = spy(client.getTransport());
- Whitebox.setInternalState(client, "transport", transport);
- Whitebox.setInternalState(client, "bulkProcessor", bulkProcessor);
- Whitebox.setInternalState(elasticSearchClient, "client", restClient);
- Whitebox.setInternalState(sink, "elasticsearchClient", elasticSearchClient);
+ client.setTransport(transport);
+ client.setBulkProcessor(bulkProcessor);
+ elasticSearchClient.setClient(client);
+ sink.setElasticsearchClient(elasticSearchClient);
+
sink.close();
verify(transport).close();
verify(bulkProcessor).close();
verify(client).close();
verify(restClient).close();
- } else {
- OpenSearchHighLevelRestClient client = (OpenSearchHighLevelRestClient) restClient;
-
+ } else if (restClient instanceof OpenSearchHighLevelRestClient client) {
final org.opensearch.action.bulk.BulkProcessor internalBulkProcessor = spy(
client.getInternalBulkProcessor());
final RestHighLevelClient restHighLevelClient = spy(client.getClient());
- Whitebox.setInternalState(client, "client", restHighLevelClient);
- Whitebox.setInternalState(client, "internalBulkProcessor", internalBulkProcessor);
- Whitebox.setInternalState(elasticSearchClient, "client", restClient);
- Whitebox.setInternalState(sink, "elasticsearchClient", elasticSearchClient);
+ client.setClient(restHighLevelClient);
+ client.setInternalBulkProcessor(internalBulkProcessor);
+ elasticSearchClient.setClient(restClient);
+ sink.setElasticsearchClient(elasticSearchClient);
+
sink.close();
verify(restHighLevelClient).close();
verify(internalBulkProcessor).awaitClose(Mockito.anyLong(), Mockito.any(TimeUnit.class));
verify(client).close();
verify(restClient).close();
+ } else {
+ fail("restClient has unknown type: " + restClient.getClass().getCanonicalName());
}
- } finally {
- sink.close();
}
}
diff --git a/testmocks/pom.xml b/testmocks/pom.xml
index d416bf7d781..452c98fbb60 100644
--- a/testmocks/pom.xml
+++ b/testmocks/pom.xml
@@ -73,11 +73,6 @@
<artifactId>objenesis</artifactId>
</dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-reflect</artifactId>
- </dependency>
-
</dependencies>
<build>