You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sh...@apache.org on 2022/09/12 09:34:32 UTC

[pulsar] branch master updated: [improve][test] Remove WhiteBox for ElasticSearchSinkTests (#17582)

This is an automated email from the ASF dual-hosted git repository.

shoothzj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new adb63d8433b [improve][test] Remove WhiteBox for ElasticSearchSinkTests  (#17582)
adb63d8433b is described below

commit adb63d8433b322d276067e5771acdb4ffdfa64e2
Author: tison <wa...@gmail.com>
AuthorDate: Mon Sep 12 17:34:22 2022 +0800

    [improve][test] Remove WhiteBox for ElasticSearchSinkTests  (#17582)
    
    Master Issue: #16912
    
    ### Documentation
    
    <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
    
    - [ ] `doc-required`
    (Your PR needs to update docs and you will update later)
    
    - [x] `doc-not-needed`
    (Please explain why)
    
    - [ ] `doc`
    (Your PR contains doc changes)
    
    - [ ] `doc-complete`
    (Docs have been already added)
---
 .../io/elasticsearch/ElasticSearchClient.java      |  5 +++
 .../pulsar/io/elasticsearch/ElasticSearchSink.java |  5 +++
 .../elastic/ElasticSearchJavaRestClient.java       | 35 +++++++++++---------
 .../opensearch/OpenSearchHighLevelRestClient.java  | 10 ++++++
 .../io/elasticsearch/ElasticSearchSinkTests.java   | 37 ++++++++++------------
 testmocks/pom.xml                                  |  5 ---
 6 files changed, 57 insertions(+), 40 deletions(-)

diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
index 67b04529a77..e46795e2b88 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
@@ -256,6 +256,11 @@ public class ElasticSearchClient implements AutoCloseable {
         }
     }
 
+    @VisibleForTesting
+    void setClient(RestClient client) {
+        this.client = client;
+    }
+
     private void checkNotFailed() throws Exception {
         if (irrecoverableError.get() != null) {
             throw irrecoverableError.get();
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index 4efd73f354d..6f029e1f2dc 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -102,6 +102,11 @@ public class ElasticSearchSink implements Sink<GenericObject> {
         }
     }
 
+    @VisibleForTesting
+    void setElasticsearchClient(ElasticSearchClient elasticsearchClient) {
+        this.elasticsearchClient = elasticsearchClient;
+    }
+
     @Override
     public void write(Record<GenericObject> record) throws Exception {
         if (!elasticsearchClient.isFailed()) {
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
index 2166df22731..bd1efcb971b 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
@@ -40,6 +40,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.Map;
+import java.util.Objects;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.HttpHost;
 import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
@@ -52,9 +53,20 @@ import org.elasticsearch.client.RestClientBuilder;
 public class ElasticSearchJavaRestClient extends RestClient {
 
     private final ElasticsearchClient client;
-    private final ElasticsearchTransport transport;
     private final ObjectMapper objectMapper = new ObjectMapper();
-    private final BulkProcessor bulkProcessor;
+
+    private BulkProcessor bulkProcessor;
+    private ElasticsearchTransport transport;
+
+    @VisibleForTesting
+    public void setBulkProcessor(BulkProcessor bulkProcessor) {
+        this.bulkProcessor = bulkProcessor;
+    }
+
+    @VisibleForTesting
+    public void setTransport(ElasticsearchTransport transport) {
+        this.transport = transport;
+    }
 
     public ElasticSearchJavaRestClient(ElasticSearchConfig elasticSearchConfig,
                                        BulkProcessor.Listener bulkProcessorListener) {
@@ -77,7 +89,7 @@ public class ElasticSearchJavaRestClient extends RestClient {
                 });
         transport = new RestClientTransport(builder.build(),
                 new JacksonJsonpMapper());
-        this.client = new ElasticsearchClient(transport);
+        client = new ElasticsearchClient(transport);
         if (elasticSearchConfig.isBulkEnabled()) {
             bulkProcessor = new ElasticBulkProcessor(elasticSearchConfig, client, bulkProcessorListener);
         } else {
@@ -112,7 +124,8 @@ public class ElasticSearchJavaRestClient extends RestClient {
             throw new IOException("Unable to create index, acknowledged: " + createIndexResponse.acknowledged()
                     + " shardsAcknowledged: " + createIndexResponse.shardsAcknowledged());
         } catch (ElasticsearchException ex) {
-            if (ex.response().error().type().contains("resource_already_exists_exception")) {
+            final String errorType = Objects.requireNonNull(ex.response().error().type());
+            if (errorType.contains("resource_already_exists_exception")) {
                 return false;
             }
             throw ex;
@@ -133,11 +146,7 @@ public class ElasticSearchJavaRestClient extends RestClient {
                 .build();
 
         DeleteResponse deleteResponse = client.delete(req);
-        if (deleteResponse.result().equals(Result.Deleted) || deleteResponse.result().equals(Result.NotFound)) {
-            return true;
-        } else {
-            return false;
-        }
+        return deleteResponse.result().equals(Result.Deleted) || deleteResponse.result().equals(Result.NotFound);
     }
 
     @Override
@@ -150,11 +159,7 @@ public class ElasticSearchJavaRestClient extends RestClient {
                 .build();
         final IndexResponse indexResponse = client.index(indexRequest);
 
-        if (indexResponse.result().equals(Result.Created) || indexResponse.result().equals(Result.Updated)) {
-            return true;
-        } else {
-            return false;
-        }
+        return indexResponse.result().equals(Result.Created) || indexResponse.result().equals(Result.Updated);
     }
 
     public SearchResponse<Map> search(String indexName) throws IOException {
@@ -200,7 +205,7 @@ public class ElasticSearchJavaRestClient extends RestClient {
         try {
             transport.close();
         } catch (IOException e) {
-            log.warn("error while closing the client: {}", e);
+            log.warn("error while closing the client", e);
         }
     }
 
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
index 0a4629f24ad..df2597fdf70 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
@@ -339,6 +339,16 @@ public class OpenSearchHighLevelRestClient extends RestClient implements BulkPro
         }
     }
 
+    @VisibleForTesting
+    public void setClient(RestHighLevelClient client) {
+        this.client = client;
+    }
+
+    @VisibleForTesting
+    public void setInternalBulkProcessor(org.opensearch.action.bulk.BulkProcessor internalBulkProcessor) {
+        this.internalBulkProcessor = internalBulkProcessor;
+    }
+
     @VisibleForTesting
     public RestHighLevelClient getClient() {
         return client;
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index b7da0f8c0fd..4975217c620 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -44,7 +44,6 @@ import java.util.List;
 import java.util.Locale;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
@@ -64,7 +63,6 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.opensearch.client.Node;
 import org.opensearch.client.RestHighLevelClient;
-import org.powermock.reflect.Whitebox;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 import org.testng.SkipException;
 import org.testng.annotations.AfterClass;
@@ -75,6 +73,7 @@ import org.testng.annotations.Test;
 
 
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
 
 public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase {
 
@@ -414,46 +413,44 @@ public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase {
 
     @Test
     public void testCloseClient() throws Exception {
-        final ElasticSearchSink sink = new ElasticSearchSink();
-        map.put("bulkEnabled", true);
-        try {
+        try (ElasticSearchSink sink = new ElasticSearchSink()) {
+            map.put("bulkEnabled", true);
             sink.open(map, mockSinkContext);
             final ElasticSearchClient elasticSearchClient = spy(sink.getElasticsearchClient());
             final RestClient restClient = spy(elasticSearchClient.getRestClient());
-            if (restClient instanceof ElasticSearchJavaRestClient) {
-                ElasticSearchJavaRestClient client = (ElasticSearchJavaRestClient) restClient;
+            if (restClient instanceof ElasticSearchJavaRestClient client) {
                 final BulkProcessor bulkProcessor = spy(restClient.getBulkProcessor());
                 final ElasticsearchTransport transport = spy(client.getTransport());
 
-                Whitebox.setInternalState(client, "transport", transport);
-                Whitebox.setInternalState(client, "bulkProcessor", bulkProcessor);
-                Whitebox.setInternalState(elasticSearchClient, "client", restClient);
-                Whitebox.setInternalState(sink, "elasticsearchClient", elasticSearchClient);
+                client.setTransport(transport);
+                client.setBulkProcessor(bulkProcessor);
+                elasticSearchClient.setClient(client);
+                sink.setElasticsearchClient(elasticSearchClient);
+
                 sink.close();
                 verify(transport).close();
                 verify(bulkProcessor).close();
                 verify(client).close();
                 verify(restClient).close();
 
-            } else {
-                OpenSearchHighLevelRestClient client = (OpenSearchHighLevelRestClient) restClient;
-
+            } else if (restClient instanceof OpenSearchHighLevelRestClient client) {
                 final org.opensearch.action.bulk.BulkProcessor internalBulkProcessor = spy(
                         client.getInternalBulkProcessor());
                 final RestHighLevelClient restHighLevelClient = spy(client.getClient());
 
-                Whitebox.setInternalState(client, "client", restHighLevelClient);
-                Whitebox.setInternalState(client, "internalBulkProcessor", internalBulkProcessor);
-                Whitebox.setInternalState(elasticSearchClient, "client", restClient);
-                Whitebox.setInternalState(sink, "elasticsearchClient", elasticSearchClient);
+                client.setClient(restHighLevelClient);
+                client.setInternalBulkProcessor(internalBulkProcessor);
+                elasticSearchClient.setClient(restClient);
+                sink.setElasticsearchClient(elasticSearchClient);
+
                 sink.close();
                 verify(restHighLevelClient).close();
                 verify(internalBulkProcessor).awaitClose(Mockito.anyLong(), Mockito.any(TimeUnit.class));
                 verify(client).close();
                 verify(restClient).close();
+            } else {
+                fail("restClient has unknown type: " + restClient.getClass().getCanonicalName());
             }
-        } finally {
-            sink.close();
         }
     }
 
diff --git a/testmocks/pom.xml b/testmocks/pom.xml
index d416bf7d781..452c98fbb60 100644
--- a/testmocks/pom.xml
+++ b/testmocks/pom.xml
@@ -73,11 +73,6 @@
       <artifactId>objenesis</artifactId>
     </dependency>
 
-    <dependency>
-      <groupId>org.powermock</groupId>
-      <artifactId>powermock-reflect</artifactId>
-    </dependency>
-
   </dependencies>
 
   <build>