You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:13:56 UTC
[05/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/benchmark/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/com/alibaba/rocketmq/example/benchmark/Consumer.java
deleted file mode 100644
index 7150513..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/benchmark/Consumer.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.example.benchmark;
-
-import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.srvutil.ServerUtil;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class Consumer {
-
- public static void main(String[] args) throws MQClientException {
- Options options = ServerUtil.buildCommandlineOptions(new Options());
- CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkConsumer", args, buildCommandlineOptions(options), new PosixParser());
- if (null == commandLine) {
- System.exit(-1);
- }
-
- final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
- final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer";
- final String isPrefixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true";
- String group = groupPrefix;
- if (Boolean.parseBoolean(isPrefixEnable)) {
- group = groupPrefix + "_" + Long.toString(System.currentTimeMillis() % 100);
- }
-
- System.out.printf("topic %s group %s prefix %s%n", topic, group, isPrefixEnable);
-
- final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
-
- final Timer timer = new Timer("BenchmarkTimerThread", true);
-
- final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
-
- timer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- snapshotList.addLast(statsBenchmarkConsumer.createSnapshot());
- if (snapshotList.size() > 10) {
- snapshotList.removeFirst();
- }
- }
- }, 1000, 1000);
-
- timer.scheduleAtFixedRate(new TimerTask() {
- private void printStats() {
- if (snapshotList.size() >= 10) {
- Long[] begin = snapshotList.getFirst();
- Long[] end = snapshotList.getLast();
-
- final long consumeTps =
- (long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L);
- final double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] - begin[1]);
- final double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] - begin[1]);
-
- System.out.printf("Consume TPS: %d Average(B2C) RT: %7.3f Average(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n",
- consumeTps, averageB2CRT, averageS2CRT, end[4], end[5]
- );
- }
- }
-
-
- @Override
- public void run() {
- try {
- this.printStats();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, 10000, 10000);
-
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
- consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
-
- consumer.subscribe(topic, "*");
-
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- MessageExt msg = msgs.get(0);
- long now = System.currentTimeMillis();
-
- statsBenchmarkConsumer.getReceiveMessageTotalCount().incrementAndGet();
-
- long born2ConsumerRT = now - msg.getBornTimestamp();
- statsBenchmarkConsumer.getBorn2ConsumerTotalRT().addAndGet(born2ConsumerRT);
-
- long store2ConsumerRT = now - msg.getStoreTimestamp();
- statsBenchmarkConsumer.getStore2ConsumerTotalRT().addAndGet(store2ConsumerRT);
-
- compareAndSetMax(statsBenchmarkConsumer.getBorn2ConsumerMaxRT(), born2ConsumerRT);
-
- compareAndSetMax(statsBenchmarkConsumer.getStore2ConsumerMaxRT(), store2ConsumerRT);
-
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
-
- consumer.start();
-
- System.out.printf("Consumer Started.%n");
- }
-
- public static Options buildCommandlineOptions(final Options options) {
- Option opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest");
- opt.setRequired(false);
- options.addOption(opt);
-
- opt = new Option("g", "group", true, "Consumer group name, Default: benchmark_consumer");
- opt.setRequired(false);
- options.addOption(opt);
-
-
- opt = new Option("p", "group prefix enable", true, "Consumer group name, Default: false");
- opt.setRequired(false);
- options.addOption(opt);
-
- return options;
- }
-
-
- public static void compareAndSetMax(final AtomicLong target, final long value) {
- long prev = target.get();
- while (value > prev) {
- boolean updated = target.compareAndSet(prev, value);
- if (updated)
- break;
-
- prev = target.get();
- }
- }
-}
-
-
-class StatsBenchmarkConsumer {
- private final AtomicLong receiveMessageTotalCount = new AtomicLong(0L);
-
- private final AtomicLong born2ConsumerTotalRT = new AtomicLong(0L);
-
- private final AtomicLong store2ConsumerTotalRT = new AtomicLong(0L);
-
- private final AtomicLong born2ConsumerMaxRT = new AtomicLong(0L);
-
- private final AtomicLong store2ConsumerMaxRT = new AtomicLong(0L);
-
-
- public Long[] createSnapshot() {
- Long[] snap = new Long[]{
- System.currentTimeMillis(),
- this.receiveMessageTotalCount.get(),
- this.born2ConsumerTotalRT.get(),
- this.store2ConsumerTotalRT.get(),
- this.born2ConsumerMaxRT.get(),
- this.store2ConsumerMaxRT.get(),
- };
-
- return snap;
- }
-
-
- public AtomicLong getReceiveMessageTotalCount() {
- return receiveMessageTotalCount;
- }
-
-
- public AtomicLong getBorn2ConsumerTotalRT() {
- return born2ConsumerTotalRT;
- }
-
-
- public AtomicLong getStore2ConsumerTotalRT() {
- return store2ConsumerTotalRT;
- }
-
-
- public AtomicLong getBorn2ConsumerMaxRT() {
- return born2ConsumerMaxRT;
- }
-
-
- public AtomicLong getStore2ConsumerMaxRT() {
- return store2ConsumerMaxRT;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/benchmark/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/benchmark/Producer.java b/example/src/main/java/com/alibaba/rocketmq/example/benchmark/Producer.java
deleted file mode 100644
index b0351c6..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/benchmark/Producer.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.benchmark;
-
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-import com.alibaba.rocketmq.srvutil.ServerUtil;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.slf4j.Logger;
-
-import java.io.UnsupportedEncodingException;
-import java.util.LinkedList;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class Producer {
- public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
-
- Options options = ServerUtil.buildCommandlineOptions(new Options());
- CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkProducer", args, buildCommandlineOptions(options), new PosixParser());
- if (null == commandLine) {
- System.exit(-1);
- }
-
- final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
- final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
- final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128;
- final boolean keyEnable = commandLine.hasOption('k') ? Boolean.parseBoolean(commandLine.getOptionValue('k')) : false;
-
- System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable);
-
- final Logger log = ClientLogger.getLog();
-
- final Message msg = buildMessage(messageSize, topic);
-
- final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
-
- final StatsBenchmarkProducer statsBenchmark = new StatsBenchmarkProducer();
-
- final Timer timer = new Timer("BenchmarkTimerThread", true);
-
- final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
-
- timer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- snapshotList.addLast(statsBenchmark.createSnapshot());
- if (snapshotList.size() > 10) {
- snapshotList.removeFirst();
- }
- }
- }, 1000, 1000);
-
- timer.scheduleAtFixedRate(new TimerTask() {
- private void printStats() {
- if (snapshotList.size() >= 10) {
- Long[] begin = snapshotList.getFirst();
- Long[] end = snapshotList.getLast();
-
- final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
- final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
-
- System.out.printf("Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d%n",
- sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
- }
- }
-
-
- @Override
- public void run() {
- try {
- this.printStats();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, 10000, 10000);
-
- final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer");
- producer.setInstanceName(Long.toString(System.currentTimeMillis()));
-
- if (commandLine.hasOption('n')) {
- String ns = commandLine.getOptionValue('n');
- producer.setNamesrvAddr(ns);
- }
-
- producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
-
- producer.start();
-
- for (int i = 0; i < threadCount; i++) {
- sendThreadPool.execute(new Runnable() {
- @Override
- public void run() {
- while (true) {
- try {
- final long beginTimestamp = System.currentTimeMillis();
- if (keyEnable) {
- msg.setKeys(String.valueOf(beginTimestamp / 1000));
- }
- producer.send(msg);
- statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
- statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
- final long currentRT = System.currentTimeMillis() - beginTimestamp;
- statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT);
- long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
- while (currentRT > prevMaxRT) {
- boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT);
- if (updated)
- break;
-
- prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
- }
- } catch (RemotingException e) {
- statsBenchmark.getSendRequestFailedCount().incrementAndGet();
- log.error("[BENCHMARK_PRODUCER] Send Exception", e);
-
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e1) {
- }
- } catch (InterruptedException e) {
- statsBenchmark.getSendRequestFailedCount().incrementAndGet();
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e1) {
- }
- } catch (MQClientException e) {
- statsBenchmark.getSendRequestFailedCount().incrementAndGet();
- log.error("[BENCHMARK_PRODUCER] Send Exception", e);
- } catch (MQBrokerException e) {
- statsBenchmark.getReceiveResponseFailedCount().incrementAndGet();
- log.error("[BENCHMARK_PRODUCER] Send Exception", e);
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e1) {
- }
- }
- }
- }
- });
- }
- }
-
- public static Options buildCommandlineOptions(final Options options) {
- Option opt = new Option("w", "threadCount", true, "Thread count, Default: 64");
- opt.setRequired(false);
- options.addOption(opt);
-
- opt = new Option("s", "messageSize", true, "Message Size, Default: 128");
- opt.setRequired(false);
- options.addOption(opt);
-
- opt = new Option("k", "keyEnable", true, "Message Key Enable, Default: false");
- opt.setRequired(false);
- options.addOption(opt);
-
- opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest");
- opt.setRequired(false);
- options.addOption(opt);
-
- return options;
- }
-
- private static Message buildMessage(final int messageSize, final String topic) throws UnsupportedEncodingException {
- Message msg = new Message();
- msg.setTopic(topic);
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < messageSize; i += 10) {
- sb.append("hello baby");
- }
-
- msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
-
- return msg;
- }
-}
-
-
-class StatsBenchmarkProducer {
- private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
-
- private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
-
- private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L);
-
- private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L);
-
- private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L);
-
- private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
-
-
- public Long[] createSnapshot() {
- Long[] snap = new Long[]{
- System.currentTimeMillis(),
- this.sendRequestSuccessCount.get(),
- this.sendRequestFailedCount.get(),
- this.receiveResponseSuccessCount.get(),
- this.receiveResponseFailedCount.get(),
- this.sendMessageSuccessTimeTotal.get(),
- };
-
- return snap;
- }
-
-
- public AtomicLong getSendRequestSuccessCount() {
- return sendRequestSuccessCount;
- }
-
-
- public AtomicLong getSendRequestFailedCount() {
- return sendRequestFailedCount;
- }
-
-
- public AtomicLong getReceiveResponseSuccessCount() {
- return receiveResponseSuccessCount;
- }
-
-
- public AtomicLong getReceiveResponseFailedCount() {
- return receiveResponseFailedCount;
- }
-
-
- public AtomicLong getSendMessageSuccessTimeTotal() {
- return sendMessageSuccessTimeTotal;
- }
-
-
- public AtomicLong getSendMessageMaxRT() {
- return sendMessageMaxRT;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/benchmark/TransactionProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/com/alibaba/rocketmq/example/benchmark/TransactionProducer.java
deleted file mode 100644
index 3dffd2f..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/benchmark/TransactionProducer.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.example.benchmark;
-
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.producer.*;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-
-import java.io.UnsupportedEncodingException;
-import java.util.LinkedList;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class TransactionProducer {
- private static int threadCount;
- private static int messageSize;
- private static boolean ischeck;
- private static boolean ischeckffalse;
-
-
- public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
- threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32;
- messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2;
- ischeck = args.length >= 3 ? Boolean.parseBoolean(args[2]) : false;
- ischeckffalse = args.length >= 4 ? Boolean.parseBoolean(args[3]) : false;
-
- final Message msg = buildMessage(messageSize);
-
- final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
-
- final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer();
-
- final Timer timer = new Timer("BenchmarkTimerThread", true);
-
- final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
-
- timer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- snapshotList.addLast(statsBenchmark.createSnapshot());
- while (snapshotList.size() > 10) {
- snapshotList.removeFirst();
- }
- }
- }, 1000, 1000);
-
- timer.scheduleAtFixedRate(new TimerTask() {
- private void printStats() {
- if (snapshotList.size() >= 10) {
- Long[] begin = snapshotList.getFirst();
- Long[] end = snapshotList.getLast();
-
- final long sendTps =
- (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
- final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
-
- System.out.printf(
- "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n",
- sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4], end[6]);
- }
- }
-
-
- @Override
- public void run() {
- try {
- this.printStats();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, 10000, 10000);
-
- final TransactionCheckListener transactionCheckListener =
- new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark);
- final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer");
- producer.setInstanceName(Long.toString(System.currentTimeMillis()));
- producer.setTransactionCheckListener(transactionCheckListener);
- producer.setDefaultTopicQueueNums(1000);
- producer.start();
-
- final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck);
-
- for (int i = 0; i < threadCount; i++) {
- sendThreadPool.execute(new Runnable() {
- @Override
- public void run() {
- while (true) {
- try {
- // Thread.sleep(1000);
- final long beginTimestamp = System.currentTimeMillis();
- SendResult sendResult =
- producer.sendMessageInTransaction(msg, tranExecuter, null);
- if (sendResult != null) {
- statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
- statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
- }
-
- final long currentRT = System.currentTimeMillis() - beginTimestamp;
- statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT);
- long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
- while (currentRT > prevMaxRT) {
- boolean updated =
- statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT,
- currentRT);
- if (updated)
- break;
-
- prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
- }
- } catch (MQClientException e) {
- statsBenchmark.getSendRequestFailedCount().incrementAndGet();
- }
- }
- }
- });
- }
- }
-
-
- private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException {
- Message msg = new Message();
- msg.setTopic("BenchmarkTest");
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < messageSize; i += 10) {
- sb.append("hello baby");
- }
-
- msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
-
- return msg;
- }
-}
-
-
-class TransactionExecuterBImpl implements LocalTransactionExecuter {
-
- private boolean ischeck;
-
-
- public TransactionExecuterBImpl(boolean ischeck) {
- this.ischeck = ischeck;
- }
-
-
- @Override
- public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
- if (ischeck) {
- return LocalTransactionState.UNKNOW;
- }
- return LocalTransactionState.COMMIT_MESSAGE;
- }
-}
-
-
-class TransactionCheckListenerBImpl implements TransactionCheckListener {
- private boolean ischeckffalse;
- private StatsBenchmarkTProducer statsBenchmarkTProducer;
-
-
- public TransactionCheckListenerBImpl(boolean ischeckffalse,
- StatsBenchmarkTProducer statsBenchmarkTProducer) {
- this.ischeckffalse = ischeckffalse;
- this.statsBenchmarkTProducer = statsBenchmarkTProducer;
- }
-
-
- @Override
- public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
- statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet();
- if (ischeckffalse) {
-
- return LocalTransactionState.ROLLBACK_MESSAGE;
- }
-
- return LocalTransactionState.COMMIT_MESSAGE;
- }
-}
-
-
-class StatsBenchmarkTProducer {
- private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
-
- private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
-
- private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L);
-
- private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L);
-
- private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L);
-
- private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
-
- private final AtomicLong checkRequestSuccessCount = new AtomicLong(0L);
-
-
- public Long[] createSnapshot() {
- Long[] snap = new Long[]{
- System.currentTimeMillis(),
- this.sendRequestSuccessCount.get(),
- this.sendRequestFailedCount.get(),
- this.receiveResponseSuccessCount.get(),
- this.receiveResponseFailedCount.get(),
- this.sendMessageSuccessTimeTotal.get(),
- this.checkRequestSuccessCount.get()};
-
- return snap;
- }
-
-
- public AtomicLong getSendRequestSuccessCount() {
- return sendRequestSuccessCount;
- }
-
-
- public AtomicLong getSendRequestFailedCount() {
- return sendRequestFailedCount;
- }
-
-
- public AtomicLong getReceiveResponseSuccessCount() {
- return receiveResponseSuccessCount;
- }
-
-
- public AtomicLong getReceiveResponseFailedCount() {
- return receiveResponseFailedCount;
- }
-
-
- public AtomicLong getSendMessageSuccessTimeTotal() {
- return sendMessageSuccessTimeTotal;
- }
-
-
- public AtomicLong getSendMessageMaxRT() {
- return sendMessageMaxRT;
- }
-
-
- public AtomicLong getCheckRequestSuccessCount() {
- return checkRequestSuccessCount;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/broadcast/PushConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/broadcast/PushConsumer.java b/example/src/main/java/com/alibaba/rocketmq/example/broadcast/PushConsumer.java
deleted file mode 100644
index 6cc6238..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/broadcast/PushConsumer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.broadcast;
-
-import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-
-import java.util.List;
-
-public class PushConsumer {
-
- public static void main(String[] args) throws InterruptedException, MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
-
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-
- consumer.setMessageModel(MessageModel.BROADCASTING);
-
- consumer.subscribe("TopicTest", "TagA || TagC || TagD");
-
- consumer.registerMessageListener(new MessageListenerConcurrently() {
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
-
- consumer.start();
- System.out.printf("Broadcast Consumer Started.%n");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java b/example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java
deleted file mode 100644
index 104e6d9..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/filter/Consumer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.filter;
-
-import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.message.MessageExt;
-
-import java.util.List;
-
-
-public class Consumer {
-
- public static void main(String[] args) throws InterruptedException, MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
-
- String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
- consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl",
- filterCode);
-
- consumer.registerMessageListener(new MessageListenerConcurrently() {
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
-
- consumer.start();
-
- System.out.printf("Consumer Started.%n");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/filter/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/filter/Producer.java b/example/src/main/java/com/alibaba/rocketmq/example/filter/Producer.java
deleted file mode 100644
index 04251fa..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/filter/Producer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.filter;
-
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
-import com.alibaba.rocketmq.client.producer.SendResult;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-
-public class Producer {
- public static void main(String[] args) throws MQClientException, InterruptedException {
- DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
- producer.start();
-
- try {
- for (int i = 0; i < 6000000; i++) {
- Message msg = new Message("TopicFilter7",
- "TagA",
- "OrderID001",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
-
- msg.putUserProperty("SequenceId", String.valueOf(i));
- SendResult sendResult = producer.send(msg);
- System.out.printf("%s%n", sendResult);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- producer.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/operation/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/operation/Consumer.java b/example/src/main/java/com/alibaba/rocketmq/example/operation/Consumer.java
deleted file mode 100644
index f6ba067..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/operation/Consumer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.operation;
-
-import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-public class Consumer {
-
- public static void main(String[] args) throws InterruptedException, MQClientException {
- CommandLine commandLine = buildCommandline(args);
- if (commandLine != null) {
- String group = commandLine.getOptionValue('g');
- String topic = commandLine.getOptionValue('t');
- String subscription = commandLine.getOptionValue('s');
- final String returnFailedHalf = commandLine.getOptionValue('f');
-
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
- consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
-
- consumer.subscribe(topic, subscription);
-
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- AtomicLong consumeTimes = new AtomicLong(0);
-
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- long currentTimes = this.consumeTimes.incrementAndGet();
- System.out.printf("%-8d %s%n", currentTimes, msgs);
- if (Boolean.parseBoolean(returnFailedHalf)) {
- if ((currentTimes % 2) == 0) {
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
-
- consumer.start();
-
- System.out.printf("Consumer Started.%n");
- }
- }
-
- public static CommandLine buildCommandline(String[] args) {
- final Options options = new Options();
- Option opt = new Option("h", "help", false, "Print help");
- opt.setRequired(false);
- options.addOption(opt);
-
- opt = new Option("g", "consumerGroup", true, "Consumer Group Name");
- opt.setRequired(true);
- options.addOption(opt);
-
- opt = new Option("t", "topic", true, "Topic Name");
- opt.setRequired(true);
- options.addOption(opt);
-
- opt = new Option("s", "subscription", true, "subscription");
- opt.setRequired(false);
- options.addOption(opt);
-
- opt = new Option("f", "returnFailedHalf", true, "return failed result, for half message");
- opt.setRequired(true);
- options.addOption(opt);
-
- PosixParser parser = new PosixParser();
- HelpFormatter hf = new HelpFormatter();
- hf.setWidth(110);
- CommandLine commandLine = null;
- try {
- commandLine = parser.parse(options, args);
- if (commandLine.hasOption('h')) {
- hf.printHelp("producer", options, true);
- return null;
- }
- } catch (ParseException e) {
- hf.printHelp("producer", options, true);
- return null;
- }
-
- return commandLine;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/operation/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/operation/Producer.java b/example/src/main/java/com/alibaba/rocketmq/example/operation/Producer.java
deleted file mode 100644
index 816e3e8..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/operation/Producer.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.operation;
-
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
-import com.alibaba.rocketmq.client.producer.SendResult;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import org.apache.commons.cli.*;
-
-public class Producer {
-
- public static void main(String[] args) throws MQClientException, InterruptedException {
- CommandLine commandLine = buildCommandline(args);
- if (commandLine != null) {
- String group = commandLine.getOptionValue('g');
- String topic = commandLine.getOptionValue('t');
- String tags = commandLine.getOptionValue('a');
- String keys = commandLine.getOptionValue('k');
- String msgCount = commandLine.getOptionValue('c');
-
- DefaultMQProducer producer = new DefaultMQProducer(group);
- producer.setInstanceName(Long.toString(System.currentTimeMillis()));
-
- producer.start();
-
- for (int i = 0; i < Integer.parseInt(msgCount); i++) {
- try {
- Message msg = new Message(
- topic,
- tags,
- keys,
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
- SendResult sendResult = producer.send(msg);
- System.out.printf("%-8d %s%n", i, sendResult);
- } catch (Exception e) {
- e.printStackTrace();
- Thread.sleep(1000);
- }
- }
-
- producer.shutdown();
- }
- }
-
- public static CommandLine buildCommandline(String[] args) {
- final Options options = new Options();
- Option opt = new Option("h", "help", false, "Print help");
- opt.setRequired(false);
- options.addOption(opt);
-
- opt = new Option("g", "producerGroup", true, "Producer Group Name");
- opt.setRequired(true);
- options.addOption(opt);
-
- opt = new Option("t", "topic", true, "Topic Name");
- opt.setRequired(true);
- options.addOption(opt);
-
- opt = new Option("a", "tags", true, "Tags Name");
- opt.setRequired(true);
- options.addOption(opt);
-
- opt = new Option("k", "keys", true, "Keys Name");
- opt.setRequired(true);
- options.addOption(opt);
-
- opt = new Option("c", "msgCount", true, "Message Count");
- opt.setRequired(true);
- options.addOption(opt);
-
- PosixParser parser = new PosixParser();
- HelpFormatter hf = new HelpFormatter();
- hf.setWidth(110);
- CommandLine commandLine = null;
- try {
- commandLine = parser.parse(options, args);
- if (commandLine.hasOption('h')) {
- hf.printHelp("producer", options, true);
- return null;
- }
- } catch (ParseException e) {
- hf.printHelp("producer", options, true);
- return null;
- }
-
- return commandLine;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Consumer.java b/example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Consumer.java
deleted file mode 100644
index 7b5f657..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Consumer.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.ordermessage;
-
-import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.message.MessageExt;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-public class Consumer {
-
- public static void main(String[] args) throws MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
-
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-
- consumer.subscribe("TopicTest", "TagA || TagC || TagD");
-
- consumer.registerMessageListener(new MessageListenerOrderly() {
- AtomicLong consumeTimes = new AtomicLong(0);
-
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
- context.setAutoCommit(false);
- System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
- this.consumeTimes.incrementAndGet();
- if ((this.consumeTimes.get() % 2) == 0) {
- return ConsumeOrderlyStatus.SUCCESS;
- } else if ((this.consumeTimes.get() % 3) == 0) {
- return ConsumeOrderlyStatus.ROLLBACK;
- } else if ((this.consumeTimes.get() % 4) == 0) {
- return ConsumeOrderlyStatus.COMMIT;
- } else if ((this.consumeTimes.get() % 5) == 0) {
- context.setSuspendCurrentQueueTimeMillis(3000);
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
-
- return ConsumeOrderlyStatus.SUCCESS;
- }
- });
-
- consumer.start();
- System.out.printf("Consumer Started.%n");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Producer.java b/example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Producer.java
deleted file mode 100644
index 609aa62..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/ordermessage/Producer.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.ordermessage;
-
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
-import com.alibaba.rocketmq.client.producer.MQProducer;
-import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
-import com.alibaba.rocketmq.client.producer.SendResult;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
-import java.io.UnsupportedEncodingException;
-import java.util.List;
-
-public class Producer {
- public static void main(String[] args) throws UnsupportedEncodingException {
- try {
- MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
- producer.start();
-
- String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
- for (int i = 0; i < 100; i++) {
- int orderId = i % 10;
- Message msg =
- new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
- SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
- @Override
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- Integer id = (Integer) arg;
- int index = id % mqs.size();
- return mqs.get(index);
- }
- }, orderId);
-
- System.out.printf("%s%n", sendResult);
- }
-
- producer.shutdown();
- } catch (MQClientException e) {
- e.printStackTrace();
- } catch (RemotingException e) {
- e.printStackTrace();
- } catch (MQBrokerException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/quickstart/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/com/alibaba/rocketmq/example/quickstart/Consumer.java
deleted file mode 100644
index adac497..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/quickstart/Consumer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.quickstart;
-
-import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.message.MessageExt;
-
-import java.util.List;
-
-public class Consumer {
-
- public static void main(String[] args) throws InterruptedException, MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
-
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-
- consumer.subscribe("TopicTest", "*");
-
- consumer.registerMessageListener(new MessageListenerConcurrently() {
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
-
- consumer.start();
- System.out.printf("Consumer Started.%n");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/quickstart/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/quickstart/Producer.java b/example/src/main/java/com/alibaba/rocketmq/example/quickstart/Producer.java
deleted file mode 100644
index fb5dbea..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/quickstart/Producer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.quickstart;
-
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
-import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
-import com.alibaba.rocketmq.client.producer.LocalTransactionState;
-import com.alibaba.rocketmq.client.producer.SendResult;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-
-public class Producer {
- public static void main(String[] args) throws MQClientException, InterruptedException {
- DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
- producer.start();
-
- for (int i = 0; i < 1000; i++) {
- try {
- Message msg = new Message("TopicTest",
- "TagA",
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
- );
- SendResult sendResult = producer.send(msg);
- LocalTransactionExecuter tranExecuter = new LocalTransactionExecuter() {
- @Override
- public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
- return null;
- }
- };
- System.out.printf("%s%n", sendResult);
- } catch (Exception e) {
- e.printStackTrace();
- Thread.sleep(1000);
- }
- }
- producer.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/AsyncProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/AsyncProducer.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/AsyncProducer.java
deleted file mode 100644
index 1a8f07e..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/simple/AsyncProducer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.simple;
-
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
-import com.alibaba.rocketmq.client.producer.SendCallback;
-import com.alibaba.rocketmq.client.producer.SendResult;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-
-import java.io.UnsupportedEncodingException;
-
-
-public class AsyncProducer {
- public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
-
- DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
- producer.start();
- producer.setRetryTimesWhenSendAsyncFailed(0);
-
- for (int i = 0; i < 10000000; i++) {
- try {
- final int index = i;
- Message msg = new Message("Jodie_topic_1023",
- "TagA",
- "OrderID188",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
- producer.send(msg, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
- }
-
- @Override
- public void onException(Throwable e) {
- System.out.printf("%-10d Exception %s %n", index, e);
- e.printStackTrace();
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- producer.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/CachedQueue.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/CachedQueue.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/CachedQueue.java
deleted file mode 100644
index 7beb064..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/simple/CachedQueue.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.example.simple;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-
-import java.util.TreeMap;
-
-
-public class CachedQueue {
- private final TreeMap<Long, MessageExt> msgCachedTable = new TreeMap<Long, MessageExt>();
-
-
- public TreeMap<Long, MessageExt> getMsgCachedTable() {
- return msgCachedTable;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java
deleted file mode 100644
index e0010d4..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.simple;
-
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
-import com.alibaba.rocketmq.client.producer.SendResult;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-
-
-public class Producer {
- public static void main(String[] args) throws MQClientException, InterruptedException {
-
- DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
-
- producer.start();
-
- for (int i = 0; i < 10000000; i++)
- try {
- {
- Message msg = new Message("TopicTest",
- "TagA",
- "OrderID188",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
- SendResult sendResult = producer.send(msg);
- System.out.printf("%s%n", sendResult);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- producer.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumer.java
deleted file mode 100644
index 6245769..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumer.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.simple;
-
-import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
-import com.alibaba.rocketmq.client.consumer.PullResult;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-public class PullConsumer {
- private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
-
-
- public static void main(String[] args) throws MQClientException {
- DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
-
- consumer.start();
-
- Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
- for (MessageQueue mq : mqs) {
- System.out.printf("Consume from the queue: " + mq + "%n");
- SINGLE_MQ:
- while (true) {
- try {
- PullResult pullResult =
- consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
- System.out.printf("%s%n", pullResult);
- putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
- switch (pullResult.getPullStatus()) {
- case FOUND:
- break;
- case NO_MATCHED_MSG:
- break;
- case NO_NEW_MSG:
- break SINGLE_MQ;
- case OFFSET_ILLEGAL:
- break;
- default:
- break;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- consumer.shutdown();
- }
-
- private static long getMessageQueueOffset(MessageQueue mq) {
- Long offset = OFFSE_TABLE.get(mq);
- if (offset != null)
- return offset;
-
- return 0;
- }
-
- private static void putMessageQueueOffset(MessageQueue mq, long offset) {
- OFFSE_TABLE.put(mq, offset);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumerTest.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumerTest.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumerTest.java
deleted file mode 100644
index 25d668c..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/simple/PullConsumerTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.simple;
-
-import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
-import com.alibaba.rocketmq.client.consumer.PullResult;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-public class PullConsumerTest {
- public static void main(String[] args) throws MQClientException {
- DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
- consumer.start();
-
- try {
- MessageQueue mq = new MessageQueue();
- mq.setQueueId(0);
- mq.setTopic("TopicTest3");
- mq.setBrokerName("vivedeMacBook-Pro.local");
-
- long offset = 26;
-
- long beginTime = System.currentTimeMillis();
- PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32);
- System.out.printf("%s%n", System.currentTimeMillis() - beginTime);
- System.out.printf("%s%n", pullResult);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- consumer.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/PullScheduleService.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/PullScheduleService.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/PullScheduleService.java
deleted file mode 100644
index 0c86cf8..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/simple/PullScheduleService.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.example.simple;
-
-import com.alibaba.rocketmq.client.consumer.MQPullConsumer;
-import com.alibaba.rocketmq.client.consumer.MQPullConsumerScheduleService;
-import com.alibaba.rocketmq.client.consumer.PullResult;
-import com.alibaba.rocketmq.client.consumer.PullTaskCallback;
-import com.alibaba.rocketmq.client.consumer.PullTaskContext;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-
-
-public class PullScheduleService {
-
- public static void main(String[] args) throws MQClientException {
- final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
-
- scheduleService.setMessageModel(MessageModel.CLUSTERING);
- scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() {
-
- @Override
- public void doPullTask(MessageQueue mq, PullTaskContext context) {
- MQPullConsumer consumer = context.getPullConsumer();
- try {
-
- long offset = consumer.fetchConsumeOffset(mq, false);
- if (offset < 0)
- offset = 0;
-
- PullResult pullResult = consumer.pull(mq, "*", offset, 32);
- System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);
- switch (pullResult.getPullStatus()) {
- case FOUND:
- break;
- case NO_MATCHED_MSG:
- break;
- case NO_NEW_MSG:
- case OFFSET_ILLEGAL:
- break;
- default:
- break;
- }
- consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
-
-
- context.setPullNextDelayTimeMillis(100);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
-
- scheduleService.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java
deleted file mode 100644
index 5628ced..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.simple;
-
-import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.message.MessageExt;
-
-import java.util.List;
-
-
-public class PushConsumer {
-
- public static void main(String[] args) throws InterruptedException, MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
- consumer.subscribe("Jodie_topic_1023", "*");
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- consumer.registerMessageListener(new MessageListenerConcurrently() {
-
- /**
-
- */
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.out.printf("Consumer Started.%n");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/RandomAsyncCommit.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/RandomAsyncCommit.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/RandomAsyncCommit.java
deleted file mode 100644
index fc6bacd..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/simple/RandomAsyncCommit.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.example.simple;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-public class RandomAsyncCommit {
- private final ConcurrentHashMap<MessageQueue, CachedQueue> mqCachedTable =
- new ConcurrentHashMap<MessageQueue, CachedQueue>();
-
-
- public void putMessages(final MessageQueue mq, final List<MessageExt> msgs) {
- CachedQueue cachedQueue = this.mqCachedTable.get(mq);
- if (null == cachedQueue) {
- cachedQueue = new CachedQueue();
- this.mqCachedTable.put(mq, cachedQueue);
- }
- for (MessageExt msg : msgs) {
- cachedQueue.getMsgCachedTable().put(msg.getQueueOffset(), msg);
- }
- }
-
-
- public void removeMessage(final MessageQueue mq, long offset) {
- CachedQueue cachedQueue = this.mqCachedTable.get(mq);
- if (null != cachedQueue) {
- cachedQueue.getMsgCachedTable().remove(offset);
- }
- }
-
-
- public long commitableOffset(final MessageQueue mq) {
- CachedQueue cachedQueue = this.mqCachedTable.get(mq);
- if (null != cachedQueue) {
- return cachedQueue.getMsgCachedTable().firstKey();
- }
-
- return -1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/src/main/java/com/alibaba/rocketmq/example/simple/TestProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/com/alibaba/rocketmq/example/simple/TestProducer.java b/example/src/main/java/com/alibaba/rocketmq/example/simple/TestProducer.java
deleted file mode 100644
index 68347a6..0000000
--- a/example/src/main/java/com/alibaba/rocketmq/example/simple/TestProducer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.example.simple;
-
-import com.alibaba.rocketmq.client.QueryResult;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
-import com.alibaba.rocketmq.client.producer.SendResult;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-
-public class TestProducer {
- public static void main(String[] args) throws MQClientException, InterruptedException {
- DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
- producer.start();
-
- for (int i = 0; i < 1; i++)
- try {
- {
- Message msg = new Message("TopicTest1",
- "TagA",
- "key113",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
- SendResult sendResult = producer.send(msg);
- System.out.printf("%s%n", sendResult);
-
- QueryResult queryMessage =
- producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis());
- for (MessageExt m : queryMessage.getMessageList()) {
- System.out.printf("%s%n", m);
- }
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- producer.shutdown();
- }
-}