You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/08 05:03:38 UTC
[incubator-inlong] branch INLONG-1739 updated: [INLONG-1760]
Optimize the realization of class MessageProducerExample (#1762)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-1739
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-1739 by this push:
new 61611cd [INLONG-1760] Optimize the realization of class MessageProducerExample (#1762)
61611cd is described below
commit 61611cd4d8572c7b675363448e6c3defd9aa6db0
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Nov 8 13:03:30 2021 +0800
[INLONG-1760] Optimize the realization of class MessageProducerExample (#1762)
---
.../inlong/tubemq/corebase/utils/MixedUtils.java | 32 ++++
.../tubemq/example/MessageProducerExample.java | 208 ++++++++-------------
.../tubemq/example/MessagePullConsumerExample.java | 8 +-
.../example/MessagePullSetConsumerExample.java | 9 +-
.../tubemq/example/MessagePushConsumerExample.java | 11 +-
...{MsgRecvStats.java => MsgSendReceiveStats.java} | 38 ++--
6 files changed, 154 insertions(+), 152 deletions(-)
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
index eb432d5..18c4ca1 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/MixedUtils.java
@@ -18,7 +18,9 @@
package org.apache.inlong.tubemq.corebase.utils;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.TreeSet;
@@ -78,6 +80,24 @@ public class MixedUtils {
return topicAndFiltersMap;
}
+ // build the topic and filter item pair carried in the message
+ public static List<Tuple2<String, String>> buildTopicFilterTupleList(
+ Map<String, TreeSet<String>> topicAndFiltersMap) {
+ // initial send target
+ List<Tuple2<String, String>> topicFilterTuples = new ArrayList<>();
+ // initial topic send round
+ for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
+ if (entry.getValue().isEmpty()) {
+ topicFilterTuples.add(new Tuple2<>(entry.getKey()));
+ } else {
+ for (String filter : entry.getValue()) {
+ topicFilterTuples.add(new Tuple2<>(entry.getKey(), filter));
+ }
+ }
+ }
+ return topicFilterTuples;
+ }
+
public static byte[] buildTestData(int bodySize) {
final byte[] transmitData =
StringUtils.getBytesUtf8("This is a test data!");
@@ -91,6 +111,18 @@ public class MixedUtils {
return dataBuffer.array();
}
+ public static void coolSending(long msgSentCount) {
+ if (msgSentCount % 5000 == 0) {
+ ThreadUtils.sleep(3000);
+ } else if (msgSentCount % 4000 == 0) {
+ ThreadUtils.sleep(2000);
+ } else if (msgSentCount % 2000 == 0) {
+ ThreadUtils.sleep(800);
+ } else if (msgSentCount % 1000 == 0) {
+ ThreadUtils.sleep(400);
+ }
+ }
+
// get the middle data between min, max, and data
public static int mid(int data, int min, int max) {
return Math.max(min, Math.min(max, data));
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
index 18462c7..bb4de3c 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessageProducerExample.java
@@ -17,17 +17,12 @@
package org.apache.inlong.tubemq.example;
-import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.codec.binary.StringUtils;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
@@ -37,7 +32,6 @@ import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
-import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,144 +46,104 @@ public final class MessageProducerExample {
private static final Logger logger =
LoggerFactory.getLogger(MessageProducerExample.class);
- private static final ConcurrentHashMap<String, AtomicLong> counterMap =
- new ConcurrentHashMap<>();
+ private static final MsgSendReceiveStats msgSendStats =
+ new MsgSendReceiveStats(true);
- private final MessageProducer messageProducer;
- private final MessageSessionFactory messageSessionFactory;
+ private static MessageSessionFactory sessionFactory;
+ private static MessageProducer messageProducer;
- public MessageProducerExample(String masterServers) throws Exception {
- TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
- this.messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
- this.messageProducer = messageSessionFactory.createProducer();
- }
-
- public static void main(String[] args) {
- // get and initial parameters
+ // The 1st parameter masterServers format is master1_ip:port[,master2_ip:port],
+ // the master address(es) to connect to;
+ // The 2nd parameter pubTopicAndFilterItems format is topic_1[[:filterCond_1.1[;filterCond_1.2]][,topic_2]]
+ // the topic(s) (and filter condition set) to publish to.
+ // The 3rd parameter msgCount is the message amount that needs to be sent
+ // The 4th parameter pkgSize is the message's body size that needs to be sent
+ public static void main(String[] args) throws Throwable {
+ // 1. get and initial parameters
final String masterServers = args[0];
- final String topics = args[1];
+ final String pubTopicAndFilterItems = args[1];
final long msgCount = Long.parseLong(args[2]);
- final Map<String, TreeSet<String>> topicAndFiltersMap =
- MixedUtils.parseTopicParam(topics);
- // initial send target
- final List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
- // initial topic send round
- for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
- if (entry.getValue().isEmpty()) {
- topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
- } else {
- for (String filter : entry.getValue()) {
- topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
- }
- }
+ int pkgSize = 1024;
+ if (args.length > 3) {
+ pkgSize = MixedUtils.mid(Integer.parseInt(args[3]), 1, 1024 * 1024);
}
- // initial sent data
- String body = "This is a test message from single-session-factory.";
- byte[] bodyBytes = StringUtils.getBytesUtf8(body);
- final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
- while (dataBuffer.hasRemaining()) {
- int offset = dataBuffer.arrayOffset();
- dataBuffer.put(bodyBytes, offset, Math.min(dataBuffer.remaining(), bodyBytes.length));
- }
- dataBuffer.flip();
- // send messages
- try {
- long sentCount = 0;
- int roundIndex = 0;
- int targetCnt = topicSendRounds.size();
- MessageProducerExample messageProducer =
- new MessageProducerExample(masterServers);
- messageProducer.publishTopics(topicAndFiltersMap.keySet());
- SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
- while (msgCount < 0 || sentCount < msgCount) {
- roundIndex = (int) (sentCount++ % targetCnt);
- Tuple2<String, String> target = topicSendRounds.get(roundIndex);
- Message message = new Message(target.getF0(), dataBuffer.array());
- long currTimeMillis = System.currentTimeMillis();
- message.setAttrKeyVal("index", String.valueOf(sentCount));
- message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
- if (target.getF1() != null) {
- message.putSystemHeader(target.getF1(), sdf.format(new Date(currTimeMillis)));
- }
- try {
- // 1.1 next line sends message synchronously, which is not recommended
- // messageProducer.sendMessage(message);
- // 1.2 send message asynchronously, recommended
- messageProducer.sendMessageAsync(message,
- messageProducer.new DefaultSendCallback());
- } catch (Throwable e1) {
- logger.error("Send Message throw exception ", e1);
- }
- // only for test, delay inflight message's count
- if (sentCount % 20000 == 0) {
- ThreadUtils.sleep(4000);
- } else if (sentCount % 10000 == 0) {
- ThreadUtils.sleep(2000);
- } else if (sentCount % 2500 == 0) {
- ThreadUtils.sleep(300);
- }
- }
- ThreadUtils.sleep(20000);
- for (Map.Entry<String, AtomicLong> entry : counterMap.entrySet()) {
- logger.info(
- "********* Current {} Message sent count is {}",
- entry.getKey(),
- entry.getValue().get()
- );
+ final Map<String, TreeSet<String>> topicAndFiltersMap =
+ MixedUtils.parseTopicParam(pubTopicAndFilterItems);
+
+ // 2. initial configure, session factory object, and producer object
+ TubeClientConfig clientConfig =
+ new TubeClientConfig(masterServers);
+ sessionFactory = new TubeSingleSessionFactory(clientConfig);
+ messageProducer = sessionFactory.createProducer();
+ messageProducer.publish(topicAndFiltersMap.keySet());
+
+ // 3. initial and statistic thread
+ Thread statisticThread =
+ new Thread(msgSendStats, "Sent Statistic Thread");
+ statisticThread.start();
+
+ // 4. build the content of the message to be sent
+ // include message body, attributes, and time information template
+ final byte[] dataBuffer =
+ MixedUtils.buildTestData(pkgSize);
+ List<Tuple2<String, String>> buildTopicFilterTuples =
+ MixedUtils.buildTopicFilterTupleList(topicAndFiltersMap);
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
+
+ // 5. send message to server
+ long sentCount = 0;
+ int roundIndex = 0;
+ int targetCnt = buildTopicFilterTuples.size();
+ while (msgCount < 0 || sentCount < msgCount) {
+ // 5.1 Rotate to get the attribute information to be sent
+ roundIndex = (int) (sentCount++ % targetCnt);
+ Tuple2<String, String> target = buildTopicFilterTuples.get(roundIndex);
+
+ // 5.2 build message to be sent
+ Message message = new Message(target.getF0(), dataBuffer);
+ long currTimeMillis = System.currentTimeMillis();
+ message.setAttrKeyVal("index", String.valueOf(sentCount));
+ message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
+ if (target.getF1() != null) {
+ // 5.2.1 add filter attribute information
+ message.putSystemHeader(target.getF1(), sdf.format(new Date(currTimeMillis)));
}
- } catch (Throwable e) {
- e.printStackTrace();
- }
- }
- public void publishTopics(Set<String> topicSet) throws TubeClientException {
- this.messageProducer.publish(topicSet);
- }
+ // 5.3 send message
+ try {
+ // 5.3.1 Send data asynchronously, recommended
+ messageProducer.sendMessage(message, new DefaultSendCallback());
- /**
- * Send message synchronous.
- */
- public void sendMessage(Message message) {
- // date format is accurate to minute, not to second
- try {
- MessageSentResult result = messageProducer.sendMessage(message);
- if (!result.isSuccess()) {
- logger.error("Sync-send message failed!" + result.getErrMsg());
+ // 5.3.2 Send message synchronous, not recommended
+ // MessageSentResult result = messageProducer.sendMessage(message);
+ // if (!result.isSuccess()) {
+ // logger.error("Sync-send message failed!" + result.getErrMsg());
+ // }
+ } catch (TubeClientException | InterruptedException e) {
+ logger.error("Async-send message failed!", e);
}
- } catch (TubeClientException | InterruptedException e) {
- logger.error("Sync-send message failed!", e);
- }
- }
- /**
- * Send message asynchronous. More efficient and recommended.
- */
- public void sendMessageAsync(Message message, MessageSentCallback callback) {
- try {
- messageProducer.sendMessage(message, callback);
- } catch (TubeClientException | InterruptedException e) {
- logger.error("Async-send message failed!", e);
+ // 5.4 Cool sending
+ // Attention: only used in the test link, to solve the problem of
+ // frequent sending failures caused by insufficient test resources.
+ MixedUtils.coolSending(sentCount);
}
+
+ // 6. Clean up resources and exit the service after the task is completed
+ // Attention: TubeMQ client is suitable for serving as a resident service,
+ // not suitable for creating Producer objects message by message
+ messageProducer.shutdown();
+ sessionFactory.shutdown();
+ msgSendStats.stopStats();
+ logger.info("The message sending task has been completed!");
}
- private class DefaultSendCallback implements MessageSentCallback {
+ private static class DefaultSendCallback implements MessageSentCallback {
@Override
public void onMessageSent(MessageSentResult result) {
if (result.isSuccess()) {
- String topicName = result.getMessage().getTopic();
-
- AtomicLong currCount = counterMap.get(topicName);
- if (currCount == null) {
- AtomicLong tmpCount = new AtomicLong(0);
- currCount = counterMap.putIfAbsent(topicName, tmpCount);
- if (currCount == null) {
- currCount = tmpCount;
- }
- }
- if (currCount.incrementAndGet() % 1000 == 0) {
- logger.info("Send " + topicName + " " + currCount.get() + " message!");
- }
+ msgSendStats.addMsgCount(result.getMessage().getTopic(), 1);
} else {
logger.error("Send message failed!" + result.getErrMsg());
}
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
index 4e9674b..bf51d9e 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullConsumerExample.java
@@ -46,7 +46,8 @@ public final class MessagePullConsumerExample {
private static final Logger logger =
LoggerFactory.getLogger(MessagePullConsumerExample.class);
- private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
+ private static final MsgSendReceiveStats msgRcvStats =
+ new MsgSendReceiveStats(false);
private static PullMessageConsumer pullConsumer;
private static MessageSessionFactory sessionFactory;
@@ -126,7 +127,7 @@ public final class MessagePullConsumerExample {
// 4.2.1 process message if getMessage() return success
List<Message> messageList = csmResult.getMessageList();
if (messageList != null && !messageList.isEmpty()) {
- msgRecvStats.addMsgCount(csmResult.getTopicName(), messageList.size());
+ msgRcvStats.addMsgCount(csmResult.getTopicName(), messageList.size());
}
// 4.2.1.1 confirm consume result
// Notice:
@@ -171,7 +172,7 @@ public final class MessagePullConsumerExample {
// 6. initial and statistic thread
Thread statisticThread =
- new Thread(msgRecvStats, "Sent Statistic Thread");
+ new Thread(msgRcvStats, "Receive Statistic Thread");
statisticThread.start();
// 7. Resource cleanup when exiting the service
@@ -183,6 +184,5 @@ public final class MessagePullConsumerExample {
// 7.3 shutdown statistic thread
// msgRecvStats.stopStats();
}
-
}
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
index 2654e75..2bc7080 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePullSetConsumerExample.java
@@ -51,7 +51,8 @@ public final class MessagePullSetConsumerExample {
private static final Logger logger =
LoggerFactory.getLogger(MessagePullSetConsumerExample.class);
// Statistic object
- private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
+ private static final MsgSendReceiveStats msgRcvStats =
+ new MsgSendReceiveStats(false);
// The map of the master cluster and Multiple session factory
// There may be multiple consumers in the same process and the topic sets subscribed
// by different consumers are in different clusters. In this case,
@@ -82,7 +83,7 @@ public final class MessagePullSetConsumerExample {
// 2. initial and statistic thread
Thread statisticThread =
- new Thread(msgRecvStats, "Sent Statistic Thread");
+ new Thread(msgRcvStats, "Receive Statistic Thread");
statisticThread.start();
// 3. Start the consumer group for the first consumption
@@ -129,9 +130,9 @@ public final class MessagePullSetConsumerExample {
thread.join();
}
- logger.info("The second consumption begin!");
// 4. Start the consumer group for the second consumption
// 4.1 set the boostrap Offset, here we set consumption from 0
+ logger.info("The second consumption begin!");
String sessionKeySec = "test_consume_Second";
int sourceCountSec = 1;
ConcurrentHashMap<String, Long> partOffsetMapSec =
@@ -229,7 +230,7 @@ public final class MessagePullSetConsumerExample {
// 2.1 process message if getMessage() return success
List<Message> messageList = csmResult.getMessageList();
if (messageList != null && !messageList.isEmpty()) {
- msgRecvStats.addMsgCount(csmResult.getTopicName(), messageList.size());
+ msgRcvStats.addMsgCount(csmResult.getTopicName(), messageList.size());
}
// 2.1.2 store the offset of processing message
// the offset returned by GetMessage() represents the initial offset of this request
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
index 0db4acd..94c8d15 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MessagePushConsumerExample.java
@@ -49,7 +49,8 @@ import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
*/
public final class MessagePushConsumerExample {
- private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
+ private static final MsgSendReceiveStats msgRcvStats =
+ new MsgSendReceiveStats(false);
private static MessageSessionFactory sessionFactory;
private static final Map<String, PushMessageConsumer> consumerMap = new HashMap<>();
@@ -99,15 +100,15 @@ public final class MessagePushConsumerExample {
MessageListener messageListener = new DefaultMessageListener(entry.getKey());
consumer.subscribe(entry.getKey(), entry.getValue(), messageListener);
}
- // 3.3. start consumer
+ // 3.3 start consumer
consumer.completeSubscribe();
- // 3.4. store consumer object
+ // 3.4 store consumer object
consumerMap.put(consumer.getConsumerId(), consumer);
}
// 4. initial and statistic thread
Thread statisticThread =
- new Thread(msgRecvStats, "Sent Statistic Thread");
+ new Thread(msgRcvStats, "Receive Statistic Thread");
statisticThread.start();
// 5. Resource cleanup when exiting the service
@@ -136,7 +137,7 @@ public final class MessagePushConsumerExample {
@Override
public void receiveMessages(PeerInfo peerInfo, List<Message> messages) {
if (messages != null && !messages.isEmpty()) {
- msgRecvStats.addMsgCount(this.topic, messages.size());
+ msgRcvStats.addMsgCount(this.topic, messages.size());
}
}
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgSendReceiveStats.java
similarity index 67%
rename from inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java
rename to inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgSendReceiveStats.java
index f418052..cfc8a55 100644
--- a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgRecvStats.java
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/MsgSendReceiveStats.java
@@ -28,17 +28,20 @@ import org.slf4j.LoggerFactory;
/**
* This demo shows how to collect and report message received statistics.
*/
-public class MsgRecvStats implements Runnable {
- private static final Logger logger =
- LoggerFactory.getLogger(MsgRecvStats.class);
- private static final ConcurrentHashMap<String, AtomicLong> counterMap =
- new ConcurrentHashMap<>();
- private static final ConcurrentHashMap<String, AtomicLong> befCountMap =
- new ConcurrentHashMap<>();
- private AtomicBoolean isStarted = new AtomicBoolean(true);
+public class MsgSendReceiveStats implements Runnable {
+ private final boolean isProducer;
+ private static final Logger logger = LoggerFactory.getLogger(MsgSendReceiveStats.class);
+ private static final ConcurrentHashMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>();
+ private static final ConcurrentHashMap<String, AtomicLong> befCountMap = new ConcurrentHashMap<>();
+ private final AtomicBoolean isStarted = new AtomicBoolean(true);
+
+ public MsgSendReceiveStats(boolean isProducer) {
+ this.isProducer = isProducer;
+ }
@Override
public void run() {
+ // Indicator output every 30 seconds
while (isStarted.get()) {
try {
for (Map.Entry<String, AtomicLong> entry : counterMap.entrySet()) {
@@ -51,10 +54,13 @@ public class MsgRecvStats implements Runnable {
befCount = tmpCount;
}
}
- // output received statistic information
- logger.info("********* Current {} Message receive count is {}, dlt is {}",
- new Object[]{entry.getKey(), currCount, (currCount - befCount.get())});
- // archive historical statistic data
+ if (isProducer) {
+ logger.info("********* Current {} Message sent count is {}, dlt is {}",
+ new Object[]{entry.getKey(), currCount, (currCount - befCount.get())});
+ } else {
+ logger.info("********* Current {} Message received count is {}, dlt is {}",
+ new Object[]{entry.getKey(), currCount, (currCount - befCount.get())});
+ }
befCountMap.get(entry.getKey()).set(currCount);
}
} catch (Throwable t) {
@@ -74,6 +80,14 @@ public class MsgRecvStats implements Runnable {
currCount = tmpCount;
}
}
+ // Indicator output every 1000
+ if (currCount.addAndGet(msgCnt) % 1000 == 0) {
+ if (isProducer) {
+ logger.info("Sent " + topicName + " messages:" + currCount.get());
+ } else {
+ logger.info("Received " + topicName + " messages:" + currCount.get());
+ }
+ }
}
}