You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/01 07:23:40 UTC

[42/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-219] Add batch example, closes apache/incubator-rocketmq#112

[ROCKETMQ-219] Add batch example, closes apache/incubator-rocketmq#112


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/f4be3bb9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/f4be3bb9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/f4be3bb9

Branch: refs/heads/develop
Commit: f4be3bb929c99b9066759c9f02dc6d24045088cb
Parents: 605103e
Author: dongeforever <zh...@yeah.net>
Authored: Thu Jun 8 11:13:24 2017 +0800
Committer: dongeforever <do...@apache.org>
Committed: Thu Jun 22 10:29:41 2017 +0800

----------------------------------------------------------------------
 .../example/batch/SimpleBatchProducer.java      | 42 +++++++++
 .../example/batch/SplitBatchProducer.java       | 97 ++++++++++++++++++++
 2 files changed, 139 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/f4be3bb9/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java
new file mode 100644
index 0000000..a8609e7
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.example.batch;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+
+public class SimpleBatchProducer {
+
+
+    public static void main(String[] args) throws  Exception {
+        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
+        producer.start();
+
+        //If you just send messages of no more than 1MiB at a time, it is easy to use batch
+        //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
+        String topic = "BatchTest";
+        List<Message> messages = new ArrayList<>();
+        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
+        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
+        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
+
+        producer.send(messages);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/f4be3bb9/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java
new file mode 100644
index 0000000..8809a11
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.example.batch;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+
+public class SplitBatchProducer {
+
+    public static void main(String[] args) throws  Exception {
+
+        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
+        producer.start();
+
+        //large batch
+        String topic = "BatchTest";
+        List<Message> messages = new ArrayList<>(100 * 1000);
+        for (int i = 0; i < 100 * 1000; i++) {
+            messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
+        }
+
+        //split the large batch into small ones:
+        ListSplitter splitter = new ListSplitter(messages);
+        while (splitter.hasNext()) {
+            List<Message>  listItem = splitter.next();
+            producer.send(listItem);
+        }
+    }
+
+}
+
+
+class ListSplitter implements Iterator<List<Message>> {
+    private int sizeLimit = 1000 * 1000;
+    private final List<Message> messages;
+    private int currIndex;
+    public ListSplitter(List<Message> messages) {
+        this.messages = messages;
+    }
+    @Override public boolean hasNext() {
+        return currIndex < messages.size();
+    }
+    @Override public List<Message> next() {
+        int nextIndex = currIndex;
+        int totalSize = 0;
+        for (; nextIndex < messages.size(); nextIndex++) {
+            Message message = messages.get(nextIndex);
+            int tmpSize = message.getTopic().length() + message.getBody().length;
+            Map<String, String> properties = message.getProperties();
+            for (Map.Entry<String, String> entry : properties.entrySet()) {
+                tmpSize += entry.getKey().length() + entry.getValue().length();
+            }
+            tmpSize = tmpSize + 20; //for log overhead
+            if (tmpSize > sizeLimit) {
+                //it is unexpected that single message exceeds the sizeLimit
+                //here just let it go, otherwise it will block the splitting process
+                if (nextIndex - currIndex == 0) {
+                    //if the next sublist has no element, add this one and then break, otherwise just break
+                    nextIndex++;
+                }
+                break;
+            }
+            if (tmpSize + totalSize > sizeLimit) {
+                break;
+            } else {
+                totalSize += tmpSize;
+            }
+
+        }
+        List<Message> subList = messages.subList(currIndex, nextIndex);
+        currIndex = nextIndex;
+        return subList;
+    }
+
+    @Override public void remove() {
+        throw new UnsupportedOperationException("Not allowed to remove");
+    }
+}