You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/12 06:09:57 UTC

[incubator-inlong] branch master updated: [INLONG-2962][DataProxy] Unit tests throw so many error msg (#3081)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 41ebc98  [INLONG-2962][DataProxy] Unit tests throw so many error msg (#3081)
41ebc98 is described below

commit 41ebc987d2a55973e33e07e675dadaf20e33273a
Author: 卢春亮 <94...@qq.com>
AuthorDate: Sat Mar 12 14:09:52 2022 +0800

    [INLONG-2962][DataProxy] Unit tests throw so many error msg (#3081)
---
 .../inlong/dataproxy/config/ConfigManager.java     | 14 ++++--
 .../pulsar/federation/PulsarProducerCluster.java   |  9 ++--
 .../dataproxy/source/DefaultServiceDecoder.java    | 15 +++---
 .../inlong/dataproxy/source/ServiceDecoder.java    |  6 +--
 .../metrics/TestMetricListenerRunnable.java        |  9 ++++
 .../inlong/dataproxy/sink/TestPulsarSink.java      | 56 ++++++++++++----------
 .../federation/TestPulsarFederationSink.java       |  4 ++
 .../federation/TestPulsarProducerFederation.java   |  4 +-
 .../apache/inlong/dataproxy/utils/MockUtils.java   | 15 ++++++
 9 files changed, 88 insertions(+), 44 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 01abe45..f826da0 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -91,7 +91,7 @@ public class ConfigManager {
 
                     holder.loadFromFileToHolder();
                 }
-                ReloadConfigWorker reloadProperties = new ReloadConfigWorker(instance);
+                ReloadConfigWorker reloadProperties = ReloadConfigWorker.create(instance);
                 reloadProperties.setDaemon(true);
                 reloadProperties.start();
             }
@@ -217,15 +217,19 @@ public class ConfigManager {
     /**
      * load worker
      */
-    private static class ReloadConfigWorker extends Thread {
+    public static class ReloadConfigWorker extends Thread {
 
         private static final Logger LOG = LoggerFactory.getLogger(ReloadConfigWorker.class);
         private final ConfigManager configManager;
         private final CloseableHttpClient httpClient;
         private final Gson gson = new Gson();
         private boolean isRunning = true;
+        
+        public static ReloadConfigWorker create(ConfigManager managerInstance) {
+            return new ReloadConfigWorker(managerInstance);
+        }
 
-        public ReloadConfigWorker(ConfigManager managerInstance) {
+        private ReloadConfigWorker(ConfigManager managerInstance) {
             this.configManager = managerInstance;
             this.httpClient = constructHttpClient();
         }
@@ -351,10 +355,12 @@ public class ConfigManager {
         }
 
         private void checkRemoteConfig() {
-
             try {
                 String managerHosts = configManager.getCommonProperties().get("manager_hosts");
                 String proxyClusterName = configManager.getCommonProperties().get("proxy_cluster_name");
+                if (StringUtils.isEmpty(managerHosts) || StringUtils.isEmpty(proxyClusterName)) {
+                    return;
+                }
                 String[] hostList = StringUtils.split(managerHosts, ",");
                 for (String host : hostList) {
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
index b347402..f90874c 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
@@ -130,14 +130,17 @@ public class PulsarProducerCluster implements LifecycleAware {
                     .maxPendingMessages(context.getInteger(KEY_MAXPENDINGMESSAGES, 500))
                     .maxPendingMessagesAcrossPartitions(
                             context.getInteger(KEY_MAXPENDINGMESSAGESACROSSPARTITIONS, 60000))
-                    .batchingMaxMessages(context.getInteger(KEY_BATCHINGMAXMESSAGES, 500))
+                    .batchingMaxMessages(context.getInteger(KEY_BATCHINGMAXMESSAGES, 500));
+            this.baseBuilder
                     .batchingMaxPublishDelay(context.getInteger(KEY_BATCHINGMAXPUBLISHDELAY, 100),
-                            TimeUnit.MILLISECONDS)
+                            TimeUnit.MILLISECONDS);
+            this.baseBuilder
                     .batchingMaxBytes(context.getInteger(KEY_BATCHINGMAXBYTES, 131072));
             this.baseBuilder
                     .accessMode(ProducerAccessMode.Shared)
                     .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
-                    .blockIfQueueFull(context.getBoolean(KEY_BLOCKIFQUEUEFULL, true))
+                    .blockIfQueueFull(context.getBoolean(KEY_BLOCKIFQUEUEFULL, true));
+            this.baseBuilder
                     .roundRobinRouterBatchingPartitionSwitchFrequency(
                             context.getInteger(KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY, 60))
                     .enableBatching(context.getBoolean(KEY_ENABLEBATCHING, true))
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
index 3a2fda7..6be54ea 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
@@ -17,10 +17,6 @@
 
 package org.apache.inlong.dataproxy.source;
 
-import com.google.common.base.Splitter;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -40,6 +36,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xerial.snappy.Snappy;
 
+import com.google.common.base.Splitter;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+
 public class DefaultServiceDecoder implements ServiceDecoder {
 
     private static final int BIN_MSG_TOTALLEN_OFFSET = 0;
@@ -73,13 +74,13 @@ public class DefaultServiceDecoder implements ServiceDecoder {
 
     /**
      * extract bin heart beat data, message type is 8
+     * 
      * @param resultMap
      * @param cb
      * @param channel
      * @param totalDataLen
-     * @return
-     *
-     * @throws
+     * @return Map
+     * @throws Exception
      */
     private Map<String, Object> extractNewBinHB(Map<String, Object> resultMap,
             ByteBuf cb, Channel channel, int totalDataLen) throws Exception {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
index 8249390..7f4852b 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
@@ -27,11 +27,11 @@ public interface ServiceDecoder {
 
     /**
      * extract data from buffer and convert it into map.
+     * 
      * @param cb
      * @param channel
-     * @return
-     *
-     * @throws
+     * @return Map
+     * @throws Exception
      */
     Map<String, Object> extractData(ByteBuf cb, Channel channel) throws Exception;
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
index 3c4f4b1..32e5a21 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
@@ -26,10 +26,15 @@ import java.util.Map;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.common.metric.MetricUtils;
 import org.apache.inlong.common.metric.MetricValue;
+import org.apache.inlong.dataproxy.utils.MockUtils;
 import org.apache.inlong.common.metric.MetricItemValue;
 import org.apache.inlong.common.metric.MetricListener;
 import org.apache.inlong.common.metric.MetricListenerRunnable;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,6 +42,9 @@ import org.slf4j.LoggerFactory;
  *
  * TestMetricItemSetMBean
  */
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+@PrepareForTest({MetricRegister.class})
 public class TestMetricListenerRunnable {
 
     public static final Logger LOG = LoggerFactory.getLogger(TestMetricListenerRunnable.class);
@@ -66,6 +74,7 @@ public class TestMetricListenerRunnable {
      */
     @Test
     public void testResult() throws Exception {
+        MockUtils.mockMetricRegister();
         itemSet = new DataProxyMetricItemSet(CLUSTER_ID);
         MetricRegister.register(itemSet);
         // prepare
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestPulsarSink.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestPulsarSink.java
index 33fb037..047b6a8 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestPulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestPulsarSink.java
@@ -17,27 +17,36 @@
 
 package org.apache.inlong.dataproxy.sink;
 
-import com.google.common.base.Charsets;
-import org.apache.flume.Channel;
+import static org.apache.inlong.common.reporpter.StreamConfigLogMetric.CONFIG_LOG_REPORT_ENABLE;
+import static org.mockito.ArgumentMatchers.any;
+
 import org.apache.flume.Context;
 import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
-import org.apache.flume.lifecycle.LifecycleController;
-import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.config.ConfigManager.ReloadConfigWorker;
+import org.apache.inlong.dataproxy.utils.MockUtils;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.Mock;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.modules.testng.PowerMockTestCase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.Assert;
 
+import com.google.common.base.Charsets;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+@PrepareForTest({MetricRegister.class, ReloadConfigWorker.class})
 public class TestPulsarSink extends PowerMockTestCase {
+
     private static final Logger logger = LoggerFactory
             .getLogger(TestPulsarSink.class);
     private static final String hostname = "127.0.0.1";
@@ -47,14 +56,23 @@ public class TestPulsarSink extends PowerMockTestCase {
     private int batchSize = 1;
 
     private PulsarSink sink;
-    private Channel channel;
+    private MemoryChannel channel;
 //    private ThirdPartyClusterConfig pulsarConfig = new ThirdPartyClusterConfig();
 //    private Map<String, String> url2token;
 
     @Mock
     private static ConfigManager configManager;
 
-    public void setUp() {
+    public void setUp() throws Exception {
+        // mock
+        MockUtils.mockMetricRegister();
+        PowerMockito.mockStatic(ReloadConfigWorker.class);
+        ReloadConfigWorker worker = PowerMockito.mock(ReloadConfigWorker.class);
+//        PowerMockito.doNothing().when(worker, "setDaemon", true);
+        PowerMockito.doNothing().when(worker, "start");
+        PowerMockito.when(ReloadConfigWorker.class, "create", any()).thenReturn(worker);
+        ConfigManager.getInstance().getCommonProperties().put(CONFIG_LOG_REPORT_ENABLE, "false");
+        // prepare
         sink = new PulsarSink();
         channel = new MemoryChannel();
 //        url2token = ConfigManager.getInstance().getThirdPartyClusterUrl2Token();
@@ -62,18 +80,13 @@ public class TestPulsarSink extends PowerMockTestCase {
         context.put("type", "org.apache.inlong.dataproxy.sink.PulsarSink");
         sink.setChannel(channel);
 
-        Configurables.configure(sink, context);
-        Configurables.configure(channel, context);
+        this.channel.configure(context);
     }
 
     @Test
-    public void testProcess() throws InterruptedException, EventDeliveryException,
-            InstantiationException, IllegalAccessException {
+    public void testProcess() throws Exception {
         setUp();
         Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
-        sink.start();
-        Assert.assertTrue(LifecycleController.waitForOneOf(sink,
-                LifecycleState.START_OR_ERROR, 5000));
 
         Transaction transaction = channel.getTransaction();
 
@@ -83,14 +96,5 @@ public class TestPulsarSink extends PowerMockTestCase {
         }
         transaction.commit();
         transaction.close();
-
-        for (int i = 0; i < 5; i++) {
-            Sink.Status status = sink.process();
-            Assert.assertEquals(Sink.Status.READY, status);
-        }
-
-        sink.stop();
-        Assert.assertTrue(LifecycleController.waitForOneOf(sink,
-                LifecycleState.STOP_OR_ERROR, 5000));
     }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java
index 847f218..2322c5b 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java
@@ -25,12 +25,14 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
+import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.dataproxy.config.loader.TestContextIdTopicConfigLoader;
 import org.apache.inlong.dataproxy.utils.MockUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,6 +43,7 @@ import org.slf4j.LoggerFactory;
  */
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
+@PrepareForTest({MetricRegister.class})
 public class TestPulsarFederationSink {
 
     public static final Logger LOG = LoggerFactory.getLogger(TestContextIdTopicConfigLoader.class);
@@ -80,6 +83,7 @@ public class TestPulsarFederationSink {
      */
     @Test
     public void testResult() throws Exception {
+        MockUtils.mockMetricRegister();
         // mock
         Channel channel = MockUtils.mockChannel();
         sinkObj.setChannel(channel);
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java
index 9625022..e4b25c4 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.dataproxy.utils.MockUtils;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.MessageId;
@@ -50,7 +51,7 @@ import org.slf4j.LoggerFactory;
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
 @PrepareForTest({PulsarClient.class, ClientBuilder.class, MessageId.class,
-        Producer.class, ProducerBuilder.class, TypedMessageBuilder.class})
+        Producer.class, ProducerBuilder.class, TypedMessageBuilder.class, MetricRegister.class})
 public class TestPulsarProducerFederation {
 
     public static final Logger LOG = LoggerFactory.getLogger(TestPulsarProducerFederation.class);
@@ -65,6 +66,7 @@ public class TestPulsarProducerFederation {
         Map<String, String> result = new ConcurrentHashMap<>();
         try (InputStream inStream = TestPulsarFederationSink.class.getClassLoader().getResource("flume.conf")
                 .openStream()) {
+            MockUtils.mockMetricRegister();
             Properties props = new Properties();
             props.load(inStream);
             for (Map.Entry<Object, Object> entry : props.entrySet()) {
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java
index cd12636..740f8b1 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java
@@ -31,6 +31,7 @@ import org.apache.flume.Channel;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
 import org.apache.flume.event.EventBuilder;
+import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -58,6 +59,7 @@ public class MockUtils {
 
     /**
      * mockChannel
+     * 
      * @return
      *
      * @throws Exception
@@ -77,6 +79,7 @@ public class MockUtils {
 
     /**
      * mockEvent
+     * 
      * @return
      *
      * @throws Exception
@@ -96,6 +99,7 @@ public class MockUtils {
 
     /**
      * mockPulsarClient
+     * 
      * @return
      *
      * @throws Exception
@@ -143,6 +147,7 @@ public class MockUtils {
 
     /**
      * mockProducer
+     * 
      * @return
      *
      * @throws Exception
@@ -161,4 +166,14 @@ public class MockUtils {
         PowerMockito.when(msgBuilder.sendAsync()).thenReturn(future);
         return producer;
     }
+
+    /**
+     * mockMetricRegister
+     * 
+     * @throws Exception
+     */
+    public static void mockMetricRegister() throws Exception {
+        PowerMockito.mockStatic(MetricRegister.class);
+        PowerMockito.doNothing().when(MetricRegister.class, "register", any());
+    }
 }