You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/29 10:07:11 UTC
[incubator-tubemq] 13/49: [TUBEMQ-449]Adjust Example implementation
(#348)
This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 8acad06b615063659219f8097de0478b76a10011
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon Dec 14 18:21:36 2020 +0800
[TUBEMQ-449]Adjust Example implementation (#348)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../org/apache/tubemq/corebase/utils/Tuple2.java | 53 +++++++
.../tubemq/example/MAMessageProducerExample.java | 158 +++++++++++----------
.../tubemq/example/MessageConsumerExample.java | 60 ++++----
.../tubemq/example/MessageProducerExample.java | 131 +++++++++--------
.../tubemq/example/MessagePullConsumerExample.java | 63 ++++----
.../example/MessagePullSetConsumerExample.java | 62 ++++----
.../tubemq/server/tools/cli/CliProducer.java | 30 ++--
7 files changed, 292 insertions(+), 265 deletions(-)
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java
new file mode 100644
index 0000000..f5626f8
--- /dev/null
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.corebase.utils;
+
+public class Tuple2<T0, T1> {
+
+ /** Field 0 of the tuple. */
+ public T0 f0 = null;
+ /** Field 1 of the tuple. */
+ public T1 f1 = null;
+
+ /**
+ * Creates a new tuple where all fields are null.
+ */
+ public Tuple2() {
+
+ }
+
+ /**
+ * Creates a new tuple with field 0 specified.
+ *
+ * @param value0 The value for field 0
+ */
+ public Tuple2(T0 value0) {
+ this.f0 = value0;
+ }
+
+ /**
+ * Creates a new tuple and assigns the given values to the tuple's fields.
+ *
+ * @param value0 The value for field 0
+ * @param value1 The value for field 1
+ */
+ public Tuple2(T0 value0, T1 value1) {
+ this.f0 = value0;
+ this.f1 = value1;
+ }
+}
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
index f76b077..ea546c9 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
@@ -20,12 +20,10 @@ package org.apache.tubemq.example;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -41,8 +39,9 @@ import org.apache.tubemq.client.producer.MessageProducer;
import org.apache.tubemq.client.producer.MessageSentCallback;
import org.apache.tubemq.client.producer.MessageSentResult;
import org.apache.tubemq.corebase.Message;
-import org.apache.tubemq.corebase.TErrCodeConstants;
+import org.apache.tubemq.corebase.utils.MixedUtils;
import org.apache.tubemq.corebase.utils.ThreadUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,19 +51,24 @@ import org.slf4j.LoggerFactory;
* to improve throughput from client to broker.
*/
public class MAMessageProducerExample {
- private static final Logger logger = LoggerFactory.getLogger(MAMessageProducerExample.class);
+ private static final Logger logger =
+ LoggerFactory.getLogger(MAMessageProducerExample.class);
+ private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
private static final AtomicLong SENT_SUCC_COUNTER = new AtomicLong(0);
+ private static final AtomicLong SENT_FAIL_COUNTER = new AtomicLong(0);
+ private static final AtomicLong SENT_EXCEPT_COUNTER = new AtomicLong(0);
+
private static final List<MessageProducer> PRODUCER_LIST = new ArrayList<>();
private static final int MAX_PRODUCER_NUM = 100;
private static final int SESSION_FACTORY_NUM = 10;
- private static Set<String> topicSet;
+ private static Map<String, TreeSet<String>> topicAndFiltersMap;
+ private static List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
private static int msgCount;
- private static int producerCount;
+ private static int clientCount;
private static byte[] sendData;
+ private static AtomicLong filterMsgCount = new AtomicLong(0);
- private final String[] arrayKey = {"aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh"};
- private final Set<String> filters = new TreeSet<>();
private final Map<MessageProducer, Sender> producerMap = new HashMap<>();
private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>();
private final ExecutorService sendExecutorService =
@@ -76,13 +80,9 @@ public class MAMessageProducerExample {
});
private final AtomicInteger producerIndex = new AtomicInteger(0);
- private int keyCount = 0;
- private int sentCount = 0;
- public MAMessageProducerExample(String masterHostAndPort) throws Exception {
- this.filters.add("aaa");
- this.filters.add("bbb");
+ public MAMessageProducerExample(String masterHostAndPort) throws Exception {
TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
for (int i = 0; i < SESSION_FACTORY_NUM; i++) {
this.sessionFactoryList.add(new TubeMultiSessionFactory(clientConfig));
@@ -90,40 +90,51 @@ public class MAMessageProducerExample {
}
public static void main(String[] args) {
- final String masterHostAndPort = args[0];
-
+ // get call parameters
+ final String masterServers = args[0];
final String topics = args[1];
- final List<String> topicList = Arrays.asList(topics.split(","));
-
- topicSet = new TreeSet<>(topicList);
-
msgCount = Integer.parseInt(args[2]);
- producerCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM);
-
- logger.info("MAMessageProducerExample.main started...");
-
- final byte[] transmitData = StringUtils.getBytesUtf8("This is a test message from multi-session factory.");
+ clientCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM);
+ topicAndFiltersMap = MixedUtils.parseTopicParam(topics);
+ // initial topic send round
+ for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
+ if (entry.getValue().isEmpty()) {
+ topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
+ } else {
+ for (String filter : entry.getValue()) {
+ topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
+ }
+ }
+ }
+ // build message's body content
+ final byte[] transmitData =
+ StringUtils.getBytesUtf8("This is a test message from multi-session factory.");
final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
-
while (dataBuffer.hasRemaining()) {
int offset = dataBuffer.arrayOffset();
- dataBuffer.put(transmitData, offset, Math.min(dataBuffer.remaining(), transmitData.length));
+ dataBuffer.put(transmitData, offset,
+ Math.min(dataBuffer.remaining(), transmitData.length));
}
-
dataBuffer.flip();
sendData = dataBuffer.array();
+ // print started log
+ logger.info("MAMessageProducerExample.main started...");
try {
- MAMessageProducerExample messageProducer = new MAMessageProducerExample(masterHostAndPort);
-
+ // initial producer objects
+ MAMessageProducerExample messageProducer =
+ new MAMessageProducerExample(masterServers);
messageProducer.startService();
-
- while (SENT_SUCC_COUNTER.get() < msgCount * producerCount * topicSet.size()) {
- Thread.sleep(1000);
+ // wait util sent message's count reachs required count
+ while (TOTAL_COUNTER.get() < msgCount * clientCount) {
+ logger.info("Sending, total messages is {}, filter messages is {}",
+ SENT_SUCC_COUNTER.get(), filterMsgCount.get());
+ Thread.sleep(5000);
}
+ logger.info("Finished, total messages is {}, filter messages is {}",
+ SENT_SUCC_COUNTER.get(), filterMsgCount.get());
messageProducer.producerMap.clear();
messageProducer.shutdown();
-
} catch (TubeClientException e) {
logger.error("TubeClientException: ", e);
} catch (Throwable e) {
@@ -137,7 +148,7 @@ public class MAMessageProducerExample {
}
private void startService() throws TubeClientException {
- for (int i = 0; i < producerCount; i++) {
+ for (int i = 0; i < clientCount; i++) {
PRODUCER_LIST.add(createProducer());
}
@@ -169,44 +180,44 @@ public class MAMessageProducerExample {
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
try {
- producer.publish(topicSet);
+ producer.publish(topicAndFiltersMap.keySet());
} catch (Throwable t) {
logger.error("publish exception: ", t);
}
- for (int i = 0; i < msgCount; i++) {
+ long sentCount = 0;
+ int roundIndex = 0;
+ int targetCnt = topicSendRounds.size();
+ while (msgCount < 0 || sentCount < msgCount) {
long millis = System.currentTimeMillis();
- for (String topic : topicSet) {
- try {
- Message message = new Message(topic, sendData);
- message.setAttrKeyVal("index", String.valueOf(1));
- message.setAttrKeyVal("dataTime", String.valueOf(millis));
-
- String keyCode = arrayKey[sentCount++ % arrayKey.length];
-
- // date format is accurate to minute, not to second
- message.putSystemHeader(keyCode, sdf.format(new Date(millis)));
- if (filters.contains(keyCode)) {
- keyCount++;
- }
-
- // next line sends message synchronously, which is not recommended
- //producer.sendMessage(message);
-
- // send message asynchronously, recommended
- producer.sendMessage(message, new DefaultSendCallback());
- } catch (Throwable e1) {
- logger.error("sendMessage exception: ", e1);
- }
-
- if (i % 5000 == 0) {
- ThreadUtils.sleep(3000);
- } else if (i % 4000 == 0) {
- ThreadUtils.sleep(2000);
- } else if (i % 2000 == 0) {
- ThreadUtils.sleep(800);
- } else if (i % 1000 == 0) {
- ThreadUtils.sleep(400);
- }
+ roundIndex = (int) (sentCount++ % targetCnt);
+ Tuple2<String, String> target = topicSendRounds.get(roundIndex);
+ Message message = new Message(target.f0, sendData);
+ message.setAttrKeyVal("index", String.valueOf(sentCount));
+ message.setAttrKeyVal("dataTime", String.valueOf(millis));
+ if (target.f1 != null) {
+ filterMsgCount.incrementAndGet();
+ message.putSystemHeader(target.f1, sdf.format(new Date(millis)));
+ }
+ try {
+ // next line sends message synchronously, which is not recommended
+ //producer.sendMessage(message);
+ // send message asynchronously, recommended
+ producer.sendMessage(message, new DefaultSendCallback());
+ } catch (Throwable e1) {
+ TOTAL_COUNTER.incrementAndGet();
+ SENT_EXCEPT_COUNTER.incrementAndGet();
+ logger.error("sendMessage exception: ", e1);
+ }
+ TOTAL_COUNTER.incrementAndGet();
+ // only for test, delay inflight message's count
+ if (sentCount % 5000 == 0) {
+ ThreadUtils.sleep(3000);
+ } else if (sentCount % 4000 == 0) {
+ ThreadUtils.sleep(2000);
+ } else if (sentCount % 2000 == 0) {
+ ThreadUtils.sleep(800);
+ } else if (sentCount % 1000 == 0) {
+ ThreadUtils.sleep(400);
}
}
try {
@@ -221,19 +232,18 @@ public class MAMessageProducerExample {
private class DefaultSendCallback implements MessageSentCallback {
@Override
public void onMessageSent(MessageSentResult result) {
+ TOTAL_COUNTER.incrementAndGet();
if (result.isSuccess()) {
- if (SENT_SUCC_COUNTER.incrementAndGet() % 1000 == 0) {
- logger.info("Send {} message, keyCount is {}", SENT_SUCC_COUNTER.get(), keyCount);
- }
+ SENT_SUCC_COUNTER.incrementAndGet();
} else {
- if (result.getErrCode() != TErrCodeConstants.SERVER_RECEIVE_OVERFLOW) {
- logger.error("Send message failed!" + result.getErrMsg());
- }
+ SENT_FAIL_COUNTER.incrementAndGet();
}
}
@Override
public void onException(Throwable e) {
+ TOTAL_COUNTER.incrementAndGet();
+ SENT_EXCEPT_COUNTER.incrementAndGet();
logger.error("Send message error!", e);
}
}
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
index 0252d16..866be77 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
@@ -17,8 +17,6 @@
package org.apache.tubemq.example;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
@@ -35,6 +33,7 @@ import org.apache.tubemq.client.exception.TubeClientException;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;
+import org.apache.tubemq.corebase.utils.MixedUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,59 +54,48 @@ import org.slf4j.LoggerFactory;
*/
public final class MessageConsumerExample {
- private static final Logger logger = LoggerFactory.getLogger(MessageConsumerExample.class);
+ private static final Logger logger =
+ LoggerFactory.getLogger(MessageConsumerExample.class);
private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
private final PushMessageConsumer messageConsumer;
private final MessageSessionFactory messageSessionFactory;
- public MessageConsumerExample(String masterHostAndPort, String group, int fetchCount) throws Exception {
+ public MessageConsumerExample(String masterHostAndPort,
+ String group, int fetchThreadCnt) throws Exception {
ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
- if (fetchCount > 0) {
- consumerConfig.setPushFetchThreadCnt(fetchCount);
+ if (fetchThreadCnt > 0) {
+ consumerConfig.setPushFetchThreadCnt(fetchThreadCnt);
}
this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
this.messageConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
}
+
public static void main(String[] args) {
- final String masterHostAndPort = args[0];
+ final String masterServers = args[0];
final String topics = args[1];
final String group = args[2];
- final int consumerCount = Integer.parseInt(args[3]);
- int fetchCount = -1;
+ final int clientCount = Integer.parseInt(args[3]);
+ int threadCnt = -1;
if (args.length > 5) {
- fetchCount = Integer.parseInt(args[4]);
- }
- final Map<String, TreeSet<String>> topicTidsMap = new HashMap<>();
-
- String[] topicTidsList = topics.split(",");
- for (String topicTids : topicTidsList) {
- String[] topicTidStr = topicTids.split(":");
- TreeSet<String> tids = null;
- if (topicTidStr.length > 1) {
- String tidsStr = topicTidStr[1];
- String[] tidsSet = tidsStr.split(";");
- if (tidsSet.length > 0) {
- tids = new TreeSet<>(Arrays.asList(tidsSet));
- }
- }
- topicTidsMap.put(topicTidStr[0], tids);
+ threadCnt = Integer.parseInt(args[4]);
}
- final int startFetchCount = fetchCount;
- final ExecutorService executorService = Executors.newCachedThreadPool();
- for (int i = 0; i < consumerCount; i++) {
+ final int fetchThreadCnt = threadCnt;
+ final Map<String, TreeSet<String>> topicAndFiltersMap =
+ MixedUtils.parseTopicParam(topics);
+ final ExecutorService executorService =
+ Executors.newCachedThreadPool();
+ for (int i = 0; i < clientCount; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
try {
- MessageConsumerExample messageConsumer = new MessageConsumerExample(
- masterHostAndPort,
- group,
- startFetchCount
- );
- messageConsumer.subscribe(topicTidsMap);
+ MessageConsumerExample messageConsumer =
+ new MessageConsumerExample(masterServers,
+ group, fetchThreadCnt);
+ messageConsumer.subscribe(topicAndFiltersMap);
} catch (Exception e) {
logger.error("Create consumer failed!", e);
}
@@ -126,8 +114,8 @@ public final class MessageConsumerExample {
msgRecvStats.stopStats();
}
- public void subscribe(Map<String, TreeSet<String>> topicTidsMap) throws TubeClientException {
- for (Map.Entry<String, TreeSet<String>> entry : topicTidsMap.entrySet()) {
+ public void subscribe(Map<String, TreeSet<String>> topicAndFiltersMap) throws TubeClientException {
+ for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
MessageV2Listener messageListener = new DefaultMessageListener(entry.getKey());
messageConsumer.subscribe(entry.getKey(), entry.getValue(), messageListener);
}
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java
index 57d0ef0..7c29a63 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java
@@ -19,7 +19,7 @@ package org.apache.tubemq.example;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -36,7 +36,9 @@ import org.apache.tubemq.client.producer.MessageProducer;
import org.apache.tubemq.client.producer.MessageSentCallback;
import org.apache.tubemq.client.producer.MessageSentResult;
import org.apache.tubemq.corebase.Message;
+import org.apache.tubemq.corebase.utils.MixedUtils;
import org.apache.tubemq.corebase.utils.ThreadUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,32 +50,40 @@ import org.slf4j.LoggerFactory;
*/
public final class MessageProducerExample {
- private static final Logger logger = LoggerFactory.getLogger(MessageProducerExample.class);
- private static final ConcurrentHashMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>();
+ private static final Logger logger =
+ LoggerFactory.getLogger(MessageProducerExample.class);
+ private static final ConcurrentHashMap<String, AtomicLong> counterMap =
+ new ConcurrentHashMap<>();
- private final String[] arrayKey = {"aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh"};
- private final Set<String> filters = new TreeSet<>();
private final MessageProducer messageProducer;
private final MessageSessionFactory messageSessionFactory;
- private int keyCount = 0;
- private int sentCount = 0;
-
- public MessageProducerExample(String masterHostAndPort) throws Exception {
- filters.add("aaa");
- filters.add("bbb");
-
- TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
+ public MessageProducerExample(String masterServers) throws Exception {
+ TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
this.messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
this.messageProducer = messageSessionFactory.createProducer();
}
public static void main(String[] args) {
- final String masterHostAndPort = args[0];
+ // get and initial parameters
+ final String masterServers = args[0];
final String topics = args[1];
- final List<String> topicList = Arrays.asList(topics.split(","));
- final int count = Integer.parseInt(args[2]);
-
+ final long msgCount = Long.parseLong(args[2]);
+ final Map<String, TreeSet<String>> topicAndFiltersMap =
+ MixedUtils.parseTopicParam(topics);
+ // initial send target
+ final List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
+ // initial topic send round
+ for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
+ if (entry.getValue().isEmpty()) {
+ topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
+ } else {
+ for (String filter : entry.getValue()) {
+ topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
+ }
+ }
+ }
+ // initial sent data
String body = "This is a test message from single-session-factory.";
byte[] bodyBytes = StringUtils.getBytesUtf8(body);
final ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
@@ -82,32 +92,40 @@ public final class MessageProducerExample {
dataBuffer.put(bodyBytes, offset, Math.min(dataBuffer.remaining(), bodyBytes.length));
}
dataBuffer.flip();
+ // send messages
try {
- MessageProducerExample messageProducer = new MessageProducerExample(masterHostAndPort);
- messageProducer.publishTopics(topicList);
- for (int i = 0; i < count; i++) {
- for (String topic : topicList) {
- try {
- // next line sends message synchronously, which is not recommended
- // messageProducer.sendMessage(topic, body.getBytes());
-
- // send message asynchronously, recommended
- messageProducer.sendMessageAsync(
- i,
- topic,
- dataBuffer.array(),
- messageProducer.new DefaultSendCallback()
- );
- } catch (Throwable e1) {
- logger.error("Send Message throw exception ", e1);
- }
+ long sentCount = 0;
+ int roundIndex = 0;
+ int targetCnt = topicSendRounds.size();
+ MessageProducerExample messageProducer =
+ new MessageProducerExample(masterServers);
+ messageProducer.publishTopics(topicAndFiltersMap.keySet());
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
+ while (msgCount < 0 || sentCount < msgCount) {
+ roundIndex = (int) (sentCount++ % targetCnt);
+ Tuple2<String, String> target = topicSendRounds.get(roundIndex);
+ Message message = new Message(target.f0, body.getBytes());
+ long currTimeMillis = System.currentTimeMillis();
+ message.setAttrKeyVal("index", String.valueOf(sentCount));
+ message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
+ if (target.f1 != null) {
+ message.putSystemHeader(target.f1, sdf.format(new Date(currTimeMillis)));
}
-
- if (i % 20000 == 0) {
+ try {
+ // 1.1 next line sends message synchronously, which is not recommended
+ // messageProducer.sendMessage(message);
+ // 1.2 send message asynchronously, recommended
+ messageProducer.sendMessageAsync(message,
+ messageProducer.new DefaultSendCallback());
+ } catch (Throwable e1) {
+ logger.error("Send Message throw exception ", e1);
+ }
+ // only for test, delay inflight message's count
+ if (sentCount % 20000 == 0) {
ThreadUtils.sleep(4000);
- } else if (i % 10000 == 0) {
+ } else if (sentCount % 10000 == 0) {
ThreadUtils.sleep(2000);
- } else if (i % 2500 == 0) {
+ } else if (sentCount % 2500 == 0) {
ThreadUtils.sleep(300);
}
}
@@ -124,51 +142,33 @@ public final class MessageProducerExample {
}
}
- public void publishTopics(List<String> topicList) throws TubeClientException {
- this.messageProducer.publish(new TreeSet<>(topicList));
+ public void publishTopics(Set<String> topicSet) throws TubeClientException {
+ this.messageProducer.publish(topicSet);
}
/**
* Send message synchronous.
*/
- public void sendMessage(String topic, byte[] body) {
+ public void sendMessage(Message message) {
// date format is accurate to minute, not to second
- SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
- long currTimeMillis = System.currentTimeMillis();
- Message message = new Message(topic, body);
- message.setAttrKeyVal("index", String.valueOf(1));
- message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
- message.putSystemHeader("test", sdf.format(new Date(currTimeMillis)));
try {
MessageSentResult result = messageProducer.sendMessage(message);
if (!result.isSuccess()) {
- logger.error("Send message failed!" + result.getErrMsg());
+ logger.error("Sync-send message failed!" + result.getErrMsg());
}
} catch (TubeClientException | InterruptedException e) {
- logger.error("Send message failed!", e);
+ logger.error("Sync-send message failed!", e);
}
}
/**
* Send message asynchronous. More efficient and recommended.
*/
- public void sendMessageAsync(int id, String topic, byte[] body, MessageSentCallback callback) {
- Message message = new Message(topic, body);
-
- // date format is accurate to minute, not to second
- SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
- long currTimeMillis = System.currentTimeMillis();
- message.setAttrKeyVal("index", String.valueOf(1));
- String keyCode = arrayKey[sentCount++ % arrayKey.length];
- message.putSystemHeader(keyCode, sdf.format(new Date(currTimeMillis)));
- if (filters.contains(keyCode)) {
- keyCount++;
- }
+ public void sendMessageAsync(Message message, MessageSentCallback callback) {
try {
- message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
messageProducer.sendMessage(message, callback);
} catch (TubeClientException | InterruptedException e) {
- logger.error("Send message failed!", e);
+ logger.error("Async-send message failed!", e);
}
}
@@ -187,9 +187,8 @@ public final class MessageProducerExample {
currCount = tmpCount;
}
}
-
if (currCount.incrementAndGet() % 1000 == 0) {
- logger.info("Send " + topicName + " " + currCount.get() + " message, keyCount is " + keyCount);
+ logger.info("Send " + topicName + " " + currCount.get() + " message!");
}
} else {
logger.error("Send message failed!" + result.getErrMsg());
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
index 16c45b2..36fcaa2 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
@@ -17,9 +17,10 @@
package org.apache.tubemq.example;
-import java.util.Arrays;
+import static org.apache.tubemq.corebase.TErrCodeConstants.IGNORE_ERROR_SET;
import java.util.List;
import java.util.Map;
+import java.util.TreeSet;
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumeOffsetInfo;
import org.apache.tubemq.client.consumer.ConsumePosition;
@@ -29,9 +30,11 @@ import org.apache.tubemq.client.exception.TubeClientException;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;
+import org.apache.tubemq.corebase.utils.MixedUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* This demo shows how to consume message by pull.
*
@@ -42,7 +45,8 @@ import org.slf4j.LoggerFactory;
*/
public final class MessagePullConsumerExample {
- private static final Logger logger = LoggerFactory.getLogger(MessagePullConsumerExample.class);
+ private static final Logger logger =
+ LoggerFactory.getLogger(MessagePullConsumerExample.class);
private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
private final PullMessageConsumer messagePullConsumer;
@@ -56,39 +60,37 @@ public final class MessagePullConsumerExample {
}
public static void main(String[] args) throws Throwable {
- final String masterHostAndPort = args[0];
+ // get and initial parameters
+ final String masterServers = args[0];
final String topics = args[1];
final String group = args[2];
- final int consumeCount = Integer.parseInt(args[3]);
-
- final MessagePullConsumerExample messageConsumer = new MessagePullConsumerExample(
- masterHostAndPort,
- group
- );
-
- final List<String> topicList = Arrays.asList(topics.split(","));
- messageConsumer.subscribe(topicList);
- long startTime = System.currentTimeMillis();
-
+ final int msgCount = Integer.parseInt(args[3]);
+ final Map<String, TreeSet<String>> topicAndFiltersMap =
+ MixedUtils.parseTopicParam(topics);
+ // initial consumer object
+ final MessagePullConsumerExample messageConsumer =
+ new MessagePullConsumerExample(masterServers, group);
+ messageConsumer.subscribe(topicAndFiltersMap);
Thread[] fetchRunners = new Thread[3];
for (int i = 0; i < fetchRunners.length; i++) {
- fetchRunners[i] = new Thread(new FetchRequestRunner(messageConsumer, consumeCount));
+ fetchRunners[i] = new Thread(new FetchRequestRunner(messageConsumer, msgCount));
fetchRunners[i].setName("_fetch_runner_" + i);
}
-
+ // initial fetch threads
for (Thread thread : fetchRunners) {
thread.start();
}
-
- Thread statisticThread = new Thread(msgRecvStats, "Sent Statistic Thread");
+ // initial statistic thread
+ Thread statisticThread =
+ new Thread(msgRecvStats, "Sent Statistic Thread");
statisticThread.start();
}
- public void subscribe(List<String> topicList) throws TubeClientException {
- for (String topic : topicList) {
- messagePullConsumer.subscribe(topic, null);
+ public void subscribe(
+ Map<String, TreeSet<String>> topicAndFiltersMap) throws TubeClientException {
+ for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
+ messagePullConsumer.subscribe(entry.getKey(), entry.getValue());
}
-
messagePullConsumer.completeSubscribe();
}
@@ -96,7 +98,8 @@ public final class MessagePullConsumerExample {
return messagePullConsumer.getMessage();
}
- public ConsumerResult confirmConsume(final String confirmContext, boolean isConsumed) throws TubeClientException {
+ public ConsumerResult confirmConsume(final String confirmContext,
+ boolean isConsumed) throws TubeClientException {
return messagePullConsumer.confirmConsume(confirmContext, isConsumed);
}
@@ -109,9 +112,9 @@ public final class MessagePullConsumerExample {
final MessagePullConsumerExample messageConsumer;
final int consumeCount;
- FetchRequestRunner(final MessagePullConsumerExample messageConsumer, int count) {
+ FetchRequestRunner(final MessagePullConsumerExample messageConsumer, int msgCount) {
this.messageConsumer = messageConsumer;
- this.consumeCount = count;
+ this.consumeCount = msgCount;
}
@Override
@@ -127,16 +130,10 @@ public final class MessagePullConsumerExample {
}
messageConsumer.confirmConsume(result.getConfirmContext(), true);
} else {
- if (!(result.getErrCode() == 400
- || result.getErrCode() == 404
- || result.getErrCode() == 405
- || result.getErrCode() == 406
- || result.getErrCode() == 407
- || result.getErrCode() == 408)) {
+ if (!IGNORE_ERROR_SET.contains(result.getErrCode())) {
logger.info(
"Receive messages errorCode is {}, Error message is {}",
- result.getErrCode(),
- result.getErrMsg());
+ result.getErrCode(), result.getErrMsg());
}
}
if (consumeCount > 0) {
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java
index d3afeaa..3d49478 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java
@@ -17,7 +17,7 @@
package org.apache.tubemq.example;
-import java.util.Arrays;
+import static org.apache.tubemq.corebase.TErrCodeConstants.IGNORE_ERROR_SET;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
@@ -34,9 +34,11 @@ import org.apache.tubemq.client.exception.TubeClientException;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;
+import org.apache.tubemq.corebase.utils.MixedUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* This demo shows how to reset offset on consuming. The main difference from {@link MessagePullConsumerExample}
* is that we call {@link PullMessageConsumer#completeSubscribe(String, int, boolean, Map)} instead of
@@ -45,24 +47,32 @@ import org.slf4j.LoggerFactory;
*/
public final class MessagePullSetConsumerExample {
- private static final Logger logger = LoggerFactory.getLogger(MessagePullSetConsumerExample.class);
+ private static final Logger logger =
+ LoggerFactory.getLogger(MessagePullSetConsumerExample.class);
private static final AtomicLong counter = new AtomicLong(0);
private final PullMessageConsumer messagePullConsumer;
private final MessageSessionFactory messageSessionFactory;
- public MessagePullSetConsumerExample(String masterHostAndPort, String group) throws Exception {
+ public MessagePullSetConsumerExample(String masterHostAndPort,
+ String group) throws Exception {
ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
this.messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
}
public static void main(String[] args) {
- final String masterHostAndPort = args[0];
+ // get and initial parameters
+ final String masterServers = args[0];
final String topics = args[1];
final String group = args[2];
- final int consumeCount = Integer.parseInt(args[3]);
- final Map<String, Long> partOffsetMap = new ConcurrentHashMap<>();
+ final int msgCount = Integer.parseInt(args[3]);
+ final Map<String, TreeSet<String>> topicAndFiltersMap =
+ MixedUtils.parseTopicParam(topics);
+ // initial reset offset parameters
+ // (The offset specified is only a demo)
+ final Map<String, Long> partOffsetMap =
+ new ConcurrentHashMap<>();
partOffsetMap.put("123:test_1:0", 0L);
partOffsetMap.put("123:test_1:1", 0L);
partOffsetMap.put("123:test_1:2", 0L);
@@ -70,17 +80,15 @@ public final class MessagePullSetConsumerExample {
partOffsetMap.put("123:test_2:1", 350L);
partOffsetMap.put("123:test_2:2", 350L);
- final List<String> topicList = Arrays.asList(topics.split(","));
-
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(new Runnable() {
@Override
public void run() {
try {
- int getCount = consumeCount;
+ int getCount = msgCount;
MessagePullSetConsumerExample messageConsumer =
- new MessagePullSetConsumerExample(masterHostAndPort, group);
- messageConsumer.subscribe(topicList, partOffsetMap);
+ new MessagePullSetConsumerExample(masterServers, group);
+ messageConsumer.subscribe(topicAndFiltersMap, partOffsetMap);
// main logic of consuming
do {
ConsumerResult result = messageConsumer.getMessage();
@@ -125,19 +133,13 @@ public final class MessagePullSetConsumerExample {
confirmResult.getErrMsg());
}
} else {
- if (!(result.getErrCode() == 400
- || result.getErrCode() == 404
- || result.getErrCode() == 405
- || result.getErrCode() == 406
- || result.getErrCode() == 407
- || result.getErrCode() == 408)) {
+ if (!IGNORE_ERROR_SET.contains(result.getErrCode())) {
logger.info(
"Receive messages errorCode is {}, Error message is {}",
- result.getErrCode(),
- result.getErrMsg());
+ result.getErrCode(), result.getErrMsg());
}
}
- if (consumeCount >= 0) {
+ if (msgCount >= 0) {
if (--getCount <= 0) {
break;
}
@@ -157,24 +159,16 @@ public final class MessagePullSetConsumerExample {
}
}
- public void subscribe(
- List<String> topicList,
- Map<String, Long> partOffsetMap
- ) throws TubeClientException {
- TreeSet<String> filters = new TreeSet<>();
- filters.add("aaa");
- filters.add("bbb");
- for (String topic : topicList) {
- this.messagePullConsumer.subscribe(topic, filters);
+ public void subscribe(Map<String, TreeSet<String>> topicAndFiltersMap,
+ Map<String, Long> partOffsetMap) throws TubeClientException {
+ for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
+ messagePullConsumer.subscribe(entry.getKey(), entry.getValue());
}
String sessionKey = "test_reset2";
int consumerCount = 2;
boolean isSelectBig = false;
- messagePullConsumer.completeSubscribe(
- sessionKey,
- consumerCount,
- isSelectBig,
- partOffsetMap);
+ messagePullConsumer.completeSubscribe(sessionKey,
+ consumerCount, isSelectBig, partOffsetMap);
}
public ConsumerResult getMessage() throws TubeClientException {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java
index 1b2f25a..f544829 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java
@@ -42,6 +42,7 @@ import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.utils.MixedUtils;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.ThreadUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.server.common.fielddef.CliArgDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,7 +65,7 @@ public class CliProducer extends CliAbstractBase {
// sent data content
private static byte[] sentData;
private final Map<String, TreeSet<String>> topicAndFiltersMap = new HashMap<>();
- private final List<TupleValue> topicSendRounds = new ArrayList<>();
+ private final List<Tuple2<String, String>> topicSendRounds = new ArrayList<>();
private final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>();
private final Map<MessageProducer, MsgSender> producerMap = new HashMap<>();
// cli parameters
@@ -189,10 +190,10 @@ public class CliProducer extends CliAbstractBase {
// initial topic send round
for (Map.Entry<String, TreeSet<String>> entry: topicAndFiltersMap.entrySet()) {
if (entry.getValue().isEmpty()) {
- topicSendRounds.add(new TupleValue(entry.getKey()));
+ topicSendRounds.add(new Tuple2<String, String>(entry.getKey()));
} else {
for (String filter : entry.getValue()) {
- topicSendRounds.add(new TupleValue(entry.getKey(), filter));
+ topicSendRounds.add(new Tuple2<String, String>(entry.getKey(), filter));
}
}
}
@@ -266,11 +267,11 @@ public class CliProducer extends CliAbstractBase {
roundIndex = (int) (sentCount++ % topicAndCondCnt);
try {
long millis = System.currentTimeMillis();
- TupleValue tupleValue = topicSendRounds.get(roundIndex);
- Message message = new Message(tupleValue.topic, sentData);
- if (tupleValue.filter != null) {
+ Tuple2<String, String> target = topicSendRounds.get(roundIndex);
+ Message message = new Message(target.f0, sentData);
+ if (target.f1 != null) {
// if include filter, add filter item
- message.putSystemHeader(tupleValue.filter, sdf.format(new Date(millis)));
+ message.putSystemHeader(target.f1, sdf.format(new Date(millis)));
}
// use sync or async process
if (syncProduction) {
@@ -332,21 +333,6 @@ public class CliProducer extends CliAbstractBase {
}
}
- private static class TupleValue {
- public String topic = null;
- public String filter = null;
-
- public TupleValue(String topic) {
- this.topic = topic;
- }
-
- public TupleValue(String topic, String filter) {
- this.topic = topic;
- this.filter = filter;
- }
-
- }
-
public static void main(String[] args) {
CliProducer cliProducer = new CliProducer();
try {