You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2022/06/21 03:55:14 UTC
[rocketmq-streams] 15/16: make RocketMQWindowExample runnable
This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch snapshot-1.0.4
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit aadf7eaa042ba3cc6c89433bc74bc99752c1c149
Author: 维章 <un...@gmail.com>
AuthorDate: Tue May 31 10:51:56 2022 +0800
make RocketMQWindowExample runnable
---
.../apache/rocketmq/streams/common/utils/KryoUtil.java | 4 ----
.../rocketmq/streams/common/utils/SerializeUtil.java | 6 ------
.../streams/window/operator/AbstractShuffleWindow.java | 17 ++---------------
.../rocketmq/streams/window/shuffle/ShuffleChannel.java | 9 +++++++++
4 files changed, 11 insertions(+), 25 deletions(-)
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/KryoUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/KryoUtil.java
index 538cb027..d6fa1072 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/KryoUtil.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/KryoUtil.java
@@ -50,10 +50,6 @@ public class KryoUtil {
//不强制要求注册类(注册行为无法保证多个 JVM 内同一个类的注册编号相同;而且业务系统中大量的 Class 也难以一一注册)
kryo.setRegistrationRequired(false); //默认值就是 false,添加此行的目的是为了提醒维护者,不要改变这个配置
- //Fix the NPE bug when deserializing Collections.
-// ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
-// .setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
-
return kryo;
}
};
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SerializeUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SerializeUtil.java
index 7def6f7e..60691cc6 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SerializeUtil.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SerializeUtil.java
@@ -33,7 +33,6 @@ import org.apache.rocketmq.streams.common.datatype.ArrayDataType;
import org.apache.rocketmq.streams.common.datatype.DataType;
import org.apache.rocketmq.streams.common.datatype.StringDataType;
import org.apache.rocketmq.streams.common.interfaces.ISerialize;
-import org.nustaq.serialization.FSTConfiguration;
public class SerializeUtil {
private static final Log LOG = LogFactory.getLog(SerializeUtil.class);
@@ -44,11 +43,6 @@ public class SerializeUtil {
* @return
*/
public static byte[] serialize(Object object) {
- if(ISerialize.class.isInstance(object)){
-// byte[] bytes = conf.asByteArray(object);
-// return bytes;
- return KryoUtil.writeObjectToByteArray(object);
- }
DataType dataType = DataTypeUtil.getDataTypeFromClass(object.getClass());
if (ArrayDataType.class.isInstance(dataType)) {
int length = Array.getLength(object);
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
index ea3b923a..070a40f0 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
@@ -76,21 +76,7 @@ public abstract class AbstractShuffleWindow extends AbstractWindow {
@Override
public AbstractContext<IMessage> doMessage(IMessage message, AbstractContext context) {
- if (hasCreated.get()==false||this.shuffleChannel==null) {
- synchronized (this){
- if(hasCreated.get()==false||this.shuffleChannel==null){
- this.windowFireSource = new WindowTrigger(this);
- this.windowFireSource.init();
- this.windowFireSource.start(getFireReceiver());
- this.shuffleChannel = new ShuffleChannel(this);
- this.shuffleChannel.init();
- windowCache.setBatchSize(5000);
- windowCache.setShuffleChannel(shuffleChannel);
- shuffleChannel.startChannel();
- hasCreated.set(true);
- }
- }
- }
+ shuffleChannel.startChannel();
return super.doMessage(message, context);
}
@@ -99,6 +85,7 @@ public abstract class AbstractShuffleWindow extends AbstractWindow {
Set<String> splitIds = new HashSet<>();
splitIds.add(windowInstance.getSplitId());
shuffleChannel.flush(splitIds);
+
return doFireWindowInstance(windowInstance);
}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index 107f1ce5..a5c150a1 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -60,6 +60,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.rocketmq.streams.window.model.WindowCache.ORIGIN_MESSAGE_TRACE_ID;
@@ -110,6 +111,14 @@ public class ShuffleChannel extends AbstractSystemChannel {
}
+ protected transient AtomicBoolean hasStart = new AtomicBoolean(false);
+
+ @Override
+ public void startChannel() {
+ if (hasStart.compareAndSet(false, true)) {
+ super.startChannel();
+ }
+ }
/**
* init shuffle channel