You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2022/06/21 03:55:13 UTC
[rocketmq-streams] 14/16: remove mini batch
This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch snapshot-1.0.4
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit 24b5bf902a1f1faa15b1eabd0df6c44d0d9f9d07
Author: 维章 <un...@gmail.com>
AuthorDate: Mon May 30 15:39:02 2022 +0800
remove mini batch
---
.../apache/rocketmq/streams/sink/RocketMQSink.java | 30 ++--
.../window/minibatch/MiniBatchMsgCache.java | 58 -------
.../rocketmq/streams/window/model/WindowCache.java | 182 +++++++++++++--------
.../window/operator/AbstractShuffleWindow.java | 1 -
.../streams/window/shuffle/ShuffleChannel.java | 2 -
5 files changed, 128 insertions(+), 145 deletions(-)
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
index 1de92e31..d702053e 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
@@ -241,30 +241,22 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
}
@Override
- public List<ISplit<?, ?>> getSplitList() {
+ public List<ISplit<?,?>> getSplitList() {
initProducer();
- List<ISplit<?, ?>> messageQueues = new ArrayList<>();
- List<MessageQueue> metaqQueueSet = new ArrayList<>();
+ List<ISplit<?,?>> messageQueues = new ArrayList<>();
try {
- if (messageQueues == null || messageQueues.size() == 0) {
- try {
- metaqQueueSet = producer.fetchPublishMessageQueues(topic);
- }catch (Exception e) {
- producer.send(new Message(topic, "test", "test".getBytes(StandardCharsets.UTF_8)));
- metaqQueueSet = producer.fetchPublishMessageQueues(topic);
- }
- List<ISplit<?, ?>> queueList = new ArrayList<>();
- for (MessageQueue queue : metaqQueueSet) {
- RocketMQMessageQueue rocketMQMessageQueue = new RocketMQMessageQueue(queue);
- queueList.add(rocketMQMessageQueue);
+ List<MessageQueue> messageQueueSet = producer.fetchPublishMessageQueues(topic);
+ List<ISplit<?,?>> queueList = new ArrayList<>();
+ for (MessageQueue queue : messageQueueSet) {
+ RocketMQMessageQueue rocketMQMessageQueue = new RocketMQMessageQueue(queue);
+ queueList.add(rocketMQMessageQueue);
- }
- queueList.sort((Comparator<ISplit>) Comparable::compareTo);
- messageQueues = queueList;
}
- } catch (Exception e) {
- throw new RuntimeException(e);
+ queueList.sort((Comparator<ISplit>) Comparable::compareTo);
+ messageQueues = queueList;
+ } catch (MQClientException e) {
+ return messageQueues;
}
return messageQueues;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java
deleted file mode 100644
index a36309e2..00000000
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.window.minibatch;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
-import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache;
-import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
-import org.apache.rocketmq.streams.common.channel.split.ISplit;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.topology.shuffle.IShuffleKeyGenerator;
-import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
-
-public class MiniBatchMsgCache extends AbstractMultiSplitMessageCache<Pair<ISplit,IMessage>> {
- public static String SHUFFLE_KEY="shuffle_key";
-
-
-
- protected transient IShuffleKeyGenerator shuffleKeyGenerator;
- protected transient AbstractShuffleWindow window;
-
-
-
-
- public MiniBatchMsgCache(
- IMessageFlushCallBack<Pair<ISplit,IMessage>> flushCallBack, IShuffleKeyGenerator shuffleKeyGenerator,
- AbstractShuffleWindow window) {
- super(flushCallBack);
- this.shuffleKeyGenerator=shuffleKeyGenerator;
- this.window=window;
- }
-
-
- @Override protected String createSplitId(Pair<ISplit, IMessage> msg) {
- return msg.getLeft().getQueueId();
- }
-
- @Override protected MessageCache createMessageCache() {
- ShuffleMessageCache messageCache=new ShuffleMessageCache(this.flushCallBack);
- messageCache.setWindow(window);
- messageCache.setShuffleKeyGenerator(shuffleKeyGenerator);
- return messageCache;
- }
-}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java
index c53a50ff..8ce1232f 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java
@@ -19,9 +19,14 @@ package org.apache.rocketmq.streams.window.model;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.logging.Log;
@@ -29,30 +34,26 @@ import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
+import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.topology.model.IWindow;
-import org.apache.rocketmq.streams.common.topology.shuffle.IShuffleKeyGenerator;
import org.apache.rocketmq.streams.common.utils.CompressUtil;
-import org.apache.rocketmq.streams.window.minibatch.MiniBatchMsgCache;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.rocketmq.streams.window.debug.DebugWriter;
import org.apache.rocketmq.streams.window.shuffle.ShuffleChannel;
-import org.apache.rocketmq.streams.window.util.ShuffleUtil;
/**
* 缓存数据,flush时,刷新完成数据落盘
*/
-public abstract class WindowCache extends
- AbstractSink implements IWindow.IWindowCheckpoint {
-
+public abstract class WindowCache extends AbstractSink implements IWindow.IWindowCheckpoint {
private static final Log LOG = LogFactory.getLog(WindowCache.class);
- public static final String SPLIT_SIGN = "##";
public static final String IS_COMPRESSION_MSG = "_is_compress_msg";
public static final String COMPRESSION_MSG_DATA = "_compress_msg";
- public static final String MSG_FROM_SOURCE = "msg_from_source";
public static final String ORIGIN_OFFSET = "origin_offset";
public static final String ORIGIN_QUEUE_ID = "origin_queue_id";
@@ -66,94 +67,105 @@ public abstract class WindowCache extends
public static final String SHUFFLE_KEY = "SHUFFLE_KEY";
public static final String ORIGIN_MESSAGE_TRACE_ID = "origin_request_id";
+
protected transient boolean isWindowTest = false;
+
protected transient AtomicLong COUNT = new AtomicLong(0);
/**
* 分片转发channel
*/
protected transient ShuffleChannel shuffleChannel;
- public void initMiniBatch() {
- shuffleMsgCache = new MiniBatchMsgCache(new WindowCache.MutilMsgMergerAndCompressFlushCallBack(),(IShuffleKeyGenerator) shuffleChannel.getWindow(),shuffleChannel.getWindow());
-
-
- shuffleMsgCache.openAutoFlush();
- }
-
-// protected transient AtomicLong insertCount=new AtomicLong(0);
-// protected transient AtomicLong shuffleCount=new AtomicLong(0);
-// protected transient AtomicLong SUM=new AtomicLong(0);
-
- protected class MutilMsgMergerAndCompressFlushCallBack implements IMessageFlushCallBack<Pair<ISplit, IMessage>> {
+ protected class ShuffleMsgCache extends AbstractMultiSplitMessageCache<Pair<ISplit, JSONObject>> {
+
+ public ShuffleMsgCache() {
+ super(new IMessageFlushCallBack<Pair<ISplit, JSONObject>>() {
+ @Override
+ public boolean flushMessage(List<Pair<ISplit, JSONObject>> messages) {
+ if (messages == null || messages.size() == 0) {
+ return true;
+ }
+ ISplit split = messages.get(0).getLeft();
+ JSONObject jsonObject = messages.get(0).getRight();
+ JSONArray allMsgs = shuffleChannel.getMsgs(jsonObject);
+ for (int i = 1; i < messages.size(); i++) {
+ Pair<ISplit, JSONObject> pair = messages.get(i);
+ JSONObject msg = pair.getRight();
+ JSONArray jsonArray = shuffleChannel.getMsgs(msg);
+ if (jsonArray != null) {
+ allMsgs.addAll(jsonArray);
+ }
+ }
+ JSONObject zipJsonObject = new JSONObject();
+ zipJsonObject.put(COMPRESSION_MSG_DATA, CompressUtil.gZip(jsonObject.toJSONString()));
+ zipJsonObject.put(IS_COMPRESSION_MSG, true);
+ shuffleChannel.getProducer().batchAdd(new Message(zipJsonObject), split);
+ shuffleChannel.getProducer().flush(split.getQueueId());
+ return true;
+ }
+ });
+ }
@Override
- public boolean flushMessage(List<Pair<ISplit, IMessage>> messages) {
- if (messages == null || messages.size() == 0) {
- return true;
- }
- long start=System.currentTimeMillis();
- ISplit split = messages.get(0).getLeft();
- JSONArray allMsgs =new JSONArray();
- long sum=0;
- for (int i = 0; i < messages.size(); i++) {
- Pair<ISplit, IMessage> pair = messages.get(i);
- IMessage message = pair.getRight();
- allMsgs.add(message.getMessageBody());
- // sum=SUM.addAndGet(message.getMessageBody().getLong("total"));
- }
- //System.out.println("before shuffle sum is "+sum);
- JSONObject jsonObject=shuffleChannel.createMsg(allMsgs,split);
-// JSONObject zipJsonObject = new JSONObject();
-// zipJsonObject.put(COMPRESSION_MSG_DATA, CompressUtil.gZip(jsonObject.toJSONString()));
-// zipJsonObject.put(IS_COMPRESSION_MSG, true);
- shuffleChannel.getProducer().batchAdd(new Message(jsonObject), split);
- shuffleChannel.getProducer().flush(split.getQueueId());
- long cost=System.currentTimeMillis()-start;
- // shuffleCount.addAndGet(cost);
- return true;
+ protected String createSplitId(Pair<ISplit, JSONObject> msg) {
+ return msg.getLeft().getQueueId();
}
}
-
- protected transient MiniBatchMsgCache shuffleMsgCache ;
+ protected transient ShuffleMsgCache shuffleMsgCache = new ShuffleMsgCache();
@Override
protected boolean initConfigurable() {
-
+ shuffleMsgCache = new ShuffleMsgCache();
+ shuffleMsgCache.setBatchSize(1000);
+ shuffleMsgCache.setAutoFlushSize(100);
+ shuffleMsgCache.setAutoFlushTimeGap(1000);
+ shuffleMsgCache.openAutoFlush();
isWindowTest = ComponentCreator.getPropertyBooleanValue("window.fire.isTest");
return super.initConfigurable();
}
@Override
protected boolean batchInsert(List<IMessage> messageList) {
- long start=System.currentTimeMillis();
- for (IMessage msg : messageList) {
- String shuffleKey = generateShuffleKey(msg);
- IMessage message= ShuffleUtil.createShuffleMsg(msg,shuffleKey);
- if(message==null){
- continue;
+ Map<Integer, JSONArray> shuffleMap = translateToShuffleMap(messageList);
+ if (shuffleMap != null && shuffleMap.size() > 0) {
+ Set<String> splitIds = new HashSet<>();
+
+ for (Map.Entry<Integer, JSONArray> entry : shuffleMap.entrySet()) {
+ ISplit split = shuffleChannel.getSplit(entry.getKey());
+ JSONObject msg = shuffleChannel.createMsg(entry.getValue(), split);
+
+ shuffleMsgCache.addCache(new MutablePair<>(split, msg));
+ List<IMessage> messages = new ArrayList<>();
+ splitIds.add(split.getQueueId());
+
+ if (DebugWriter.getDebugWriter(shuffleChannel.getWindow().getConfigureName()).isOpenDebug()) {
+ JSONArray jsonArray = entry.getValue();
+ for (int i = 0; i < jsonArray.size(); i++) {
+ Message message = new Message(jsonArray.getJSONObject(i));
+ message.getHeader().setQueueId(jsonArray.getJSONObject(i).getString(ORIGIN_QUEUE_ID));
+ message.getHeader().setOffset(jsonArray.getJSONObject(i).getLong(ORIGIN_OFFSET));
+ messages.add(message);
+
+ }
+ DebugWriter.getDebugWriter(shuffleChannel.getWindow().getConfigureName()).writeWindowCache(shuffleChannel.getWindow(), messages, split.getQueueId());
+ }
+
}
- addPropertyToMessage(msg, message.getMessageBody());
- Integer index = shuffleChannel.hash(shuffleKey);
- ISplit split = shuffleChannel.getSplit(index);
- shuffleMsgCache.addCache(new MutablePair(split, message));
+
}
if (isWindowTest) {
long count = COUNT.addAndGet(messageList.size());
System.out.println(shuffleChannel.getWindow().getConfigureName() + " send shuffle msg count is " + count);
shuffleMsgCache.flush();
}
- long cost=System.currentTimeMillis()-start;
- //shuffleCount.addAndGet(cost);
return true;
}
@Override
public void finishBatchMsg(BatchFinishMessage batchFinishMessage) {
- long start=System.currentTimeMillis();
if (shuffleChannel != null && shuffleChannel.getProducer() != null) {
- this.flush();
- shuffleMsgCache.flush();
+ shuffleChannel.getProducer().flush();
for (ISplit split : shuffleChannel.getQueueList()) {
IMessage message = batchFinishMessage.getMsg().deepCopy();
message.getMessageBody().put(ORIGIN_QUEUE_ID, message.getHeader().getQueueId());
@@ -161,11 +173,51 @@ public abstract class WindowCache extends
}
shuffleChannel.getProducer().flush();
}
- // System.out.println("insert cost is "+insertCount.get()+" shuffle cost is "+shuffleCount.get()+" finish batch cost is "+(System.currentTimeMillis()-start));
}
+ /**
+ * 对接收的消息按照不同shuffle key进行分组
+ *
+ * @param messages
+ * @return
+ */
+ protected Map<Integer, JSONArray> translateToShuffleMap(List<IMessage> messages) {
+ Map<Integer, JSONArray> shuffleMap = new HashMap<>();
+ for (IMessage msg : messages) {
+ if (msg.getHeader().isSystemMessage()) {
+ continue;
+ }
+ String shuffleKey = generateShuffleKey(msg);
+ if (StringUtil.isEmpty(shuffleKey)) {
+ shuffleKey = "<null>";
+ LOG.debug("there is no group by value in message! " + msg.getMessageBody().toString());
+ //continue;
+ }
+ Integer index = shuffleChannel.hash(shuffleKey);
+ JSONObject body = msg.getMessageBody();
+ String offset = msg.getHeader().getOffset();
+ String queueId = msg.getHeader().getQueueId();
+
+ body.put(ORIGIN_OFFSET, offset);
+ body.put(ORIGIN_QUEUE_ID, queueId);
+ body.put(ORIGIN_QUEUE_IS_LONG, msg.getHeader().getMessageOffset().isLongOfMainOffset());
+ body.put(ORIGIN_MESSAGE_HEADER, JSONObject.toJSONString(msg.getHeader()));
+ body.put(ORIGIN_MESSAGE_TRACE_ID, msg.getHeader().getTraceId());
+ body.put(SHUFFLE_KEY, shuffleKey);
+
+ addPropertyToMessage(msg, body);
+
+ JSONArray jsonArray = shuffleMap.get(index);
+ if (jsonArray == null) {
+ jsonArray = new JSONArray();
+ shuffleMap.put(index, jsonArray);
+ }
+ jsonArray.add(body);
+ }
+ return shuffleMap;
+ }
/**
* 根据message生成shuffle key
@@ -195,8 +247,8 @@ public abstract class WindowCache extends
return shuffleChannel;
}
- public MiniBatchMsgCache getShuffleMsgCache() {
- return this.shuffleMsgCache;
+ public ShuffleMsgCache getShuffleMsgCache() {
+ return shuffleMsgCache;
}
public void setShuffleChannel(ShuffleChannel shuffleChannel) {
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
index 7c78f478..ea3b923a 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
@@ -86,7 +86,6 @@ public abstract class AbstractShuffleWindow extends AbstractWindow {
this.shuffleChannel.init();
windowCache.setBatchSize(5000);
windowCache.setShuffleChannel(shuffleChannel);
- windowCache.initMiniBatch();
shuffleChannel.startChannel();
hasCreated.set(true);
}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index d834af41..107f1ce5 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -47,7 +47,6 @@ import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.common.utils.TraceUtil;
import org.apache.rocketmq.streams.window.debug.DebugWriter;
-import org.apache.rocketmq.streams.window.minibatch.MiniBatchMsgCache;
import org.apache.rocketmq.streams.window.model.WindowCache;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
@@ -61,7 +60,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.rocketmq.streams.window.model.WindowCache.ORIGIN_MESSAGE_TRACE_ID;