You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/10 11:03:16 UTC

[incubator-inlong] branch release-1.0.0 updated (e934aa1 -> 7355452)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch release-1.0.0
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git.


    from e934aa1  [INLONG-2414][Manager] Exclude test jar file during the apache-rat-plugin check (#2415)
     new 439f142  [INLONG-2435][Sort] Fix Sort-standalone UT Exceptions (#2436)
     new 7355452  [INLONG-2441] [Inlong-Audit] Modify the version of audit protobuf (#2444)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 inlong-agent/agent-common/pom.xml                  |   6 --
 inlong-audit/audit-common/pom.xml                  |   2 +-
 .../audit-common/src/main/proto/AuditApi.proto     |  46 ++++----
 inlong-audit/audit-sdk/pom.xml                     |   2 +-
 .../apache/inlong/audit/util/AuditDataTest.java    |  20 +++-
 .../TestDefaultEvent2IndexRequestHandler.java      |   7 +-
 .../sink/elasticsearch/TestEsCallbackListener.java | 117 ++++++++-------------
 .../sink/elasticsearch/TestEsChannelWorker.java    |  35 +++---
 .../sink/elasticsearch/TestEsOutputChannel.java    |  30 +++---
 .../sink/elasticsearch/TestEsSinkContext.java      |  14 ++-
 10 files changed, 135 insertions(+), 144 deletions(-)

[incubator-inlong] 02/02: [INLONG-2441] [Inlong-Audit] Modify the version of audit protobuf (#2444)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.0.0
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 7355452acef7ffe00540baa966f0ee2b6435ad90
Author: doleyzi <43...@users.noreply.github.com>
AuthorDate: Thu Feb 10 19:00:25 2022 +0800

    [INLONG-2441] [Inlong-Audit] Modify the version of audit protobuf (#2444)
---
 inlong-agent/agent-common/pom.xml                  |  6 ---
 inlong-audit/audit-common/pom.xml                  |  2 +-
 .../audit-common/src/main/proto/AuditApi.proto     | 46 +++++++++++-----------
 inlong-audit/audit-sdk/pom.xml                     |  2 +-
 .../apache/inlong/audit/util/AuditDataTest.java    | 20 ++++++++--
 5 files changed, 41 insertions(+), 35 deletions(-)

diff --git a/inlong-agent/agent-common/pom.xml b/inlong-agent/agent-common/pom.xml
index 067fd1d..3ba08a3 100755
--- a/inlong-agent/agent-common/pom.xml
+++ b/inlong-agent/agent-common/pom.xml
@@ -106,12 +106,6 @@
             <groupId>org.apache.inlong</groupId>
             <artifactId>audit-sdk</artifactId>
             <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.google.protobuf</groupId>
-                    <artifactId>protobuf-java</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
     </dependencies>
     <build>
diff --git a/inlong-audit/audit-common/pom.xml b/inlong-audit/audit-common/pom.xml
index 17364d6..001d8e0 100644
--- a/inlong-audit/audit-common/pom.xml
+++ b/inlong-audit/audit-common/pom.xml
@@ -54,7 +54,7 @@
                 <extensions>true</extensions>
                 <configuration>
                     <protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>
-                    <protocArtifact>com.google.protobuf:protoc:3.2.0:exe:${os.detected.classifier}</protocArtifact>
+                    <protocArtifact>com.google.protobuf:protoc:2.5.0:exe:${os.detected.classifier}</protocArtifact>
                 </configuration>
                 <executions>
                     <execution>
diff --git a/inlong-audit/audit-common/src/main/proto/AuditApi.proto b/inlong-audit/audit-common/src/main/proto/AuditApi.proto
index ce98fcd..b5aa6a6 100644
--- a/inlong-audit/audit-common/src/main/proto/AuditApi.proto
+++ b/inlong-audit/audit-common/src/main/proto/AuditApi.proto
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-syntax = "proto3";
+syntax = "proto2";
 
 package org.apache.inlong.audit.protocol;
 
@@ -26,11 +26,11 @@ message BaseCommand {
         AUDITREQUEST  = 2;
         AUDITREPLY    = 3;
     }
-    Type type                            = 1;
-    AuditRequest audit_request  = 2;
-    AuditReply audit_reply      = 3;
-    Ping ping                   = 4;
-    Pong pong                   = 5;
+    optional Type type                   = 1;
+    required AuditRequest audit_request  = 2;
+    optional AuditReply audit_reply      = 3;
+    optional Ping ping                   = 4;
+    optional Pong pong                   = 5;
 }
 
 message Ping {
@@ -40,27 +40,27 @@ message Pong {
 }
 
 message AuditRequest {
-  uint64 request_id = 1;
-  AuditMessageHeader msg_header = 2;
+  optional uint64 request_id = 1;
+  required AuditMessageHeader msg_header = 2;
   repeated AuditMessageBody msg_body = 3;
 }
 
 message AuditMessageHeader {
-  string ip = 1;
-  string docker_id = 2;
-  string thread_id = 3;
-  uint64 sdk_ts = 4;
-  uint64 packet_id = 5;
+  required string ip = 1;
+  optional string docker_id = 2;
+  optional string thread_id = 3;
+  required uint64 sdk_ts = 4;
+  required uint64 packet_id = 5;
 }
 
 message AuditMessageBody {
-  uint64 log_ts = 1;
-  string inlong_group_id= 2;
-  string inlong_stream_id= 3;
-  string audit_id = 4;
-  uint64 count = 5;
-  uint64 size = 6;
-  int64  delay = 7;
+  required uint64 log_ts = 1;
+  required string inlong_group_id= 2;
+  required string inlong_stream_id= 3;
+  required string audit_id = 4;
+  required uint64 count = 5;
+  required uint64 size = 6;
+  required int64  delay = 7;
 }
 
 message AuditReply {
@@ -69,7 +69,7 @@ message AuditReply {
     FAILED   = 1;
     DISASTER = 2;
   }
-  uint64 request_id = 1;
-  RSP_CODE rsp_code = 2;
-  string message = 3;
+  optional uint64 request_id = 1;
+  required RSP_CODE rsp_code = 2;
+  optional string message = 3;
 }
diff --git a/inlong-audit/audit-sdk/pom.xml b/inlong-audit/audit-sdk/pom.xml
index c30f206..767eb51 100644
--- a/inlong-audit/audit-sdk/pom.xml
+++ b/inlong-audit/audit-sdk/pom.xml
@@ -37,7 +37,7 @@
         <compiler.target>1.8</compiler.target>
         <netty.version>3.8.0.Final</netty.version>
         <junit.version>4.13</junit.version>
-        <protobuf.version>3.19.1</protobuf.version>
+        <protobuf.version>2.5.0</protobuf.version>
         <commons.version>3.0</commons.version>
         <skipTests>false</skipTests>
         <slf4j.version>1.7.25</slf4j.version>
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
index 1d4d688..82babec 100644
--- a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
@@ -37,10 +37,22 @@ public class AuditDataTest {
 
     @Test
     public void getDataByte() {
-        AuditApi.AuditMessageHeader header = AuditApi.AuditMessageHeader.newBuilder().setIp("127.0.0.1").build();
-        AuditApi.AuditMessageBody body = AuditApi.AuditMessageBody.newBuilder().setAuditId("1").build();
-        AuditApi.AuditRequest request = AuditApi.AuditRequest.newBuilder().setMsgHeader(header)
-                .addMsgBody(body).build();
+        AuditApi.AuditMessageHeader.Builder headerBuilder = AuditApi.AuditMessageHeader.newBuilder();
+        headerBuilder.setPacketId(1)
+                .setSdkTs(0)
+                .setThreadId("")
+                .setIp("")
+                .setDockerId("");
+        AuditApi.AuditMessageBody.Builder bodyBuilder = AuditApi.AuditMessageBody.newBuilder();
+        bodyBuilder.setAuditId("")
+                .setInlongGroupId("")
+                .setInlongStreamId("")
+                .setLogTs(0)
+                .setCount(0)
+                .setSize(0)
+                .setDelay(0);
+        AuditApi.AuditRequest request = AuditApi.AuditRequest.newBuilder().setMsgHeader(headerBuilder.build())
+                .addMsgBody(bodyBuilder.build()).build();
         AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.newBuilder().setAuditRequest(request).build();
         AuditData test = new AuditData(System.currentTimeMillis(), baseCommand);
         byte[] data = test.getDataByte();

[incubator-inlong] 01/02: [INLONG-2435][Sort] Fix Sort-standalone UT Exceptions (#2436)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.0.0
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 439f14285911702e3495e1dc61998fe86b8b6a61
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Feb 10 17:01:30 2022 +0800

    [INLONG-2435][Sort] Fix Sort-standalone UT Exceptions (#2436)
---
 .../TestDefaultEvent2IndexRequestHandler.java      |   7 +-
 .../sink/elasticsearch/TestEsCallbackListener.java | 117 ++++++++-------------
 .../sink/elasticsearch/TestEsChannelWorker.java    |  35 +++---
 .../sink/elasticsearch/TestEsOutputChannel.java    |  30 +++---
 .../sink/elasticsearch/TestEsSinkContext.java      |  14 ++-
 5 files changed, 94 insertions(+), 109 deletions(-)

diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
index 5177bf8..725b8f1 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java
@@ -21,10 +21,12 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 /**
@@ -33,13 +35,16 @@ import org.powermock.modules.junit4.PowerMockRunner;
  */
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
+@PrepareForTest({MetricRegister.class})
 public class TestDefaultEvent2IndexRequestHandler {
 
     /**
      * test that ProfileEvent transform to EsIndexRequest
+     * 
+     * @throws Exception
      */
     @Test
-    public void test() {
+    public void test() throws Exception {
         LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
         EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
         ProfileEvent event = TestEsSinkContext.mockProfileEvent();
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
index 84441ef..da2bcb9 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.elasticsearch.action.DocWriteRequest.OpType;
 import org.elasticsearch.action.bulk.BulkItemResponse;
@@ -38,13 +39,13 @@ import org.powermock.modules.junit4.PowerMockRunner;
  */
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
-@PrepareForTest({EsSinkFactory.class})
+@PrepareForTest({EsSinkFactory.class, MetricRegister.class})
 public class TestEsCallbackListener {
 
     private EsSinkContext context;
 
     @Before
-    public void before() {
+    public void before() throws Exception {
         LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
         this.context = TestEsSinkContext.mock(dispatchQueue);
     }
@@ -54,84 +55,54 @@ public class TestEsCallbackListener {
      */
     @Test
     public void testBeforeBulk() {
-        try {
-            // prepare
-            ProfileEvent event = TestEsSinkContext.mockProfileEvent();
-            EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
-            long executionId = 0;
-            BulkRequest bulkRequest = new BulkRequest();
-            bulkRequest.add(indexRequest);
-            // test
-            EsCallbackListener listener = new EsCallbackListener(context);
-            listener.beforeBulk(executionId, bulkRequest);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+        // prepare
+        ProfileEvent event = TestEsSinkContext.mockProfileEvent();
+        EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
+        long executionId = 0;
+        BulkRequest bulkRequest = new BulkRequest();
+        bulkRequest.add(indexRequest);
+        // test
+        EsCallbackListener listener = new EsCallbackListener(context);
+        listener.beforeBulk(executionId, bulkRequest);
     }
 
     /**
      * testAfterSuccessBulk
+     * 
+     * @throws Exception
      */
     @Test
-    public void testAfterSuccessBulk() {
-        try {
-            // prepare
-            LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
-            EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
-            ProfileEvent event = TestEsSinkContext.mockProfileEvent();
-            EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
-            long executionId = 0;
-            EsCallbackListener listener = new EsCallbackListener(context);
-            // request
-            BulkRequest bulkRequest = new BulkRequest();
-            bulkRequest.add(indexRequest);
-            // success response
-            IndexResponse indexResponse = new IndexResponse();
-            BulkItemResponse itemResponse = new BulkItemResponse(0, OpType.INDEX, indexResponse);
-            BulkItemResponse[] responses = new BulkItemResponse[1];
-            responses[0] = itemResponse;
-            BulkResponse bulkResponse = new BulkResponse(responses, 0);
-            listener.afterBulk(executionId, bulkRequest, bulkResponse);
-
-            // fail resend
-            BulkItemResponse.Failure failure = new BulkItemResponse.Failure("index", OpType.INDEX.name(), "id",
-                    new Exception());
-            BulkItemResponse failResponse = new BulkItemResponse(0, OpType.INDEX, failure);
-            responses[0] = failResponse;
-            listener.afterBulk(executionId, bulkRequest, bulkResponse);
-
-            // failNull noResend
-            BulkItemResponse.Failure failureNull = new BulkItemResponse.Failure("index", OpType.INDEX.name(), "id",
-                    null);
-            BulkItemResponse failNullResponse = new BulkItemResponse(0, OpType.INDEX, failureNull);
-            responses[0] = failNullResponse;
-            listener.afterBulk(executionId, bulkRequest, bulkResponse);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
+    public void testAfterSuccessBulk() throws Exception {
+        // prepare
+        LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
+        EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
+        ProfileEvent event = TestEsSinkContext.mockProfileEvent();
+        EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
+        long executionId = 0;
+        EsCallbackListener listener = new EsCallbackListener(context);
+        // request
+        BulkRequest bulkRequest = new BulkRequest();
+        bulkRequest.add(indexRequest);
+        // success response
+        IndexResponse indexResponse = new IndexResponse();
+        BulkItemResponse itemResponse = new BulkItemResponse(0, OpType.INDEX, indexResponse);
+        BulkItemResponse[] responses = new BulkItemResponse[1];
+        responses[0] = itemResponse;
+        BulkResponse bulkResponse = new BulkResponse(responses, 0);
+        listener.afterBulk(executionId, bulkRequest, bulkResponse);
 
-    /**
-     * testAfterFailureBulk
-     */
-    @Test
-    public void testAfterFailureBulk() {
-        try {
-            // prepare
-            LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
-            EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
-            ProfileEvent event = TestEsSinkContext.mockProfileEvent();
-            EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
-            long executionId = 0;
-            EsCallbackListener listener = new EsCallbackListener(context);
-            // request
-            BulkRequest bulkRequest = new BulkRequest();
-            bulkRequest.add(indexRequest);
+        // fail resend
+        BulkItemResponse.Failure failure = new BulkItemResponse.Failure("index", OpType.INDEX.name(), "id",
+                new Exception());
+        BulkItemResponse failResponse = new BulkItemResponse(0, OpType.INDEX, failure);
+        responses[0] = failResponse;
+        listener.afterBulk(executionId, bulkRequest, bulkResponse);
 
-            // fail resend
-            listener.afterBulk(executionId, bulkRequest, new Exception());
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+        // failNull noResend
+        BulkItemResponse.Failure failureNull = new BulkItemResponse.Failure("index", OpType.INDEX.name(), "id",
+                null);
+        BulkItemResponse failNullResponse = new BulkItemResponse(0, OpType.INDEX, failureNull);
+        responses[0] = failNullResponse;
+        listener.afterBulk(executionId, bulkRequest, bulkResponse);
     }
 }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
index 5438bb5..c95b6b4 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.flume.Transaction;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,16 +35,18 @@ import org.powermock.modules.junit4.PowerMockRunner;
  */
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
-@PrepareForTest({EsSinkFactory.class})
+@PrepareForTest({EsSinkFactory.class, MetricRegister.class})
 public class TestEsChannelWorker {
 
     private EsSinkContext context;
 
     /**
      * before
+     * 
+     * @throws Exception
      */
     @Before
-    public void before() {
+    public void before() throws Exception {
         LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
         this.context = TestEsSinkContext.mock(dispatchQueue);
     }
@@ -53,21 +56,17 @@ public class TestEsChannelWorker {
      */
     @Test
     public void test() {
-        try {
-            // prepare
-            ProfileEvent event = TestEsSinkContext.mockProfileEvent();
-            Transaction tx = this.context.getChannel().getTransaction();
-            tx.begin();
-            this.context.getChannel().put(event);
-            tx.commit();
-            tx.close();
-            // test
-            EsChannelWorker worker = new EsChannelWorker(context, 0);
-            worker.doRun();
-            worker.start();
-            worker.close();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+        // prepare
+        ProfileEvent event = TestEsSinkContext.mockProfileEvent();
+        Transaction tx = this.context.getChannel().getTransaction();
+        tx.begin();
+        this.context.getChannel().put(event);
+        tx.commit();
+        tx.close();
+        // test
+        EsChannelWorker worker = new EsChannelWorker(context, 0);
+        worker.doRun();
+        worker.start();
+        worker.close();
     }
 }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
index cbee385..c1facdf 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java
@@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any;
 
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
@@ -47,7 +48,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
  */
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
-@PrepareForTest({EsSinkFactory.class, RestHighLevelClient.class, ClusterClient.class, IndicesClient.class})
+@PrepareForTest({EsSinkFactory.class, RestHighLevelClient.class, ClusterClient.class, IndicesClient.class,
+        MetricRegister.class})
 public class TestEsOutputChannel {
 
     private RestHighLevelClient esClient;
@@ -103,22 +105,20 @@ public class TestEsOutputChannel {
 
     /**
      * test
+     * 
+     * @throws Exception
      */
     @Test
-    public void test() {
-        try {
-            LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
-            EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
-            EsOutputChannel output = new EsOutputChannel(context);
-            ProfileEvent event = TestEsSinkContext.mockProfileEvent();
-            EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
-            dispatchQueue.add(indexRequest);
-            output.init();
-            output.send();
-            output.close();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+    public void test() throws Exception {
+        LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
+        EsSinkContext context = TestEsSinkContext.mock(dispatchQueue);
+        EsOutputChannel output = new EsOutputChannel(context);
+        ProfileEvent event = TestEsSinkContext.mockProfileEvent();
+        EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event);
+        dispatchQueue.add(indexRequest);
+        output.init();
+        output.send();
+        output.close();
     }
 
 }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
index 5733c03..55f97e8 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
 
 import java.nio.charset.Charset;
 import java.util.HashMap;
@@ -26,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.BufferQueueChannel;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
@@ -33,7 +35,9 @@ import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
 import org.apache.inlong.sort.standalone.utils.Constants;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 /**
@@ -42,6 +46,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
  */
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
+@PrepareForTest({MetricRegister.class})
 public class TestEsSinkContext {
 
     public static final String TEST_INLONG_GROUP_ID = "0fc00000046";
@@ -53,8 +58,11 @@ public class TestEsSinkContext {
      * 
      * @param  dispatchQueue
      * @return
+     * @throws Exception
      */
-    public static EsSinkContext mock(LinkedBlockingQueue<EsIndexRequest> dispatchQueue) {
+    public static EsSinkContext mock(LinkedBlockingQueue<EsIndexRequest> dispatchQueue) throws Exception {
+        PowerMockito.mockStatic(MetricRegister.class);
+        PowerMockito.doNothing().when(MetricRegister.class, "register", any());
         Context context = CommonPropertiesHolder.getContext();
         String sinkName = CommonPropertiesHolder.getClusterId() + "Sink";
         context.put(SortTaskConfig.KEY_TASK_NAME, "sid_es_es-rmrv7g7a_v3");
@@ -93,9 +101,11 @@ public class TestEsSinkContext {
 
     /**
      * test
+     * 
+     * @throws Exception
      */
     @Test
-    public void test() {
+    public void test() throws Exception {
         LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>();
         EsSinkContext context = mock(dispatchQueue);
         assertEquals(10, context.getBulkSizeMb());