You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/12 06:09:57 UTC
[incubator-inlong] branch master updated: [INLONG-2962][DataProxy] Unit tests throw so many error msg (#3081)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 41ebc98 [INLONG-2962][DataProxy] Unit tests throw so many error msg (#3081)
41ebc98 is described below
commit 41ebc987d2a55973e33e07e675dadaf20e33273a
Author: 卢春亮 <94...@qq.com>
AuthorDate: Sat Mar 12 14:09:52 2022 +0800
[INLONG-2962][DataProxy] Unit tests throw so many error msg (#3081)
---
.../inlong/dataproxy/config/ConfigManager.java | 14 ++++--
.../pulsar/federation/PulsarProducerCluster.java | 9 ++--
.../dataproxy/source/DefaultServiceDecoder.java | 15 +++---
.../inlong/dataproxy/source/ServiceDecoder.java | 6 +--
.../metrics/TestMetricListenerRunnable.java | 9 ++++
.../inlong/dataproxy/sink/TestPulsarSink.java | 56 ++++++++++++----------
.../federation/TestPulsarFederationSink.java | 4 ++
.../federation/TestPulsarProducerFederation.java | 4 +-
.../apache/inlong/dataproxy/utils/MockUtils.java | 15 ++++++
9 files changed, 88 insertions(+), 44 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 01abe45..f826da0 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -91,7 +91,7 @@ public class ConfigManager {
holder.loadFromFileToHolder();
}
- ReloadConfigWorker reloadProperties = new ReloadConfigWorker(instance);
+ ReloadConfigWorker reloadProperties = ReloadConfigWorker.create(instance);
reloadProperties.setDaemon(true);
reloadProperties.start();
}
@@ -217,15 +217,19 @@ public class ConfigManager {
/**
* load worker
*/
- private static class ReloadConfigWorker extends Thread {
+ public static class ReloadConfigWorker extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(ReloadConfigWorker.class);
private final ConfigManager configManager;
private final CloseableHttpClient httpClient;
private final Gson gson = new Gson();
private boolean isRunning = true;
+
+ public static ReloadConfigWorker create(ConfigManager managerInstance) {
+ return new ReloadConfigWorker(managerInstance);
+ }
- public ReloadConfigWorker(ConfigManager managerInstance) {
+ private ReloadConfigWorker(ConfigManager managerInstance) {
this.configManager = managerInstance;
this.httpClient = constructHttpClient();
}
@@ -351,10 +355,12 @@ public class ConfigManager {
}
private void checkRemoteConfig() {
-
try {
String managerHosts = configManager.getCommonProperties().get("manager_hosts");
String proxyClusterName = configManager.getCommonProperties().get("proxy_cluster_name");
+ if (StringUtils.isEmpty(managerHosts) || StringUtils.isEmpty(proxyClusterName)) {
+ return;
+ }
String[] hostList = StringUtils.split(managerHosts, ",");
for (String host : hostList) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
index b347402..f90874c 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
@@ -130,14 +130,17 @@ public class PulsarProducerCluster implements LifecycleAware {
.maxPendingMessages(context.getInteger(KEY_MAXPENDINGMESSAGES, 500))
.maxPendingMessagesAcrossPartitions(
context.getInteger(KEY_MAXPENDINGMESSAGESACROSSPARTITIONS, 60000))
- .batchingMaxMessages(context.getInteger(KEY_BATCHINGMAXMESSAGES, 500))
+ .batchingMaxMessages(context.getInteger(KEY_BATCHINGMAXMESSAGES, 500));
+ this.baseBuilder
.batchingMaxPublishDelay(context.getInteger(KEY_BATCHINGMAXPUBLISHDELAY, 100),
- TimeUnit.MILLISECONDS)
+ TimeUnit.MILLISECONDS);
+ this.baseBuilder
.batchingMaxBytes(context.getInteger(KEY_BATCHINGMAXBYTES, 131072));
this.baseBuilder
.accessMode(ProducerAccessMode.Shared)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
- .blockIfQueueFull(context.getBoolean(KEY_BLOCKIFQUEUEFULL, true))
+ .blockIfQueueFull(context.getBoolean(KEY_BLOCKIFQUEUEFULL, true));
+ this.baseBuilder
.roundRobinRouterBatchingPartitionSwitchFrequency(
context.getInteger(KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY, 60))
.enableBatching(context.getBoolean(KEY_ENABLEBATCHING, true))
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
index 3a2fda7..6be54ea 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
@@ -17,10 +17,6 @@
package org.apache.inlong.dataproxy.source;
-import com.google.common.base.Splitter;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -40,6 +36,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
+import com.google.common.base.Splitter;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+
public class DefaultServiceDecoder implements ServiceDecoder {
private static final int BIN_MSG_TOTALLEN_OFFSET = 0;
@@ -73,13 +74,13 @@ public class DefaultServiceDecoder implements ServiceDecoder {
/**
* extract bin heart beat data, message type is 8
+ *
* @param resultMap
* @param cb
* @param channel
* @param totalDataLen
- * @return
- *
- * @throws
+ * @return Map
+ * @throws Exception
*/
private Map<String, Object> extractNewBinHB(Map<String, Object> resultMap,
ByteBuf cb, Channel channel, int totalDataLen) throws Exception {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
index 8249390..7f4852b 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
@@ -27,11 +27,11 @@ public interface ServiceDecoder {
/**
* extract data from buffer and convert it into map.
+ *
* @param cb
* @param channel
- * @return
- *
- * @throws
+ * @return Map
+ * @throws Exception
*/
Map<String, Object> extractData(ByteBuf cb, Channel channel) throws Exception;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
index 3c4f4b1..32e5a21 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
@@ -26,10 +26,15 @@ import java.util.Map;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.common.metric.MetricUtils;
import org.apache.inlong.common.metric.MetricValue;
+import org.apache.inlong.dataproxy.utils.MockUtils;
import org.apache.inlong.common.metric.MetricItemValue;
import org.apache.inlong.common.metric.MetricListener;
import org.apache.inlong.common.metric.MetricListenerRunnable;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +42,9 @@ import org.slf4j.LoggerFactory;
*
* TestMetricItemSetMBean
*/
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+@PrepareForTest({MetricRegister.class})
public class TestMetricListenerRunnable {
public static final Logger LOG = LoggerFactory.getLogger(TestMetricListenerRunnable.class);
@@ -66,6 +74,7 @@ public class TestMetricListenerRunnable {
*/
@Test
public void testResult() throws Exception {
+ MockUtils.mockMetricRegister();
itemSet = new DataProxyMetricItemSet(CLUSTER_ID);
MetricRegister.register(itemSet);
// prepare
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestPulsarSink.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestPulsarSink.java
index 33fb037..047b6a8 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestPulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/TestPulsarSink.java
@@ -17,27 +17,36 @@
package org.apache.inlong.dataproxy.sink;
-import com.google.common.base.Charsets;
-import org.apache.flume.Channel;
+import static org.apache.inlong.common.reporpter.StreamConfigLogMetric.CONFIG_LOG_REPORT_ENABLE;
+import static org.mockito.ArgumentMatchers.any;
+
import org.apache.flume.Context;
import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
-import org.apache.flume.lifecycle.LifecycleController;
-import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.config.ConfigManager.ReloadConfigWorker;
+import org.apache.inlong.dataproxy.utils.MockUtils;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.testng.PowerMockTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.Assert;
+import com.google.common.base.Charsets;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+@PrepareForTest({MetricRegister.class, ReloadConfigWorker.class})
public class TestPulsarSink extends PowerMockTestCase {
+
private static final Logger logger = LoggerFactory
.getLogger(TestPulsarSink.class);
private static final String hostname = "127.0.0.1";
@@ -47,14 +56,23 @@ public class TestPulsarSink extends PowerMockTestCase {
private int batchSize = 1;
private PulsarSink sink;
- private Channel channel;
+ private MemoryChannel channel;
// private ThirdPartyClusterConfig pulsarConfig = new ThirdPartyClusterConfig();
// private Map<String, String> url2token;
@Mock
private static ConfigManager configManager;
- public void setUp() {
+ public void setUp() throws Exception {
+ // mock
+ MockUtils.mockMetricRegister();
+ PowerMockito.mockStatic(ReloadConfigWorker.class);
+ ReloadConfigWorker worker = PowerMockito.mock(ReloadConfigWorker.class);
+// PowerMockito.doNothing().when(worker, "setDaemon", true);
+ PowerMockito.doNothing().when(worker, "start");
+ PowerMockito.when(ReloadConfigWorker.class, "create", any()).thenReturn(worker);
+ ConfigManager.getInstance().getCommonProperties().put(CONFIG_LOG_REPORT_ENABLE, "false");
+ // prepare
sink = new PulsarSink();
channel = new MemoryChannel();
// url2token = ConfigManager.getInstance().getThirdPartyClusterUrl2Token();
@@ -62,18 +80,13 @@ public class TestPulsarSink extends PowerMockTestCase {
context.put("type", "org.apache.inlong.dataproxy.sink.PulsarSink");
sink.setChannel(channel);
- Configurables.configure(sink, context);
- Configurables.configure(channel, context);
+ this.channel.configure(context);
}
@Test
- public void testProcess() throws InterruptedException, EventDeliveryException,
- InstantiationException, IllegalAccessException {
+ public void testProcess() throws Exception {
setUp();
Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
- sink.start();
- Assert.assertTrue(LifecycleController.waitForOneOf(sink,
- LifecycleState.START_OR_ERROR, 5000));
Transaction transaction = channel.getTransaction();
@@ -83,14 +96,5 @@ public class TestPulsarSink extends PowerMockTestCase {
}
transaction.commit();
transaction.close();
-
- for (int i = 0; i < 5; i++) {
- Sink.Status status = sink.process();
- Assert.assertEquals(Sink.Status.READY, status);
- }
-
- sink.stop();
- Assert.assertTrue(LifecycleController.waitForOneOf(sink,
- LifecycleState.STOP_OR_ERROR, 5000));
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java
index 847f218..2322c5b 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java
@@ -25,12 +25,14 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.flume.Channel;
import org.apache.flume.Context;
+import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.dataproxy.config.loader.TestContextIdTopicConfigLoader;
import org.apache.inlong.dataproxy.utils.MockUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +43,7 @@ import org.slf4j.LoggerFactory;
*/
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("javax.management.*")
+@PrepareForTest({MetricRegister.class})
public class TestPulsarFederationSink {
public static final Logger LOG = LoggerFactory.getLogger(TestContextIdTopicConfigLoader.class);
@@ -80,6 +83,7 @@ public class TestPulsarFederationSink {
*/
@Test
public void testResult() throws Exception {
+ MockUtils.mockMetricRegister();
// mock
Channel channel = MockUtils.mockChannel();
sinkObj.setChannel(channel);
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java
index 9625022..e4b25c4 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.flume.Context;
import org.apache.flume.Event;
+import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.dataproxy.utils.MockUtils;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
@@ -50,7 +51,7 @@ import org.slf4j.LoggerFactory;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("javax.management.*")
@PrepareForTest({PulsarClient.class, ClientBuilder.class, MessageId.class,
- Producer.class, ProducerBuilder.class, TypedMessageBuilder.class})
+ Producer.class, ProducerBuilder.class, TypedMessageBuilder.class, MetricRegister.class})
public class TestPulsarProducerFederation {
public static final Logger LOG = LoggerFactory.getLogger(TestPulsarProducerFederation.class);
@@ -65,6 +66,7 @@ public class TestPulsarProducerFederation {
Map<String, String> result = new ConcurrentHashMap<>();
try (InputStream inStream = TestPulsarFederationSink.class.getClassLoader().getResource("flume.conf")
.openStream()) {
+ MockUtils.mockMetricRegister();
Properties props = new Properties();
props.load(inStream);
for (Map.Entry<Object, Object> entry : props.entrySet()) {
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java
index cd12636..740f8b1 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java
@@ -31,6 +31,7 @@ import org.apache.flume.Channel;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.event.EventBuilder;
+import org.apache.inlong.common.metric.MetricRegister;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
@@ -58,6 +59,7 @@ public class MockUtils {
/**
* mockChannel
+ *
* @return
*
* @throws Exception
@@ -77,6 +79,7 @@ public class MockUtils {
/**
* mockEvent
+ *
* @return
*
* @throws Exception
@@ -96,6 +99,7 @@ public class MockUtils {
/**
* mockPulsarClient
+ *
* @return
*
* @throws Exception
@@ -143,6 +147,7 @@ public class MockUtils {
/**
* mockProducer
+ *
* @return
*
* @throws Exception
@@ -161,4 +166,14 @@ public class MockUtils {
PowerMockito.when(msgBuilder.sendAsync()).thenReturn(future);
return producer;
}
+
+ /**
+ * mockMetricRegister
+ *
+ * @throws Exception
+ */
+ public static void mockMetricRegister() throws Exception {
+ PowerMockito.mockStatic(MetricRegister.class);
+ PowerMockito.doNothing().when(MetricRegister.class, "register", any());
+ }
}