You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2022/06/21 03:55:03 UTC
[rocketmq-streams] 04/16: merge from snapshot-1.0.3
This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch snapshot-1.0.4
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit a5a6ddde987adf1143fc5a1f4ed6902dff5368aa
Merge: 93d7f775 9d3ae58b
Author: 维章 <un...@gmail.com>
AuthorDate: Mon May 23 15:32:38 2022 +0800
merge from snapshot-1.0.3
NOTICE | 2 +-
README.md | 16 +-
SUMMARY.md | 7 +
docs/README.md | 142 ------
docs/SUMMARY.md | 8 -
...225\264\344\275\223\346\236\266\346\236\204.md" | 33 --
.../2.\346\236\204\345\273\272DataStream.md" | 73 ----
.../3.\345\220\257\345\212\250DataStream.md" | 53 ---
...265\201\350\275\254\350\277\207\347\250\213.md" | 63 ---
...256\227\345\255\220\350\247\243\346\236\220.md" | 55 ---
...256\236\347\216\260\345\256\271\351\224\231.md" | 0
"docs/images/Pipeline\347\261\273\345\233\276.png" | Bin 44207 -> 0 bytes
docs/images/img.png | Bin 0 -> 38684 bytes
docs/images/img_1.png | Bin 0 -> 43711 bytes
docs/images/img_2.png | Bin 0 -> 103151 bytes
docs/images/window.png | Bin 241692 -> 0 bytes
...75\223\346\236\266\346\236\204\345\233\276.png" | Bin 60493 -> 0 bytes
...00\273\344\275\223\350\277\207\347\250\213.png" | Bin 44252 -> 0 bytes
.../\346\211\251\345\256\271\345\211\215.png" | Bin 56733 -> 0 bytes
...12\266\346\200\201\347\256\227\345\255\220.png" | Bin 35766 -> 0 bytes
"docs/images/\347\212\266\346\200\201.png" | Bin 47527 -> 0 bytes
"docs/images/\347\274\251\345\256\271.png" | Bin 51087 -> 0 bytes
docs/quick_start/README.md | 46 --
pom.xml | 118 ++---
quick_start.md | 92 ++--
.../pom.xml | 41 +-
rocketmq-streams-cep/src/test/resources/log4j.xml | 36 ++
rocketmq-streams-channel-db/pom.xml | 4 +-
.../streams/db/sink/AbstractMultiTableSink.java | 12 +-
.../streams/db/sink/DynamicMultipleDBSink.java | 11 +-
.../streams/db/sink/SelfMultiTableSink.java | 2 +-
.../streams/db/sink/SplitBySerialNumber.java | 2 +-
.../streams/db/sink/SplitByTimeMultiTableSink.java | 2 +-
rocketmq-streams-channel-es/pom.xml | 27 +-
.../{ESSinkBuilder.java => ESChannelBuilder.java} | 54 +--
.../rocketmq/streams/es/sink/ESSinkBuilder.java | 1 +
.../streams/es/sink/ESSinkOnlyChannel.java | 43 +-
.../apache/rocketmq/streams/es/sink/EsClient.java | 135 ++++++
rocketmq-streams-channel-http/pom.xml | 4 +-
rocketmq-streams-channel-kafka/pom.xml | 32 ++
.../streams/kafka/KafkaChannelBuilder.java | 52 ++-
.../apache/rocketmq/streams/kafka/KafkaSplit.java | 55 +++
.../rocketmq/streams/kafka/sink/KafkaSink.java | 200 +++++++++
.../rocketmq/streams/kafka/source/KafkaSource.java | 238 ++++++++++
.../rocketmq/streams/kafka/KafkaChannelTest.java | 104 +++++
.../src/test/resources/log4j.xml | 20 +
rocketmq-streams-channel-mqtt/pom.xml | 20 +-
.../rocketmq/streams/mqtt/source/PahoSource.java | 41 +-
rocketmq-streams-channel-rocketmq/pom.xml | 28 +-
.../apache/rocketmq/streams/debug/DebugWriter.java | 92 ++--
.../apache/rocketmq/streams/sink/RocketMQSink.java | 55 ++-
.../rocketmq/streams/RocketMQChannelTest.java | 2 +-
rocketmq-streams-channel-syslog/pom.xml | 19 +-
.../rocketmq/streams/syslog/SyslogChannel.java | 35 +-
.../streams/syslog/SyslogChannelBuilder.java | 4 +
.../streams/syslog/SyslogChannelManager.java | 6 +-
.../rocketmq/streams/syslog/SyslogServer.java | 35 +-
.../rocketmq/streams/syslog/SyslogClient.java | 22 +-
rocketmq-streams-clients/pom.xml | 14 +-
.../streams/client/source/DataStreamSource.java | 19 +
.../streams/client/transform/DataStream.java | 41 +-
.../streams/client/transform/JoinStream.java | 5 +-
.../streams/client/transform/SplitStream.java | 24 +-
.../streams/client/transform/WindowStream.java | 50 ++-
.../rocketmq/streams/client/ApplicationTest.java | 57 +++
.../rocketmq/streams/client/MqttSourceExample.java | 80 ++++
.../{sink/UserDefinedSink.java => ScriptTest.java} | 27 +-
.../apache/rocketmq/streams/client/WindowTest.java | 14 +-
.../rocketmq/streams/client/example/JoinTest.java | 7 +-
.../rocketmq/streams/client/example/SplitTest.java | 31 +-
.../streams/client/sink/UserDefinedSink.java | 2 +-
.../client/sink/UserDefinedSupportShuffleSink.java | 4 +-
rocketmq-streams-commons/pom.xml | 46 +-
.../MappedByteBufferTableWithPrimaryIndex.java | 482 +++++++++++++++++++++
.../streams/common/cache/compress/KVAddress.java | 48 +-
.../streams/common/channel/AbstractChannel.java | 14 +-
.../AbstractSupportShuffleChannelBuilder.java | 2 +-
.../common/channel/builder/IChannelBuilder.java | 25 +-
.../channel/builder/IShuffleChannelBuilder.java | 4 +-
.../common/channel/impl/CollectionSink.java | 39 +-
.../common/channel/impl/CollectionSinkBuilder.java | 34 +-
.../common/channel/impl/PrintChannelBuilder.java | 30 +-
.../channel/impl/file/FileChannelBuilder.java | 45 +-
.../streams/common/channel/impl/file/FileSink.java | 4 +-
.../common/channel/impl/file/FileSource.java | 2 +-
.../channel/impl/memory/MemoryChannelBuilder.java | 64 +++
.../common/channel/impl/memory/MemorySink.java | 4 +-
.../channel/impl/view/ViewChannelBuilder.java | 43 +-
.../streams/common/channel/impl/view/ViewSink.java | 31 +-
.../common/channel/impl/view/ViewSource.java | 66 +++
.../streams/common/channel/sink/AbstractSink.java | 17 +-
.../channel/sink/AbstractSupportShuffleSink.java | 8 +-
.../sink/AbstractSupportShuffleUDFSink.java | 6 +-
.../common/channel/sink/AbstractUDFSink.java | 23 +-
.../streams/common/channel/sink/ISink.java | 9 +-
.../impl/AbstractMultiSplitMessageCache.java | 22 +-
.../common/channel/source/AbstractSource.java | 89 +++-
.../channel/source/AbstractUnreliableSource.java | 2 +-
.../streams/common/channel/split/ISplit.java | 5 +-
.../streams/common/component/ComponentCreator.java | 4 +
.../common/configurable/AbstractConfigurable.java | 17 +-
.../common/configurable/BasedConfigurable.java | 13 +-
.../common/configurable/IConfigurableService.java | 5 +-
.../streams/common/configure/ConfigureFileKey.java | 7 +-
.../streams/common/context/AbstractContext.java | 33 +-
.../streams/common/context/MessageHeader.java | 89 ++--
.../streams/common/datatype/IntDataType.java | 3 +-
.../streams/common/datatype/ShortDataType.java | 2 +-
.../streams/common/interfaces/ISerialize.java | 9 +-
.../rocketmq/streams/common/model/NameCreator.java | 14 +-
.../NameCreatorContext.java} | 36 +-
.../common/monitor/ConsoleMonitorManager.java | 412 ++++++++++++++++++
.../streams/common/monitor/DataSyncConstants.java | 54 +++
.../streams/common/monitor/HttpClient.java | 116 +++++
.../rocketmq/streams/common/monitor/HttpUtil.java | 248 +++++++++++
.../monitor/MonitorDataSyncServiceFactory.java | 61 +++
.../common/monitor/group/MonitorCommander.java | 4 +-
.../streams/common/monitor/impl/DipperMonitor.java | 2 +-
.../streams/common/monitor/model/JobStage.java | 350 +++++++++++++++
.../streams/common/monitor/model/TraceIdsDO.java | 126 ++++++
.../common/monitor/model/TraceMonitorDO.java | 250 +++++++++++
.../service/MonitorDataSyncService.java} | 16 +-
.../service/impl/DBMonitorDataSyncImpl.java | 63 +++
.../service/impl/HttpMonitorDataSyncImpl.java | 151 +++++++
.../service/impl/RocketMQMonitorDataSyncImpl.java | 185 ++++++++
.../optimization/IHomologousOptimization.java | 2 +-
.../common/optimization/MessageGlobleTrace.java | 16 +-
.../streams/common/optimization/Re2Engine.java | 47 +-
.../common/optimization/TaskOptimization.java | 17 +-
.../optimization/fingerprint/FingerprintCache.java | 4 +-
.../optimization/fingerprint/PreFingerprint.java | 54 ++-
.../streams/common/schedule/ScheduleTask.java | 4 +-
.../common/threadpool/ThreadPoolFactory.java | 34 +-
.../AbstractMutilPipelineChainPipline.java | 81 ++--
.../streams/common/topology/ChainPipeline.java | 255 ++++++-----
.../streams/common/topology/ChainStage.java | 32 +-
.../common/topology/builder/PipelineBuilder.java | 139 ++++--
.../common/topology/metric/NotFireReason.java | 176 ++++++++
.../streams/common/topology/metric/StageGroup.java | 248 +++++++++++
.../common/topology/metric/StageMetric.java | 138 ++++++
.../common/topology/model/AbstractStage.java | 177 +++++---
.../streams/common/topology/model/IWindow.java | 3 +-
.../streams/common/topology/model/Pipeline.java | 56 ++-
.../topology/model/PipelineSourceJoiner.java | 48 --
.../topology/shuffle/IShuffleKeyGenerator.java | 11 +-
.../common/topology/shuffle/ShuffleMQCreator.java | 398 ++++++++++-------
.../topology/stages/AbstractWindowStage.java | 5 +-
.../stages/EmptyChainStage.java} | 27 +-
.../common/topology/stages/FilterChainStage.java | 157 ++-----
.../common/topology/stages/JoinChainStage.java | 3 +-
.../common/topology/stages/JoinEndChainStage.java | 11 +-
.../topology/stages/JoinStartChainStage.java | 67 +++
.../common/topology/stages/OutputChainStage.java | 81 ++--
.../topology/stages/ShuffleConsumerChainStage.java | 193 +++++++++
.../topology/stages/ShuffleProducerChainStage.java | 345 +++++++++++++++
.../topology/stages/SubPiplineChainStage.java | 138 ------
.../common/topology/stages/UnionChainStage.java | 3 +-
.../common/topology/stages/UnionEndChainStage.java | 11 +-
...onChainStage.java => UnionStartChainStage.java} | 43 +-
.../ViewChainStage.java} | 479 ++++++++++----------
.../common/topology/stages/udf/UDFChainStage.java | 27 +-
.../streams/common/topology/task/StreamsTask.java | 444 +++----------------
.../streams/common/topology/task/TaskAssigner.java | 20 +-
.../streams/common/utils/ConfigurableUtil.java | 6 +-
.../streams/common/utils/ContantsUtil.java | 33 +-
.../streams/common/utils/DataTypeUtil.java | 16 +-
.../rocketmq/streams/common/utils/FileUtil.java | 89 +++-
.../streams/common/utils/InstantiationUtil.java | 48 +-
.../streams/common/utils/JsonableUtil.java | 5 +
.../rocketmq/streams/common/utils/KryoUtil.java | 214 +++++++++
.../streams/common/utils/NameCreatorUtil.java | 60 ---
.../streams/common/utils/PipelineHTMLUtil.java | 299 +++++++++++++
.../streams/common/utils/PropertiesUtils.java | 2 +-
.../rocketmq/streams/common/utils/ReflectUtil.java | 62 ++-
.../streams/common/utils/SerializeUtil.java | 23 +-
.../streams/common/utils/ServiceLoadUtil.java | 63 +++
.../rocketmq/streams/common/utils/TraceUtil.java | 23 +-
.../rocketmq/streams/common/channel/SinkTest.java | 4 +-
rocketmq-streams-configurable/pom.xml | 9 +-
.../configurable/ConfigurableComponent.java | 17 +-
.../service/AbstractConfigurableService.java | 98 ++---
.../service/impl}/FileConfigureService.java | 2 +-
.../impl}/FileSupportParentConfigureService.java | 2 +-
.../service/impl/HttpConfigureService.java | 377 ++++++++++++++++
.../impl/HttpSupportParentConfigureService.java | 20 +-
.../service/impl}/MemoryConfigureService.java | 2 +-
.../impl}/MemorySupportParentConfigureService.java | 2 +-
rocketmq-streams-connectors/pom.xml | 9 +-
.../{IBounded.java => IScheduleCallback.java} | 9 +-
.../connectors/source/AbstractPullSource.java | 148 ++++---
.../connectors/source/MutilBatchTaskSource.java | 158 +++++++
rocketmq-streams-db-operator/pom.xml | 4 +-
.../streams/db/configuable/DBConfigureService.java | 11 +-
rocketmq-streams-dim/pom.xml | 4 +-
.../intelligence/AbstractIntelligenceCache.java | 5 +-
rocketmq-streams-examples/pom.xml | 20 +-
.../mutilconsumer/MultiStreamsExample.java | 3 +
.../streams/examples/send/ProducerFromFile.java | 8 +-
.../streams/examples/source/FileSourceExample.java | 2 +-
.../src/main/resources/joinData-2.txt | 4 -
rocketmq-streams-filter/pom.xml | 3 +-
.../streams/filter/builder/ExpressionBuilder.java | 49 +--
.../streams/filter/context/RuleContext.java | 31 +-
.../filter/engine/impl/DefaultRuleEngine.java | 29 +-
.../function/expression/CompareFunction.java | 16 +-
.../rocketmq/streams/filter/operator/Rule.java | 58 +++
.../expression/ExpressionRelationParser.java | 9 +-
.../expression/ExpressionRelationPaser.java | 107 -----
.../operator/expression/GroupExpression.java | 3 +-
.../operator/expression/RelationExpression.java | 35 +-
.../PiplineLogFingerprintAnalysis.java | 6 +-
.../dependency/BlinkRuleV2Expression.java | 5 +-
.../optimization/dependency/DependencyTree.java | 22 +-
.../dependency/SimplePipelineTree.java | 3 +-
.../dependency/StateLessDependencyTree.java | 84 ++++
.../optimization/homologous/HomologousCompute.java | 5 +-
.../homologous/HomologousOptimization.java | 13 +-
rocketmq-streams-lease/pom.xml | 4 +-
rocketmq-streams-runner/assembly/distribution.xml | 69 ---
rocketmq-streams-runner/assembly/standalone.xml | 72 ---
rocketmq-streams-runner/bin/start.sh | 58 ---
rocketmq-streams-runner/bin/stop.sh | 33 --
rocketmq-streams-runner/pom.xml | 80 ----
.../src/main/resources/log4j.xml | 51 ---
rocketmq-streams-schedule/pom.xml | 4 +-
.../schedule/job/ConfigurableExecutorJob.java | 30 +-
rocketmq-streams-script/pom.xml | 4 +-
.../function/aggregation/LastValueAccumulator.java | 67 +++
.../function/impl/distinct/DistinctFunction.java | 1 -
.../function/impl/json/JsonCreatorFunction.java | 4 +-
.../function/impl/json/UDTFFieldNameFunction.java | 50 +++
.../script/function/impl/parser/GrokFunction.java | 4 +-
.../function/impl/parser/Paser2JsonFunction.java | 17 +-
.../function/impl/parser/PaserBySplitFunction.java | 44 +-
.../function/impl/parser/RegexParserFunction.java | 24 +-
.../script/function/impl/router/RouteFunction.java | 4 +-
.../script/operator/impl/AggregationScript.java | 19 +-
.../streams/script/service/IAccumulator.java | 4 +-
.../script/service/udf/SimpleUDAFScript.java | 35 +-
.../streams/script/service/udf/UDFScript.java | 26 +-
.../streams/script/function/FunctionTest.java | 19 +-
rocketmq-streams-serviceloader/pom.xml | 4 +-
rocketmq-streams-state/pom.xml | 4 +-
.../streams/state/kv/rocksdb/RocksDBOperator.java | 37 +-
rocketmq-streams-transport-minio/pom.xml | 4 +-
rocketmq-streams-window/pom.xml | 9 +-
.../window/minibatch/MiniBatchMsgCache.java | 58 +++
.../window/minibatch/ShuffleMessageCache.java | 187 ++++++++
.../rocketmq/streams/window/model/WindowCache.java | 182 +++-----
.../streams/window/model/WindowInstance.java | 22 +-
.../window/operator/AbstractShuffleWindow.java | 18 +-
.../streams/window/operator/AbstractWindow.java | 73 +++-
.../window/operator/impl/SessionOperator.java | 2 +-
.../window/operator/impl/WindowOperator.java | 8 +-
.../streams/window/operator/join/JoinWindow.java | 3 +-
.../window/shuffle/AbstractSystemChannel.java | 94 ++--
.../streams/window/shuffle/ShuffleCache.java | 14 +-
.../streams/window/shuffle/ShuffleChannel.java | 54 +--
.../streams/window/state/impl/WindowValue.java | 54 ++-
.../streams/window/trigger/WindowTrigger.java | 1 -
.../rocketmq/streams/window/util/ShuffleUtil.java | 62 +++
.../org/apache/rocketmq/streams/RocksdbTest.java | 2 +-
docs/stream_sink/README.md => stream_sink.md | 19 +-
docs/stream_source/README.md => stream_source.md | 30 +-
.../README.md => stream_transform.md | 0
265 files changed, 9865 insertions(+), 4083 deletions(-)
diff --cc pom.xml
index 70288c26,7caa0cbc..2d139613
--- a/pom.xml
+++ b/pom.xml
@@@ -184,9 -137,9 +136,10 @@@
<exclude>**/*.xml</exclude>
<exclude>**/*.sh</exclude>
<exclude>**/*.out</exclude>
+ <exclude>**/*.sql</exclude>
<exclude>**/*.properties</exclude>
<exclude>docs/**/*</exclude>
+ <exclude>**/*.sql</exclude>
</excludes>
</configuration>
</plugin>
diff --cc rocketmq-streams-channel-es/pom.xml
index 15723b3c,fe50aeef..b4ce4d85
--- a/rocketmq-streams-channel-es/pom.xml
+++ b/rocketmq-streams-channel-es/pom.xml
@@@ -20,8 -7,8 +7,8 @@@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams</artifactId>
- <version>1.0.2-SNAPSHOT</version>
+ <version>1.0.2-preview-SNAPSHOT</version>
- </parent>
+ </parent>
<artifactId>rocketmq-streams-channel-es</artifactId>
<name>ROCKETMQ STREAMS :: channel-es</name>
<packaging>jar</packaging>
diff --cc rocketmq-streams-channel-kafka/pom.xml
index 00000000,89bd645c..76b73909
mode 000000,100644..100644
--- a/rocketmq-streams-channel-kafka/pom.xml
+++ b/rocketmq-streams-channel-kafka/pom.xml
@@@ -1,0 -1,32 +1,32 @@@
+ <?xml version="1.0" encoding="UTF-8"?>
+ <project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-streams</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
- <version>1.0.2-SNAPSHOT</version>
++ <version>1.0.2-preview-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>rocketmq-streams-channel-kafka</artifactId>
+ <name>ROCKETMQ STREAMS :: channel-kafka</name>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.12</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-commons</artifactId>
+ </dependency>
+ </dependencies>
+ </project>
diff --cc rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
index bb6e9454,d825603a..1de92e31
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
@@@ -17,8 -17,10 +17,11 @@@
package org.apache.rocketmq.streams.sink;
+ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@@ -27,8 -29,10 +30,9 @@@ import java.util.Set
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
@@@ -149,10 -154,10 +154,6 @@@ public class RocketMQSink extends Abstr
destroy();
producer = new DefaultMQProducer(groupName + "producer", true, null);
try {
-- //please not use the code,the name srv addr may be empty in jmenv
--// if (this.namesrvAddr == null || "".equals(this.namesrvAddr)) {
--// throw new RuntimeException("namesrvAddr can not be null.");
--// }
if (StringUtil.isNotEmpty(this.namesrvAddr)) {
producer.setNamesrvAddr(this.namesrvAddr);
@@@ -263,7 -275,7 +272,7 @@@
@Override
public int getSplitNum() {
- List<ISplit> splits = getSplitList();
- List<ISplit<?, ?>> splits = getSplitList();
++ List<ISplit<?,?>> splits = getSplitList();
if (splits == null || splits.size() == 0) {
return 0;
}
diff --cc rocketmq-streams-clients/pom.xml
index 8fb31672,94c7f3e7..2d15efce
--- a/rocketmq-streams-clients/pom.xml
+++ b/rocketmq-streams-clients/pom.xml
@@@ -50,6 -57,18 +57,11 @@@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-filter</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-channel-syslog</artifactId>
+ </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-streams-channel-kafka</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-streams-dbinit</artifactId>
- </dependency>
++
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-window</artifactId>
diff --cc rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
index 9422519a,5004f230..1e4b15b6
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
@@@ -178,6 -179,25 +178,25 @@@ public class DataStreamSource
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null);
}
- public DataStream fromKafka(String endpoint, String topic, String groupName) {
- return fromKafka(endpoint, topic, groupName, true);
- }
-
- public DataStream fromKafka(String endpoint, String topic, String groupName, Boolean isJson) {
- return fromKafka(endpoint, topic, groupName, isJson, 1);
- }
-
- public DataStream fromKafka(String endpoint, String topic, String groupName, Boolean isJson, int maxThread) {
- KafkaSource kafkaChannel = new KafkaSource();
- kafkaChannel.setBootstrapServers(endpoint);
- kafkaChannel.setTopic(topic);
- kafkaChannel.setGroupName(groupName);
- kafkaChannel.setJsonData(isJson);
- kafkaChannel.setMaxThread(maxThread);
- this.mainPipelineBuilder.setSource(kafkaChannel);
- return new DataStream(this.mainPipelineBuilder, null);
- }
++// public DataStream fromKafka(String endpoint, String topic, String groupName) {
++// return fromKafka(endpoint, topic, groupName, true);
++// }
++//
++// public DataStream fromKafka(String endpoint, String topic, String groupName, Boolean isJson) {
++// return fromKafka(endpoint, topic, groupName, isJson, 1);
++// }
++//
++// public DataStream fromKafka(String endpoint, String topic, String groupName, Boolean isJson, int maxThread) {
++// KafkaSource kafkaChannel = new KafkaSource();
++// kafkaChannel.setBootstrapServers(endpoint);
++// kafkaChannel.setTopic(topic);
++// kafkaChannel.setGroupName(groupName);
++// kafkaChannel.setJsonData(isJson);
++// kafkaChannel.setMaxThread(maxThread);
++// this.mainPipelineBuilder.setSource(kafkaChannel);
++// return new DataStream(this.mainPipelineBuilder, null);
++// }
+
public DataStream from(ISource<?> source) {
this.mainPipelineBuilder.setSource(source);
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null);
diff --cc rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
index 80ae510e,96e24a0a..563af478
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
@@@ -51,8 -51,11 +51,10 @@@ import org.apache.rocketmq.streams.comm
import org.apache.rocketmq.streams.common.topology.ChainStage;
import org.apache.rocketmq.streams.common.topology.builder.IStageBuilder;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
-import org.apache.rocketmq.streams.common.topology.model.AbstractRule;
import org.apache.rocketmq.streams.common.topology.model.Union;
import org.apache.rocketmq.streams.common.topology.stages.FilterChainStage;
+ import org.apache.rocketmq.streams.common.topology.stages.ShuffleConsumerChainStage;
+ import org.apache.rocketmq.streams.common.topology.stages.ShuffleProducerChainStage;
import org.apache.rocketmq.streams.common.topology.stages.udf.StageBuilder;
import org.apache.rocketmq.streams.common.topology.stages.udf.UDFChainStage;
import org.apache.rocketmq.streams.common.topology.stages.udf.UDFUnionChainStage;
@@@ -646,6 -669,13 +667,13 @@@ public class DataStream implements Seri
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, output);
}
- public DataStream toKafka(String bootstrapServers, String topic) {
- KafkaSink kafkaSink = new KafkaSink(bootstrapServers, topic);
- ChainStage<?> output = this.mainPipelineBuilder.createStage(kafkaSink);
- this.mainPipelineBuilder.setTopologyStages(currentChainStage, output);
- return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, output);
- }
++// public DataStream toKafka(String bootstrapServers, String topic) {
++// KafkaSink kafkaSink = new KafkaSink(bootstrapServers, topic);
++// ChainStage<?> output = this.mainPipelineBuilder.createStage(kafkaSink);
++// this.mainPipelineBuilder.setTopologyStages(currentChainStage, output);
++// return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, output);
++// }
+
public DataStream toEnhanceDBSink(String url, String userName, String password, String tableName) {
EnhanceDBSink sink = new EnhanceDBSink(url, userName, password, tableName);
ChainStage<?> output = this.mainPipelineBuilder.createStage(sink);
diff --cc rocketmq-streams-commons/pom.xml
index 5ea155d4,72af956e..2e73c6f8
--- a/rocketmq-streams-commons/pom.xml
+++ b/rocketmq-streams-commons/pom.xml
@@@ -98,11 -101,36 +101,40 @@@
<artifactId>re2j</artifactId>
<version>1.6</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-tools</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>de.ruedigermoeller</groupId>
+ <artifactId>fst</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo</artifactId>
+ <version>5.3.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.13</version>
+ </dependency>
</dependencies>
</project>
diff --cc rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
index 939b1d54,320cbd12..2bc7e55f
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
@@@ -18,9 -18,10 +18,11 @@@ package org.apache.rocketmq.streams.com
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
+
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@@ -58,15 -58,22 +60,23 @@@ import org.apache.rocketmq.streams.comm
public abstract class AbstractSource extends BasedConfigurable implements ISource<AbstractSource>, ILifeCycle {
public static String CHARSET = "UTF-8";
+ /**
+ * 输入的消息是否为json
+ */
+ protected Boolean isJsonData = true;
+ /**
+ * 输入的消息是否为json array
+ */
+ protected Boolean msgIsJsonArray = false;
- protected Boolean isJsonData = true;//输入的消息是否为json
- protected Boolean msgIsJsonArray = false;//输入的消息是否为json array
@ENVDependence
- protected String groupName;//group name
+ protected String groupName;
+
protected int maxThread = Runtime.getRuntime().availableProcessors();
+
@ENVDependence
protected String topic = "";
+ protected String namesrvAddr;
/**
* 多长时间做一次checkpoint
*/
@@@ -136,11 -167,9 +170,10 @@@
* @param message
* @return
*/
- public AbstractContext doReceiveMessage(JSONObject message, boolean needSetCheckPoint, String queueId, String offset) {
+ public AbstractContext doReceiveMessage(JSONObject message, boolean needSetCheckPoint, String queueId,
+ String offset) {
Message msg = createMessage(message, queueId, offset, needSetCheckPoint);
- AbstractContext context = executeMessage(msg);
- return context;
+ return executeMessage(msg);
}
/**
diff --cc rocketmq-streams-connectors/pom.xml
index 8c8277e3,d544e3d5..e557a19a
mode 100755,100644..100644
--- a/rocketmq-streams-connectors/pom.xml
+++ b/rocketmq-streams-connectors/pom.xml
diff --cc rocketmq-streams-examples/pom.xml
index 62a4038c,da6a130b..7ced0cb0
--- a/rocketmq-streams-examples/pom.xml
+++ b/rocketmq-streams-examples/pom.xml
@@@ -39,6 -41,6 +41,22 @@@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-clients</artifactId>
</dependency>
++ <dependency>
++ <groupId>com.alibaba</groupId>
++ <artifactId>fastjson</artifactId>
++ </dependency>
++ <dependency>
++ <groupId>junit</groupId>
++ <artifactId>junit</artifactId>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.rocketmq</groupId>
++ <artifactId>rocketmq-client</artifactId>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.rocketmq</groupId>
++ <artifactId>rocketmq-streams-commons</artifactId>
++ </dependency>
</dependencies>
<packaging>jar</packaging>
diff --cc rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java
index f5f3d46c,2b823e44..8835e943
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java
@@@ -24,13 -23,11 +24,15 @@@ import com.alibaba.fastjson.JSONObject
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+
++import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.streams.client.StreamBuilder;
import org.apache.rocketmq.streams.client.source.DataStreamSource;
import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
++import org.apache.rocketmq.streams.client.transform.DataStream;
import org.apache.rocketmq.streams.client.transform.window.Time;
import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
+import org.apache.rocketmq.streams.examples.send.ProducerFromFile;
import static org.apache.rocketmq.streams.examples.aggregate.Constant.NAMESRV_ADDRESS;
import static org.apache.rocketmq.streams.examples.aggregate.Constant.RMQ_CONSUMER_GROUP_NAME;
@@@ -64,6 -61,6 +66,7 @@@ public class MultiStreamsExample
private static void runOneStreamsClient(int index) {
DataStreamSource source = StreamBuilder.dataStream("namespace" + index, "pipeline" + index);
++
source.fromRocketmq(
RMQ_TOPIC,
RMQ_CONSUMER_GROUP_NAME,
diff --cc rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/send/ProducerFromFile.java
index 163d8116,58d3710c..0508c303
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/send/ProducerFromFile.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/send/ProducerFromFile.java
@@@ -17,8 -17,8 +17,12 @@@
*
*/
-package org.apache.rocketmq.streams.examples.aggregate;
+package org.apache.rocketmq.streams.examples.send;
+
++import org.apache.rocketmq.client.producer.DefaultMQProducer;
++import org.apache.rocketmq.common.message.Message;
++import org.apache.rocketmq.remoting.common.RemotingHelper;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
@@@ -26,44 -26,18 +30,40 @@@ import java.io.IOException
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
+import java.util.concurrent.atomic.AtomicLong;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.common.RemotingHelper;
-
public class ProducerFromFile {
+ private static final DefaultMQProducer producer = new DefaultMQProducer("test-group");
+ private static final AtomicLong count = new AtomicLong(0);
+ private static boolean init = false;
- public static void produce(String filePath, String nameServ, String topic) {
- try {
- DefaultMQProducer producer = new DefaultMQProducer("test-group");
+ private static synchronized void initProducer(String nameServ) throws Throwable {
+ if (!init) {
producer.setNamesrvAddr(nameServ);
producer.start();
+ init = true;
+ }
+ }
+
+ public static void produceInLoop(String filePath, String nameServ, String topic, long interval) {
+ while (true) {
+ try {
+ produce(filePath, nameServ, topic, false);
+
+ Thread.sleep(interval);
+
+ if (count.get() % 500 == 0) {
+ System.out.println("send message num: " + count.get());
+ }
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+ }
+
+ public static void produce(String filePath, String nameServ, String topic, boolean shutdown) {
+ try {
+ initProducer(nameServ);
List<String> result = ProducerFromFile.read(filePath);
diff --cc rocketmq-streams-window/pom.xml
index 40fcbf94,7a2be2dd..3ce4dac8
--- a/rocketmq-streams-window/pom.xml
+++ b/rocketmq-streams-window/pom.xml
@@@ -48,10 -50,6 +50,13 @@@
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</dependency>
-
-
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-tools</artifactId>
+ </dependency>
++ <dependency>
++ <groupId>org.apache.rocketmq</groupId>
++ <artifactId>rocketmq-streams-commons</artifactId>
++ </dependency>
</dependencies>
</project>
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java
index 00000000,39629cc1..a36309e2
mode 000000,100644..100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java
@@@ -1,0 -1,76 +1,58 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.rocketmq.streams.window.minibatch;
+
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+ import org.apache.commons.lang3.tuple.Pair;
+ import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
+ import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache;
+ import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
+ import org.apache.rocketmq.streams.common.channel.split.ISplit;
+ import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.rocketmq.streams.common.context.MessageHeader;
+ import org.apache.rocketmq.streams.common.topology.shuffle.IShuffleKeyGenerator;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.ReflectUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.script.operator.impl.AggregationScript;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
+ import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
-import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.window.state.impl.WindowValue;
-import org.apache.rocketmq.streams.window.util.ShuffleUtil;
+
+ public class MiniBatchMsgCache extends AbstractMultiSplitMessageCache<Pair<ISplit,IMessage>> {
+ public static String SHUFFLE_KEY="shuffle_key";
+
+
+
+ protected transient IShuffleKeyGenerator shuffleKeyGenerator;
+ protected transient AbstractShuffleWindow window;
+
+
+
+
+ public MiniBatchMsgCache(
+ IMessageFlushCallBack<Pair<ISplit,IMessage>> flushCallBack, IShuffleKeyGenerator shuffleKeyGenerator,
+ AbstractShuffleWindow window) {
+ super(flushCallBack);
+ this.shuffleKeyGenerator=shuffleKeyGenerator;
+ this.window=window;
+ }
+
+
+ @Override protected String createSplitId(Pair<ISplit, IMessage> msg) {
+ return msg.getLeft().getQueueId();
+ }
+
+ @Override protected MessageCache createMessageCache() {
+ ShuffleMessageCache messageCache=new ShuffleMessageCache(this.flushCallBack);
+ messageCache.setWindow(window);
+ messageCache.setShuffleKeyGenerator(shuffleKeyGenerator);
+ return messageCache;
+ }
+ }
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
index c749bd84,fed18863..4062b1f5
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
@@@ -116,6 -122,19 +116,10 @@@ public class WindowInstance extends Ent
return windowInstance;
}
- /**
- * 创建window instance的唯一ID
- *
- * @return
- */
+ public String createWindowInstanceId() {
+ return MapKeyUtil.createKey(splitId, windowNameSpace, windowName, windowInstanceName, startTime, endTime);
+ }
+
- public String createWindowInstanceIdWithoutSplitid() {
- return MapKeyUtil.createKey(windowNameSpace, windowName, windowInstanceName, startTime, endTime);
- }
-
public String createWindowInstanceTriggerId() {
return MapKeyUtil.createKey(splitId, windowNameSpace, windowName, windowInstanceName, startTime, endTime, fireTime);
}
@@@ -162,7 -254,20 +165,18 @@@
* @return
* @Param isWindowInstance2DB 如果是秒级窗口,可能windowinstacne不必存表,只在内存保存,可以通过这个标志设置
*/
- public static List<WindowInstance> getOrCreateWindowInstance(AbstractWindow window, Long occurTime,
- int timeUnitAdjust, String queueId) {
+ public static List<WindowInstance> getOrCreateWindowInstance(AbstractWindow window, Long occurTime, int timeUnitAdjust, String queueId) {
+ return getOrCreateWindowInstance(window,occurTime,timeUnitAdjust,queueId,false);
+ }
+ /**
+ * 查询或者创建Window的实例,滑动窗口有可能返回多个,滚动窗口返回一个
+ *
+ * @param window
+ * @param occurTime
+ * @return
+ * @Param isWindowInstance2DB 如果是秒级窗口,可能windowinstacne不必存表,只在内存保存,可以通过这个标志设置
+ */
- public static List<WindowInstance> getOrCreateWindowInstance(AbstractWindow window, Long occurTime,
- int timeUnitAdjust, String queueId, boolean isCreateOnly) {
++ public static List<WindowInstance> getOrCreateWindowInstance(AbstractWindow window, Long occurTime, int timeUnitAdjust, String queueId, boolean isCreateOnly) {
int windowSlideInterval = window.getSlideInterval();
int windowSizeInterval = window.getSizeInterval();
if (windowSlideInterval == 0) {
@@@ -246,10 -345,9 +264,10 @@@
}
}
List<WindowInstance> lostInstanceList = null;
+ //todo 这里针对lost的都创建一次
lostInstanceList = WindowInstance.createWindowInstances(window, lostWindowTimeList, lostFireList, queueId);
instanceList.addAll(lostInstanceList);
- if (CollectionUtil.isNotEmpty(lostInstanceList)) {
+ if (CollectionUtil.isNotEmpty(lostInstanceList)&&!isCreateOnly) {
for (WindowInstance windowInstance : instanceList) {
List<WindowInstance> emitInstances = createEmitWindowInstance(window, windowInstance);
if (emitInstances != null && emitInstances.size() > 0) {
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
index 070a40f0,5338734f..7c78f478
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
@@@ -85,8 -74,7 +100,7 @@@ public abstract class AbstractShuffleWi
Set<String> splitIds = new HashSet<>();
splitIds.add(windowInstance.getSplitId());
shuffleChannel.flush(splitIds);
-
- return fireWindowInstance(windowInstance, windowInstance.getSplitId(), queueId2Offset);
+ return doFireWindowInstance(windowInstance);
}
/**
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
index 49d79c5b,8fd07737..eb548aa6
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
@@@ -17,6 -17,17 +17,8 @@@
package org.apache.rocketmq.streams.window.operator;
import com.alibaba.fastjson.JSONObject;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
++
+ import javafx.util.Pair;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@@ -137,8 -164,8 +145,9 @@@ public abstract class AbstractWindow ex
protected boolean isLocalStorageOnly = true;//是否只用本地存储,可以提高性能,但不保证可靠性
protected String reduceSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码
protected transient IReducer reducer;
-
+ protected transient Long maxPartitionNum = 100000000L;
+ protected String mapFunctionSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码
+ protected transient MapFunction<JSONObject, Pair<WindowInstance, JSONObject>> mapFunction;
/**
* the computed column and it's process of computing
*/
@@@ -164,12 -191,23 +173,13 @@@
*/
protected transient String WINDOW_NAME;
- /**
- * 内部使用,定期检查窗口有没有触发
- */
- //protected transient ScheduledExecutorService fireWindowInstanceChecker =new ScheduledThreadPoolExecutor(3);
-
- // protected transient ExecutorService deleteService = Executors.newSingleThreadExecutor();
protected volatile transient WindowCache windowCache;
- protected transient WindowStorage storage;
+ protected transient IStorage storage;
protected transient WindowTrigger windowFireSource;
- protected transient SQLCache sqlCache;
protected transient EventTimeManager eventTimeManager;
+ protected transient ISink contextMsgSink;
- //create and save window instacne max partitionNum and window max eventTime
- protected transient IWindowMaxValueManager windowMaxValueManager;
-
public AbstractWindow() {
setType(IWindow.TYPE);
}
@@@ -199,8 -242,12 +209,12 @@@
byte[] bytes = Base64Utils.decode(this.reduceSerializeValue);
reducer = InstantiationUtil.deserializeObject(bytes);
}
+ if (StringUtil.isNotEmpty(this.mapFunctionSerializeValue)) {
+ byte[] bytes = Base64Utils.decode(this.mapFunctionSerializeValue);
+ this.mapFunction = InstantiationUtil.deserializeObject(bytes);
+ }
eventTimeManager = new EventTimeManager();
- windowMaxValueManager = new WindowMaxValueManager(this, sqlCache);
+
return success;
}
@@@ -306,11 -347,11 +320,12 @@@
* @param message
* @return
*/
- protected String generateShuffleKey(IMessage message) {
+ @Override
+ public String generateShuffleKey(IMessage message) {
if (StringUtil.isEmpty(groupByFieldName)) {
- return null;
+ return "globle_window";
}
+
JSONObject msg = message.getMessageBody();
String[] fieldNames = groupByFieldName.split(";");
String[] values = new String[fieldNames.length];
@@@ -773,13 -845,45 +796,53 @@@
this.maxDelay = maxDelay;
}
+ public Long getMaxPartitionNum() {
+ return maxPartitionNum;
+ }
+
+ public void setMaxPartitionNum(Long maxPartitionNum) {
+ this.maxPartitionNum = maxPartitionNum;
+ }
+
public abstract boolean supportBatchMsgFinish();
+
+ public ISink getContextMsgSink() {
+ return contextMsgSink;
+ }
+
+ public String getContextMsgSinkName() {
+ return contextMsgSinkName;
+ }
+
+ public void setContextMsgSinkName(String contextMsgSinkName) {
+ this.contextMsgSinkName = contextMsgSinkName;
+ }
+
+ public String getMapFunctionSerializeValue() {
+ return mapFunctionSerializeValue;
+ }
+
+ public void setMapFunctionSerializeValue(String mapFunctionSerializeValue) {
+ this.mapFunctionSerializeValue = mapFunctionSerializeValue;
+ }
+
+ public void saveMsgContext(String queueId,WindowInstance windowInstance, List<IMessage> messages) {
+ if(this.mapFunction!=null&&this.contextMsgSink!=null){
+ if(messages!=null){
+ for(IMessage message:messages){
+ JSONObject msg=message.getMessageBody();
+ try {
+ msg=this.mapFunction.map(new Pair(windowInstance,msg));
+ Message copyMsg=new Message(msg);
+ copyMsg.getHeader().setQueueId(queueId);
+ copyMsg.getHeader().setOffset(message.getHeader().getOffset());
+ this.contextMsgSink.batchAdd(copyMsg);
+ } catch (Exception e) {
+ throw new RuntimeException("save window context msg error ",e);
+ }
+ }
+ this.contextMsgSink.flush();
+ }
+ }
+ }
}
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
index 65ac261b,8da03987..0c10a9bf
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
@@@ -515,13 -481,12 +515,13 @@@ public class SessionOperator extends Wi
store(lastValueMap, instance, queueId);
}
+
@Override
public long incrementAndGetSplitNumber(WindowInstance instance, String shuffleId) {
- long number = super.incrementAndGetSplitNumber(instance, shuffleId);
- if (number > 900000000) {
- this.getWindowMaxValueManager().resetSplitNum(instance, shuffleId);
+ long numer = super.incrementAndGetSplitNumber(instance, shuffleId);
+ if (numer > 900000000) {
+ this.storage.putMaxPartitionNum(shuffleId, instance.getWindowInstanceId(), numer);
}
- return number;
+ return numer;
}
- }
+ }
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
index 65f326f6,d2806081..de6dce22
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
@@@ -16,113 -16,155 +16,119 @@@
*/
package org.apache.rocketmq.streams.window.operator.impl;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.rocketmq.streams.common.channel.split.ISplit;
++
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.MessageOffset;
-import org.apache.rocketmq.streams.common.utils.CollectionUtil;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.db.driver.batchloader.IRowOperator;
-import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
-import org.apache.rocketmq.streams.script.operator.impl.AggregationScript;
import org.apache.rocketmq.streams.window.debug.DebugWriter;
-import org.apache.rocketmq.streams.window.model.FunctionExecutor;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
-import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.window.sqlcache.impl.FiredNotifySQLElement;
import org.apache.rocketmq.streams.window.state.WindowBaseValue;
import org.apache.rocketmq.streams.window.state.impl.WindowValue;
-import org.apache.rocketmq.streams.window.storage.IWindowStorage;
-import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;
-import org.apache.rocketmq.streams.window.storage.WindowStorage.WindowBaseValueIterator;
-
-public class WindowOperator extends AbstractShuffleWindow {
+import org.apache.rocketmq.streams.window.storage.IteratorWrap;
+import org.apache.rocketmq.streams.window.storage.RocksdbIterator;
+import org.apache.rocketmq.streams.window.storage.WindowType;
- import java.util.*;
- private static final String ORDER_BY_SPLIT_NUM = "_order_by_split_num_";//key=_order;queueid,windowinstanceid,partitionNum
++import java.util.ArrayList;
++import java.util.Comparator;
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+public class WindowOperator extends AbstractShuffleWindow {
public WindowOperator() {
super();
}
- protected transient boolean supportQuickStoreModel=false;
- protected transient List<String> schema=new ArrayList<>();
+ @Override
+ public int doFireWindowInstance(WindowInstance instance) {
+ String windowInstanceId = instance.getWindowInstanceId();
+ String queueId = instance.getSplitId();
- @Deprecated
- public WindowOperator(String timeFieldName, int windowPeriodMinute) {
- super();
- super.timeFieldName = timeFieldName;
- super.sizeInterval = windowPeriodMinute;
- }
+ RocksdbIterator<WindowBaseValue> rocksdbIterator = storage.getWindowBaseValue(queueId, windowInstanceId, WindowType.NORMAL_WINDOW, null);
- @Deprecated
- public WindowOperator(String timeFieldName, int windowPeriodMinute, String calFieldName) {
- super();
- super.timeFieldName = timeFieldName;
- super.sizeInterval = windowPeriodMinute;
- }
+ ArrayList<WindowValue> windowValues = new ArrayList<>();
+ while (rocksdbIterator.hasNext()) {
+ IteratorWrap<WindowBaseValue> next = rocksdbIterator.next();
+ WindowValue data = (WindowValue)next.getData();
+ windowValues.add(data);
+ }
+
+ windowValues.sort(Comparator.comparingLong(WindowBaseValue::getPartitionNum));
- public WindowOperator(int sizeInterval, String groupByFieldName, Map<String, String> select) {
- this.sizeInterval = sizeInterval;
- this.slideInterval = sizeInterval;
- this.groupByFieldName = groupByFieldName;
- this.setSelectMap(select);
+ int fireCount = sendBatch(windowValues, queueId, 0);
+
+ clearFire(instance);
+
+ return fireCount;
}
- protected transient AtomicInteger shuffleCount = new AtomicInteger(0);
- protected transient AtomicInteger fireCountAccumulator = new AtomicInteger(0);
- protected transient AtomicLong fireCost=new AtomicLong(0);
- @Override
- public int fireWindowInstance(WindowInstance instance, String queueId, Map<String, String> queueId2Offset) {
- List<WindowValue> windowValues = new ArrayList<>();
- int fireCount = 0;
- //long startTime = System.currentTimeMillis();
- //int sendCost = 0;
- // int currentCount = 0;
- //for(String queueId:currentQueueIds){
- WindowBaseValueIterator<WindowBaseValue> it = storage.loadWindowInstanceSplitData(getOrderBypPrefix(), queueId, instance.createWindowInstanceId(), null, getWindowBaseValueClass());
- if (queueId2Offset != null) {
- String offset = queueId2Offset.get(queueId);
- if (StringUtil.isNotEmpty(offset)) {
- it.setPartitionNum(Long.valueOf(offset));
- }
+ private int sendBatch(List<WindowValue> windowValues, String queueId, int fireCount) {
+ if (windowValues == null || windowValues.size() == 0) {
+ return fireCount;
}
- while (it.hasNext()) {
- WindowBaseValue windowBaseValue = it.next();
- if (windowBaseValue == null) {
- continue;
- }
- WindowValue windowValue = (WindowValue) windowBaseValue;
-
- Integer currentValue = getValue(windowValue, "total");
-
- fireCountAccumulator.addAndGet(currentValue);
- windowValues.add((WindowValue) windowBaseValue);
- if (windowValues.size() >= windowCache.getBatchSize()) {
- long sendFireCost = System.currentTimeMillis();
- sendFireMessage(windowValues, queueId);
- //sendCost += (System.currentTimeMillis() - sendFireCost);
- fireCount += windowValues.size();
- windowValues = new ArrayList<>();
- }
- }
- if (windowValues.size() > 0) {
- long sendFireCost = System.currentTimeMillis();
+ if (windowValues.size() <= windowCache.getBatchSize()) {
sendFireMessage(windowValues, queueId);
- // sendCost += (System.currentTimeMillis() - sendFireCost);
+
fireCount += windowValues.size();
+
+ return fireCount;
+ } else {
+ ArrayList<WindowValue> temp = new ArrayList<>();
+ for (int i = 0; i < windowCache.getBatchSize(); i++) {
+ temp.add(windowValues.remove(i));
+ }
+
+ sendFireMessage(temp, queueId);
+
+ return sendBatch(windowValues, queueId, fireCount + windowCache.getBatchSize());
}
- clearFire(instance);
- this.sqlCache.addCache(new FiredNotifySQLElement(queueId, instance.createWindowInstanceId()));
- //long cost= this.fireCost.addAndGet(System.currentTimeMillis()-startTime);
- // System.out.println("fire cost is "+cost+" "+ DateUtil.getCurrentTimeString());
- return fireCount;
}
- protected transient Map<String, Integer> shuffleWindowInstanceId2MsgCount = new HashMap<>();
- protected transient int windowvaluecount = 0;
- protected transient AtomicLong shuffleCost=new AtomicLong(0);
+
@Override
public void shuffleCalculate(List<IMessage> messages, WindowInstance instance, String queueId) {
+ Long startTime=System.currentTimeMillis();
DebugWriter.getDebugWriter(getConfigureName()).writeShuffleCalcultateReceveMessage(instance, messages, queueId);
+
List<String> sortKeys = new ArrayList<>();
Map<String, List<IMessage>> groupBy = groupByGroupName(messages, sortKeys);
- Set<String> groupByKeys = groupBy.keySet();
- List<String> storeKeys = new ArrayList<>();
- for (String groupByKey : groupByKeys) {
- String storeKey = createStoreKey(queueId, groupByKey, instance);
- storeKeys.add(storeKey);
+
+ RocksdbIterator<WindowBaseValue> windowBaseValue = storage.getWindowBaseValue(queueId, instance.getWindowInstanceId(), WindowType.NORMAL_WINDOW, null);
+
+ ArrayList<WindowBaseValue> windowValues = new ArrayList<>();
+ while (windowBaseValue.hasNext()) {
+ IteratorWrap<WindowBaseValue> next = windowBaseValue.next();
+ windowValues.add(next.getData());
}
- Map<String, WindowBaseValue> allWindowValues = new HashMap<>();
- //从存储中,查找window value对象,value是对象的json格式
- Map<String, WindowBaseValue> existWindowValues = storage.multiGet(getWindowBaseValueClass(), storeKeys, instance.createWindowInstanceId(), queueId);
- // Iterator<Entry<String, List<IMessage>>> it = groupBy.entrySet().iterator();
- for (String groupByKey : sortKeys) {
+ Map<String, List<WindowValue>> temp = windowValues.stream().map((value) -> (WindowValue) value).collect(Collectors.groupingBy(WindowValue::getMsgKey));
+
+ Map<String, List<WindowValue>> groupByMsgKey = new HashMap<>(temp);
+
+ List<WindowValue> allWindowValues = new ArrayList<>();
+
+ //处理不同groupBy的message
+ for (String groupByKey : sortKeys) {
List<IMessage> msgs = groupBy.get(groupByKey);
String storeKey = createStoreKey(queueId, groupByKey, instance);
- WindowValue windowValue = (WindowValue) existWindowValues.get(storeKey);
- ;
- if (windowValue == null) {
- windowvaluecount++;
+
+ //msgKey 为唯一键
+ List<WindowValue> windowValueList = groupByMsgKey.get(storeKey);
+ WindowValue windowValue;
+ if (windowValueList == null || windowValueList.size() == 0) {
windowValue = createWindowValue(queueId, groupByKey, instance);
- // windowValue.setOrigOffset(msgs.get(0).getHeader().getOffset());
+ } else {
+ windowValue = windowValueList.get(0);
}
- allWindowValues.put(storeKey, windowValue);
- windowValue.incrementUpdateVersion();
- Integer origValue = getValue(windowValue, "total");
+ allWindowValues.add(windowValue);
+ windowValue.incrementUpdateVersion();
if (msgs != null) {
for (IMessage message : msgs) {
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
index 9c1c0af6,5eb3b784..f356a286
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
@@@ -60,11 -62,16 +60,10 @@@ public class JoinWindow extends Abstrac
protected String joinType;//join类型,值为INNER,LEFT
protected String expression;//条件表达式。在存在非等值比较时使用
- protected transient DBOperator joinOperator = new DBOperator();
- protected String rightDependentTableName;
- // @Override
- // protected void addPropertyToMessage(IMessage oriMessage, JSONObject oriJson){
- // oriJson.put("AbstractWindow", this);
- //
- // }
-
@Override
- protected int fireWindowInstance(WindowInstance instance, String shuffleId, Map<String, String> queueId2Offsets) {
+ protected int doFireWindowInstance(WindowInstance instance) {
+ //todo 只是清理吗?
clearFire(instance);
return 0;
}
@@@ -359,8 -422,18 +358,8 @@@
}
@Override
- protected String generateShuffleKey(IMessage message) {
+ public String generateShuffleKey(IMessage message) {
- String routeLabel =null;
- String lable = message.getHeader().getMsgRouteFromLable();
- if (lable != null) {
- if (lable.equals(rightDependentTableName)) {
- routeLabel = MessageHeader.JOIN_RIGHT;
- } else {
- routeLabel = MessageHeader.JOIN_LEFT;
- }
- } else {
- throw new RuntimeException("can not dipatch message, need route label " + toJson());
- }
+ String routeLabel = message.getHeader().getMsgRouteFromLable();
String messageKey = generateKey(message.getMessageBody(), routeLabel, leftJoinFieldNames, rightJoinFieldNames);
return messageKey;
}
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java
index 6c4659b3,d0e499f2..0a3845fd
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java
@@@ -54,11 -55,12 +55,12 @@@ public abstract class AbstractSystemCha
protected static final String CHANNEL_PROPERTY_KEY_PREFIX = "CHANNEL_PROPERTY_KEY_PREFIX";
protected static final String CHANNEL_TYPE = "CHANNEL_TYPE";
- protected ISource consumer;
+ protected ISource<?> consumer;
protected AbstractSupportShuffleSink producer;
protected Map<String, String> channelConfig = new HashMap<>();
- protected volatile boolean hasCreateShuffleChannel = false;
+ protected boolean hasCreateShuffleChannel = false;
+ protected transient AtomicBoolean hasStart = new AtomicBoolean(false);
public void startChannel() {
if (consumer == null) {
return;
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java
index 5139f707,054a418d..7b4d8b4f
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java
@@@ -22,11 -22,11 +22,11 @@@ import java.util.Collections
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.Future;
-
import org.apache.commons.lang3.tuple.Pair;
+ import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
+import org.apache.rocketmq.streams.common.context.MessageOffset;
import org.apache.rocketmq.streams.window.debug.DebugWriter;
import org.apache.rocketmq.streams.window.model.WindowCache;
import org.apache.rocketmq.streams.window.model.WindowInstance;
@@@ -36,9 -37,8 +36,9 @@@ import org.apache.rocketmq.streams.wind
/**
* save receiver messages into cachefilter when checkpoint/autoflush/flush, process cachefilter message
*/
- public class ShuffleCache extends WindowCache {
+ public class ShuffleCache extends AbstractSink {
protected AbstractShuffleWindow window;
+ private HashMap<String, Boolean> hasLoad = new HashMap<>();
public ShuffleCache(AbstractShuffleWindow window) {
this.window = window;
@@@ -64,64 -54,23 +64,65 @@@
for (Pair<String, String> queueIdAndInstanceKey : keys) {
String queueId = queueIdAndInstanceKey.getLeft();
String windowInstanceId = queueIdAndInstanceKey.getRight();
+
List<IMessage> messages = instance2Messages.get(queueIdAndInstanceKey);
+
WindowInstance windowInstance = windowInstanceMap.get(windowInstanceId);
+
DebugWriter.getDebugWriter(window.getConfigureName()).writeShuffleReceive(window, messages, windowInstance);
+
+ stateMustLoad(queueId);
+
window.shuffleCalculate(messages, windowInstance, queueId);
+
+ //保存处理进度
saveSplitProgress(queueId, messages);
+ window.saveMsgContext(queueId,windowInstance,messages);
}
- return true;
+ return true;
}
+ private void stateMustLoad(String queueId) {
+ Boolean load = this.hasLoad.get(queueId);
+ if (load != null && load) {
+ return;
+ }
+
+ //在计算之前需要异步加载状态完成
+ HashMap<String, Future<?>> loadResult = this.window.getShuffleChannel().getLoadResult();
+ Future<?> future = loadResult.get(queueId);
+
+ if (future == null) {
+ return;
+ }
+
+ try {
+ long before = System.currentTimeMillis();
+ future.get();
+ long after = System.currentTimeMillis();
+
+ System.out.println("message wait before state recover:[" + (after - before) + "] ms, queueId=" + queueId);
+
+ for (String loadQueueId : loadResult.keySet()) {
+ hasLoad.put(loadQueueId, true);
+ }
+ } catch (Throwable t) {
+ throw new RuntimeException("check remote with queueId:" + queueId + ",error", t);
+ }
+ }
+
/**
- * save consumer progress(offset)for groupby source queueId
+ * save consumer progress(offset)for groupby source shuffleId
+ * window configName: name_window_10001
+ * shuffleId: shuffle_NormalTestTopic_namespace_name_broker-a_001
+ * oriQueueId: NormalTestTopic2_broker-a_000
*
- * @param queueId
+ * @param shuffleId
* @param messages
*/
- protected void saveSplitProgress(String queueId, List<IMessage> messages) {
+ protected void saveSplitProgress(String shuffleId, List<IMessage> messages) {
+ IStorage delegator = this.window.getStorage();
+
Map<String, String> queueId2OrigOffset = new HashMap<>();
Boolean isLong = false;
for (IMessage message : messages) {
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index f9bd3413,9280a2e4..d834af41
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@@ -18,7 -18,18 +18,8 @@@ package org.apache.rocketmq.streams.win
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
+ import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
@@@ -42,7 -57,10 +46,8 @@@ import org.apache.rocketmq.streams.comm
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.common.utils.TraceUtil;
-import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
import org.apache.rocketmq.streams.window.debug.DebugWriter;
+ import org.apache.rocketmq.streams.window.minibatch.MiniBatchMsgCache;
import org.apache.rocketmq.streams.window.model.WindowCache;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
@@@ -77,11 -90,17 +85,13 @@@ public class ShuffleChannel extends Abs
protected ShuffleCache shuffleCache;
- protected Map<String, ISplit> queueMap = new ConcurrentHashMap<>();
- protected List<ISplit> queueList;//所有的分片
+ protected Map<String, ISplit<?, ?>> queueMap = new ConcurrentHashMap<>();
+ /**
+ * 所有的分片
+ */
+ protected List<ISplit<?, ?>> queueList;
- // protected NotifyChannel notfiyChannel;//负责做分片的通知管理
protected AbstractShuffleWindow window;
- /**
- * 当前管理的分片
- */
- private Set<String> currentQueueIds;
protected transient boolean isWindowTest = false;
@@@ -116,28 -126,16 +117,16 @@@
* init shuffle channel
*/
public void init() {
- this.consumer = createSource(window.getNameSpace(), window.getConfigureName());
-
- this.producer = createSink(window.getNameSpace(), window.getConfigureName());
- if (this.consumer == null || this.producer == null) {
- autoCreateShuffleChannel(window.getFireReceiver().getPipeline());
- }
- if (this.consumer == null) {
- return;
- }
- if (this.consumer instanceof AbstractSource) {
- ((AbstractSource) this.consumer).setJsonData(true);
- }
+ init(this.window);
if (producer != null && (queueList == null || queueList.size() == 0)) {
queueList = producer.getSplitList();
- Map<String, ISplit> tmp = new ConcurrentHashMap<>();
- for (ISplit queue : queueList) {
+ Map<String, ISplit<?, ?>> tmp = new ConcurrentHashMap<>();
+ for (ISplit<?, ?> queue : queueList) {
tmp.put(queue.getQueueId(), queue);
}
-
this.queueMap = tmp;
}
- isWindowTest = ComponentCreator.getPropertyBooleanValue("window.fire.isTest");
+// isWindowTest = ComponentCreator.getPropertyBooleanValue("window.fire.isTest");
}
/**
@@@ -467,8 -505,11 +455,8 @@@
return splitNum > 0 ? splitNum : 32;
}
- public Set<String> getCurrentQueueIds() {
- return currentQueueIds;
- }
- public List<ISplit> getQueueList() {
+ public List<ISplit<?, ?>> getQueueList() {
return queueList;
}
diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java
index b012b175,69b6494a..e3bb3f83
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java
@@@ -287,7 -287,7 +287,6 @@@ public class WindowTrigger extends Abst
}
});
for (WindowInstance windowInstance : windowInstanceList) {
- System.out.println("fire by finish flag");
- // System.out.println("fire by finish flag");
fireWindowInstance(windowInstance);
}
}