You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/05/31 14:52:04 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-168] Example
module: remove localhost IP configuration parameters (#108)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new 8fbae0e [TUBEMQ-168] Example module: remove localhost IP configuration parameters (#108)
8fbae0e is described below
commit 8fbae0e13751bd4bbf0a49319adf5d15dd800eda
Author: viviel <37...@users.noreply.github.com>
AuthorDate: Sun May 31 22:51:56 2020 +0800
[TUBEMQ-168] Example module: remove localhost IP configuration parameters (#108)
---
.../tubemq/example/MAMessageProducerExample.java | 15 +++++++--------
.../tubemq/example/MessageConsumerExample.java | 21 +++++++--------------
.../tubemq/example/MessageProducerExample.java | 13 ++++++-------
.../tubemq/example/MessagePullConsumerExample.java | 21 ++++++++-------------
.../example/MessagePullSetConsumerExample.java | 19 +++++++------------
5 files changed, 35 insertions(+), 54 deletions(-)
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
index 24ed133..f76b077 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
@@ -79,27 +79,26 @@ public class MAMessageProducerExample {
private int keyCount = 0;
private int sentCount = 0;
- public MAMessageProducerExample(String localHost, String masterHostAndPort) throws Exception {
+ public MAMessageProducerExample(String masterHostAndPort) throws Exception {
this.filters.add("aaa");
this.filters.add("bbb");
- TubeClientConfig clientConfig = new TubeClientConfig(localHost, masterHostAndPort);
+ TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
for (int i = 0; i < SESSION_FACTORY_NUM; i++) {
this.sessionFactoryList.add(new TubeMultiSessionFactory(clientConfig));
}
}
public static void main(String[] args) {
- final String localHost = args[0];
- final String masterHostAndPort = args[1];
+ final String masterHostAndPort = args[0];
- final String topics = args[2];
+ final String topics = args[1];
final List<String> topicList = Arrays.asList(topics.split(","));
topicSet = new TreeSet<>(topicList);
- msgCount = Integer.parseInt(args[3]);
- producerCount = Math.min(args.length > 4 ? Integer.parseInt(args[4]) : 10, MAX_PRODUCER_NUM);
+ msgCount = Integer.parseInt(args[2]);
+ producerCount = Math.min(args.length > 4 ? Integer.parseInt(args[3]) : 10, MAX_PRODUCER_NUM);
logger.info("MAMessageProducerExample.main started...");
@@ -115,7 +114,7 @@ public class MAMessageProducerExample {
sendData = dataBuffer.array();
try {
- MAMessageProducerExample messageProducer = new MAMessageProducerExample(localHost, masterHostAndPort);
+ MAMessageProducerExample messageProducer = new MAMessageProducerExample(masterHostAndPort);
messageProducer.startService();
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
index c031a16..1fa2e6f 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
@@ -61,13 +61,8 @@ public final class MessageConsumerExample {
private final PushMessageConsumer messageConsumer;
private final MessageSessionFactory messageSessionFactory;
- public MessageConsumerExample(
- String localHost,
- String masterHostAndPort,
- String group,
- int fetchCount
- ) throws Exception {
- ConsumerConfig consumerConfig = new ConsumerConfig(localHost, masterHostAndPort, group);
+ public MessageConsumerExample(String masterHostAndPort, String group, int fetchCount) throws Exception {
+ ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
if (fetchCount > 0) {
consumerConfig.setPushFetchThreadCnt(fetchCount);
@@ -77,14 +72,13 @@ public final class MessageConsumerExample {
}
public static void main(String[] args) {
- final String localHost = args[0];
- final String masterHostAndPort = args[1];
- final String topics = args[2];
- final String group = args[3];
- final int consumerCount = Integer.parseInt(args[4]);
+ final String masterHostAndPort = args[0];
+ final String topics = args[1];
+ final String group = args[2];
+ final int consumerCount = Integer.parseInt(args[3]);
int fetchCount = -1;
if (args.length > 5) {
- fetchCount = Integer.parseInt(args[5]);
+ fetchCount = Integer.parseInt(args[4]);
}
final Map<String, TreeSet<String>> topicTidsMap = new HashMap<>();
@@ -109,7 +103,6 @@ public final class MessageConsumerExample {
try {
for (int i = 0; i < consumerCount; i++) {
MessageConsumerExample messageConsumer = new MessageConsumerExample(
- localHost,
masterHostAndPort,
group,
startFetchCount
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java
index 2f12eb1..57d0ef0 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java
@@ -59,21 +59,20 @@ public final class MessageProducerExample {
private int keyCount = 0;
private int sentCount = 0;
- public MessageProducerExample(String localHost, String masterHostAndPort) throws Exception {
+ public MessageProducerExample(String masterHostAndPort) throws Exception {
filters.add("aaa");
filters.add("bbb");
- TubeClientConfig clientConfig = new TubeClientConfig(localHost, masterHostAndPort);
+ TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
this.messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
this.messageProducer = messageSessionFactory.createProducer();
}
public static void main(String[] args) {
- final String localHost = args[0];
- final String masterHostAndPort = args[1];
- final String topics = args[2];
+ final String masterHostAndPort = args[0];
+ final String topics = args[1];
final List<String> topicList = Arrays.asList(topics.split(","));
- final int count = Integer.parseInt(args[3]);
+ final int count = Integer.parseInt(args[2]);
String body = "This is a test message from single-session-factory.";
byte[] bodyBytes = StringUtils.getBytesUtf8(body);
@@ -84,7 +83,7 @@ public final class MessageProducerExample {
}
dataBuffer.flip();
try {
- MessageProducerExample messageProducer = new MessageProducerExample(localHost, masterHostAndPort);
+ MessageProducerExample messageProducer = new MessageProducerExample(masterHostAndPort);
messageProducer.publishTopics(topicList);
for (int i = 0; i < count; i++) {
for (String topic : topicList) {
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
index fa9c0a5..a431498 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
@@ -49,28 +49,23 @@ public final class MessagePullConsumerExample {
private final PullMessageConsumer messagePullConsumer;
private final MessageSessionFactory messageSessionFactory;
- public MessagePullConsumerExample(
- String localHost,
- String masterHostAndPort,
- String group
- ) throws Exception {
- ConsumerConfig consumerConfig = new ConsumerConfig(localHost, masterHostAndPort, group);
+ public MessagePullConsumerExample(String masterHostAndPort, String group) throws Exception {
+ ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
this.messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
}
public static void main(String[] args) throws Throwable {
- final String localHost = args[0];
- final String masterHostAndPort = args[1];
- final String topics = args[2];
- final String group = args[3];
- final int consumeCount = Integer.parseInt(args[4]);
+ final String masterHostAndPort = args[0];
+ final String topics = args[1];
+ final String group = args[2];
+ final int consumeCount = Integer.parseInt(args[3]);
final MessagePullConsumerExample messageConsumer = new MessagePullConsumerExample(
- localHost,
masterHostAndPort,
- group);
+ group
+ );
final List<String> topicList = Arrays.asList(topics.split(","));
messageConsumer.subscribe(topicList);
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java
index 849b8c9..5180c0b 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java
@@ -52,22 +52,17 @@ public final class MessagePullSetConsumerExample {
private final PullMessageConsumer messagePullConsumer;
private final MessageSessionFactory messageSessionFactory;
- public MessagePullSetConsumerExample(
- String localHost,
- String masterHostAndPort,
- String group
- ) throws Exception {
- ConsumerConfig consumerConfig = new ConsumerConfig(localHost, masterHostAndPort, group);
+ public MessagePullSetConsumerExample(String masterHostAndPort, String group) throws Exception {
+ ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
this.messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
}
public static void main(String[] args) {
- final String localhost = args[0];
- final String masterHostAndPort = args[1];
- final String topics = args[2];
- final String group = args[3];
- final int consumeCount = Integer.parseInt(args[4]);
+ final String masterHostAndPort = args[0];
+ final String topics = args[1];
+ final String group = args[2];
+ final int consumeCount = Integer.parseInt(args[3]);
final Map<String, Long> partOffsetMap = new ConcurrentHashMap<>();
partOffsetMap.put("123:test_1:0", 0L);
partOffsetMap.put("123:test_1:1", 0L);
@@ -85,7 +80,7 @@ public final class MessagePullSetConsumerExample {
try {
int getCount = consumeCount;
MessagePullSetConsumerExample messageConsumer =
- new MessagePullSetConsumerExample(localhost, masterHostAndPort, group);
+ new MessagePullSetConsumerExample(masterHostAndPort, group);
messageConsumer.subscribe(topicList, partOffsetMap);
// wait until the consumer is allocated parts