You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/10 11:03:17 UTC
[incubator-inlong] 01/02: [INLONG-2435][Sort] Fix Sort-standalone UT Exceptions (#2436)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.0.0
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 439f14285911702e3495e1dc61998fe86b8b6a61
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Feb 10 17:01:30 2022 +0800
[INLONG-2435][Sort] Fix Sort-standalone UT Exceptions (#2436)
---
.../TestDefaultEvent2IndexRequestHandler.java | 7 +-
.../sink/elasticsearch/TestEsCallbackListener.java | 117 ++++++++-------------
.../sink/elasticsearch/TestEsChannelWorker.java | 35 +++---
.../sink/elasticsearch/TestEsOutputChannel.java | 30 +++---
.../sink/elasticsearch/TestEsSinkContext.java | 14 ++-
5 files changed, 94 insertions(+), 109 deletions(-)
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
index 5177bf8..725b8f1 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
@@ -21,10 +21,12 @@ import static org.junit.Assert.assertEquals;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
/**
@@ -33,13 +35,16 @@ import org.powermock.modules.junit4.PowerMockRunner;
*/
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("javax.management.*")
+@PrepareForTest({MetricRegister.class})
public class TestDefaultEvent2IndexRequestHandler {
/**
* test that ProfileEvent transform to EsIndexRequest
+ *
+ * @throws Exception
*/
@Test
- public void test() {
+ public void test() throws Exception {
LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
ProfileEvent event = TestEsSinkContext.mockProfileEvent();
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
index 84441ef..da2bcb9 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.bulk.BulkItemResponse;
@@ -38,13 +39,13 @@ import org.powermock.modules.junit4.PowerMockRunner;
*/
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("javax.management.*")
-@PrepareForTest({EsSinkFactory.class})
+@PrepareForTest({EsSinkFactory.class, MetricRegister.class})
public class TestEsCallbackListener {
private EsSinkContext context;
@Before
- public void before() {
+ public void before() throws Exception {
LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
this.context = TestEsSinkContext.mock(dispatchQueue);
}
@@ -54,84 +55,54 @@ public class TestEsCallbackListener {
*/
@Test
public void testBeforeBulk() {
- try {
- // prepare
- ProfileEvent event = TestEsSinkContext.mockProfileEvent();
- EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
- long executionId = 0;
- BulkRequest bulkRequest = new BulkRequest();
- bulkRequest.add(indexRequest);
- // test
- EsCallbackListener listener = new EsCallbackListener(context);
- listener.beforeBulk(executionId, bulkRequest);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ // prepare
+ ProfileEvent event = TestEsSinkContext.mockProfileEvent();
+ EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
+ long executionId = 0;
+ BulkRequest bulkRequest = new BulkRequest();
+ bulkRequest.add(indexRequest);
+ // test
+ EsCallbackListener listener = new EsCallbackListener(context);
+ listener.beforeBulk(executionId, bulkRequest);
}
/**
* testAfterSuccessBulk
+ *
+ * @throws Exception
*/
@Test
- public void testAfterSuccessBulk() {
- try {
- // prepare
- LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
- EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
- ProfileEvent event = TestEsSinkContext.mockProfileEvent();
- EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
- long executionId = 0;
- EsCallbackListener listener = new EsCallbackListener(context);
- // request
- BulkRequest bulkRequest = new BulkRequest();
- bulkRequest.add(indexRequest);
- // success response
- IndexResponse indexResponse = new IndexResponse();
- BulkItemResponse itemResponse = new BulkItemResponse(0, OpType.INDEX, indexResponse);
- BulkItemResponse[] responses = new BulkItemResponse[1];
- responses[0] = itemResponse;
- BulkResponse bulkResponse = new BulkResponse(responses, 0);
- listener.afterBulk(executionId, bulkRequest, bulkResponse);
-
- // fail resend
- BulkItemResponse.Failure failure = new BulkItemResponse.Failure("index", OpType.INDEX.name(), "id",
- new Exception());
- BulkItemResponse failResponse = new BulkItemResponse(0, OpType.INDEX, failure);
- responses[0] = failResponse;
- listener.afterBulk(executionId, bulkRequest, bulkResponse);
-
- // failNull noResend
- BulkItemResponse.Failure failureNull = new BulkItemResponse.Failure("index", OpType.INDEX.name(), "id",
- null);
- BulkItemResponse failNullResponse = new BulkItemResponse(0, OpType.INDEX, failureNull);
- responses[0] = failNullResponse;
- listener.afterBulk(executionId, bulkRequest, bulkResponse);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
+ public void testAfterSuccessBulk() throws Exception {
+ // prepare
+ LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
+ EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
+ ProfileEvent event = TestEsSinkContext.mockProfileEvent();
+ EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
+ long executionId = 0;
+ EsCallbackListener listener = new EsCallbackListener(context);
+ // request
+ BulkRequest bulkRequest = new BulkRequest();
+ bulkRequest.add(indexRequest);
+ // success response
+ IndexResponse indexResponse = new IndexResponse();
+ BulkItemResponse itemResponse = new BulkItemResponse(0, OpType.INDEX, indexResponse);
+ BulkItemResponse[] responses = new BulkItemResponse[1];
+ responses[0] = itemResponse;
+ BulkResponse bulkResponse = new BulkResponse(responses, 0);
+ listener.afterBulk(executionId, bulkRequest, bulkResponse);
- /**
- * testAfterFailureBulk
- */
- @Test
- public void testAfterFailureBulk() {
- try {
- // prepare
- LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
- EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
- ProfileEvent event = TestEsSinkContext.mockProfileEvent();
- EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
- long executionId = 0;
- EsCallbackListener listener = new EsCallbackListener(context);
- // request
- BulkRequest bulkRequest = new BulkRequest();
- bulkRequest.add(indexRequest);
+ // fail resend
+ BulkItemResponse.Failure failure = new BulkItemResponse.Failure("index", OpType.INDEX.name(), "id",
+ new Exception());
+ BulkItemResponse failResponse = new BulkItemResponse(0, OpType.INDEX, failure);
+ responses[0] = failResponse;
+ listener.afterBulk(executionId, bulkRequest, bulkResponse);
- // fail resend
- listener.afterBulk(executionId, bulkRequest, new Exception());
- } catch (Exception e) {
- e.printStackTrace();
- }
+ // failNull noResend
+ BulkItemResponse.Failure failureNull = new BulkItemResponse.Failure("index", OpType.INDEX.name(), "id",
+ null);
+ BulkItemResponse failNullResponse = new BulkItemResponse(0, OpType.INDEX, failureNull);
+ responses[0] = failNullResponse;
+ listener.afterBulk(executionId, bulkRequest, bulkResponse);
}
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
index 5438bb5..c95b6b4 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flume.Transaction;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.junit.Before;
import org.junit.Test;
@@ -34,16 +35,18 @@ import org.powermock.modules.junit4.PowerMockRunner;
*/
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("javax.management.*")
-@PrepareForTest({EsSinkFactory.class})
+@PrepareForTest({EsSinkFactory.class, MetricRegister.class})
public class TestEsChannelWorker {
private EsSinkContext context;
/**
* before
+ *
+ * @throws Exception
*/
@Before
- public void before() {
+ public void before() throws Exception {
LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
this.context = TestEsSinkContext.mock(dispatchQueue);
}
@@ -53,21 +56,17 @@ public class TestEsChannelWorker {
*/
@Test
public void test() {
- try {
- // prepare
- ProfileEvent event = TestEsSinkContext.mockProfileEvent();
- Transaction tx = this.context.getChannel().getTransaction();
- tx.begin();
- this.context.getChannel().put(event);
- tx.commit();
- tx.close();
- // test
- EsChannelWorker worker = new EsChannelWorker(context, 0);
- worker.doRun();
- worker.start();
- worker.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
+ // prepare
+ ProfileEvent event = TestEsSinkContext.mockProfileEvent();
+ Transaction tx = this.context.getChannel().getTransaction();
+ tx.begin();
+ this.context.getChannel().put(event);
+ tx.commit();
+ tx.close();
+ // test
+ EsChannelWorker worker = new EsChannelWorker(context, 0);
+ worker.doRun();
+ worker.start();
+ worker.close();
}
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
index cbee385..c1facdf 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
@@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
@@ -47,7 +48,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
*/
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("javax.management.*")
-@PrepareForTest({EsSinkFactory.class, RestHighLevelClient.class, ClusterClient.class, IndicesClient.class})
+@PrepareForTest({EsSinkFactory.class, RestHighLevelClient.class, ClusterClient.class, IndicesClient.class,
+ MetricRegister.class})
public class TestEsOutputChannel {
private RestHighLevelClient esClient;
@@ -103,22 +105,20 @@ public class TestEsOutputChannel {
/**
* test
+ *
+ * @throws Exception
*/
@Test
- public void test() {
- try {
- LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
- EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
- EsOutputChannel output = new EsOutputChannel(context);
- ProfileEvent event = TestEsSinkContext.mockProfileEvent();
- EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
- dispatchQueue.add(indexRequest);
- output.init();
- output.send();
- output.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
+ public void test() throws Exception {
+ LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
+ EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
+ EsOutputChannel output = new EsOutputChannel(context);
+ ProfileEvent event = TestEsSinkContext.mockProfileEvent();
+ EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
+ dispatchQueue.add(indexRequest);
+ output.init();
+ output.send();
+ output.close();
}
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
index 5733c03..55f97e8 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
import java.nio.charset.Charset;
import java.util.HashMap;
@@ -26,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flume.Channel;
import org.apache.flume.Context;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
import org.apache.inlong.sort.standalone.channel.BufferQueueChannel;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
@@ -33,7 +35,9 @@ import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
/**
@@ -42,6 +46,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
*/
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("javax.management.*")
+@PrepareForTest({MetricRegister.class})
public class TestEsSinkContext {
public static final String TEST_INLONG_GROUP_ID = "0fc00000046";
@@ -53,8 +58,11 @@ public class TestEsSinkContext {
*
* @param dispatchQueue
* @return
+ * @throws Exception
*/
- public static EsSinkContext mock(LinkedBlockingQueue<EsIndexRequest> dispatchQueue) {
+ public static EsSinkContext mock(LinkedBlockingQueue<EsIndexRequest> dispatchQueue) throws Exception {
+ PowerMockito.mockStatic(MetricRegister.class);
+ PowerMockito.doNothing().when(MetricRegister.class, "register", any());
Context context = CommonPropertiesHolder.getContext();
String sinkName = CommonPropertiesHolder.getClusterId() + "Sink";
context.put(SortTaskConfig.KEY_TASK_NAME, "sid_es_es-rmrv7g7a_v3");
@@ -93,9 +101,11 @@ public class TestEsSinkContext {
/**
* test
+ *
+ * @throws Exception
*/
@Test
- public void test() {
+ public void test() throws Exception {
LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
EsSinkContext context = mock(dispatchQueue);
assertEquals(10, context.getBulkSizeMb());