You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2022/06/21 03:55:14 UTC

[rocketmq-streams] 15/16: make RocketMQWindowExample runnable

This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch snapshot-1.0.4
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git

commit aadf7eaa042ba3cc6c89433bc74bc99752c1c149
Author: 维章 <un...@gmail.com>
AuthorDate: Tue May 31 10:51:56 2022 +0800

    make RocketMQWindowExample runnable
---
 .../apache/rocketmq/streams/common/utils/KryoUtil.java  |  4 ----
 .../rocketmq/streams/common/utils/SerializeUtil.java    |  6 ------
 .../streams/window/operator/AbstractShuffleWindow.java  | 17 ++---------------
 .../rocketmq/streams/window/shuffle/ShuffleChannel.java |  9 +++++++++
 4 files changed, 11 insertions(+), 25 deletions(-)

diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/KryoUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/KryoUtil.java
index 538cb027..d6fa1072 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/KryoUtil.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/KryoUtil.java
@@ -50,10 +50,6 @@ public class KryoUtil {
             //不强制要求注册类(注册行为无法保证多个 JVM 内同一个类的注册编号相同;而且业务系统中大量的 Class 也难以一一注册)
             kryo.setRegistrationRequired(false); //默认值就是 false,添加此行的目的是为了提醒维护者,不要改变这个配置
 
-            //Fix the NPE bug when deserializing Collections.
-//            ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
-//                .setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
-
             return kryo;
         }
     };
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SerializeUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SerializeUtil.java
index 7def6f7e..60691cc6 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SerializeUtil.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SerializeUtil.java
@@ -33,7 +33,6 @@ import org.apache.rocketmq.streams.common.datatype.ArrayDataType;
 import org.apache.rocketmq.streams.common.datatype.DataType;
 import org.apache.rocketmq.streams.common.datatype.StringDataType;
 import org.apache.rocketmq.streams.common.interfaces.ISerialize;
-import org.nustaq.serialization.FSTConfiguration;
 
 public class SerializeUtil {
     private static final Log LOG = LogFactory.getLog(SerializeUtil.class);
@@ -44,11 +43,6 @@ public class SerializeUtil {
      * @return
      */
     public static byte[] serialize(Object object) {
-        if(ISerialize.class.isInstance(object)){
-//        byte[] bytes = conf.asByteArray(object);
-//        return bytes;
-           return KryoUtil.writeObjectToByteArray(object);
-        }
         DataType dataType = DataTypeUtil.getDataTypeFromClass(object.getClass());
         if (ArrayDataType.class.isInstance(dataType)) {
             int length = Array.getLength(object);
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
index ea3b923a..070a40f0 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
@@ -76,21 +76,7 @@ public abstract class AbstractShuffleWindow extends AbstractWindow {
 
     @Override
     public AbstractContext<IMessage> doMessage(IMessage message, AbstractContext context) {
-        if (hasCreated.get()==false||this.shuffleChannel==null) {
-            synchronized (this){
-                if(hasCreated.get()==false||this.shuffleChannel==null){
-                    this.windowFireSource = new WindowTrigger(this);
-                    this.windowFireSource.init();
-                    this.windowFireSource.start(getFireReceiver());
-                    this.shuffleChannel = new ShuffleChannel(this);
-                    this.shuffleChannel.init();
-                    windowCache.setBatchSize(5000);
-                    windowCache.setShuffleChannel(shuffleChannel);
-                    shuffleChannel.startChannel();
-                    hasCreated.set(true);
-                }
-            }
-        }
+        shuffleChannel.startChannel();
         return super.doMessage(message, context);
     }
 
@@ -99,6 +85,7 @@ public abstract class AbstractShuffleWindow extends AbstractWindow {
         Set<String> splitIds = new HashSet<>();
         splitIds.add(windowInstance.getSplitId());
         shuffleChannel.flush(splitIds);
+
         return doFireWindowInstance(windowInstance);
     }
 
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index 107f1ce5..a5c150a1 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -60,6 +60,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.rocketmq.streams.window.model.WindowCache.ORIGIN_MESSAGE_TRACE_ID;
@@ -110,6 +111,14 @@ public class ShuffleChannel extends AbstractSystemChannel {
 
     }
 
+    protected transient AtomicBoolean hasStart = new AtomicBoolean(false);
+
+    @Override
+    public void startChannel() {
+        if (hasStart.compareAndSet(false, true)) {
+            super.startChannel();
+        }
+    }
 
     /**
      * init shuffle channel