You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/12/01 03:29:32 UTC
[rocketmq-streams] branch main updated: fix(window) fix queuelist null pointer exception (#89)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new f288ad4 fix(window) fix queuelist null pointer exception (#89)
f288ad4 is described below
commit f288ad48b9977ad517c65d734876566856fb8588
Author: Heng Du <du...@apache.org>
AuthorDate: Wed Dec 1 11:29:28 2021 +0800
fix(window) fix queuelist null pointer exception (#89)
---
.../window/operator/AbstractShuffleWindow.java | 8 ++--
.../streams/window/shuffle/ShuffleChannel.java | 50 +++++++++-------------
2 files changed, 25 insertions(+), 33 deletions(-)
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
index 22891a9..68def7a 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
@@ -49,6 +49,7 @@ public abstract class AbstractShuffleWindow extends AbstractWindow implements IA
this.windowFireSource.init();
this.windowFireSource.start(getFireReceiver());
this.shuffleChannel = new ShuffleChannel(this);
+ this.shuffleChannel.init();
windowCache.setBatchSize(5000);
windowCache.setShuffleChannel(shuffleChannel);
}
@@ -74,14 +75,15 @@ public abstract class AbstractShuffleWindow extends AbstractWindow implements IA
* @param messages
* @param instance
*/
- public abstract void shuffleCalculate(List<IMessage> messages, WindowInstance instance, String queueId);
+ public abstract void shuffleCalculate(List<IMessage> messages, WindowInstance instance, String queueId);
- /**
+ /**
* 触发window
*
* @param instance
*/
- protected abstract int fireWindowInstance(WindowInstance instance, String queueId, Map<String, String> queueId2Offset);
+ protected abstract int fireWindowInstance(WindowInstance instance, String queueId,
+ Map<String, String> queueId2Offset);
public abstract void clearCache(String queueId);
}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index a2a3fe4..a8de869 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -18,14 +18,18 @@ package org.apache.rocketmq.streams.window.shuffle;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
-import org.apache.commons.lang3.StringUtils;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
@@ -34,33 +38,28 @@ import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointState;
import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.common.context.AbstractContext;
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.context.MessageOffset;
import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
import org.apache.rocketmq.streams.common.topology.model.Pipeline;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.DateUtil;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.common.utils.TraceUtil;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
import org.apache.rocketmq.streams.window.debug.DebugWriter;
+import org.apache.rocketmq.streams.window.model.WindowCache;
+import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.common.context.AbstractContext;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
-import org.apache.rocketmq.streams.window.model.WindowCache;
import org.apache.rocketmq.streams.window.operator.impl.WindowOperator.WindowRowOperator;
import org.apache.rocketmq.streams.window.sqlcache.impl.SQLElement;
import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
/**
* 负责处理分片
*/
@@ -73,12 +72,10 @@ public class ShuffleChannel extends AbstractSystemChannel {
protected static final String SHUFFLE_MESSAGES = "SHUFFLE_MESSAGES";
protected String MSG_OWNER = "MSG_OWNER";//消息所属的window
-
private static final String SHUFFLE_TRACE_ID = "SHUFFLE_TRACE_ID";
protected ShuffleCache shuffleCache;
-
protected Map<String, ISplit> queueMap = new ConcurrentHashMap<>();
protected List<ISplit> queueList;//所有的分片
@@ -97,18 +94,16 @@ public class ShuffleChannel extends AbstractSystemChannel {
channelConfig.put(CHANNEL_PROPERTY_KEY_PREFIX, ConfigureFileKey.WINDOW_SHUFFLE_CHANNEL_PROPERTY_PREFIX);
channelConfig.put(CHANNEL_TYPE, ConfigureFileKey.WINDOW_SHUFFLE_CHANNEL_TYPE);
-
this.shuffleCache = new ShuffleCache(window);
this.shuffleCache.init();
this.shuffleCache.openAutoFlush();
-
}
- protected transient AtomicBoolean hasStart=new AtomicBoolean(false);
+ protected transient AtomicBoolean hasStart = new AtomicBoolean(false);
+
@Override public void startChannel() {
- if(hasStart.compareAndSet(false,true)){
- init();
+ if (hasStart.compareAndSet(false, true)) {
super.startChannel();
}
@@ -117,7 +112,7 @@ public class ShuffleChannel extends AbstractSystemChannel {
/**
* init shuffle channel
*/
- public void init(){
+ public void init() {
this.consumer = createSource(window.getNameSpace(), window.getConfigureName());
this.producer = createSink(window.getNameSpace(), window.getConfigureName());
@@ -205,7 +200,6 @@ public class ShuffleChannel extends AbstractSystemChannel {
DebugWriter.getDebugWriter(window.getConfigureName()).writeShuffleReceiveBeforeCache(window, msgs, queueId);
}
-
beforeBatchAdd(oriMessage, message);
for (WindowInstance windowInstance : windowInstances) {
@@ -234,7 +228,6 @@ public class ShuffleChannel extends AbstractSystemChannel {
window.initWindowInstanceMaxSplitNum(windowInstance);
}
-
} else {
for (String queueId : newSplitMessage.getSplitIds()) {
ShufflePartitionManager.getInstance().setSplitFinished(queueId);
@@ -304,7 +297,6 @@ public class ShuffleChannel extends AbstractSystemChannel {
afterFlushCallback(oriMessage, context);
}
-
/**
* if the message offset is old filter the repeate message
*
@@ -379,7 +371,6 @@ public class ShuffleChannel extends AbstractSystemChannel {
return pipeline.getSource().getNameSpace();
}
-
@Override
public String getConfigureName() {
return window.getConfigureName() + "_shuffle";
@@ -395,7 +386,6 @@ public class ShuffleChannel extends AbstractSystemChannel {
return Pipeline.TYPE;
}
-
public ISplit getSplit(Integer index) {
return queueList.get(index);
}