You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/02/11 02:03:01 UTC

[skywalking] branch master updated: Simplify the Zabbix UT, reduce use time (#6362)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 92d0465  Simplify the Zabbix UT, reduce use time (#6362)
92d0465 is described below

commit 92d0465cd839e22f859b0c9cfe7b2928d502ad2f
Author: mrproliu <74...@qq.com>
AuthorDate: Thu Feb 11 10:02:37 2021 +0800

    Simplify the Zabbix UT, reduce use time (#6362)
---
 .../provider/protocol/ZabbixProtocolDecoder.java   |   2 +-
 .../receiver/zabbix/provider/ZabbixBaseTest.java   | 333 ++++++++-------------
 .../zabbix/provider/ZabbixMetricsTest.java         |  16 +-
 .../protocol/ZabbixProtocolHandlerTest.java        |  24 +-
 4 files changed, 145 insertions(+), 230 deletions(-)

diff --git a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/protocol/ZabbixProtocolDecoder.java b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/protocol/ZabbixProtocolDecoder.java
index e2d41d5..6fd100e 100644
--- a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/protocol/ZabbixProtocolDecoder.java
+++ b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/protocol/ZabbixProtocolDecoder.java
@@ -106,7 +106,7 @@ public class ZabbixProtocolDecoder extends ByteToMessageDecoder {
     /**
      * Close connection if protocol error
      */
-    private void errorProtocol(ChannelHandlerContext context, ByteBuf byteBuf, String reason, Throwable ex) throws InterruptedException {
+    protected void errorProtocol(ChannelHandlerContext context, ByteBuf byteBuf, String reason, Throwable ex) throws InterruptedException {
         log.warn("Receive message is not Zabbix protocol, reason: {}", reason, ex);
         // Skip all content
         byteBuf.skipBytes(byteBuf.readableBytes());
diff --git a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/ZabbixBaseTest.java b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/ZabbixBaseTest.java
index f73314a..be5e073 100644
--- a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/ZabbixBaseTest.java
+++ b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/ZabbixBaseTest.java
@@ -23,47 +23,43 @@ import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.socket.SocketChannel;
-import java.net.SocketTimeoutException;
-import lombok.SneakyThrows;
+import lombok.Getter;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.skywalking.apm.util.StringUtil;
-import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixErrorProtocolException;
 import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixProtocolDecoder;
+import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixProtocolEncoder;
 import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixProtocolHandler;
-import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixServer;
 import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixProtocolType;
 import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixRequest;
-import org.junit.After;
+import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixResponse;
 import org.junit.Assert;
 import org.junit.Before;
-import org.mockito.stubbing.Answer;
-import org.powermock.reflect.Whitebox;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.net.Socket;
+import org.mockito.Spy;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 public abstract class ZabbixBaseTest {
-    private static final String TCP_HOST = "0.0.0.0";
-    private static final int TCP_PORT = 10051;
 
-    protected ZabbixServer zabbixServer;
-    protected SocketClient socketClient;
+    @Spy
+    private ChannelHandlerContext channelHandlerContext;
+
+    private List<Object> requests;
+    private List<Object> responses;
+
+    private ZabbixProtocolEncoderWrapper encoder;
+    private ZabbixProtocolDecoderWrapper decoder;
+    private ZabbixProtocolHandler handler;
     protected ZabbixMetrics zabbixMetrics;
 
     /**
@@ -72,40 +68,31 @@ public abstract class ZabbixBaseTest {
     protected abstract ZabbixMetrics buildZabbixMetrics() throws Exception;
 
     @Before
-    public void setupService() throws Throwable {
-        // Startup server
-        ZabbixModuleConfig config = new ZabbixModuleConfig();
-        config.setPort(TCP_PORT);
-        config.setHost(TCP_HOST);
+    public void setupMetrics() throws Throwable {
         zabbixMetrics = buildZabbixMetrics();
-        zabbixServer = new ZabbixServerWrapper(config, zabbixMetrics);
-        zabbixServer.start();
-    }
-
-    @After
-    public void cleanup() {
-        zabbixServer.stop();
+        requests = new ArrayList<>();
+        responses = new ArrayList<>();
+
+        encoder = new ZabbixProtocolEncoderWrapper();
+        decoder = new ZabbixProtocolDecoderWrapper();
+        handler = new ZabbixProtocolHandler(zabbixMetrics);
+        when(channelHandlerContext.writeAndFlush(any())).thenAnswer(invocationOnMock -> {
+            responses.add(invocationOnMock.getArgument(0));
+            return null;
+        });
+        ByteBufAllocator allocator = mock(ByteBufAllocator.class);
+        when(allocator.buffer(anyInt())).thenAnswer(invocationOnMock -> Unpooled.buffer(invocationOnMock.getArgument(0)));
+        when(channelHandlerContext.alloc()).thenReturn(allocator);
     }
 
     /**
      * Verify request error protocol
      */
     public void assertWriteErrorProtocol(byte[] data) throws Throwable {
-        startupSocketClient();
-        try {
-            socketClient.socket.getOutputStream().write(data);
-
-            for (int i = 0; i < 10; i++) {
-                // No response
-                if (socketClient.socket.getInputStream().available() == 0 && socketClient.socket.getInputStream().read() == -1) {
-                    return ;
-                }
-                TimeUnit.MILLISECONDS.sleep(500);
-            }
-
-            throw new IllegalStateException("Could not detect protocol error");
-        } finally {
-            stopSocketClient();
+        ZabbixProtocolDecoderWrapper decoder = new ZabbixProtocolDecoderWrapper();
+        decoder.decode(null, Unpooled.wrappedBuffer(data), null);
+        if (!decoder.isProtocolError()) {
+            throw new IllegalStateException("Could not detect need more input error");
         }
     }
 
@@ -113,33 +100,46 @@ public abstract class ZabbixBaseTest {
      * Assert need more input to server
      */
     public void assertNeedMoreInput(byte[] data) throws Throwable {
-        startupSocketClient();
-        try {
-            socketClient.socket.getOutputStream().write(data);
-
-            try {
-                for (int i = 0; i < 10; i++) {
-                    // No response
-                    if (socketClient.socket.getInputStream().available() == 0 && socketClient.socket.getInputStream().read() == -1) {
-                        return ;
-                    }
-                    TimeUnit.MILLISECONDS.sleep(100);
+        ZabbixProtocolDecoder decoder = spy(new ZabbixProtocolDecoder());
+        if (decoder.decodeToPayload(null, Unpooled.wrappedBuffer(data)) != null) {
+            throw new IllegalStateException("Could not detect need more input error");
+        }
+    }
+
+    /**
+     * Verify Active checks item names
+     */
+    public void assertZabbixActiveChecksResponse(int inx, String... itemNames) throws Exception {
+        ZabbixResponse response = (ZabbixResponse) responses.get(inx);
+
+        // Active Checks
+        Assert.assertEquals(itemNames.length, response.getActiveChecks().size());
+        for (String itemName : itemNames) {
+            boolean found = false;
+
+            for (final ZabbixResponse.ActiveChecks checks : response.getActiveChecks()) {
+                if (Objects.equals(checks.getKey(), itemName)) {
+                    Assert.assertTrue(checks.getDelay() > 0);
+                    Assert.assertTrue(checks.getLastlogsize() >= 0);
+                    Assert.assertTrue(checks.getMtime() >= 0);
+                    found = true;
                 }
-            } catch (SocketTimeoutException e) {
-                // Read timeout mean need more content
-                return;
             }
 
-            throw new IllegalStateException("Could not detect need more input error");
-        } finally {
-            stopSocketClient();
+            if (!found) {
+                throw new AssertionError("Could not found " + itemName + " in Active Checks response");
+            }
         }
+
+        encoder.encode(channelHandlerContext, response, null);
+        String respBody = decoder.decodeToPayload(channelHandlerContext, (ByteBuf) responses.get(inx + 1));
+        assertZabbixActiveChecksResponseWithEncoded(respBody, itemNames);
     }
 
     /**
-     * Verify Active checks item names
+     * Verify Active checks item names with encoded
      */
-    public void assertZabbixActiveChecksResponse(String body, String... itemNames) {
+    private void assertZabbixActiveChecksResponseWithEncoded(String body, String... itemNames) {
         Assert.assertNotNull(body);
         JsonElement bodyRoot = new Gson().fromJson(body, JsonElement.class);
         JsonObject rootObject = bodyRoot.getAsJsonObject();
@@ -173,16 +173,29 @@ public abstract class ZabbixBaseTest {
     /**
      * Verify Zabbix agent data response
      */
-    public void assertZabbixAgentDataResponse(String body) {
+    public void assertZabbixAgentDataResponse(int inx) throws Exception {
+        ZabbixResponse response = (ZabbixResponse) responses.get(inx);
+
+        // Agent data info
+        Assert.assertTrue(StringUtil.isNotEmpty(response.getAgentData().getInfo()));
+
+        encoder.encode(channelHandlerContext, response, null);
+        String respBody = decoder.decodeToPayload(channelHandlerContext, (ByteBuf) responses.get(inx + 1));
+        assertZabbixAgentDataResponseWithEncoded(respBody);
+    }
+
+    /**
+     * Verify Zabbix agent data response with encoded
+     */
+    public void assertZabbixAgentDataResponseWithEncoded(String body) {
         Assert.assertNotNull(body);
         JsonElement bodyRoot = new Gson().fromJson(body, JsonElement.class);
         JsonObject rootObject = bodyRoot.getAsJsonObject();
         // Basic response status
         Assert.assertEquals("success", rootObject.get("response").getAsString());
 
-        // Agent data info
+        // Agent data
         Assert.assertNotNull(rootObject.get("info"));
-        Assert.assertTrue(StringUtil.isNotEmpty(rootObject.get("info").getAsString()));
     }
 
     /**
@@ -229,159 +242,67 @@ public abstract class ZabbixBaseTest {
      * Verify zabbix request basic info
      */
     private ZabbixRequest assertZabbixRequestBasic(int inx, ZabbixProtocolType protocolType) {
-        List<ZabbixRequest> requests = socketClient.requests;
         Assert.assertNotNull(requests);
         Assert.assertTrue(requests.size() > inx);
-        ZabbixRequest request = requests.get(inx);
+        ZabbixRequest request = (ZabbixRequest) requests.get(inx);
         Assert.assertEquals(protocolType, request.getType());
         return request;
     }
 
-    /**
-     * Startup a new socket client to server
-     */
-    protected void startupSocketClient() throws Throwable {
-        socketClient = Optional.ofNullable(this.socketClient).orElseGet(SocketClient::new);
-        socketClient.startup();
+    public byte[] buildZabbixRequestData(String content) {
+        // Build header
+        byte[] payload = content.getBytes();
+        int payloadLength = payload.length;
+        byte[] header = new byte[] {
+            'Z', 'B', 'X', 'D', '\1',
+            (byte) (payloadLength & 0xFF),
+            (byte) (payloadLength >> 8 & 0xFF),
+            (byte) (payloadLength >> 16 & 0xFF),
+            (byte) (payloadLength >> 24 & 0xFF),
+            '\0', '\0', '\0', '\0'};
+
+        byte[] packet = new byte[header.length + payloadLength];
+        System.arraycopy(header, 0, packet, 0, header.length);
+        System.arraycopy(payload, 0, packet, header.length, payloadLength);
+
+        return packet;
     }
 
-    /**
-     * Close the client
-     */
-    protected void stopSocketClient() {
-        Optional.ofNullable(socketClient).ifPresent(SocketClient::stop);
-        socketClient = null;
-    }
-
-    /**
-     * Connect to receiver server
-     */
-    protected static class SocketClient {
-        private ZabbixProtocolHandler protocolHandler;
-        private Throwable spyHandlerException;
-        private Socket socket;
-        private List<ZabbixRequest> requests;
-
-        private void startup() throws Throwable {
-            if (socket != null) {
-                return;
-            }
-            socket = new Socket();
-            socket.setSoTimeout(2000);
-            socket.connect(new InetSocketAddress(TCP_HOST, TCP_PORT));
+    public void writeZabbixMessage(String message) throws Exception {
+        ArrayList<Object> data = new ArrayList<>();
+        decoder.decode(channelHandlerContext, Unpooled.wrappedBuffer(buildZabbixRequestData(message)), data);
+        requests.add(data.get(0));
 
-            // Waiting for connection
-            while (!socket.isConnected() || (protocolHandler == null && spyHandlerException == null)) {
-                TimeUnit.SECONDS.sleep(1);
-            }
-
-            if (spyHandlerException != null) {
-                throw spyHandlerException;
-            }
-
-            // Intercept message received
-            requests = new ArrayList<>();
-            doAnswer((Answer<Object>) invocationOnMock -> {
-                requests.add(invocationOnMock.getArgument(1));
-                return invocationOnMock.callRealMethod();
-            }).when(protocolHandler).channelRead0(any(), any());
-        }
-
-        @SneakyThrows
-        private void stop() {
-            if (socket != null && socket.isConnected()) {
-                socket.close();
-            }
-        }
-
-        public void writeZabbixMessage(String message) throws IOException {
-            this.socket.getOutputStream().write(buildZabbixRequestData(message));
-        }
-
-        public static byte[] buildZabbixRequestData(String content) {
-            // Build header
-            byte[] payload = content.getBytes();
-            int payloadLength = payload.length;
-            byte[] header = new byte[] {
-                'Z', 'B', 'X', 'D', '\1',
-                (byte) (payloadLength & 0xFF),
-                (byte) (payloadLength >> 8 & 0xFF),
-                (byte) (payloadLength >> 16 & 0xFF),
-                (byte) (payloadLength >> 24 & 0xFF),
-                '\0', '\0', '\0', '\0'};
-
-            byte[] packet = new byte[header.length + payloadLength];
-            System.arraycopy(header, 0, packet, 0, header.length);
-            System.arraycopy(payload, 0, packet, header.length, payloadLength);
-
-            return packet;
-        }
+        handler.channelRead0(channelHandlerContext, (ZabbixRequest) data.get(0));
+    }
 
-        /**
-         * Finding and spy the Zabbix handler
-         */
-        private void spyHandler(SocketChannel channel) {
-            Object tailContext = Whitebox.getInternalState(channel.pipeline(), "tail");
-            Object handlerContext = Whitebox.getInternalState(tailContext, "prev");
-            ZabbixProtocolHandler handler = spyHandler(handlerContext, ZabbixProtocolHandler.class);
-            if (handler == null) {
-                throw new IllegalStateException("Unnable to find Zabbix protocol handler");
-            }
-            protocolHandler = handler;
-        }
+    @Getter
+    private class ZabbixProtocolDecoderWrapper extends ZabbixProtocolDecoder {
+        private boolean protocolError;
 
-        private <T> T spyHandler(Object handlerContext, Class<T> handlerCls) {
-            if (handlerContext == null || handlerContext.getClass().getSimpleName().contains("HeadContext")) {
-                return null;
-            }
-            Object handler = Whitebox.getInternalState(handlerContext, "handler");
-            if (handler.getClass().equals(handlerCls)) {
-                Object realHandler = spy(handler);
-                Whitebox.setInternalState(handlerContext, "handler", realHandler);
-                return (T) realHandler;
-            } else {
-                return spyHandler(Whitebox.getInternalState(handlerContext, "prev"), handlerCls);
-            }
-        }
-
-        private byte[] readAllContent(InputStream inputStream) throws IOException {
-            ByteArrayOutputStream outputStream = new ByteArrayOutputStream(512);
-            byte[] buffer = new byte[512];
-            int len;
-            while ((len = inputStream.read(buffer)) > 0) {
-                outputStream.write(buffer, 0, len);
-                if (len != buffer.length) {
-                    break;
-                }
-            }
-            return outputStream.toByteArray();
+        @Override
+        public void decode(final ChannelHandlerContext channelHandlerContext,
+                              final ByteBuf byteBuf,
+                              final List<Object> list) throws Exception {
+            super.decode(channelHandlerContext, byteBuf, list);
         }
 
-        public String waitAndGetResponsePayload() throws InterruptedException, IOException, ZabbixErrorProtocolException {
-            ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class);
-            ByteBuf byteBuf = Unpooled.copiedBuffer(readAllContent(socket.getInputStream()));
-            return new ZabbixProtocolDecoder().decodeToPayload(channelHandlerContext, byteBuf);
+        @Override
+        protected void errorProtocol(final ChannelHandlerContext context,
+                                     final ByteBuf byteBuf,
+                                     final String reason,
+                                     final Throwable ex) throws InterruptedException {
+            protocolError = true;
         }
     }
 
-    /**
-     * Zabbix binder wrapper, support spy Zabbix message received data
-     */
-    private class ZabbixServerWrapper extends ZabbixServer {
-
-        public ZabbixServerWrapper(ZabbixModuleConfig config, ZabbixMetrics zabbixMetrics) {
-            super(config, zabbixMetrics);
-        }
-
+    @Getter
+    private class ZabbixProtocolEncoderWrapper extends ZabbixProtocolEncoder {
         @Override
-        public void initChannel(SocketChannel channel) {
-            super.initChannel(channel);
-
-            try {
-                socketClient.spyHandler(channel);
-            } catch (Throwable e) {
-                socketClient.spyHandlerException = e;
-            }
+        public void encode(final ChannelHandlerContext channelHandlerContext,
+                              final ZabbixResponse zabbixResponse,
+                              final List<Object> list) throws Exception {
+            super.encode(channelHandlerContext, zabbixResponse, list);
         }
     }
 
diff --git a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/ZabbixMetricsTest.java b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/ZabbixMetricsTest.java
index 6bd9354a..6766e71 100644
--- a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/ZabbixMetricsTest.java
+++ b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/ZabbixMetricsTest.java
@@ -60,7 +60,7 @@ public class ZabbixMetricsTest extends ZabbixBaseTest {
     private List<AcceptableValue> values = new ArrayList<>();
 
     @Override
-    public void setupService() throws Throwable {
+    public void setupMetrics() throws Throwable {
         moduleProvider = Mockito.mock(CoreModuleProvider.class);
         moduleManager = Mockito.mock(ModuleManager.class);
 
@@ -82,7 +82,7 @@ public class ZabbixMetricsTest extends ZabbixBaseTest {
         map.put("avgHistogram", AvgHistogramFunction.class);
         map.put("avgHistogramPercentile", AvgHistogramPercentileFunction.class);
         Whitebox.setInternalState(meterSystem, "functionRegister", map);
-        super.setupService();
+        super.setupMetrics();
     }
 
     @Override
@@ -100,23 +100,20 @@ public class ZabbixMetricsTest extends ZabbixBaseTest {
 
     @Test
     public void testReceiveMetrics() throws Throwable {
-        startupSocketClient();
         // Verify Active Checks
-        socketClient.writeZabbixMessage("{\"request\":\"active checks\",\"host\":\"test-01\"}");
-        String activeChecksRespData = socketClient.waitAndGetResponsePayload();
+        writeZabbixMessage("{\"request\":\"active checks\",\"host\":\"test-01\"}");
         assertZabbixActiveChecksRequest(0, "test-01");
-        assertZabbixActiveChecksResponse(activeChecksRespData, "system.cpu.load[all,avg1]", "system.cpu.load[all,avg5]", "system.cpu.load[all,avg15]", "agent.hostname");
+        assertZabbixActiveChecksResponse(0, "system.cpu.load[all,avg1]", "system.cpu.load[all,avg5]", "system.cpu.load[all,avg15]", "agent.hostname");
 
         // Verify Agent data
-        socketClient.writeZabbixMessage("{\"request\":\"agent data\",\"session\":\"f32425dc61971760bf791f731931a92e\",\"data\":[" +
+        writeZabbixMessage("{\"request\":\"agent data\",\"session\":\"f32425dc61971760bf791f731931a92e\",\"data\":[" +
             "{\"host\":\"test-01\",\"key\":\"system.cpu.load[all,avg1]\",\"value\":\"1.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}," +
             "{\"host\":\"test-01\",\"key\":\"system.cpu.load[all,avg5]\",\"value\":\"2.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}," +
             "{\"host\":\"test-01\",\"key\":\"system.cpu.load[all,avg15]\",\"value\":\"3.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}," +
             "{\"host\":\"test-01\",\"key\":\"agent.hostname\",\"value\":\"test-01-hostname\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}" +
             "],\"clock\":1609588568,\"ns\":102244476}");
-        String agentDataRespData = socketClient.waitAndGetResponsePayload();
         assertZabbixAgentDataRequest(1, "test-01", "system.cpu.load[all,avg1]", "system.cpu.load[all,avg5]", "system.cpu.load[all,avg15]", "agent.hostname");
-        assertZabbixAgentDataResponse(agentDataRespData);
+        assertZabbixAgentDataResponse(2);
 
         // Verify meter system received data
         Assert.assertEquals(1, values.size());
@@ -130,6 +127,5 @@ public class ZabbixMetricsTest extends ZabbixBaseTest {
         Assert.assertEquals(1, avgLabeledFunction.getCount().get("avg1"), 0.0);
         Assert.assertEquals(1, avgLabeledFunction.getCount().get("avg5"), 0.0);
         Assert.assertEquals(1, avgLabeledFunction.getCount().get("avg15"), 0.0);
-        stopSocketClient();
     }
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/protocol/ZabbixProtocolHandlerTest.java b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/protocol/ZabbixProtocolHandlerTest.java
index 749699f..a92c142 100644
--- a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/protocol/ZabbixProtocolHandlerTest.java
+++ b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/protocol/ZabbixProtocolHandlerTest.java
@@ -47,20 +47,15 @@ public class ZabbixProtocolHandlerTest extends ZabbixBaseTest {
      */
     @Test
     public void testReceive() throws Throwable {
-        startupSocketClient();
         // Verify Active Checks
-        socketClient.writeZabbixMessage("{\"request\":\"active checks\",\"host\":\"zabbix-test-agent\"}");
-        String activeChecksRespData = socketClient.waitAndGetResponsePayload();
+        writeZabbixMessage("{\"request\":\"active checks\",\"host\":\"zabbix-test-agent\"}");
         assertZabbixActiveChecksRequest(0, "zabbix-test-agent");
-        assertZabbixActiveChecksResponse(activeChecksRespData, "system.cpu.load[all,avg15]");
+        assertZabbixActiveChecksResponse(0, "system.cpu.load[all,avg15]");
 
         // Verify Agent data
-        socketClient.writeZabbixMessage("{\"request\":\"agent data\",\"session\":\"f32425dc61971760bf791f731931a92e\",\"data\":[{\"host\":\"zabbix-test-agent\",\"key\":\"system.cpu.load[all,avg15]\",\"value\":\"1.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}],\"clock\":1609588568,\"ns\":102244476}");
-        String agentDataRespData = socketClient.waitAndGetResponsePayload();
+        writeZabbixMessage("{\"request\":\"agent data\",\"session\":\"f32425dc61971760bf791f731931a92e\",\"data\":[{\"host\":\"zabbix-test-agent\",\"key\":\"system.cpu.load[all,avg15]\",\"value\":\"1.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}],\"clock\":1609588568,\"ns\":102244476}");
         assertZabbixAgentDataRequest(1, "zabbix-test-agent", "system.cpu.load[all,avg15]");
-        assertZabbixAgentDataResponse(agentDataRespData);
-
-        stopSocketClient();
+        assertZabbixAgentDataResponse(2);
     }
 
     /**
@@ -69,7 +64,7 @@ public class ZabbixProtocolHandlerTest extends ZabbixBaseTest {
     @Test
     public void testErrorProtocol() throws Throwable {
         // Simple header
-        for (int i = 1; i < 5; i++) {
+        for (int i = 1; i < 4; i++) {
             assertNeedMoreInput(new byte[i]);
         }
 
@@ -80,10 +75,13 @@ public class ZabbixProtocolHandlerTest extends ZabbixBaseTest {
         assertWriteErrorProtocol(new byte[] {'Z', 'B', 'X', 'D', 2, 0, 0, 0, 0});
         assertWriteErrorProtocol(new byte[] {'Z', 'B', 'X', 'D', 2, 1, 0, 0, 0});
 
+        // Need more content
+        assertNeedMoreInput(new byte[] {'Z', 'B', 'X', 'D', 1, 5, 0, 0, 0, 1, 1, 1});
+
         // Empty data
-        assertWriteErrorProtocol(SocketClient.buildZabbixRequestData(""));
-        assertWriteErrorProtocol(SocketClient.buildZabbixRequestData("{}"));
-        assertWriteErrorProtocol(SocketClient.buildZabbixRequestData("{\"test\": 1}"));
+        assertWriteErrorProtocol(buildZabbixRequestData(""));
+        assertWriteErrorProtocol(buildZabbixRequestData("{}"));
+        assertWriteErrorProtocol(buildZabbixRequestData("{\"test\": 1}"));
     }
 
 }