You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2022/06/21 03:55:13 UTC

[rocketmq-streams] 14/16: remove mini batch

This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch snapshot-1.0.4
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git

commit 24b5bf902a1f1faa15b1eabd0df6c44d0d9f9d07
Author: 维章 <un...@gmail.com>
AuthorDate: Mon May 30 15:39:02 2022 +0800

    remove mini batch
---
 .../apache/rocketmq/streams/sink/RocketMQSink.java |  30 ++--
 .../window/minibatch/MiniBatchMsgCache.java        |  58 -------
 .../rocketmq/streams/window/model/WindowCache.java | 182 +++++++++++++--------
 .../window/operator/AbstractShuffleWindow.java     |   1 -
 .../streams/window/shuffle/ShuffleChannel.java     |   2 -
 5 files changed, 128 insertions(+), 145 deletions(-)

diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
index 1de92e31..d702053e 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
@@ -241,30 +241,22 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
     }
 
     @Override
-    public List<ISplit<?, ?>> getSplitList() {
+    public List<ISplit<?,?>> getSplitList() {
         initProducer();
-        List<ISplit<?, ?>> messageQueues = new ArrayList<>();
-        List<MessageQueue> metaqQueueSet = new ArrayList<>();
+        List<ISplit<?,?>> messageQueues = new ArrayList<>();
         try {
 
-            if (messageQueues == null || messageQueues.size() == 0) {
-                try {
-                    metaqQueueSet = producer.fetchPublishMessageQueues(topic);
-                }catch (Exception e) {
-                    producer.send(new Message(topic, "test", "test".getBytes(StandardCharsets.UTF_8)));
-                    metaqQueueSet = producer.fetchPublishMessageQueues(topic);
-                }
-                List<ISplit<?, ?>> queueList = new ArrayList<>();
-                for (MessageQueue queue : metaqQueueSet) {
-                    RocketMQMessageQueue rocketMQMessageQueue = new RocketMQMessageQueue(queue);
-                    queueList.add(rocketMQMessageQueue);
+            List<MessageQueue> messageQueueSet = producer.fetchPublishMessageQueues(topic);
+            List<ISplit<?,?>> queueList = new ArrayList<>();
+            for (MessageQueue queue : messageQueueSet) {
+                RocketMQMessageQueue rocketMQMessageQueue = new RocketMQMessageQueue(queue);
+                queueList.add(rocketMQMessageQueue);
 
-                }
-                queueList.sort((Comparator<ISplit>) Comparable::compareTo);
-                messageQueues = queueList;
             }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+            queueList.sort((Comparator<ISplit>) Comparable::compareTo);
+            messageQueues = queueList;
+        } catch (MQClientException e) {
+            return messageQueues;
         }
 
         return messageQueues;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java
deleted file mode 100644
index a36309e2..00000000
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.window.minibatch;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
-import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache;
-import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
-import org.apache.rocketmq.streams.common.channel.split.ISplit;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.topology.shuffle.IShuffleKeyGenerator;
-import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
-
-public class MiniBatchMsgCache extends AbstractMultiSplitMessageCache<Pair<ISplit,IMessage>> {
-    public static String SHUFFLE_KEY="shuffle_key";
-
-
-
-    protected transient IShuffleKeyGenerator shuffleKeyGenerator;
-    protected transient AbstractShuffleWindow window;
-
-
-
-
-    public MiniBatchMsgCache(
-        IMessageFlushCallBack<Pair<ISplit,IMessage>> flushCallBack, IShuffleKeyGenerator shuffleKeyGenerator,
-        AbstractShuffleWindow window) {
-        super(flushCallBack);
-        this.shuffleKeyGenerator=shuffleKeyGenerator;
-        this.window=window;
-    }
-
-
-    @Override protected String createSplitId(Pair<ISplit, IMessage> msg) {
-        return msg.getLeft().getQueueId();
-    }
-
-    @Override protected MessageCache createMessageCache() {
-        ShuffleMessageCache messageCache=new ShuffleMessageCache(this.flushCallBack);
-        messageCache.setWindow(window);
-        messageCache.setShuffleKeyGenerator(shuffleKeyGenerator);
-        return messageCache;
-    }
-}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java
index c53a50ff..8ce1232f 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java
@@ -19,9 +19,14 @@ package org.apache.rocketmq.streams.window.model;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.logging.Log;
@@ -29,30 +34,26 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
+import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.component.ComponentCreator;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.context.Message;
 import org.apache.rocketmq.streams.common.topology.model.IWindow;
-import org.apache.rocketmq.streams.common.topology.shuffle.IShuffleKeyGenerator;
 import org.apache.rocketmq.streams.common.utils.CompressUtil;
-import org.apache.rocketmq.streams.window.minibatch.MiniBatchMsgCache;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.rocketmq.streams.window.debug.DebugWriter;
 import org.apache.rocketmq.streams.window.shuffle.ShuffleChannel;
-import org.apache.rocketmq.streams.window.util.ShuffleUtil;
 
 /**
  * 缓存数据,flush时,刷新完成数据落盘
  */
-public abstract class WindowCache extends
-    AbstractSink implements IWindow.IWindowCheckpoint {
-
+public abstract class WindowCache extends AbstractSink implements IWindow.IWindowCheckpoint {
     private static final Log LOG = LogFactory.getLog(WindowCache.class);
 
-    public static final String SPLIT_SIGN = "##";
 
     public static final String IS_COMPRESSION_MSG = "_is_compress_msg";
     public static final String COMPRESSION_MSG_DATA = "_compress_msg";
-    public static final String MSG_FROM_SOURCE = "msg_from_source";
     public static final String ORIGIN_OFFSET = "origin_offset";
 
     public static final String ORIGIN_QUEUE_ID = "origin_queue_id";
@@ -66,94 +67,105 @@ public abstract class WindowCache extends
     public static final String SHUFFLE_KEY = "SHUFFLE_KEY";
 
     public static final String ORIGIN_MESSAGE_TRACE_ID = "origin_request_id";
+
     protected transient boolean isWindowTest = false;
+
     protected transient AtomicLong COUNT = new AtomicLong(0);
     /**
      * 分片转发channel
      */
     protected transient ShuffleChannel shuffleChannel;
 
-    public void initMiniBatch() {
-        shuffleMsgCache  = new MiniBatchMsgCache(new WindowCache.MutilMsgMergerAndCompressFlushCallBack(),(IShuffleKeyGenerator) shuffleChannel.getWindow(),shuffleChannel.getWindow());
-
-
-        shuffleMsgCache.openAutoFlush();
-    }
-
-//    protected transient AtomicLong insertCount=new AtomicLong(0);
-//    protected transient AtomicLong shuffleCount=new AtomicLong(0);
-//    protected transient AtomicLong SUM=new AtomicLong(0);
-
-    protected class MutilMsgMergerAndCompressFlushCallBack implements IMessageFlushCallBack<Pair<ISplit, IMessage>> {
+    protected class ShuffleMsgCache extends AbstractMultiSplitMessageCache<Pair<ISplit, JSONObject>> {
+
+        public ShuffleMsgCache() {
+            super(new IMessageFlushCallBack<Pair<ISplit, JSONObject>>() {
+                @Override
+                public boolean flushMessage(List<Pair<ISplit, JSONObject>> messages) {
+                    if (messages == null || messages.size() == 0) {
+                        return true;
+                    }
+                    ISplit split = messages.get(0).getLeft();
+                    JSONObject jsonObject = messages.get(0).getRight();
+                    JSONArray allMsgs = shuffleChannel.getMsgs(jsonObject);
+                    for (int i = 1; i < messages.size(); i++) {
+                        Pair<ISplit, JSONObject> pair = messages.get(i);
+                        JSONObject msg = pair.getRight();
+                        JSONArray jsonArray = shuffleChannel.getMsgs(msg);
+                        if (jsonArray != null) {
+                            allMsgs.addAll(jsonArray);
+                        }
+                    }
+                    JSONObject zipJsonObject = new JSONObject();
+                    zipJsonObject.put(COMPRESSION_MSG_DATA, CompressUtil.gZip(jsonObject.toJSONString()));
+                    zipJsonObject.put(IS_COMPRESSION_MSG, true);
+                    shuffleChannel.getProducer().batchAdd(new Message(zipJsonObject), split);
+                    shuffleChannel.getProducer().flush(split.getQueueId());
+                    return true;
+                }
+            });
+        }
 
         @Override
-        public boolean flushMessage(List<Pair<ISplit, IMessage>> messages) {
-            if (messages == null || messages.size() == 0) {
-                return true;
-            }
-            long start=System.currentTimeMillis();
-            ISplit split = messages.get(0).getLeft();
-            JSONArray allMsgs =new JSONArray();
-            long sum=0;
-            for (int i = 0; i < messages.size(); i++) {
-                Pair<ISplit, IMessage> pair = messages.get(i);
-                IMessage message = pair.getRight();
-                allMsgs.add(message.getMessageBody());
-               // sum=SUM.addAndGet(message.getMessageBody().getLong("total"));
-            }
-            //System.out.println("before shuffle sum is "+sum);
-            JSONObject jsonObject=shuffleChannel.createMsg(allMsgs,split);
-//            JSONObject zipJsonObject = new JSONObject();
-//            zipJsonObject.put(COMPRESSION_MSG_DATA, CompressUtil.gZip(jsonObject.toJSONString()));
-//            zipJsonObject.put(IS_COMPRESSION_MSG, true);
-            shuffleChannel.getProducer().batchAdd(new Message(jsonObject), split);
-            shuffleChannel.getProducer().flush(split.getQueueId());
-            long cost=System.currentTimeMillis()-start;
-           // shuffleCount.addAndGet(cost);
-            return true;
+        protected String createSplitId(Pair<ISplit, JSONObject> msg) {
+            return msg.getLeft().getQueueId();
         }
     }
 
-
-    protected transient MiniBatchMsgCache shuffleMsgCache ;
+    protected transient ShuffleMsgCache shuffleMsgCache = new ShuffleMsgCache();
 
     @Override
     protected boolean initConfigurable() {
-
+        shuffleMsgCache = new ShuffleMsgCache();
+        shuffleMsgCache.setBatchSize(1000);
+        shuffleMsgCache.setAutoFlushSize(100);
+        shuffleMsgCache.setAutoFlushTimeGap(1000);
+        shuffleMsgCache.openAutoFlush();
         isWindowTest = ComponentCreator.getPropertyBooleanValue("window.fire.isTest");
         return super.initConfigurable();
     }
 
     @Override
     protected boolean batchInsert(List<IMessage> messageList) {
-        long start=System.currentTimeMillis();
-        for (IMessage msg : messageList) {
-            String shuffleKey = generateShuffleKey(msg);
-            IMessage message= ShuffleUtil.createShuffleMsg(msg,shuffleKey);
-            if(message==null){
-                continue;
+        Map<Integer, JSONArray> shuffleMap = translateToShuffleMap(messageList);
+        if (shuffleMap != null && shuffleMap.size() > 0) {
+            Set<String> splitIds = new HashSet<>();
+
+            for (Map.Entry<Integer, JSONArray> entry : shuffleMap.entrySet()) {
+                ISplit split = shuffleChannel.getSplit(entry.getKey());
+                JSONObject msg = shuffleChannel.createMsg(entry.getValue(), split);
+
+                shuffleMsgCache.addCache(new MutablePair<>(split, msg));
+                List<IMessage> messages = new ArrayList<>();
+                splitIds.add(split.getQueueId());
+
+                if (DebugWriter.getDebugWriter(shuffleChannel.getWindow().getConfigureName()).isOpenDebug()) {
+                    JSONArray jsonArray = entry.getValue();
+                    for (int i = 0; i < jsonArray.size(); i++) {
+                        Message message = new Message(jsonArray.getJSONObject(i));
+                        message.getHeader().setQueueId(jsonArray.getJSONObject(i).getString(ORIGIN_QUEUE_ID));
+                        message.getHeader().setOffset(jsonArray.getJSONObject(i).getLong(ORIGIN_OFFSET));
+                        messages.add(message);
+
+                    }
+                    DebugWriter.getDebugWriter(shuffleChannel.getWindow().getConfigureName()).writeWindowCache(shuffleChannel.getWindow(), messages, split.getQueueId());
+                }
+
             }
-            addPropertyToMessage(msg, message.getMessageBody());
-            Integer index = shuffleChannel.hash(shuffleKey);
-            ISplit split = shuffleChannel.getSplit(index);
-            shuffleMsgCache.addCache(new MutablePair(split, message));
+
         }
         if (isWindowTest) {
             long count = COUNT.addAndGet(messageList.size());
             System.out.println(shuffleChannel.getWindow().getConfigureName() + " send shuffle msg count is " + count);
             shuffleMsgCache.flush();
         }
-        long cost=System.currentTimeMillis()-start;
-        //shuffleCount.addAndGet(cost);
         return true;
     }
 
     @Override
     public void finishBatchMsg(BatchFinishMessage batchFinishMessage) {
-        long start=System.currentTimeMillis();
         if (shuffleChannel != null && shuffleChannel.getProducer() != null) {
-            this.flush();
-            shuffleMsgCache.flush();
+            shuffleChannel.getProducer().flush();
             for (ISplit split : shuffleChannel.getQueueList()) {
                 IMessage message = batchFinishMessage.getMsg().deepCopy();
                 message.getMessageBody().put(ORIGIN_QUEUE_ID, message.getHeader().getQueueId());
@@ -161,11 +173,51 @@ public abstract class WindowCache extends
             }
             shuffleChannel.getProducer().flush();
         }
-       // System.out.println("insert cost is "+insertCount.get()+" shuffle cost is "+shuffleCount.get()+"  finish batch cost is "+(System.currentTimeMillis()-start));
 
     }
 
+    /**
+     * 对接收的消息按照不同shuffle key进行分组
+     *
+     * @param messages
+     * @return
+     */
+    protected Map<Integer, JSONArray> translateToShuffleMap(List<IMessage> messages) {
+        Map<Integer, JSONArray> shuffleMap = new HashMap<>();
+        for (IMessage msg : messages) {
+            if (msg.getHeader().isSystemMessage()) {
+                continue;
+            }
+            String shuffleKey = generateShuffleKey(msg);
+            if (StringUtil.isEmpty(shuffleKey)) {
+                shuffleKey = "<null>";
+                LOG.debug("there is no group by value in message! " + msg.getMessageBody().toString());
+                //continue;
+            }
+            Integer index = shuffleChannel.hash(shuffleKey);
+            JSONObject body = msg.getMessageBody();
+            String offset = msg.getHeader().getOffset();
+            String queueId = msg.getHeader().getQueueId();
+
+            body.put(ORIGIN_OFFSET, offset);
+            body.put(ORIGIN_QUEUE_ID, queueId);
+            body.put(ORIGIN_QUEUE_IS_LONG, msg.getHeader().getMessageOffset().isLongOfMainOffset());
+            body.put(ORIGIN_MESSAGE_HEADER, JSONObject.toJSONString(msg.getHeader()));
+            body.put(ORIGIN_MESSAGE_TRACE_ID, msg.getHeader().getTraceId());
+            body.put(SHUFFLE_KEY, shuffleKey);
+
+            addPropertyToMessage(msg, body);
+
+            JSONArray jsonArray = shuffleMap.get(index);
+            if (jsonArray == null) {
+                jsonArray = new JSONArray();
+                shuffleMap.put(index, jsonArray);
+            }
+            jsonArray.add(body);
 
+        }
+        return shuffleMap;
+    }
 
     /**
      * 根据message生成shuffle key
@@ -195,8 +247,8 @@ public abstract class WindowCache extends
         return shuffleChannel;
     }
 
-    public MiniBatchMsgCache getShuffleMsgCache() {
-        return this.shuffleMsgCache;
+    public ShuffleMsgCache getShuffleMsgCache() {
+        return shuffleMsgCache;
     }
 
     public void setShuffleChannel(ShuffleChannel shuffleChannel) {
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
index 7c78f478..ea3b923a 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
@@ -86,7 +86,6 @@ public abstract class AbstractShuffleWindow extends AbstractWindow {
                     this.shuffleChannel.init();
                     windowCache.setBatchSize(5000);
                     windowCache.setShuffleChannel(shuffleChannel);
-                    windowCache.initMiniBatch();
                     shuffleChannel.startChannel();
                     hasCreated.set(true);
                 }
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index d834af41..107f1ce5 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -47,7 +47,6 @@ import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.common.utils.TraceUtil;
 import org.apache.rocketmq.streams.window.debug.DebugWriter;
-import org.apache.rocketmq.streams.window.minibatch.MiniBatchMsgCache;
 import org.apache.rocketmq.streams.window.model.WindowCache;
 import org.apache.rocketmq.streams.window.model.WindowInstance;
 import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
@@ -61,7 +60,6 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.rocketmq.streams.window.model.WindowCache.ORIGIN_MESSAGE_TRACE_ID;