You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:13:55 UTC
[04/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionCheckListenerImpl.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionCheckListenerImpl.java b/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionCheckListenerImpl.java
deleted file mode 100644
index 2e91e34..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionCheckListenerImpl.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.transaction;
-
-import com.alibaba.rocketmq.client.producer.LocalTransactionState;
-import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
-import com.alibaba.rocketmq.common.message.MessageExt;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-
-public class TransactionCheckListenerImpl implements TransactionCheckListener {
- private AtomicInteger transactionIndex = new AtomicInteger(0);
-
-
- @Override
- public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
- System.out.printf("server checking TrMsg " + msg.toString() + "%n");
-
- int value = transactionIndex.getAndIncrement();
- if ((value % 6) == 0) {
- throw new RuntimeException("Could not find db");
- } else if ((value % 5) == 0) {
- return LocalTransactionState.ROLLBACK_MESSAGE;
- } else if ((value % 4) == 0) {
- return LocalTransactionState.COMMIT_MESSAGE;
- }
-
- return LocalTransactionState.UNKNOW;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionExecuterImpl.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionExecuterImpl.java b/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionExecuterImpl.java
deleted file mode 100644
index cda523a..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionExecuterImpl.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.transaction;
-
-import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
-import com.alibaba.rocketmq.client.producer.LocalTransactionState;
-import com.alibaba.rocketmq.common.message.Message;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class TransactionExecuterImpl implements LocalTransactionExecuter {
- private AtomicInteger transactionIndex = new AtomicInteger(1);
-
-
- @Override
- public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
- int value = transactionIndex.getAndIncrement();
-
- if (value == 0) {
- throw new RuntimeException("Could not find db");
- } else if ((value % 5) == 0) {
- return LocalTransactionState.ROLLBACK_MESSAGE;
- } else if ((value % 4) == 0) {
- return LocalTransactionState.COMMIT_MESSAGE;
- }
-
- return LocalTransactionState.UNKNOW;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionProducer.java b/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionProducer.java
deleted file mode 100644
index 2c4745f..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/transaction/TransactionProducer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.transaction;
-
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.producer.SendResult;
-import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
-import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-
-import java.io.UnsupportedEncodingException;
-
-public class TransactionProducer {
- public static void main(String[] args) throws MQClientException, InterruptedException {
- TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
- TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
- producer.setCheckThreadPoolMinSize(2);
- producer.setCheckThreadPoolMaxSize(2);
- producer.setCheckRequestHoldMax(2000);
- producer.setTransactionCheckListener(transactionCheckListener);
- producer.start();
-
- String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
- TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
- for (int i = 0; i < 100; i++) {
- try {
- Message msg =
- new Message("TopicTest", tags[i % tags.length], "KEY" + i,
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
- SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
- System.out.printf("%s%n", sendResult);
-
- Thread.sleep(10);
- } catch (MQClientException e) {
- e.printStackTrace();
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- }
-
- for (int i = 0; i < 100000; i++) {
- Thread.sleep(1000);
- }
- producer.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
new file mode 100644
index 0000000..1fbb8a4
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.benchmark;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Consumer {
+
+ public static void main(String[] args) throws MQClientException {
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkConsumer", args, buildCommandlineOptions(options), new PosixParser());
+ if (null == commandLine) {
+ System.exit(-1);
+ }
+
+ final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
+ final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer";
+ final String isPrefixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true";
+ String group = groupPrefix;
+ if (Boolean.parseBoolean(isPrefixEnable)) {
+ group = groupPrefix + "_" + Long.toString(System.currentTimeMillis() % 100);
+ }
+
+ System.out.printf("topic %s group %s prefix %s%n", topic, group, isPrefixEnable);
+
+ final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
+
+ final Timer timer = new Timer("BenchmarkTimerThread", true);
+
+ final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
+
+ timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ snapshotList.addLast(statsBenchmarkConsumer.createSnapshot());
+ if (snapshotList.size() > 10) {
+ snapshotList.removeFirst();
+ }
+ }
+ }, 1000, 1000);
+
+ timer.scheduleAtFixedRate(new TimerTask() {
+ private void printStats() {
+ if (snapshotList.size() >= 10) {
+ Long[] begin = snapshotList.getFirst();
+ Long[] end = snapshotList.getLast();
+
+ final long consumeTps =
+ (long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L);
+ final double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] - begin[1]);
+ final double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] - begin[1]);
+
+ System.out.printf("Consume TPS: %d Average(B2C) RT: %7.3f Average(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n",
+ consumeTps, averageB2CRT, averageS2CRT, end[4], end[5]
+ );
+ }
+ }
+
+
+ @Override
+ public void run() {
+ try {
+ this.printStats();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }, 10000, 10000);
+
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
+ consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ consumer.subscribe(topic, "*");
+
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext context) {
+ MessageExt msg = msgs.get(0);
+ long now = System.currentTimeMillis();
+
+ statsBenchmarkConsumer.getReceiveMessageTotalCount().incrementAndGet();
+
+ long born2ConsumerRT = now - msg.getBornTimestamp();
+ statsBenchmarkConsumer.getBorn2ConsumerTotalRT().addAndGet(born2ConsumerRT);
+
+ long store2ConsumerRT = now - msg.getStoreTimestamp();
+ statsBenchmarkConsumer.getStore2ConsumerTotalRT().addAndGet(store2ConsumerRT);
+
+ compareAndSetMax(statsBenchmarkConsumer.getBorn2ConsumerMaxRT(), born2ConsumerRT);
+
+ compareAndSetMax(statsBenchmarkConsumer.getStore2ConsumerMaxRT(), store2ConsumerRT);
+
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+
+ consumer.start();
+
+ System.out.printf("Consumer Started.%n");
+ }
+
+ public static Options buildCommandlineOptions(final Options options) {
+ Option opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("g", "group", true, "Consumer group name, Default: benchmark_consumer");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+
+ opt = new Option("p", "group prefix enable", true, "Consumer group name, Default: false");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+
+ public static void compareAndSetMax(final AtomicLong target, final long value) {
+ long prev = target.get();
+ while (value > prev) {
+ boolean updated = target.compareAndSet(prev, value);
+ if (updated)
+ break;
+
+ prev = target.get();
+ }
+ }
+}
+
+
+class StatsBenchmarkConsumer {
+ private final AtomicLong receiveMessageTotalCount = new AtomicLong(0L);
+
+ private final AtomicLong born2ConsumerTotalRT = new AtomicLong(0L);
+
+ private final AtomicLong store2ConsumerTotalRT = new AtomicLong(0L);
+
+ private final AtomicLong born2ConsumerMaxRT = new AtomicLong(0L);
+
+ private final AtomicLong store2ConsumerMaxRT = new AtomicLong(0L);
+
+
+ public Long[] createSnapshot() {
+ Long[] snap = new Long[]{
+ System.currentTimeMillis(),
+ this.receiveMessageTotalCount.get(),
+ this.born2ConsumerTotalRT.get(),
+ this.store2ConsumerTotalRT.get(),
+ this.born2ConsumerMaxRT.get(),
+ this.store2ConsumerMaxRT.get(),
+ };
+
+ return snap;
+ }
+
+
+ public AtomicLong getReceiveMessageTotalCount() {
+ return receiveMessageTotalCount;
+ }
+
+
+ public AtomicLong getBorn2ConsumerTotalRT() {
+ return born2ConsumerTotalRT;
+ }
+
+
+ public AtomicLong getStore2ConsumerTotalRT() {
+ return store2ConsumerTotalRT;
+ }
+
+
+ public AtomicLong getBorn2ConsumerMaxRT() {
+ return born2ConsumerMaxRT;
+ }
+
+
+ public AtomicLong getStore2ConsumerMaxRT() {
+ return store2ConsumerMaxRT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
new file mode 100644
index 0000000..3b13f94
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.benchmark;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+
+import java.io.UnsupportedEncodingException;
+import java.util.LinkedList;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Producer {
+ public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
+
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkProducer", args, buildCommandlineOptions(options), new PosixParser());
+ if (null == commandLine) {
+ System.exit(-1);
+ }
+
+ final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
+ final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
+ final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128;
+ final boolean keyEnable = commandLine.hasOption('k') ? Boolean.parseBoolean(commandLine.getOptionValue('k')) : false;
+
+ System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable);
+
+ final Logger log = ClientLogger.getLog();
+
+ final Message msg = buildMessage(messageSize, topic);
+
+ final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
+
+ final StatsBenchmarkProducer statsBenchmark = new StatsBenchmarkProducer();
+
+ final Timer timer = new Timer("BenchmarkTimerThread", true);
+
+ final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
+
+ timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ snapshotList.addLast(statsBenchmark.createSnapshot());
+ if (snapshotList.size() > 10) {
+ snapshotList.removeFirst();
+ }
+ }
+ }, 1000, 1000);
+
+ timer.scheduleAtFixedRate(new TimerTask() {
+ private void printStats() {
+ if (snapshotList.size() >= 10) {
+ Long[] begin = snapshotList.getFirst();
+ Long[] end = snapshotList.getLast();
+
+ final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
+ final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
+
+ System.out.printf("Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d%n",
+ sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
+ }
+ }
+
+
+ @Override
+ public void run() {
+ try {
+ this.printStats();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }, 10000, 10000);
+
+ final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer");
+ producer.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ if (commandLine.hasOption('n')) {
+ String ns = commandLine.getOptionValue('n');
+ producer.setNamesrvAddr(ns);
+ }
+
+ producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
+
+ producer.start();
+
+ for (int i = 0; i < threadCount; i++) {
+ sendThreadPool.execute(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ final long beginTimestamp = System.currentTimeMillis();
+ if (keyEnable) {
+ msg.setKeys(String.valueOf(beginTimestamp / 1000));
+ }
+ producer.send(msg);
+ statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
+ statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
+ final long currentRT = System.currentTimeMillis() - beginTimestamp;
+ statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT);
+ long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
+ while (currentRT > prevMaxRT) {
+ boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT);
+ if (updated)
+ break;
+
+ prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
+ }
+ } catch (RemotingException e) {
+ statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+ log.error("[BENCHMARK_PRODUCER] Send Exception", e);
+
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e1) {
+ }
+ } catch (InterruptedException e) {
+ statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e1) {
+ }
+ } catch (MQClientException e) {
+ statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+ log.error("[BENCHMARK_PRODUCER] Send Exception", e);
+ } catch (MQBrokerException e) {
+ statsBenchmark.getReceiveResponseFailedCount().incrementAndGet();
+ log.error("[BENCHMARK_PRODUCER] Send Exception", e);
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e1) {
+ }
+ }
+ }
+ }
+ });
+ }
+ }
+
+ public static Options buildCommandlineOptions(final Options options) {
+ Option opt = new Option("w", "threadCount", true, "Thread count, Default: 64");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("s", "messageSize", true, "Message Size, Default: 128");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("k", "keyEnable", true, "Message Key Enable, Default: false");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ private static Message buildMessage(final int messageSize, final String topic) throws UnsupportedEncodingException {
+ Message msg = new Message();
+ msg.setTopic(topic);
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < messageSize; i += 10) {
+ sb.append("hello baby");
+ }
+
+ msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
+
+ return msg;
+ }
+}
+
+
+class StatsBenchmarkProducer {
+ private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
+
+ private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
+
+ private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L);
+
+ private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L);
+
+ private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L);
+
+ private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
+
+
+ public Long[] createSnapshot() {
+ Long[] snap = new Long[]{
+ System.currentTimeMillis(),
+ this.sendRequestSuccessCount.get(),
+ this.sendRequestFailedCount.get(),
+ this.receiveResponseSuccessCount.get(),
+ this.receiveResponseFailedCount.get(),
+ this.sendMessageSuccessTimeTotal.get(),
+ };
+
+ return snap;
+ }
+
+
+ public AtomicLong getSendRequestSuccessCount() {
+ return sendRequestSuccessCount;
+ }
+
+
+ public AtomicLong getSendRequestFailedCount() {
+ return sendRequestFailedCount;
+ }
+
+
+ public AtomicLong getReceiveResponseSuccessCount() {
+ return receiveResponseSuccessCount;
+ }
+
+
+ public AtomicLong getReceiveResponseFailedCount() {
+ return receiveResponseFailedCount;
+ }
+
+
+ public AtomicLong getSendMessageSuccessTimeTotal() {
+ return sendMessageSuccessTimeTotal;
+ }
+
+
+ public AtomicLong getSendMessageMaxRT() {
+ return sendMessageMaxRT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
new file mode 100644
index 0000000..43f159b
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.benchmark;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.client.producer.*;
+
+import java.io.UnsupportedEncodingException;
+import java.util.LinkedList;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TransactionProducer {
+ private static int threadCount;
+ private static int messageSize;
+ private static boolean ischeck;
+ private static boolean ischeckffalse;
+
+
+ public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
+ threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32;
+ messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2;
+ ischeck = args.length >= 3 ? Boolean.parseBoolean(args[2]) : false;
+ ischeckffalse = args.length >= 4 ? Boolean.parseBoolean(args[3]) : false;
+
+ final Message msg = buildMessage(messageSize);
+
+ final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
+
+ final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer();
+
+ final Timer timer = new Timer("BenchmarkTimerThread", true);
+
+ final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
+
+ timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ snapshotList.addLast(statsBenchmark.createSnapshot());
+ while (snapshotList.size() > 10) {
+ snapshotList.removeFirst();
+ }
+ }
+ }, 1000, 1000);
+
+ timer.scheduleAtFixedRate(new TimerTask() {
+ private void printStats() {
+ if (snapshotList.size() >= 10) {
+ Long[] begin = snapshotList.getFirst();
+ Long[] end = snapshotList.getLast();
+
+ final long sendTps =
+ (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
+ final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
+
+ System.out.printf(
+ "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n",
+ sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4], end[6]);
+ }
+ }
+
+
+ @Override
+ public void run() {
+ try {
+ this.printStats();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }, 10000, 10000);
+
+ final TransactionCheckListener transactionCheckListener =
+ new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark);
+ final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer");
+ producer.setInstanceName(Long.toString(System.currentTimeMillis()));
+ producer.setTransactionCheckListener(transactionCheckListener);
+ producer.setDefaultTopicQueueNums(1000);
+ producer.start();
+
+ final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck);
+
+ for (int i = 0; i < threadCount; i++) {
+ sendThreadPool.execute(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ // Thread.sleep(1000);
+ final long beginTimestamp = System.currentTimeMillis();
+ SendResult sendResult =
+ producer.sendMessageInTransaction(msg, tranExecuter, null);
+ if (sendResult != null) {
+ statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
+ statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
+ }
+
+ final long currentRT = System.currentTimeMillis() - beginTimestamp;
+ statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT);
+ long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
+ while (currentRT > prevMaxRT) {
+ boolean updated =
+ statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT,
+ currentRT);
+ if (updated)
+ break;
+
+ prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
+ }
+ } catch (MQClientException e) {
+ statsBenchmark.getSendRequestFailedCount().incrementAndGet();
+ }
+ }
+ }
+ });
+ }
+ }
+
+
+ private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException {
+ Message msg = new Message();
+ msg.setTopic("BenchmarkTest");
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < messageSize; i += 10) {
+ sb.append("hello baby");
+ }
+
+ msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
+
+ return msg;
+ }
+}
+
+
+class TransactionExecuterBImpl implements LocalTransactionExecuter {
+
+ private boolean ischeck;
+
+
+ public TransactionExecuterBImpl(boolean ischeck) {
+ this.ischeck = ischeck;
+ }
+
+
+ @Override
+ public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
+ if (ischeck) {
+ return LocalTransactionState.UNKNOW;
+ }
+ return LocalTransactionState.COMMIT_MESSAGE;
+ }
+}
+
+
+class TransactionCheckListenerBImpl implements TransactionCheckListener {
+ private boolean ischeckffalse;
+ private StatsBenchmarkTProducer statsBenchmarkTProducer;
+
+
+ public TransactionCheckListenerBImpl(boolean ischeckffalse,
+ StatsBenchmarkTProducer statsBenchmarkTProducer) {
+ this.ischeckffalse = ischeckffalse;
+ this.statsBenchmarkTProducer = statsBenchmarkTProducer;
+ }
+
+
+ @Override
+ public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
+ statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet();
+ if (ischeckffalse) {
+
+ return LocalTransactionState.ROLLBACK_MESSAGE;
+ }
+
+ return LocalTransactionState.COMMIT_MESSAGE;
+ }
+}
+
+
+class StatsBenchmarkTProducer {
+ private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
+
+ private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
+
+ private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L);
+
+ private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L);
+
+ private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L);
+
+ private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
+
+ private final AtomicLong checkRequestSuccessCount = new AtomicLong(0L);
+
+
+ public Long[] createSnapshot() {
+ Long[] snap = new Long[]{
+ System.currentTimeMillis(),
+ this.sendRequestSuccessCount.get(),
+ this.sendRequestFailedCount.get(),
+ this.receiveResponseSuccessCount.get(),
+ this.receiveResponseFailedCount.get(),
+ this.sendMessageSuccessTimeTotal.get(),
+ this.checkRequestSuccessCount.get()};
+
+ return snap;
+ }
+
+
+ public AtomicLong getSendRequestSuccessCount() {
+ return sendRequestSuccessCount;
+ }
+
+
+ public AtomicLong getSendRequestFailedCount() {
+ return sendRequestFailedCount;
+ }
+
+
+ public AtomicLong getReceiveResponseSuccessCount() {
+ return receiveResponseSuccessCount;
+ }
+
+
+ public AtomicLong getReceiveResponseFailedCount() {
+ return receiveResponseFailedCount;
+ }
+
+
+ public AtomicLong getSendMessageSuccessTimeTotal() {
+ return sendMessageSuccessTimeTotal;
+ }
+
+
+ public AtomicLong getSendMessageMaxRT() {
+ return sendMessageMaxRT;
+ }
+
+
+ public AtomicLong getCheckRequestSuccessCount() {
+ return checkRequestSuccessCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
new file mode 100644
index 0000000..aa62a1e
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.broadcast;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+
+import java.util.List;
+
+public class PushConsumer {
+
+ public static void main(String[] args) throws InterruptedException, MQClientException {
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
+
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+
+ consumer.setMessageModel(MessageModel.BROADCASTING);
+
+ consumer.subscribe("TopicTest", "TagA || TagC || TagD");
+
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext context) {
+ System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+
+ consumer.start();
+ System.out.printf("Broadcast Consumer Started.%n");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
new file mode 100644
index 0000000..d0a41f1
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.filter;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+
+public class Consumer {
+
+ public static void main(String[] args) throws InterruptedException, MQClientException {
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
+
+ String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
+ consumer.subscribe("TopicFilter7", "org.apache.rocketmq.example.filter.MessageFilterImpl",
+ filterCode);
+
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext context) {
+ System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+
+ consumer.start();
+
+ System.out.printf("Consumer Started.%n");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java b/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
new file mode 100644
index 0000000..d58c28d
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.filter;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class Producer {
+ public static void main(String[] args) throws MQClientException, InterruptedException {
+ DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
+ producer.start();
+
+ try {
+ for (int i = 0; i < 6000000; i++) {
+ Message msg = new Message("TopicFilter7",
+ "TagA",
+ "OrderID001",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+
+ msg.putUserProperty("SequenceId", String.valueOf(i));
+ SendResult sendResult = producer.send(msg);
+ System.out.printf("%s%n", sendResult);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ producer.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java
new file mode 100644
index 0000000..a6a3aca
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.operation;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class Consumer {
+
+ public static void main(String[] args) throws InterruptedException, MQClientException {
+ CommandLine commandLine = buildCommandline(args);
+ if (commandLine != null) {
+ String group = commandLine.getOptionValue('g');
+ String topic = commandLine.getOptionValue('t');
+ String subscription = commandLine.getOptionValue('s');
+ final String returnFailedHalf = commandLine.getOptionValue('f');
+
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
+ consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ consumer.subscribe(topic, subscription);
+
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ AtomicLong consumeTimes = new AtomicLong(0);
+
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext context) {
+ long currentTimes = this.consumeTimes.incrementAndGet();
+ System.out.printf("%-8d %s%n", currentTimes, msgs);
+ if (Boolean.parseBoolean(returnFailedHalf)) {
+ if ((currentTimes % 2) == 0) {
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+
+ consumer.start();
+
+ System.out.printf("Consumer Started.%n");
+ }
+ }
+
+ public static CommandLine buildCommandline(String[] args) {
+ final Options options = new Options();
+ Option opt = new Option("h", "help", false, "Print help");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("g", "consumerGroup", true, "Consumer Group Name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("t", "topic", true, "Topic Name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("s", "subscription", true, "subscription");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("f", "returnFailedHalf", true, "return failed result, for half message");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ PosixParser parser = new PosixParser();
+ HelpFormatter hf = new HelpFormatter();
+ hf.setWidth(110);
+ CommandLine commandLine = null;
+ try {
+ commandLine = parser.parse(options, args);
+ if (commandLine.hasOption('h')) {
+ hf.printHelp("producer", options, true);
+ return null;
+ }
+ } catch (ParseException e) {
+ hf.printHelp("producer", options, true);
+ return null;
+ }
+
+ return commandLine;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java b/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java
new file mode 100644
index 0000000..54e256b
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.operation;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.commons.cli.*;
+
+public class Producer {
+
+ public static void main(String[] args) throws MQClientException, InterruptedException {
+ CommandLine commandLine = buildCommandline(args);
+ if (commandLine != null) {
+ String group = commandLine.getOptionValue('g');
+ String topic = commandLine.getOptionValue('t');
+ String tags = commandLine.getOptionValue('a');
+ String keys = commandLine.getOptionValue('k');
+ String msgCount = commandLine.getOptionValue('c');
+
+ DefaultMQProducer producer = new DefaultMQProducer(group);
+ producer.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ producer.start();
+
+ for (int i = 0; i < Integer.parseInt(msgCount); i++) {
+ try {
+ Message msg = new Message(
+ topic,
+ tags,
+ keys,
+ ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
+ SendResult sendResult = producer.send(msg);
+ System.out.printf("%-8d %s%n", i, sendResult);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Thread.sleep(1000);
+ }
+ }
+
+ producer.shutdown();
+ }
+ }
+
+ public static CommandLine buildCommandline(String[] args) {
+ final Options options = new Options();
+ Option opt = new Option("h", "help", false, "Print help");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("g", "producerGroup", true, "Producer Group Name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("t", "topic", true, "Topic Name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("a", "tags", true, "Tags Name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("k", "keys", true, "Keys Name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("c", "msgCount", true, "Message Count");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ PosixParser parser = new PosixParser();
+ HelpFormatter hf = new HelpFormatter();
+ hf.setWidth(110);
+ CommandLine commandLine = null;
+ try {
+ commandLine = parser.parse(options, args);
+ if (commandLine.hasOption('h')) {
+ hf.printHelp("producer", options, true);
+ return null;
+ }
+ } catch (ParseException e) {
+ hf.printHelp("producer", options, true);
+ return null;
+ }
+
+ return commandLine;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java
new file mode 100644
index 0000000..7ddfbf7
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.ordermessage;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class Consumer {
+
+ public static void main(String[] args) throws MQClientException {
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
+
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+
+ consumer.subscribe("TopicTest", "TagA || TagC || TagD");
+
+ consumer.registerMessageListener(new MessageListenerOrderly() {
+ AtomicLong consumeTimes = new AtomicLong(0);
+
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+ context.setAutoCommit(false);
+ System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
+ this.consumeTimes.incrementAndGet();
+ if ((this.consumeTimes.get() % 2) == 0) {
+ return ConsumeOrderlyStatus.SUCCESS;
+ } else if ((this.consumeTimes.get() % 3) == 0) {
+ return ConsumeOrderlyStatus.ROLLBACK;
+ } else if ((this.consumeTimes.get() % 4) == 0) {
+ return ConsumeOrderlyStatus.COMMIT;
+ } else if ((this.consumeTimes.get() % 5) == 0) {
+ context.setSuspendCurrentQueueTimeMillis(3000);
+ return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+ }
+
+ return ConsumeOrderlyStatus.SUCCESS;
+ }
+ });
+
+ consumer.start();
+ System.out.printf("Consumer Started.%n");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java
new file mode 100644
index 0000000..84c1da4
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.ordermessage;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+public class Producer {
+ public static void main(String[] args) throws UnsupportedEncodingException {
+ try {
+ MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+ producer.start();
+
+ String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
+ for (int i = 0; i < 100; i++) {
+ int orderId = i % 10;
+ Message msg =
+ new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
+ ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
+ SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
+ @Override
+ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+ Integer id = (Integer) arg;
+ int index = id % mqs.size();
+ return mqs.get(index);
+ }
+ }, orderId);
+
+ System.out.printf("%s%n", sendResult);
+ }
+
+ producer.shutdown();
+ } catch (MQClientException e) {
+ e.printStackTrace();
+ } catch (RemotingException e) {
+ e.printStackTrace();
+ } catch (MQBrokerException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
new file mode 100644
index 0000000..43566f0
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.quickstart;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+public class Consumer {
+
+ public static void main(String[] args) throws InterruptedException, MQClientException {
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
+
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+
+ consumer.subscribe("TopicTest", "*");
+
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext context) {
+ System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+
+ consumer.start();
+ System.out.printf("Consumer Started.%n");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
new file mode 100644
index 0000000..f6bd5df
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.quickstart;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class Producer {
+ public static void main(String[] args) throws MQClientException, InterruptedException {
+ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+ producer.start();
+
+ for (int i = 0; i < 1000; i++) {
+ try {
+ Message msg = new Message("TopicTest",
+ "TagA",
+ ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
+ );
+ SendResult sendResult = producer.send(msg);
+ LocalTransactionExecuter tranExecuter = new LocalTransactionExecuter() {
+ @Override
+ public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
+ return null;
+ }
+ };
+ System.out.printf("%s%n", sendResult);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Thread.sleep(1000);
+ }
+ }
+ producer.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
new file mode 100644
index 0000000..68dbb67
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+import java.io.UnsupportedEncodingException;
+
+
+public class AsyncProducer {
+ public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
+
+ DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
+ producer.start();
+ producer.setRetryTimesWhenSendAsyncFailed(0);
+
+ for (int i = 0; i < 10000000; i++) {
+ try {
+ final int index = i;
+ Message msg = new Message("Jodie_topic_1023",
+ "TagA",
+ "OrderID188",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ producer.send(msg, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ System.out.printf("%-10d Exception %s %n", index, e);
+ e.printStackTrace();
+ }
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ producer.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java b/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java
new file mode 100644
index 0000000..2b4ce23
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.simple;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.TreeMap;
+
+
+public class CachedQueue {
+ private final TreeMap<Long, MessageExt> msgCachedTable = new TreeMap<Long, MessageExt>();
+
+
+ public TreeMap<Long, MessageExt> getMsgCachedTable() {
+ return msgCachedTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
new file mode 100644
index 0000000..b035d57
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+
+public class Producer {
+ public static void main(String[] args) throws MQClientException, InterruptedException {
+
+ DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
+
+ producer.start();
+
+ for (int i = 0; i < 10000000; i++)
+ try {
+ {
+ Message msg = new Message("TopicTest",
+ "TagA",
+ "OrderID188",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ SendResult sendResult = producer.send(msg);
+ System.out.printf("%s%n", sendResult);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ producer.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
new file mode 100644
index 0000000..8c9ba15
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class PullConsumer {
+ private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
+
+
+ public static void main(String[] args) throws MQClientException {
+ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
+
+ consumer.start();
+
+ Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
+ for (MessageQueue mq : mqs) {
+ System.out.printf("Consume from the queue: " + mq + "%n");
+ SINGLE_MQ:
+ while (true) {
+ try {
+ PullResult pullResult =
+ consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
+ System.out.printf("%s%n", pullResult);
+ putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
+ switch (pullResult.getPullStatus()) {
+ case FOUND:
+ break;
+ case NO_MATCHED_MSG:
+ break;
+ case NO_NEW_MSG:
+ break SINGLE_MQ;
+ case OFFSET_ILLEGAL:
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ consumer.shutdown();
+ }
+
+ private static long getMessageQueueOffset(MessageQueue mq) {
+ Long offset = OFFSE_TABLE.get(mq);
+ if (offset != null)
+ return offset;
+
+ return 0;
+ }
+
+ private static void putMessageQueueOffset(MessageQueue mq, long offset) {
+ OFFSE_TABLE.put(mq, offset);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumerTest.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumerTest.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumerTest.java
new file mode 100644
index 0000000..c2d7468
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumerTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class PullConsumerTest {
+ public static void main(String[] args) throws MQClientException {
+ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
+ consumer.start();
+
+ try {
+ MessageQueue mq = new MessageQueue();
+ mq.setQueueId(0);
+ mq.setTopic("TopicTest3");
+ mq.setBrokerName("vivedeMacBook-Pro.local");
+
+ long offset = 26;
+
+ long beginTime = System.currentTimeMillis();
+ PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32);
+ System.out.printf("%s%n", System.currentTimeMillis() - beginTime);
+ System.out.printf("%s%n", pullResult);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ consumer.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
new file mode 100644
index 0000000..d38d679
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.simple;
+
+import org.apache.rocketmq.client.consumer.MQPullConsumer;
+import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullTaskCallback;
+import org.apache.rocketmq.client.consumer.PullTaskContext;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+
+
+public class PullScheduleService {
+
+ public static void main(String[] args) throws MQClientException {
+ final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
+
+ scheduleService.setMessageModel(MessageModel.CLUSTERING);
+ scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() {
+
+ @Override
+ public void doPullTask(MessageQueue mq, PullTaskContext context) {
+ MQPullConsumer consumer = context.getPullConsumer();
+ try {
+
+ long offset = consumer.fetchConsumeOffset(mq, false);
+ if (offset < 0)
+ offset = 0;
+
+ PullResult pullResult = consumer.pull(mq, "*", offset, 32);
+ System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);
+ switch (pullResult.getPullStatus()) {
+ case FOUND:
+ break;
+ case NO_MATCHED_MSG:
+ break;
+ case NO_NEW_MSG:
+ case OFFSET_ILLEGAL:
+ break;
+ default:
+ break;
+ }
+ consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
+
+
+ context.setPullNextDelayTimeMillis(100);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ scheduleService.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
new file mode 100644
index 0000000..5929aff
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+
+public class PushConsumer {
+
+ public static void main(String[] args) throws InterruptedException, MQClientException {
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
+ consumer.subscribe("Jodie_topic_1023", "*");
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+ /**
+
+ */
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+ System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ consumer.start();
+ System.out.printf("Consumer Started.%n");
+ }
+}