You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/28 16:02:03 UTC
[incubator-inlong] branch master updated: [INLONG-2371][Bug][InLong-Dataproxy] monitorIndex should not use msgid for key, it affects performance (#2372)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 53c46b0 [INLONG-2371][Bug][InLong-Dataproxy] monitorIndex should not use msgid for key,it affects performance (#2372)
53c46b0 is described below
commit 53c46b01e2a093e08fc1ffc7b33d4707bc496192
Author: baomingyu <ba...@163.com>
AuthorDate: Sat Jan 29 00:01:59 2022 +0800
[INLONG-2371][Bug][InLong-Dataproxy] monitorIndex should not use msgid for key,it affects performance (#2372)
---
.../org/apache/inlong/dataproxy/sink/PulsarSink.java | 13 +++++++------
.../dataproxy/sink/pulsar/PulsarClientService.java | 19 ++++++++++++-------
.../inlong/dataproxy/source/ServerMessageHandler.java | 4 +++-
3 files changed, 22 insertions(+), 14 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index e846265..4b68162 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -202,14 +202,14 @@ public class PulsarSink extends AbstractSink implements Configurable,
/*
* stat pulsar performance
*/
- System.out.println("pulsarPerformanceTask!!!!!!");
+ logger.info("PulsarPerformanceTask!!!!!!");
scheduledExecutorService.scheduleWithFixedDelay(pulsarPerformanceTask, 0L,
PRINT_INTERVAL, TimeUnit.SECONDS);
}
public PulsarSink() {
super();
- logger.debug("new instance of PulsarSink!");
+ logger.info("new instance of PulsarSink!");
}
/**
@@ -312,7 +312,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
if (isNewMetricOn) {
monitorIndex = new MonitorIndex("Pulsar_Sink", statIntervalSec, maxMonitorCnt);
}
- monitorIndexExt = new MonitorIndexExt("Data_proxy_monitors#" + this.getName(),
+ monitorIndexExt = new MonitorIndexExt("Pulsar_Sink_monitors#" + this.getName(),
statIntervalSec, maxMonitorCnt);
}
@@ -471,7 +471,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
newbase.append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR)
.append(streamId).append(SEPARATOR).append(nodeIp)
.append(SEPARATOR).append(NetworkUtils.getLocalIp())
- .append(SEPARATOR).append(msgId).append(SEPARATOR)
+ .append(SEPARATOR).append(SEPARATOR)
.append(event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
long messageSize = event.getBody().length;
@@ -673,8 +673,9 @@ public class PulsarSink extends AbstractSink implements Configurable,
topic = event.getHeaders().get(TOPIC);
}
}
- logger.debug("Event is {}, topic = {} ",event, topic);
-
+ if (logger.isDebugEnabled()) {
+ logger.debug("Event is {}, topic = {} ",event, topic);
+ }
if (event == null) {
continue;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index b988d90..be8abcd 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -41,6 +41,8 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.shade.io.netty.util.NettyRuntime;
+import org.apache.pulsar.shade.io.netty.util.internal.SystemPropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,7 +73,9 @@ public class PulsarClientService {
private static String MAX_BATCHING_MESSAGES = "max_batching_messages";
private static String RETRY_INTERVAL_WHEN_SEND_ERROR_MILL = "retry_interval_when_send_error_ms";
- private static int DEFAULT_PULSAR_IO_THREADS = 1;
+ private static int DEFAULT_PULSAR_IO_THREADS = Math.max(1, SystemPropertyUtil
+ .getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
+
private static int DEFAULT_CONNECTIONS_PRE_BROKER = 1;
private static int DEFAULT_SEND_TIMEOUT_MILL = 30 * 1000;
private static int DEFAULT_CLIENT_TIMEOUT_SECOND = 30;
@@ -194,7 +198,6 @@ public class PulsarClientService {
streamId = event.getHeaders().get(AttributeConstants.INAME);
}
proMap.put(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
- logger.debug("producer send msg!");
TopicProducerInfo forCallBackP = producer;
forCallBackP.getProducer().newMessage().properties(proMap).value(event.getBody())
.sendAsync().thenAccept((msgId) -> {
@@ -222,23 +225,25 @@ public class PulsarClientService {
pulsarClients = new ArrayList<PulsarClient>();
for (int i = 0; i < pulsarServerUrls.length; i++) {
try {
- logger.debug("index = {}, url = {}", i, pulsarServerUrls[i]);
+ if (logger.isDebugEnabled()) {
+ logger.debug("index = {}, url = {}", i, pulsarServerUrls[i]);
+ }
PulsarClient client = initPulsarClient(pulsarServerUrls[i]);
pulsarClients.add(client);
callBack.handleCreateClientSuccess(pulsarServerUrls[i]);
} catch (PulsarClientException e) {
callBack.handleCreateClientException(pulsarServerUrls[i]);
- logger.error("create connnection error in metasink, "
+ logger.error("create connnection error in pulsar sink, "
+ "maybe pulsar master set error, please re-check.url{}, ex1 {}",
pulsarServerUrls[i],
- e.getMessage());
+ e);
} catch (Throwable e) {
callBack.handleCreateClientException(pulsarServerUrls[i]);
- logger.error("create connnection error in metasink, "
+ logger.error("create connnection error in pulsar sink, "
+ "maybe pulsar master set error/shutdown in progress, please "
+ "re-check. url{}, ex2 {}",
pulsarServerUrls[i],
- e.getMessage());
+ e);
}
}
if (pulsarClients.size() == 0) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 5c430d4..baef74b 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -724,7 +724,9 @@ public class ServerMessageHandler extends SimpleChannelHandler {
topic = configManager.getTopicProperties().get(groupId);
}
}
- logger.debug("Get topic by groupId = {} , streamId = {}", groupId, streamId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Get topic by groupId = {} , streamId = {}", groupId, streamId);
+ }
return topic;
}