You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/12/01 03:29:32 UTC

[rocketmq-streams] branch main updated: fix(window) fix queuelist null pointer exception (#89)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new f288ad4  fix(window) fix queuelist null pointer exception (#89)
f288ad4 is described below

commit f288ad48b9977ad517c65d734876566856fb8588
Author: Heng Du <du...@apache.org>
AuthorDate: Wed Dec 1 11:29:28 2021 +0800

    fix(window) fix queuelist null pointer exception (#89)
---
 .../window/operator/AbstractShuffleWindow.java     |  8 ++--
 .../streams/window/shuffle/ShuffleChannel.java     | 50 +++++++++-------------
 2 files changed, 25 insertions(+), 33 deletions(-)

diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
index 22891a9..68def7a 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
@@ -49,6 +49,7 @@ public abstract class AbstractShuffleWindow extends AbstractWindow implements IA
             this.windowFireSource.init();
             this.windowFireSource.start(getFireReceiver());
             this.shuffleChannel = new ShuffleChannel(this);
+            this.shuffleChannel.init();
             windowCache.setBatchSize(5000);
             windowCache.setShuffleChannel(shuffleChannel);
         }
@@ -74,14 +75,15 @@ public abstract class AbstractShuffleWindow extends AbstractWindow implements IA
      * @param messages
      * @param instance
      */
-    public abstract void  shuffleCalculate(List<IMessage> messages, WindowInstance instance, String queueId);
+    public abstract void shuffleCalculate(List<IMessage> messages, WindowInstance instance, String queueId);
 
-     /**
+    /**
      * 触发window
      *
      * @param instance
      */
-    protected abstract int fireWindowInstance(WindowInstance instance, String queueId, Map<String, String> queueId2Offset);
+    protected abstract int fireWindowInstance(WindowInstance instance, String queueId,
+        Map<String, String> queueId2Offset);
 
     public abstract void clearCache(String queueId);
 }
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index a2a3fe4..a8de869 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -18,14 +18,18 @@ package org.apache.rocketmq.streams.window.shuffle;
 
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
-import org.apache.commons.lang3.StringUtils;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
 import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
 import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
@@ -34,33 +38,28 @@ import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
 import org.apache.rocketmq.streams.common.checkpoint.CheckPointState;
 import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.common.context.AbstractContext;
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.context.Message;
 import org.apache.rocketmq.streams.common.context.MessageOffset;
 import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
 import org.apache.rocketmq.streams.common.topology.ChainPipeline;
 import org.apache.rocketmq.streams.common.topology.model.Pipeline;
 import org.apache.rocketmq.streams.common.utils.CollectionUtil;
 import org.apache.rocketmq.streams.common.utils.DateUtil;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.common.utils.TraceUtil;
 import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
 import org.apache.rocketmq.streams.window.debug.DebugWriter;
+import org.apache.rocketmq.streams.window.model.WindowCache;
+import org.apache.rocketmq.streams.window.model.WindowInstance;
 import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
 import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.common.context.AbstractContext;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
-import org.apache.rocketmq.streams.window.model.WindowCache;
 import org.apache.rocketmq.streams.window.operator.impl.WindowOperator.WindowRowOperator;
 import org.apache.rocketmq.streams.window.sqlcache.impl.SQLElement;
 import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;
 
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
 /**
  * 负责处理分片
  */
@@ -73,12 +72,10 @@ public class ShuffleChannel extends AbstractSystemChannel {
     protected static final String SHUFFLE_MESSAGES = "SHUFFLE_MESSAGES";
     protected String MSG_OWNER = "MSG_OWNER";//消息所属的window
 
-
     private static final String SHUFFLE_TRACE_ID = "SHUFFLE_TRACE_ID";
 
     protected ShuffleCache shuffleCache;
 
-
     protected Map<String, ISplit> queueMap = new ConcurrentHashMap<>();
     protected List<ISplit> queueList;//所有的分片
 
@@ -97,18 +94,16 @@ public class ShuffleChannel extends AbstractSystemChannel {
         channelConfig.put(CHANNEL_PROPERTY_KEY_PREFIX, ConfigureFileKey.WINDOW_SHUFFLE_CHANNEL_PROPERTY_PREFIX);
         channelConfig.put(CHANNEL_TYPE, ConfigureFileKey.WINDOW_SHUFFLE_CHANNEL_TYPE);
 
-
         this.shuffleCache = new ShuffleCache(window);
         this.shuffleCache.init();
         this.shuffleCache.openAutoFlush();
 
-
     }
 
-    protected transient AtomicBoolean hasStart=new AtomicBoolean(false);
+    protected transient AtomicBoolean hasStart = new AtomicBoolean(false);
+
     @Override public void startChannel() {
-        if(hasStart.compareAndSet(false,true)){
-            init();
+        if (hasStart.compareAndSet(false, true)) {
             super.startChannel();
         }
 
@@ -117,7 +112,7 @@ public class ShuffleChannel extends AbstractSystemChannel {
     /**
      * init shuffle channel
      */
-    public void init(){
+    public void init() {
         this.consumer = createSource(window.getNameSpace(), window.getConfigureName());
 
         this.producer = createSink(window.getNameSpace(), window.getConfigureName());
@@ -205,7 +200,6 @@ public class ShuffleChannel extends AbstractSystemChannel {
                 DebugWriter.getDebugWriter(window.getConfigureName()).writeShuffleReceiveBeforeCache(window, msgs, queueId);
             }
 
-
             beforeBatchAdd(oriMessage, message);
 
             for (WindowInstance windowInstance : windowInstances) {
@@ -234,7 +228,6 @@ public class ShuffleChannel extends AbstractSystemChannel {
                 window.initWindowInstanceMaxSplitNum(windowInstance);
             }
 
-
         } else {
             for (String queueId : newSplitMessage.getSplitIds()) {
                 ShufflePartitionManager.getInstance().setSplitFinished(queueId);
@@ -304,7 +297,6 @@ public class ShuffleChannel extends AbstractSystemChannel {
         afterFlushCallback(oriMessage, context);
     }
 
-
     /**
      * if the message offset is old filter the repeate message
      *
@@ -379,7 +371,6 @@ public class ShuffleChannel extends AbstractSystemChannel {
         return pipeline.getSource().getNameSpace();
     }
 
-
     @Override
     public String getConfigureName() {
         return window.getConfigureName() + "_shuffle";
@@ -395,7 +386,6 @@ public class ShuffleChannel extends AbstractSystemChannel {
         return Pipeline.TYPE;
     }
 
-
     public ISplit getSplit(Integer index) {
         return queueList.get(index);
     }