You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/09/07 01:37:21 UTC
[rocketmq-streams] branch main updated: a runnable window example
reading data from rocketmq. (#54)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new a897b47 a runnable window example reading data from rocketmq. (#54)
a897b47 is described below
commit a897b47f801457220bb99eb35fb0717cd896a692
Author: Ni Ze <31...@users.noreply.github.com>
AuthorDate: Tue Sep 7 09:37:16 2021 +0800
a runnable window example reading data from rocketmq. (#54)
Co-authored-by: nize <un...@gmail.com>
---
.../rocketmq/streams/RocketMQChannelBuilder.java | 36 ++--
.../apache/rocketmq/streams/sink/RocketMQSink.java | 111 +++++++------
.../rocketmq/streams/source/RocketMQSource.java | 2 +-
.../streams/client/source/DataStreamSource.java | 4 +-
.../streams/client/transform/DataStream.java | 2 +-
.../common/channel/impl/OutputPrintChannel.java | 2 +-
.../channel/sinkcache/impl/MessageCache.java | 1 -
.../topology/stages/AbstractWindowStage.java | 13 +-
.../rocketmqsource/RocketMQSourceExample2.java | 27 ++-
...ceExample2.java => RocketMQSourceExample3.java} | 24 ++-
...SourceExample2.java => RocketmqWindowTest.java} | 31 +++-
.../streams/window/operator/AbstractWindow.java | 3 +-
.../window/shuffle/AbstractSystemChannel.java | 11 +-
.../streams/window/shuffle/ShuffleCache.java | 4 +-
.../streams/window/shuffle/ShuffleChannel.java | 185 ++++++++++-----------
.../rocketmq/streams/window/sqlcache/SQLCache.java | 110 ++++++------
.../streams/window/state/impl/WindowValue.java | 13 +-
17 files changed, 300 insertions(+), 279 deletions(-)
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java
index 6014cf3..012a06e 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java
@@ -19,7 +19,9 @@ package org.apache.rocketmq.streams;
import com.alibaba.fastjson.JSONObject;
import com.google.auto.service.AutoService;
+
import java.util.Properties;
+
import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
@@ -31,32 +33,32 @@ import org.apache.rocketmq.streams.sink.RocketMQSink;
import org.apache.rocketmq.streams.source.RocketMQSource;
@AutoService(IChannelBuilder.class)
-@ServiceName(value = RocketMQChannelBuilder.TYPE, aliasName = "RocketMQSource",name="metaq")
+@ServiceName(value = RocketMQChannelBuilder.TYPE, aliasName = "RocketMQSource", name = "metaq")
public class RocketMQChannelBuilder extends AbstractSupportShuffleChannelBuilder {
public static final String TYPE = "rocketmq";
@Override
public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
- RocketMQSource rocketMQSource = (RocketMQSource) ConfigurableUtil.create(RocketMQSource.class.getName(),namespace,name,createFormatProperty(properties),null);
+ RocketMQSource rocketMQSource = (RocketMQSource) ConfigurableUtil.create(RocketMQSource.class.getName(), namespace, name, createFormatProperty(properties), null);
return rocketMQSource;
}
- protected JSONObject createFormatProperty(Properties properties){
- JSONObject formatProperties=new JSONObject();
- for(Object object:properties.keySet()){
- String key=(String)object;
+ protected JSONObject createFormatProperty(Properties properties) {
+ JSONObject formatProperties = new JSONObject();
+ for (Object object : properties.keySet()) {
+ String key = (String) object;
if ("type".equals(key)) {
continue;
}
- formatProperties.put(key,properties.getProperty(key));
+ formatProperties.put(key, properties.getProperty(key));
}
- IChannelBuilder.formatPropertiesName(formatProperties,properties,"topic","topic");
- IChannelBuilder.formatPropertiesName(formatProperties,properties,"tags","tag");
- IChannelBuilder.formatPropertiesName(formatProperties,properties,"maxThread","thread.max.count");
- IChannelBuilder.formatPropertiesName(formatProperties,properties,"pullIntervalMs","pullIntervalMs");
- IChannelBuilder.formatPropertiesName(formatProperties,properties,"offsetTime","offsetTime");
- IChannelBuilder.formatPropertiesName(formatProperties,properties,"namesrvAddr","namesrvAddr");
+ IChannelBuilder.formatPropertiesName(formatProperties, properties, "topic", "topic");
+ IChannelBuilder.formatPropertiesName(formatProperties, properties, "tags", "tag");
+ IChannelBuilder.formatPropertiesName(formatProperties, properties, "maxThread", "thread.max.count");
+ IChannelBuilder.formatPropertiesName(formatProperties, properties, "pullIntervalMs", "pullIntervalMs");
+ IChannelBuilder.formatPropertiesName(formatProperties, properties, "offsetTime", "offsetTime");
+ IChannelBuilder.formatPropertiesName(formatProperties, properties, "namesrvAddr", "namesrvAddr");
if (properties.getProperty("group") != null) {
String group = properties.getProperty("group");
if (group.startsWith("GID_")) {
@@ -78,16 +80,16 @@ public class RocketMQChannelBuilder extends AbstractSupportShuffleChannelBuilder
@Override
public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
- RocketMQSink rocketMQSink = (RocketMQSink) ConfigurableUtil.create(RocketMQSink.class.getName(),namespace,name,createFormatProperty(properties),null);
+ RocketMQSink rocketMQSink = (RocketMQSink) ConfigurableUtil.create(RocketMQSink.class.getName(), namespace, name, createFormatProperty(properties), null);
return rocketMQSink;
}
@Override
public ISink createBySource(ISource pipelineSource) {
- RocketMQSource source = (RocketMQSource)pipelineSource;
- String topic = source.getTopic();
+ RocketMQSource source = (RocketMQSource) pipelineSource;
RocketMQSink sink = new RocketMQSink();
- sink.setTopic(topic);
+ sink.setNamesrvAddr(source.getNamesrvAddr());
+ sink.setTopic(source.getTopic());
sink.setTags(source.getTags());
return sink;
}
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
index 773633e..a8293ec 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -44,21 +45,27 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
public class RocketMQSink extends AbstractSupportShuffleSink {
- protected static final Log LOG = LogFactory.getLog(RocketMQSink.class);
+ private static final Log LOG = LogFactory.getLog(RocketMQSink.class);
@ENVDependence
- protected String tags = "*";
+ private String tags = "*";
- protected String topic;
- protected String groupName;
+ private String topic;
+ private String groupName;
- private transient List<DefaultMQPushConsumer> consumers=new ArrayList<>();
- protected transient DefaultMQProducer producer;
+ private transient List<DefaultMQPushConsumer> consumers = new ArrayList<>();
+ private transient DefaultMQProducer producer;
- protected Long pullIntervalMs;
- protected String namesrvAddr;
+ private Long pullIntervalMs;
+ private String namesrvAddr;
+ public RocketMQSink() {
+ }
- public RocketMQSink(){}
+ public RocketMQSink(String namesrvAddr, String topic, String groupName) {
+ this.namesrvAddr = namesrvAddr;
+ this.topic = topic;
+ this.groupName = groupName;
+ }
@Override
@@ -81,45 +88,45 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
initProducer();
try {
- Map<String,List<Message>> msgsByQueueId=new HashMap<>();// group by queueId, if the message not contains queue info ,the set default string as default queueId
- Map<String, MessageQueue> messageQueueMap=new HashMap<>();//if has queue id in message, save the map for queueid 2 messagequeeue
- String defaultQueueId="<null>";//message is not contains queue ,use default
- for(IMessage msg:messages){
+ Map<String, List<Message>> msgsByQueueId = new HashMap<>();// group by queueId, if the message not contains queue info ,the set default string as default queueId
+ Map<String, MessageQueue> messageQueueMap = new HashMap<>();//if has queue id in message, save the map for queueid 2 messagequeeue
+ String defaultQueueId = "<null>";//message is not contains queue ,use default
+ for (IMessage msg : messages) {
ISplit<RocketMQMessageQueue, MessageQueue> channelQueue = getSplit(msg);
- String queueId=defaultQueueId;
+ String queueId = defaultQueueId;
if (channelQueue != null) {
- queueId=channelQueue.getQueueId();
- RocketMQMessageQueue metaqMessageQueue=(RocketMQMessageQueue)channelQueue;
- messageQueueMap.put(queueId,metaqMessageQueue.getQueue());
+ queueId = channelQueue.getQueueId();
+ RocketMQMessageQueue metaqMessageQueue = (RocketMQMessageQueue) channelQueue;
+ messageQueueMap.put(queueId, metaqMessageQueue.getQueue());
}
- List<Message> messageList=msgsByQueueId.get(queueId);
- if(messageList==null){
- messageList=new ArrayList<>();
- msgsByQueueId.put(queueId,messageList);
+ List<Message> messageList = msgsByQueueId.get(queueId);
+ if (messageList == null) {
+ messageList = new ArrayList<>();
+ msgsByQueueId.put(queueId, messageList);
}
messageList.add(new Message(topic, tags, null, msg.getMessageBody().toJSONString().getBytes("UTF-8")));
}
- List<Message> messageList=msgsByQueueId.get(defaultQueueId);
- if(messageList!=null){
- for(Message message:messageList){
+ List<Message> messageList = msgsByQueueId.get(defaultQueueId);
+ if (messageList != null) {
+ for (Message message : messageList) {
producer.sendOneway(message);
}
messageQueueMap.remove(defaultQueueId);
}
- if(messageQueueMap.size()<=0){
+ if (messageQueueMap.size() <= 0) {
return true;
}
- for(String queueId:msgsByQueueId.keySet()){
- messageList=msgsByQueueId.get(queueId);
- for(Message message:messageList){
- MessageQueue queue=messageQueueMap.get(queueId);
- producer.send(message,queue);
+ for (String queueId : msgsByQueueId.keySet()) {
+ messageList = msgsByQueueId.get(queueId);
+ for (Message message : messageList) {
+ MessageQueue queue = messageQueueMap.get(queueId);
+ producer.send(message, queue);
}
}
- }catch (Exception e){
+ } catch (Exception e) {
e.printStackTrace();
- throw new RuntimeException("batch insert error ",e);
+ throw new RuntimeException("batch insert error ", e);
}
return true;
@@ -127,19 +134,21 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
protected void initProducer() {
- if(producer==null){
- synchronized (this){
- if(producer==null){
+ if (producer == null) {
+ synchronized (this) {
+ if (producer == null) {
destroy();
producer = new DefaultMQProducer(groupName + "producer", true, null);
try {
- if (this.namesrvAddr != null && !"".equalsIgnoreCase(this.namesrvAddr)) {
- producer.setNamesrvAddr(this.namesrvAddr);
+ if (this.namesrvAddr == null || "".equals(this.namesrvAddr)) {
+ throw new RuntimeException("namesrvAddr can not be null.");
}
+
+ producer.setNamesrvAddr(this.namesrvAddr);
producer.start();
} catch (Exception e) {
setInitSuccess(false);
- throw new RuntimeException("创建队列失败," + topic + ",msg=" + e.getMessage(), e);
+ throw new RuntimeException("create producer failed," + topic + ",msg=" + e.getMessage(), e);
}
}
}
@@ -151,7 +160,7 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
if (producer != null) {
try {
producer.shutdown();
- producer=null;
+ producer = null;
} catch (Throwable t) {
if (LOG.isWarnEnabled()) {
LOG.warn(t.getMessage(), t);
@@ -160,14 +169,6 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
}
}
- public static void main(String[] args) {
- String topic = "shuffle_TOPIC_DIPPER_SYSTEM_MSG_6_namespace_name1";
- RocketMQSink metaqSink = new RocketMQSink();
- metaqSink.setTopic(topic);
- metaqSink.setSplitNum(5);
- metaqSink.init();
- System.out.println(metaqSink.getSplitList().size());
- }
@Override
public void destroy() {
@@ -195,7 +196,7 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
}
}
- //defaultMQAdminExt.createTopic(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC, topic, splitNum, 1);
+
defaultMQAdminExt.createTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC, topic, splitNum, 1);
} catch (Exception e) {
e.printStackTrace();
@@ -209,7 +210,7 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
@Override
public List<ISplit> getSplitList() {
initProducer();
- List<ISplit> messageQueues=new ArrayList<>();
+ List<ISplit> messageQueues = new ArrayList<>();
try {
if (messageQueues == null || messageQueues.size() == 0) {
@@ -223,7 +224,7 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
Collections.sort(queueList);
messageQueues = queueList;
}
- }catch (Exception e){
+ } catch (Exception e) {
throw new RuntimeException(e);
}
@@ -232,13 +233,13 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
@Override
public int getSplitNum() {
- List<ISplit> splits=getSplitList();
- if(splits==null||splits.size()==0){
+ List<ISplit> splits = getSplitList();
+ if (splits == null || splits.size() == 0) {
return 0;
}
- Set<Integer> splitNames=new HashSet<>();
- for(ISplit split:splits){
- MessageQueue messageQueue= (MessageQueue)split.getQueue();
+ Set<Integer> splitNames = new HashSet<>();
+ for (ISplit split : splits) {
+ MessageQueue messageQueue = (MessageQueue) split.getQueue();
splitNames.add(messageQueue.getQueueId());
}
return splitNames.size();
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
index 2d35f80..b45df86 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
@@ -291,7 +291,7 @@ public class RocketMQSource extends AbstractSupportOffsetResetSource {
ConcurrentMap<MessageQueue, AtomicLong> offsetTable = ReflectUtil.getDeclaredField(this, "offsetTable");
DebugWriter.getInstance(getTopic()).writeSaveOffset(mq, offsetTable.get(mq));
}
- LOG.info("the queue Id is " + new RocketMQMessageQueue(mq).getQueueId() + ",rocketmq start save offset,the save time is " + DateUtil.getCurrentTimeString());
+// LOG.info("the queue Id is " + new RocketMQMessageQueue(mq).getQueueId() + ",rocketmq start save offset,the save time is " + DateUtil.getCurrentTimeString());
super.updateConsumeOffsetToBroker(mq, offset, isOneway);
}
};
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
index 7797832..e30acdb 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
@@ -23,7 +23,9 @@ import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
import org.apache.rocketmq.streams.source.RocketMQSource;
-public class DataStreamSource {
+import java.io.Serializable;
+
+public class DataStreamSource implements Serializable {
protected PipelineBuilder mainPipelineBuilder;
public DataStreamSource(String namespace, String pipelineName) {
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
index 22d139a..f9260ea 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
@@ -135,7 +135,7 @@ public class DataStream implements Serializable {
if (result instanceof JSONObject) {
subMessage=new Message((JSONObject)t);
} else {
- subMessage=new Message(new UserDefinedMessage(result));
+ subMessage=new Message(new UserDefinedMessage(t));
}
splitMessages.add(subMessage);
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
index 1656c65..bd7029f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
@@ -30,7 +30,7 @@ public class OutputPrintChannel extends AbstractSink {
@Override
protected boolean batchInsert(List<IMessage> messages) {
for (IMessage msg : messages) {
- System.out.println(msg.getMessageBody().toJSONString());
+ System.out.println(msg.getMessageValue());
}
return false;
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
index b5171bc..04dc08f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
@@ -42,7 +42,6 @@ public class MessageCache<R> implements IMessageCache<R> {
protected volatile int autoFlushSize=300;
protected volatile int autoFlushTimeGap=1000;
- protected transient AtomicBoolean LOCK=new AtomicBoolean(false);
public MessageCache(IMessageFlushCallBack<R> flushCallBack) {
this.flushCallBack = flushCallBack;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
index e5dcc6c..0d71b6b 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
@@ -61,21 +61,10 @@ public abstract class AbstractWindowStage<T extends IMessage> extends ChainStage
@Override
public void addNewSplit(IMessage message, AbstractContext context, NewSplitMessage newSplitMessage) {
-
- //do nothigh
}
@Override
public void removeSplit(IMessage message, AbstractContext context, RemoveSplitMessage removeSplitMessage) {
- //if(message.getHeader().isNeedFlush()){
- // if(message.getHeader().getCheckpointQueueIds()!=null&&message.getHeader().getCheckpointQueueIds().size()>0){
- // window.getWindowCache().flush(message.getHeader().getCheckpointQueueIds());
- // }else {
- // Set<String> queueIds=new HashSet<>();
- // queueIds.add(message.getHeader().getQueueId());
- // window.getWindowCache().flush(queueIds);
- // }
- //
- //}
+
}
@Override
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
index 387152b..c9d9bda 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
@@ -18,12 +18,16 @@ package org.apache.rocketmq.streams.examples.rocketmqsource;
import org.apache.rocketmq.streams.client.StreamBuilder;
import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.source.RocketMQSource;
+import org.apache.rocketmq.streams.client.transform.window.Time;
+import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
+import org.apache.rocketmq.streams.client.transform.window.WindowInfo;
+
+import java.util.Arrays;
public class RocketMQSourceExample2 {
public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
- public static final String RMQ_TOPIC = "topic_tiger_0901_01";
- public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-10";
+ public static final String RMQ_TOPIC = "NormalTestTopic";
+ public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-01";
public static final String TAGS = "*";
public static void main(String[] args) {
@@ -34,9 +38,26 @@ public class RocketMQSourceExample2 {
RMQ_CONSUMER_GROUP_NAME,
false,
NAMESRV_ADDRESS)
+ .forEach((message)->{
+ System.out.println("forEach: before===========");
+ System.out.println("forEach: "+message);
+ System.out.println("forEach: after===========");
+ })
.map(message -> message)
+ .filter((value) -> {
+ System.out.println("filter: ===========");
+ String messageValue = (String)value;
+ return !messageValue.contains("RocketMQ");
+ })
+ .flatMap((message)->{
+ String value = (String) message;
+ String[] result = value.split(" ");
+ return Arrays.asList(result);
+ })
.toPrint(1)
.start();
}
+
+
}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
similarity index 70%
copy from rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
copy to rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
index 387152b..1af6206 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
@@ -14,16 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.streams.examples.rocketmqsource;
import org.apache.rocketmq.streams.client.StreamBuilder;
import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.source.RocketMQSource;
-public class RocketMQSourceExample2 {
+import java.util.Arrays;
+
+public class RocketMQSourceExample3 {
public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
- public static final String RMQ_TOPIC = "topic_tiger_0901_01";
- public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-10";
+ public static final String RMQ_TOPIC = "NormalTestTopic";
+ public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-03";
public static final String TAGS = "*";
public static void main(String[] args) {
@@ -34,9 +36,23 @@ public class RocketMQSourceExample2 {
RMQ_CONSUMER_GROUP_NAME,
false,
NAMESRV_ADDRESS)
+ .forEach((message) -> {
+ System.out.println("forEach: " + message);
+ })
.map(message -> message)
+ .filter((value) -> {
+ String messageValue = (String) value;
+ return messageValue.contains("RocketMQ");
+ })
+ .flatMap((message) -> {
+ String value = (String) message;
+ String[] result = value.split(" ");
+ return Arrays.asList(result);
+ })
.toPrint(1)
.start();
}
+
+
}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
similarity index 55%
copy from rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
copy to rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
index 387152b..a50d6cb 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
@@ -14,16 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.streams.examples.rocketmqsource;
+import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.streams.client.StreamBuilder;
import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.source.RocketMQSource;
+import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
+import org.apache.rocketmq.streams.client.transform.window.Time;
+import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
-public class RocketMQSourceExample2 {
+public class RocketmqWindowTest {
public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
- public static final String RMQ_TOPIC = "topic_tiger_0901_01";
- public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-10";
+ public static final String RMQ_TOPIC = "NormalTestTopic";
+ public static final String RMQ_CONSUMER_GROUP_NAME = "group-03";
public static final String TAGS = "*";
public static void main(String[] args) {
@@ -34,9 +38,26 @@ public class RocketMQSourceExample2 {
RMQ_CONSUMER_GROUP_NAME,
false,
NAMESRV_ADDRESS)
- .map(message -> message)
+ .filter((message) -> {
+ try {
+ JSONObject.parseObject((String) message);
+ } catch (Throwable t) {
+ // if can not convert to json, discard it.because all operator are base on json.
+ return true;
+ }
+ return false;
+ })
+ .map(message -> JSONObject.parseObject((String) message))
+ .window(TumblingWindow.of(Time.seconds(1)))
+ .groupBy("ProjectName", "LogStore")
+ .count("total")
+ .waterMark(1)
+ .setLocalStorageOnly(true)
+ .toDataSteam()
.toPrint(1)
+ .with(WindowStrategy.highPerformance())
.start();
}
+
}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
index aea126e..6b05827 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
@@ -619,8 +619,7 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
}
@Override
- public void setFireReceiver(
- PiplineRecieverAfterCurrentNode fireReceiver) {
+ public void setFireReceiver(PiplineRecieverAfterCurrentNode fireReceiver) {
this.fireReceiver = fireReceiver;
}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java
index 6d1df78..09b6555 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java
@@ -77,22 +77,22 @@ public abstract class AbstractSystemChannel implements IConfigurableIdentificati
synchronized (this) {
if (!hasCreateShuffleChannel) {
ISource piplineSource = pipeline.getSource();
- ServiceLoaderComponent serviceLoaderComponent = ComponentCreator.getComponent(
- IChannelBuilder.class.getName(), ServiceLoaderComponent.class);
+ ServiceLoaderComponent serviceLoaderComponent = ComponentCreator.getComponent(IChannelBuilder.class.getName(), ServiceLoaderComponent.class);
+
IChannelBuilder builder = (IChannelBuilder)serviceLoaderComponent.loadService(piplineSource.getClass().getSimpleName());
if (builder == null) {
throw new RuntimeException("can not create shuffle channel, not find channel builder " + piplineSource.toJson());
}
- if (!IShuffleChannelBuilder.class.isInstance(builder)) {
+ if (!(builder instanceof IShuffleChannelBuilder)) {
throw new RuntimeException("can not create shuffle channel, builder not imp IShuffleChannelBuilder " + piplineSource.toJson());
}
IShuffleChannelBuilder shuffleChannelBuilder = (IShuffleChannelBuilder)builder;
ISink sink = shuffleChannelBuilder.createBySource(piplineSource);
- if (!MemoryChannel.class.isInstance(sink) && !AbstractSupportShuffleSink.class.isInstance(sink)) {
+ if (!(sink instanceof MemoryChannel) && !(sink instanceof AbstractSupportShuffleSink)) {
throw new RuntimeException("can not create shuffle channel, sink not extends AbstractSupportShuffleSink " + piplineSource.toJson());
}
ISource source = null;
- if (MemoryChannel.class.isInstance(sink)) {
+ if (sink instanceof MemoryChannel) {
MemoryCache memoryCache = new MemoryCache();
memoryCache.setNameSpace(createShuffleChannelNameSpace(pipeline));
memoryCache.setConfigureName(createShuffleChannelName(pipeline));
@@ -108,6 +108,7 @@ public abstract class AbstractSystemChannel implements IConfigurableIdentificati
putDynamicPropertyValue(new HashSet<>(), properties);
AbstractSupportShuffleSink shuffleSink = (AbstractSupportShuffleSink)sink;
+ //todo 为什么这里的分区数量要和源头topic的分区数量一直?
shuffleSink.setSplitNum(getShuffleSplitCount(shuffleSink));
shuffleSink.setNameSpace(createShuffleChannelNameSpace(pipeline));
shuffleSink.setConfigureName(createShuffleChannelName(pipeline));
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java
index e1f3e62..1ea07b3 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java
@@ -85,14 +85,14 @@ public class ShuffleCache extends WindowCache {
*/
protected void saveSplitProgress(String queueId, List<IMessage> messages) {
Map<String,String> queueId2OrigOffset=new HashMap<>();
- Set<String> oriQueueIds=new HashSet<>();
+// Set<String> oriQueueIds=new HashSet<>();
Boolean isLong=false;
for(IMessage message:messages){
isLong=message.getMessageBody().getBoolean(ORIGIN_QUEUE_IS_LONG);
String oriQueueId = message.getMessageBody().getString(WindowCache.ORIGIN_QUEUE_ID);
String oriOffset = message.getMessageBody().getString(WindowCache.ORIGIN_OFFSET);
queueId2OrigOffset.put(oriQueueId,oriOffset);
- oriQueueIds.add(oriQueueId);
+// oriQueueIds.add(oriQueueId);
}
Map<String,WindowMaxValue> windowMaxValueMap=window.getWindowMaxValueManager().saveMaxOffset(isLong,window.getConfigureName(),queueId,queueId2OrigOffset);
window.getSqlCache().addCache(new SplitSQLElement(queueId,ORMUtil.createBatchReplacetSQL(new ArrayList<>(windowMaxValueMap.values()))));
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index 922de4b..cb6cb4e 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
@@ -76,7 +77,6 @@ public class ShuffleChannel extends AbstractSystemChannel {
protected ShuffleCache shuffleCache;
-
protected Map<String, ISplit> queueMap = new ConcurrentHashMap<>();
protected List<ISplit> queueList;//所有的分片
@@ -87,28 +87,28 @@ public class ShuffleChannel extends AbstractSystemChannel {
/**
* 每个分片,已经确定处理的最大offset
*/
- protected transient Map<String,String> split2MaxOffsets=new HashMap<>();
+ protected transient Map<String, String> split2MaxOffsets = new HashMap<>();
public ShuffleChannel(AbstractShuffleWindow window) {
this.window = window;
channelConfig = new HashMap<>();
channelConfig.put(CHANNEL_PROPERTY_KEY_PREFIX, ConfigureFileKey.WINDOW_SHUFFLE_CHANNEL_PROPERTY_PREFIX);
channelConfig.put(CHANNEL_TYPE, ConfigureFileKey.WINDOW_SHUFFLE_CHANNEL_TYPE);
- this.consumer = createSource(window.getNameSpace(),window.getConfigureName());
+ this.consumer = createSource(window.getNameSpace(), window.getConfigureName());
- this.producer = createSink(window.getNameSpace(),window.getConfigureName());
- if(this.consumer==null||this.producer==null){
+ this.producer = createSink(window.getNameSpace(), window.getConfigureName());
+ if (this.consumer == null || this.producer == null) {
autoCreateShuffleChannel(window.getFireReceiver().getPipeline());
}
- if(this.consumer instanceof AbstractSource){
- ((AbstractSource)this.consumer).setJsonData(true);
+ if (this.consumer instanceof AbstractSource) {
+ ((AbstractSource) this.consumer).setJsonData(true);
}
this.shuffleCache = new ShuffleCache(window);
this.shuffleCache.init();
this.shuffleCache.openAutoFlush();
- if (producer!=null&&(queueList == null || queueList.size() == 0) ){
+ if (producer != null && (queueList == null || queueList.size() == 0)) {
queueList = producer.getSplitList();
Map<String, ISplit> tmp = new ConcurrentHashMap<>();
for (ISplit queue : queueList) {
@@ -128,24 +128,25 @@ public class ShuffleChannel extends AbstractSystemChannel {
* @return
*/
- protected transient AtomicLong COUNT=new AtomicLong(0);
+ protected transient AtomicLong COUNT = new AtomicLong(0);
+
@Override
public Object doMessage(IMessage oriMessage, AbstractContext context) {
if (oriMessage.getHeader().isSystemMessage()) {
- doSystemMessage(oriMessage,context);
+ doSystemMessage(oriMessage, context);
return null;
}
/**
* 过滤不是这个window的消息,一个shuffle通道,可能多个window共享,这里过滤掉非本window的消息
*/
- boolean isFilter=filterNotOwnerMessage(oriMessage);
- if(isFilter){
+ boolean isFilter = filterNotOwnerMessage(oriMessage);
+ if (isFilter) {
return null;
}
- String queueId=oriMessage.getHeader().getQueueId();
+ String queueId = oriMessage.getHeader().getQueueId();
JSONArray messages = oriMessage.getMessageBody().getJSONArray(SHUFFLE_MESSAGES);
- if(messages==null){
+ if (messages == null) {
return null;
}
@@ -154,23 +155,23 @@ public class ShuffleChannel extends AbstractSystemChannel {
TraceUtil.debug(traceId, "shuffle message in", "received message size:" + messages.size());
}
- for (Object obj: messages) {
+ for (Object obj : messages) {
IMessage message = new Message((JSONObject) obj);
message.getHeader().setQueueId(queueId);
window.updateMaxEventTime(message);
- if(isRepeateMessage(message,queueId)){
+ if (isRepeateMessage(message, queueId)) {
continue;
}
- List<WindowInstance> windowInstances=window.queryOrCreateWindowInstance(message,queueId);
- if(windowInstances==null||windowInstances.size()==0){
+ List<WindowInstance> windowInstances = window.queryOrCreateWindowInstance(message, queueId);
+ if (windowInstances == null || windowInstances.size() == 0) {
LOG.warn("the message is out of window instance, the message is discard");
continue;
}
- for(WindowInstance windowInstance:windowInstances){
+ for (WindowInstance windowInstance : windowInstances) {
String windowInstanceId = windowInstance.createWindowInstanceId();
//new instance, not need load data from remote
- if(windowInstance.isNewWindowInstance()){
- window.getSqlCache().addCache(new SQLElement(windowInstance.getSplitId(),windowInstanceId, ORMUtil.createBatchReplacetSQL(windowInstance)));
+ if (windowInstance.isNewWindowInstance()) {
+ window.getSqlCache().addCache(new SQLElement(windowInstance.getSplitId(), windowInstanceId, ORMUtil.createBatchReplacetSQL(windowInstance)));
windowInstance.setNewWindowInstance(false);
ShufflePartitionManager.getInstance().setWindowInstanceFinished(windowInstance.createWindowInstanceId());
}
@@ -178,21 +179,17 @@ public class ShuffleChannel extends AbstractSystemChannel {
message.getMessageBody().put(WindowInstance.class.getSimpleName(), windowInstances);
message.getMessageBody().put(AbstractWindow.class.getSimpleName(), window);
- long count=COUNT.incrementAndGet();
-// if(count>25000){
-// System.out.println("shufffle reciever is "+count);
-// }
- if(DebugWriter.getDebugWriter(window.getConfigureName()).isOpenDebug()){
- List<IMessage> msgs=new ArrayList<>();
+ if (DebugWriter.getDebugWriter(window.getConfigureName()).isOpenDebug()) {
+ List<IMessage> msgs = new ArrayList<>();
msgs.add(message);
- DebugWriter.getDebugWriter(window.getConfigureName()).writeShuffleReceiveBeforeCache(window,msgs,queueId);
+ DebugWriter.getDebugWriter(window.getConfigureName()).writeShuffleReceiveBeforeCache(window, msgs, queueId);
}
- beforeBatchAdd(oriMessage,message);
+ beforeBatchAdd(oriMessage, message);
- for(WindowInstance windowInstance:windowInstances){
+ for (WindowInstance windowInstance : windowInstances) {
window.getWindowFireSource().updateWindowInstanceLastUpdateTime(windowInstance);
}
shuffleCache.batchAdd(message);
@@ -203,38 +200,39 @@ public class ShuffleChannel extends AbstractSystemChannel {
@Override
public void addNewSplit(IMessage message, AbstractContext context, NewSplitMessage newSplitMessage) {
- this.currentQueueIds=newSplitMessage.getCurrentSplitIds();
+ this.currentQueueIds = newSplitMessage.getCurrentSplitIds();
loadSplitProgress(newSplitMessage);
- List<WindowInstance> allWindowInstances=WindowInstance.queryAllWindowInstance(DateUtil.getCurrentTimeString(),window,newSplitMessage.getSplitIds());
- if(CollectionUtil.isNotEmpty(allWindowInstances)){
- Map<String,Set<WindowInstance>> queueId2WindowInstances=new HashMap<>();
- for(WindowInstance windowInstance:allWindowInstances){
+ List<WindowInstance> allWindowInstances = WindowInstance.queryAllWindowInstance(DateUtil.getCurrentTimeString(), window, newSplitMessage.getSplitIds());
+ if (CollectionUtil.isNotEmpty(allWindowInstances)) {
+ Map<String, Set<WindowInstance>> queueId2WindowInstances = new HashMap<>();
+ for (WindowInstance windowInstance : allWindowInstances) {
windowInstance.setNewWindowInstance(false);
- window.getWindowInstanceMap().putIfAbsent(windowInstance.createWindowInstanceTriggerId(),windowInstance);
- window.getWindowFireSource().registFireWindowInstanceIfNotExist(windowInstance,window);
- String queueId=windowInstance.getSplitId();
- window.getStorage().loadSplitData2Local(queueId,windowInstance.createWindowInstanceId(),window.getWindowBaseValueClass(),new WindowRowOperator(windowInstance,queueId,window));
+ window.getWindowInstanceMap().putIfAbsent(windowInstance.createWindowInstanceTriggerId(), windowInstance);
+ window.getWindowFireSource().registFireWindowInstanceIfNotExist(windowInstance, window);
+ String queueId = windowInstance.getSplitId();
+ window.getStorage().loadSplitData2Local(queueId, windowInstance.createWindowInstanceId(), window.getWindowBaseValueClass(), new WindowRowOperator(windowInstance, queueId, window));
window.initWindowInstanceMaxSplitNum(windowInstance);
}
- }else {
- for(String queueId:newSplitMessage.getSplitIds()){
+ } else {
+ for (String queueId : newSplitMessage.getSplitIds()) {
ShufflePartitionManager.getInstance().setSplitFinished(queueId);
}
}
- window.getFireReceiver().doMessage(message,context);
+ window.getFireReceiver().doMessage(message, context);
}
/**
* load ori split consume offset
+ *
* @param newSplitMessage
*/
protected void loadSplitProgress(NewSplitMessage newSplitMessage) {
- for(String queueId:newSplitMessage.getSplitIds()){
- Map<String,String> result=window.getWindowMaxValueManager().loadOffsets(window.getConfigureName(),queueId);
- if(result!=null){
+ for (String queueId : newSplitMessage.getSplitIds()) {
+ Map<String, String> result = window.getWindowMaxValueManager().loadOffsets(window.getConfigureName(), queueId);
+ if (result != null) {
this.split2MaxOffsets.putAll(result);
}
}
@@ -242,10 +240,10 @@ public class ShuffleChannel extends AbstractSystemChannel {
@Override
public void removeSplit(IMessage message, AbstractContext context, RemoveSplitMessage removeSplitMessage) {
- this.currentQueueIds=removeSplitMessage.getCurrentSplitIds();
- Set<String> queueIds=removeSplitMessage.getSplitIds();
- if(queueIds!=null){
- for(String queueId:queueIds){
+ this.currentQueueIds = removeSplitMessage.getCurrentSplitIds();
+ Set<String> queueIds = removeSplitMessage.getSplitIds();
+ if (queueIds != null) {
+ for (String queueId : queueIds) {
ShufflePartitionManager.getInstance().setSplitInValidate(queueId);
window.clearCache(queueId);
@@ -253,55 +251,57 @@ public class ShuffleChannel extends AbstractSystemChannel {
window.getWindowMaxValueManager().removeKeyPrefixFromLocalCache(queueIds);
//window.getWindowFireSource().removeSplit(queueIds);
}
- window.getFireReceiver().doMessage(message,context);
+ window.getFireReceiver().doMessage(message, context);
}
@Override
public void checkpoint(IMessage message, AbstractContext context, CheckPointMessage checkPointMessage) {
- if(message.getHeader().isNeedFlush()){
+ if (message.getHeader().isNeedFlush()) {
this.flush(message.getHeader().getCheckpointQueueIds());
window.getSqlCache().flush(message.getHeader().getCheckpointQueueIds());
}
- CheckPointState checkPointState= new CheckPointState();
+ CheckPointState checkPointState = new CheckPointState();
checkPointState.setQueueIdAndOffset(this.shuffleCache.getFinishedQueueIdAndOffsets(checkPointMessage));
checkPointMessage.reply(checkPointState);
}
/**
* do system message
+ *
* @param oriMessage
* @param context
*/
protected void doSystemMessage(IMessage oriMessage, AbstractContext context) {
- ISystemMessage systemMessage=oriMessage.getSystemMessage();
- if(systemMessage instanceof CheckPointMessage){
- this.checkpoint(oriMessage, context,(CheckPointMessage)systemMessage);
- }else if(systemMessage instanceof NewSplitMessage){
- this.addNewSplit(oriMessage,context,(NewSplitMessage)systemMessage);
- }else if(systemMessage instanceof RemoveSplitMessage){
- this.removeSplit(oriMessage,context,(RemoveSplitMessage)systemMessage);
- }else {
- throw new RuntimeException("can not support this system message "+systemMessage.getClass().getName());
+ ISystemMessage systemMessage = oriMessage.getSystemMessage();
+ if (systemMessage instanceof CheckPointMessage) {
+ this.checkpoint(oriMessage, context, (CheckPointMessage) systemMessage);
+ } else if (systemMessage instanceof NewSplitMessage) {
+ this.addNewSplit(oriMessage, context, (NewSplitMessage) systemMessage);
+ } else if (systemMessage instanceof RemoveSplitMessage) {
+ this.removeSplit(oriMessage, context, (RemoveSplitMessage) systemMessage);
+ } else {
+ throw new RuntimeException("can not support this system message " + systemMessage.getClass().getName());
}
- afterFlushCallback(oriMessage,context);
+ afterFlushCallback(oriMessage, context);
}
/**
* if the message offset is old filter the repeate message
+ *
* @param message
* @param queueId
* @return
*/
protected boolean isRepeateMessage(IMessage message, String queueId) {
- boolean isOrigOffsetLong=message.getMessageBody().getBoolean(WindowCache.ORIGIN_QUEUE_IS_LONG);
+ boolean isOrigOffsetLong = message.getMessageBody().getBoolean(WindowCache.ORIGIN_QUEUE_IS_LONG);
String oriQueueId = message.getMessageBody().getString(WindowCache.ORIGIN_QUEUE_ID);
String oriOffset = message.getMessageBody().getString(WindowCache.ORIGIN_OFFSET);
- String key=MapKeyUtil.createKey(window.getConfigureName(),queueId,oriQueueId);
- String offset=this.split2MaxOffsets.get(key);
- if(offset!=null){
- MessageOffset messageOffset=new MessageOffset(oriOffset,isOrigOffsetLong);
- if(!messageOffset.greateThan(offset)){
+ String key = MapKeyUtil.createKey(window.getConfigureName(), queueId, oriQueueId);
+ String offset = this.split2MaxOffsets.get(key);
+ if (offset != null) {
+ MessageOffset messageOffset = new MessageOffset(oriOffset, isOrigOffsetLong);
+ if (!messageOffset.greateThan(offset)) {
System.out.println("the message offset is old, the message is discard ");
return true;
}
@@ -315,13 +315,13 @@ public class ShuffleChannel extends AbstractSystemChannel {
}
@Override
- protected void putDynamicPropertyValue(Set<String> dynamiPropertySet,Properties properties){
- String groupName="groupName";
- if(!dynamiPropertySet.contains(groupName)){
- properties.put(groupName,getDynamicPropertyValue());
+ protected void putDynamicPropertyValue(Set<String> dynamiPropertySet, Properties properties) {
+ String groupName = "groupName";
+ if (!dynamiPropertySet.contains(groupName)) {
+ properties.put(groupName, getDynamicPropertyValue());
}
- if(!dynamiPropertySet.contains("tags")){
- properties.put("tags",getDynamicPropertyValue());
+ if (!dynamiPropertySet.contains("tags")) {
+ properties.put("tags", getDynamicPropertyValue());
}
}
@@ -335,7 +335,7 @@ public class ShuffleChannel extends AbstractSystemChannel {
@Override
protected String createShuffleTopic(String topic, ChainPipeline pipeline) {
return "shuffle_" + topic + "_" + pipeline.getSource().getNameSpace().replaceAll("\\.", "_") + "_" + pipeline
- .getConfigureName().replaceAll("\\.", "_").replaceAll(";", "_");
+ .getConfigureName().replaceAll("\\.", "_").replaceAll(";", "_");
}
/**
@@ -361,11 +361,9 @@ public class ShuffleChannel extends AbstractSystemChannel {
}
-
-
@Override
public String getConfigureName() {
- return window.getConfigureName()+"_shuffle";
+ return window.getConfigureName() + "_shuffle";
}
@Override
@@ -379,19 +377,17 @@ public class ShuffleChannel extends AbstractSystemChannel {
}
-
-
- public ISplit getSplit(Integer index){
+ public ISplit getSplit(Integer index) {
return queueList.get(index);
}
- public JSONObject createMsg(JSONArray messages,ISplit split) {
+ public JSONObject createMsg(JSONArray messages, ISplit split) {
JSONObject msg = new JSONObject();
msg.put(SHUFFLE_QUEUE_ID, split.getQueueId());//分片id
msg.put(SHUFFLE_MESSAGES, messages);//合并的消息
- msg.put(MSG_OWNER,getDynamicPropertyValue());//消息owner
+ msg.put(MSG_OWNER, getDynamicPropertyValue());//消息owner
StringBuilder traceIds = new StringBuilder();
for (int i = 0; i < messages.size(); i++) {
@@ -406,18 +402,18 @@ public class ShuffleChannel extends AbstractSystemChannel {
return msg;
}
- public JSONArray getMsgs(JSONObject msg){
+ public JSONArray getMsgs(JSONObject msg) {
return msg.getJSONArray(SHUFFLE_MESSAGES);
}
- public ISplit getChannelQueue(String key){
- int index=hash(key);
+ public ISplit getChannelQueue(String key) {
+ int index = hash(key);
ISplit targetQueue = queueList.get(index);
return targetQueue;
}
- public int hash(Object key) {
- int mValue=queueList.size();
+ public int hash(Object key) {
+ int mValue = queueList.size();
int h = 0;
if (key != null) {
h = key.hashCode() ^ (h >>> 16);
@@ -457,23 +453,24 @@ public class ShuffleChannel extends AbstractSystemChannel {
* @return
*/
protected boolean filterNotOwnerMessage(IMessage oriMessage) {
- String owner=oriMessage.getMessageBody().getString(MSG_OWNER);
- if(owner!=null&&owner.equals(getDynamicPropertyValue())){
+ String owner = oriMessage.getMessageBody().getString(MSG_OWNER);
+ if (owner != null && owner.equals(getDynamicPropertyValue())) {
return false;
}
return true;
}
+
@Override
protected String getDynamicPropertyValue() {
- String dynamicPropertyValue= MapKeyUtil.createKey(window.getNameSpace(),window.getConfigureName());
- dynamicPropertyValue = dynamicPropertyValue.replaceAll("\\.", "_").replaceAll(";","_");
+ String dynamicPropertyValue = MapKeyUtil.createKey(window.getNameSpace(), window.getConfigureName());
+ dynamicPropertyValue = dynamicPropertyValue.replaceAll("\\.", "_").replaceAll(";", "_");
return dynamicPropertyValue;
}
@Override
protected int getShuffleSplitCount(AbstractSupportShuffleSink shuffleSink) {
- int splitNum=shuffleSink.getSplitNum();
- return splitNum>0?splitNum:32;
+ int splitNum = shuffleSink.getSplitNum();
+ return splitNum > 0 ? splitNum : 32;
}
public Set<String> getCurrentQueueIds() {
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
index 8e1b363..03e4224 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
@@ -32,106 +32,90 @@ import org.apache.rocketmq.streams.db.driver.JDBCDriver;
/**
* cache sql, async and batch commit
*/
+
public class SQLCache extends AbstractMultiSplitMessageCache<ISQLElement> {
protected Boolean isOpenCache=true;//if false,then execute sql when receive sql
protected Set<String> firedWindowInstances=new HashSet<>();//fired window instance ,if the owned sqls have not commit, can cancel the sqls
protected Map<String,Integer> windowInstance2Index=new HashMap<>();//set index to ISQLElement group by window instance
+
protected boolean isLocalOnly;
- public SQLCache(boolean isLocalOnly ){
+
+ public SQLCache(boolean isLocalOnly) {
super(null);
- this.isLocalOnly=isLocalOnly;
+ this.isLocalOnly = isLocalOnly;
this.flushCallBack = new MessageFlushCallBack(new SQLCacheCallback());
this.setBatchSize(1000);
- this.setAutoFlushTimeGap(30*1000);
+ this.setAutoFlushTimeGap(30 * 1000);
this.setAutoFlushSize(100);
this.openAutoFlush();
}
- @Override public int addCache(ISQLElement isqlElement) {
- if(isLocalOnly){
+ @Override
+ public int addCache(ISQLElement isqlElement) {
+ if (isLocalOnly) {
return 0;
}
- if(isOpenCache==false){
+ if (isOpenCache == false) {
DriverBuilder.createDriver().execute(isqlElement.getSQL());
return 1;
}
- if(isqlElement.isFireNotify()){
+ if (isqlElement.isFireNotify()) {
firedWindowInstances.add(isqlElement.getWindowInstanceId());
- }else if(isqlElement.isWindowInstanceSQL()){
- Integer index=windowInstance2Index.get(isqlElement.getWindowInstanceId());
- if(index==null){
- index=0;
+ } else if (isqlElement.isWindowInstanceSQL()) {
+ Integer index = windowInstance2Index.get(isqlElement.getWindowInstanceId());
+ if (index == null) {
+ index = 0;
}
index++;
isqlElement.setIndex(index);
- windowInstance2Index.put(isqlElement.getWindowInstanceId(),index);
+ windowInstance2Index.put(isqlElement.getWindowInstanceId(), index);
}
return super.addCache(isqlElement);
}
- @Override protected String createSplitId(ISQLElement msg) {
+ @Override
+ protected String createSplitId(ISQLElement msg) {
return msg.getQueueId();
}
- protected AtomicInteger executeSQLCount=new AtomicInteger(0);
- protected AtomicInteger cancelQLCount=new AtomicInteger(0);
- protected class SQLCacheCallback implements IMessageFlushCallBack<ISQLElement> {
- Set<String> canCancelWindowIntances=new HashSet<>();
- @Override public boolean flushMessage(List<ISQLElement> messages) {
- List<String> sqls=new ArrayList<>();
+ protected AtomicInteger executeSQLCount = new AtomicInteger(0);
+ protected AtomicInteger cancelQLCount = new AtomicInteger(0);
- for(ISQLElement isqlElement:messages){
- if(isqlElement.isSplitSQL()){
- sqls.add(isqlElement.getSQL());
- }else if(isqlElement.isWindowInstanceSQL()){
-// if(canCancel(isqlElement)){
-// cancelQLCount.incrementAndGet();
-// continue;
-// }else {
- sqls.add(isqlElement.getSQL());
-// }
- }else if(isqlElement.isFireNotify()){
- windowInstance2Index.remove(isqlElement.getWindowInstanceId());
- firedWindowInstances.remove(isqlElement.getWindowInstanceId());
+ protected class SQLCacheCallback implements IMessageFlushCallBack<ISQLElement> {
- }
- }
- if(sqls.size()==0){
- return true;
- }
- JDBCDriver dataSource = DriverBuilder.createDriver();
- try {
- executeSQLCount.addAndGet(sqls.size());
- dataSource.executSqls(sqls);
- System.out.println("execute sql count is "+executeSQLCount.get()+"; cancel sql count is "+cancelQLCount.get());
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- } finally {
- if (dataSource != null) {
- dataSource.destroy();
- }
- }
- return true;
- }
+ @Override
+ public boolean flushMessage(List<ISQLElement> messages) {
+ List<String> sqls = new ArrayList<>();
+ for (ISQLElement isqlElement : messages) {
+ if (isqlElement.isSplitSQL()) {
+ sqls.add(isqlElement.getSQL());
+ } else if (isqlElement.isWindowInstanceSQL()) {
+ sqls.add(isqlElement.getSQL());
+ } else if (isqlElement.isFireNotify()) {
+ windowInstance2Index.remove(isqlElement.getWindowInstanceId());
+ firedWindowInstances.remove(isqlElement.getWindowInstanceId());
- protected boolean canCancel(ISQLElement element) {
- String windowInstanceId=element.getWindowInstanceId();
- if(!firedWindowInstances.contains(windowInstanceId)){
- return false;
+ }
}
- if(canCancelWindowIntances.contains(windowInstanceId)){
+ if (sqls.size() == 0) {
return true;
}
- if(element.getIndex()==1){
- canCancelWindowIntances.add(windowInstanceId);
- return true;
+ JDBCDriver dataSource = DriverBuilder.createDriver();
+ try {
+ executeSQLCount.addAndGet(sqls.size());
+ dataSource.executSqls(sqls);
+ System.out.println("execute sql count is " + executeSQLCount.get() + "; cancel sql count is " + cancelQLCount.get());
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ } finally {
+ if (dataSource != null) {
+ dataSource.destroy();
+ }
}
- return false;
+ return true;
}
}
-
-
}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
index 202f2eb..b4ae904 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
@@ -506,18 +506,7 @@ public class WindowValue extends WindowBaseValue implements Serializable {
StringUtil.createMD5Str(clonedValue.getWindowInstancePartitionId()));
return clonedValue;
}
- //
- //public WindowValue toOriginValue(boolean supportOutDate) {
- // WindowValue clonedValue = clone();
- // String windowInstanceId = WindowInstance.getWindowInstanceId(getNameSpace(), getConfigureName(), getStartTime(),
- // getEndTime(), getFireTime(), supportOutDate);
- // clonedValue.setMsgKey(MapKeyUtil
- // .createKey(getPartition(), windowInstanceId, getGroupBy()));
- // clonedValue.setWindowInstanceId(windowInstanceId);
- // clonedValue.setWindowInstancePartitionId(
- // MapKeyUtil.createKey(windowInstanceId, getPartition()));
- // return clonedValue;
- //}
+
public Long getLastUpdateTime() {
return lastUpdateTime;