You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/10 09:01:37 UTC

[incubator-inlong] branch master updated: [INLONG-2435][Sort] Fix Sort-standalone UT Exceptions (#2436)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 739972c  [INLONG-2435][Sort] Fix Sort-standalone UT Exceptions (#2436)
739972c is described below

commit 739972c59b46b5cbe97a24595e4f18669167b779
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Feb 10 17:01:30 2022 +0800

    [INLONG-2435][Sort] Fix Sort-standalone UT Exceptions (#2436)
---
 .../TestDefaultEvent2IndexRequestHandler.java      |   7 +-
 .../sink/elasticsearch/TestEsCallbackListener.java | 117 ++++++++-------------
 .../sink/elasticsearch/TestEsChannelWorker.java    |  35 +++---
 .../sink/elasticsearch/TestEsOutputChannel.java    |  30 +++---
 .../sink/elasticsearch/TestEsSinkContext.java      |  14 ++-
 5 files changed, 94 insertions(+), 109 deletions(-)

diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
index 5177bf8..725b8f1 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
@@ -21,10 +21,12 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 /**
@@ -33,13 +35,16 @@ import org.powermock.modules.junit4.PowerMockRunner;
  */
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
+@PrepareForTest({MetricRegister.class})
 public class TestDefaultEvent2IndexRequestHandler {
 
     /**
      * test that ProfileEvent transform to EsIndexRequest
+     * 
+     * @throws Exception
      */
     @Test
-    public void test() {
+    public void test() throws Exception {
         LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
         EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
         ProfileEvent event = TestEsSinkContext.mockProfileEvent();
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
index 84441ef..da2bcb9 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.elasticsearch.action.DocWriteRequest.OpType;
 import org.elasticsearch.action.bulk.BulkItemResponse;
@@ -38,13 +39,13 @@ import org.powermock.modules.junit4.PowerMockRunner;
  */
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
-@PrepareForTest({EsSinkFactory.class})
+@PrepareForTest({EsSinkFactory.class, MetricRegister.class})
 public class TestEsCallbackListener {
 
     private EsSinkContext context;
 
     @Before
-    public void before() {
+    public void before() throws Exception {
         LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
         this.context = TestEsSinkContext.mock(dispatchQueue);
     }
@@ -54,84 +55,54 @@ public class TestEsCallbackListener {
      */
     @Test
     public void testBeforeBulk() {
-        try {
-            // prepare
-            ProfileEvent event = TestEsSinkContext.mockProfileEvent();
-            EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
-            long executionId = 0;
-            BulkRequest bulkRequest = new BulkRequest();
-            bulkRequest.add(indexRequest);
-            // test
-            EsCallbackListener listener = new EsCallbackListener(context);
-            listener.beforeBulk(executionId, bulkRequest);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+        // prepare
+        ProfileEvent event = TestEsSinkContext.mockProfileEvent();
+        EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
+        long executionId = 0;
+        BulkRequest bulkRequest = new BulkRequest();
+        bulkRequest.add(indexRequest);
+        // test
+        EsCallbackListener listener = new EsCallbackListener(context);
+        listener.beforeBulk(executionId, bulkRequest);
     }
 
     /**
      * testAfterSuccessBulk
+     * 
+     * @throws Exception
      */
     @Test
-    public void testAfterSuccessBulk() {
-        try {
-            // prepare
-            LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
-            EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
-            ProfileEvent event = TestEsSinkContext.mockProfileEvent();
-            EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
-            long executionId = 0;
-            EsCallbackListener listener = new EsCallbackListener(context);
-            // request
-            BulkRequest bulkRequest = new BulkRequest();
-            bulkRequest.add(indexRequest);
-            // success response
-            IndexResponse indexResponse = new IndexResponse();
-            BulkItemResponse itemResponse = new BulkItemResponse(0, OpType.INDEX, indexResponse);
-            BulkItemResponse[] responses = new BulkItemResponse[1];
-            responses[0] = itemResponse;
-            BulkResponse bulkResponse = new BulkResponse(responses, 0);
-            listener.afterBulk(executionId, bulkRequest, bulkResponse);
-
-            // fail resend
-            BulkItemResponse.Failure failure = new BulkItemResponse.Failure("index", OpType.INDEX.name(), "id",
-                    new Exception());
-            BulkItemResponse failResponse = new BulkItemResponse(0, OpType.INDEX, failure);
-            responses[0] = failResponse;
-            listener.afterBulk(executionId, bulkRequest, bulkResponse);
-
-            // failNull noResend
-            BulkItemResponse.Failure failureNull = new BulkItemResponse.Failure("index", OpType.INDEX.name(), "id",
-                    null);
-            BulkItemResponse failNullResponse = new BulkItemResponse(0, OpType.INDEX, failureNull);
-            responses[0] = failNullResponse;
-            listener.afterBulk(executionId, bulkRequest, bulkResponse);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
+    public void testAfterSuccessBulk() throws Exception {
+        // prepare
+        LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
+        EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
+        ProfileEvent event = TestEsSinkContext.mockProfileEvent();
+        EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
+        long executionId = 0;
+        EsCallbackListener listener = new EsCallbackListener(context);
+        // request
+        BulkRequest bulkRequest = new BulkRequest();
+        bulkRequest.add(indexRequest);
+        // success response
+        IndexResponse indexResponse = new IndexResponse();
+        BulkItemResponse itemResponse = new BulkItemResponse(0, OpType.INDEX, indexResponse);
+        BulkItemResponse[] responses = new BulkItemResponse[1];
+        responses[0] = itemResponse;
+        BulkResponse bulkResponse = new BulkResponse(responses, 0);
+        listener.afterBulk(executionId, bulkRequest, bulkResponse);
 
-    /**
-     * testAfterFailureBulk
-     */
-    @Test
-    public void testAfterFailureBulk() {
-        try {
-            // prepare
-            LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
-            EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
-            ProfileEvent event = TestEsSinkContext.mockProfileEvent();
-            EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
-            long executionId = 0;
-            EsCallbackListener listener = new EsCallbackListener(context);
-            // request
-            BulkRequest bulkRequest = new BulkRequest();
-            bulkRequest.add(indexRequest);
+        // fail resend
+        BulkItemResponse.Failure failure = new BulkItemResponse.Failure("index", OpType.INDEX.name(), "id",
+                new Exception());
+        BulkItemResponse failResponse = new BulkItemResponse(0, OpType.INDEX, failure);
+        responses[0] = failResponse;
+        listener.afterBulk(executionId, bulkRequest, bulkResponse);
 
-            // fail resend
-            listener.afterBulk(executionId, bulkRequest, new Exception());
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+        // failNull noResend
+        BulkItemResponse.Failure failureNull = new BulkItemResponse.Failure("index", OpType.INDEX.name(), "id",
+                null);
+        BulkItemResponse failNullResponse = new BulkItemResponse(0, OpType.INDEX, failureNull);
+        responses[0] = failNullResponse;
+        listener.afterBulk(executionId, bulkRequest, bulkResponse);
     }
 }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
index 5438bb5..c95b6b4 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.flume.Transaction;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,16 +35,18 @@ import org.powermock.modules.junit4.PowerMockRunner;
  */
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
-@PrepareForTest({EsSinkFactory.class})
+@PrepareForTest({EsSinkFactory.class, MetricRegister.class})
 public class TestEsChannelWorker {
 
     private EsSinkContext context;
 
     /**
      * before
+     * 
+     * @throws Exception
      */
     @Before
-    public void before() {
+    public void before() throws Exception {
         LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
         this.context = TestEsSinkContext.mock(dispatchQueue);
     }
@@ -53,21 +56,17 @@ public class TestEsChannelWorker {
      */
     @Test
     public void test() {
-        try {
-            // prepare
-            ProfileEvent event = TestEsSinkContext.mockProfileEvent();
-            Transaction tx = this.context.getChannel().getTransaction();
-            tx.begin();
-            this.context.getChannel().put(event);
-            tx.commit();
-            tx.close();
-            // test
-            EsChannelWorker worker = new EsChannelWorker(context, 0);
-            worker.doRun();
-            worker.start();
-            worker.close();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+        // prepare
+        ProfileEvent event = TestEsSinkContext.mockProfileEvent();
+        Transaction tx = this.context.getChannel().getTransaction();
+        tx.begin();
+        this.context.getChannel().put(event);
+        tx.commit();
+        tx.close();
+        // test
+        EsChannelWorker worker = new EsChannelWorker(context, 0);
+        worker.doRun();
+        worker.start();
+        worker.close();
     }
 }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
index cbee385..c1facdf 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
@@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any;
 
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
@@ -47,7 +48,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
  */
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
-@PrepareForTest({EsSinkFactory.class, RestHighLevelClient.class, ClusterClient.class, IndicesClient.class})
+@PrepareForTest({EsSinkFactory.class, RestHighLevelClient.class, ClusterClient.class, IndicesClient.class,
+        MetricRegister.class})
 public class TestEsOutputChannel {
 
     private RestHighLevelClient esClient;
@@ -103,22 +105,20 @@ public class TestEsOutputChannel {
 
     /**
      * test
+     * 
+     * @throws Exception
      */
     @Test
-    public void test() {
-        try {
-            LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
-            EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
-            EsOutputChannel output = new EsOutputChannel(context);
-            ProfileEvent event = TestEsSinkContext.mockProfileEvent();
-            EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
-            dispatchQueue.add(indexRequest);
-            output.init();
-            output.send();
-            output.close();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+    public void test() throws Exception {
+        LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
+        EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
+        EsOutputChannel output = new EsOutputChannel(context);
+        ProfileEvent event = TestEsSinkContext.mockProfileEvent();
+        EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
+        dispatchQueue.add(indexRequest);
+        output.init();
+        output.send();
+        output.close();
     }
 
 }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
index 5733c03..55f97e8 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
 
 import java.nio.charset.Charset;
 import java.util.HashMap;
@@ -26,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.BufferQueueChannel;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
@@ -33,7 +35,9 @@ import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
 import org.apache.inlong.sort.standalone.utils.Constants;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 /**
@@ -42,6 +46,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
  */
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
+@PrepareForTest({MetricRegister.class})
 public class TestEsSinkContext {
 
     public static final String TEST_INLONG_GROUP_ID = "0fc00000046";
@@ -53,8 +58,11 @@ public class TestEsSinkContext {
      * 
      * @param  dispatchQueue
      * @return
+     * @throws Exception
      */
-    public static EsSinkContext mock(LinkedBlockingQueue<EsIndexRequest> dispatchQueue) {
+    public static EsSinkContext mock(LinkedBlockingQueue<EsIndexRequest> dispatchQueue) throws Exception {
+        PowerMockito.mockStatic(MetricRegister.class);
+        PowerMockito.doNothing().when(MetricRegister.class, "register", any());
         Context context = CommonPropertiesHolder.getContext();
         String sinkName = CommonPropertiesHolder.getClusterId() + "Sink";
         context.put(SortTaskConfig.KEY_TASK_NAME, "sid_es_es-rmrv7g7a_v3");
@@ -93,9 +101,11 @@ public class TestEsSinkContext {
 
     /**
      * test
+     * 
+     * @throws Exception
      */
     @Test
-    public void test() {
+    public void test() throws Exception {
         LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
         EsSinkContext context = mock(dispatchQueue);
         assertEquals(10, context.getBulkSizeMb());