You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/28 16:02:03 UTC

[incubator-inlong] branch master updated: [INLONG-2371][Bug][InLong-Dataproxy] monitorIndex should not use msgid for key, it affects performance (#2372)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 53c46b0  [INLONG-2371][Bug][InLong-Dataproxy] monitorIndex should not use msgid for key,it affects performance (#2372)
53c46b0 is described below

commit 53c46b01e2a093e08fc1ffc7b33d4707bc496192
Author: baomingyu <ba...@163.com>
AuthorDate: Sat Jan 29 00:01:59 2022 +0800

    [INLONG-2371][Bug][InLong-Dataproxy] monitorIndex should not use msgid for key,it affects performance (#2372)
---
 .../org/apache/inlong/dataproxy/sink/PulsarSink.java  | 13 +++++++------
 .../dataproxy/sink/pulsar/PulsarClientService.java    | 19 ++++++++++++-------
 .../inlong/dataproxy/source/ServerMessageHandler.java |  4 +++-
 3 files changed, 22 insertions(+), 14 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index e846265..4b68162 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -202,14 +202,14 @@ public class PulsarSink extends AbstractSink implements Configurable,
         /*
          * stat pulsar performance
          */
-        System.out.println("pulsarPerformanceTask!!!!!!");
+        logger.info("PulsarPerformanceTask!!!!!!");
         scheduledExecutorService.scheduleWithFixedDelay(pulsarPerformanceTask, 0L,
                 PRINT_INTERVAL, TimeUnit.SECONDS);
     }
 
     public PulsarSink() {
         super();
-        logger.debug("new instance of PulsarSink!");
+        logger.info("new instance of PulsarSink!");
     }
 
     /**
@@ -312,7 +312,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
             if (isNewMetricOn) {
                 monitorIndex = new MonitorIndex("Pulsar_Sink", statIntervalSec, maxMonitorCnt);
             }
-            monitorIndexExt = new MonitorIndexExt("Data_proxy_monitors#" + this.getName(),
+            monitorIndexExt = new MonitorIndexExt("Pulsar_Sink_monitors#" + this.getName(),
                     statIntervalSec, maxMonitorCnt);
         }
 
@@ -471,7 +471,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
                     newbase.append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR)
                             .append(streamId).append(SEPARATOR).append(nodeIp)
                             .append(SEPARATOR).append(NetworkUtils.getLocalIp())
-                            .append(SEPARATOR).append(msgId).append(SEPARATOR)
+                            .append(SEPARATOR).append(SEPARATOR)
                             .append(event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
 
                     long messageSize = event.getBody().length;
@@ -673,8 +673,9 @@ public class PulsarSink extends AbstractSink implements Configurable,
                             topic = event.getHeaders().get(TOPIC);
                         }
                     }
-                    logger.debug("Event is {}, topic = {} ",event, topic);
-
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Event is {}, topic = {} ",event, topic);
+                    }
                     if (event == null) {
                         continue;
                     }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index b988d90..be8abcd 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -41,6 +41,8 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.shade.io.netty.util.NettyRuntime;
+import org.apache.pulsar.shade.io.netty.util.internal.SystemPropertyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,7 +73,9 @@ public class PulsarClientService {
     private static String MAX_BATCHING_MESSAGES = "max_batching_messages";
     private static String RETRY_INTERVAL_WHEN_SEND_ERROR_MILL = "retry_interval_when_send_error_ms";
 
-    private static int DEFAULT_PULSAR_IO_THREADS = 1;
+    private static int DEFAULT_PULSAR_IO_THREADS = Math.max(1, SystemPropertyUtil
+            .getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
+
     private static int DEFAULT_CONNECTIONS_PRE_BROKER = 1;
     private static int DEFAULT_SEND_TIMEOUT_MILL = 30 * 1000;
     private static int DEFAULT_CLIENT_TIMEOUT_SECOND = 30;
@@ -194,7 +198,6 @@ public class PulsarClientService {
             streamId = event.getHeaders().get(AttributeConstants.INAME);
         }
         proMap.put(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
-        logger.debug("producer send msg!");
         TopicProducerInfo forCallBackP = producer;
         forCallBackP.getProducer().newMessage().properties(proMap).value(event.getBody())
                 .sendAsync().thenAccept((msgId) -> {
@@ -222,23 +225,25 @@ public class PulsarClientService {
         pulsarClients = new ArrayList<PulsarClient>();
         for (int i = 0; i < pulsarServerUrls.length; i++) {
             try {
-                logger.debug("index = {}, url = {}", i, pulsarServerUrls[i]);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("index = {}, url = {}", i, pulsarServerUrls[i]);
+                }
                 PulsarClient client = initPulsarClient(pulsarServerUrls[i]);
                 pulsarClients.add(client);
                 callBack.handleCreateClientSuccess(pulsarServerUrls[i]);
             } catch (PulsarClientException e) {
                 callBack.handleCreateClientException(pulsarServerUrls[i]);
-                logger.error("create connnection error in metasink, "
+                logger.error("create connnection error in pulsar sink, "
                         + "maybe pulsar master set error, please re-check.url{}, ex1 {}",
                         pulsarServerUrls[i],
-                        e.getMessage());
+                        e);
             } catch (Throwable e) {
                 callBack.handleCreateClientException(pulsarServerUrls[i]);
-                logger.error("create connnection error in metasink, "
+                logger.error("create connnection error in pulsar sink, "
                                 + "maybe pulsar master set error/shutdown in progress, please "
                                 + "re-check. url{}, ex2 {}",
                         pulsarServerUrls[i],
-                        e.getMessage());
+                        e);
             }
         }
         if (pulsarClients.size() == 0) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 5c430d4..baef74b 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -724,7 +724,9 @@ public class ServerMessageHandler extends SimpleChannelHandler {
                 topic = configManager.getTopicProperties().get(groupId);
             }
         }
-        logger.debug("Get topic by groupId = {} , streamId = {}", groupId, streamId);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Get topic by groupId = {} , streamId = {}", groupId, streamId);
+        }
         return topic;
     }