You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/09/25 00:56:14 UTC
[rocketmq-streams] branch main updated: Add a quick start which
contain some examples. (#68)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new 3612f79 Add a quick start which contain some examples. (#68)
3612f79 is described below
commit 3612f79663a372e7a06e4889bbf473865eae469e
Author: Ni Ze <31...@users.noreply.github.com>
AuthorDate: Sat Sep 25 08:56:06 2021 +0800
Add a quick start which contain some examples. (#68)
* a runnable window example reading data from rocketmq.
* add quick start
* modify quick_start
* modify example.
* modify and add another pageclick example.
* remove private path
* fix spelling mistakes
* use mysql as remote checkpoint storage
* modify code style of import class
* modify annotation
Co-authored-by: nize <un...@gmail.com>
---
QUICKSTART.md | 81 -----------
README.md | 2 +-
pom.xml | 12 +-
quick_start.md | 71 +++++++++
.../apache/rocketmq/streams/sink/RocketMQSink.java | 1 +
.../rocketmq/streams/source/RocketMQSource.java | 8 +-
rocketmq-streams-clients/pom.xml | 4 +
.../rocketmq/streams/client/DataStreamAction.java | 9 +-
.../rocketmq/streams/client/DataStreamTest.java | 6 +-
.../streams/client/windows/AbstractWindowTest.java | 19 +--
.../resources/{window_msg_10 => window_msg_10.txt} | 0
.../{window_msg_100 => window_msg_100.txt} | 0
.../{window_msg_1000 => window_msg_1000.txt} | 0
.../{window_msg_10000 => window_msg_10000.txt} | 0
.../{window_msg_88121 => window_msg_88121.txt} | 0
.../common/channel/impl/OutputPrintChannel.java | 7 +-
.../common/channel/impl/file/FileSource.java | 22 ++-
.../streams/common/datatype/DateDataType.java | 13 +-
rocketmq-streams-dbinit/pom.xml | 3 -
.../src/main/resources/tables_mysql_innodb.sql | 19 +--
rocketmq-streams-examples/README.md | 159 +++++++++++++++++++++
rocketmq-streams-examples/pom.xml | 48 +++++--
.../examples/checkpoint/RemoteCheckpointTest.java | 96 +++++++++++++
.../examples/filesource/FileSourceExample.java | 2 +-
.../streams/examples/pageclick/PageDimension.java | 81 +++++++++++
.../streams/examples/pageclick/UsersDimension.java | 67 +++++++++
.../streams/examples/rocketmqsource/Constant.java | 27 ++++
.../examples/rocketmqsource/ProducerFromFile.java | 99 +++++++++++++
.../rocketmqsource/RocketMQSourceExample1.java | 24 +++-
.../rocketmqsource/RocketMQSourceExample2.java | 27 ++--
.../rocketmqsource/RocketMQSourceExample3.java | 43 ++++--
.../rocketmqsource/RocketmqWindowTest.java | 30 ++--
.../src/main/resources/data.txt | 11 ++
.../src/main/resources/pageClickData.txt | 11 ++
.../streams/window/fire/EventTimeManager.java | 8 +-
.../streams/window/fire/SplitEventTimeManager.java | 20 +--
.../window/operator/impl/WindowOperator.java | 79 +++++-----
.../streams/window/shuffle/ShuffleChannel.java | 28 ++--
38 files changed, 883 insertions(+), 254 deletions(-)
diff --git a/QUICKSTART.md b/QUICKSTART.md
deleted file mode 100644
index f265669..0000000
--- a/QUICKSTART.md
+++ /dev/null
@@ -1,81 +0,0 @@
-# Quick Start
-
-本文档详细介绍了如何在rocketmq上执行流计算任务;
-
-## 所需环境
-
-+ 64bit OS, Linux/Unix/Mac is recommended;(Windows user see guide below)
-+ 64bit JDK 1.8+;
-+ Maven 3.2.X
-
-## 使用步骤
-
-### 1. 创建maven项目, 并依赖rocketmq-streams的客户端
-
-```xml
-
-<dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-streams-clients</artifactId>
- <version>1.0.0-SNAPSHOT</version>
-</dependency>
-```
-
-### 2. 在主函数中按照streams的开发规范,编写业务逻辑
-
-```java
-import org.apache.rocketmq.streams.client.transform.DataStream;
-
-public static void main(String[]args){
- DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
-
- source
- .fromFile("~/admin/data/text.txt",false)
- .map(message->message)
- .toPrint(1)
- .start();
- }
-```
-
-### 3. 在pom.xml中加入shade插件, 将依赖的stream与业务代码一并打包, 形成-shaded.jar 包
-
-```xml
-
-<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.2.1</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <minimizeJar>false</minimizeJar>
- <shadedArtifactAttached>true</shadedArtifactAttached>
- <artifactSet>
- <includes>
- <include>org.apache.rocketmq:rocketmq-streams-clients</include>
- </includes>
- </artifactSet>
- </configuration>
- </execution>
- </executions>
-</plugin>
-
-```
-
-### 4. 将jar包拷贝到应用服务器,作为普通的java应用直接运行
-
-```
- java -jar XXXX-shade.jar \
- -Dlog4j.level=ERROR \
- -Dlog4j.home=/logs \
- -Xms1024m \
- -Xmx1024m
-```
-
-
-
-
diff --git a/README.md b/README.md
index 36b7270..d949a7b 100644
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@
[![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social)](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
## [中文文档](./README-Chinese.md)
-## [Quick Start](./QUICKSTART.md)
+## [Quick Start](./quick_start.md)
## Features
diff --git a/pom.xml b/pom.xml
index 5fbb574..1d2a636 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,7 +69,7 @@
<commons-logging.version>1.1</commons-logging.version>
<spring.version>3.2.13.RELEASE</spring.version>
<auto-service.version>1.0-rc5</auto-service.version>
- <mysql-connector.version>5.1.40</mysql-connector.version>
+ <mysql-connector.version>8.0.26</mysql-connector.version>
<fastjson.version>1.2.78</fastjson.version>
<quartz.version>2.2.1</quartz.version>
<httpclient.version>4.5.2</httpclient.version>
@@ -112,13 +112,13 @@
<exclude>.asf.yaml</exclude>
<exclude>README.md</exclude>
<exclude>README-Chinese.md</exclude>
- <exclude>QUICKSTART.md</exclude>
+ <exclude>quick_start.md</exclude>
<exclude>.github/**</exclude>
<exclude>*/target/**</exclude>
<exclude>*/*.iml</exclude>
<exclude>**/*.txt</exclude>
<exclude>**/*.cs</exclude>
- <exclude>src/test/resources/window_msg_*</exclude>
+ <exclude>**/*.sql</exclude>
</excludes>
</configuration>
</plugin>
@@ -283,6 +283,12 @@
<artifactId>rocketmq-streams-channel-rocketmq</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-dbinit</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- ================================================= -->
<!-- rocketmq library -->
diff --git a/quick_start.md b/quick_start.md
new file mode 100644
index 0000000..6f6a314
--- /dev/null
+++ b/quick_start.md
@@ -0,0 +1,71 @@
+## rocketmq-streams 快速搭建
+---
+### 前言
+本文档主要介绍如何基于rocketmq-streams快速搭建流处理任务,搭建过程中某些例子会用到rocketmq,可以参考[rocketmq搭建文档](https://rocketmq.apache.org/docs/quick-start/)
+
+
+### 1、源码构建
+
+#### 1.1、构建环境
+ - JDK 1.8 and above
+ - Maven 3.2 and above
+
+#### 1.2、构建Rocketmq-streams
+
+`git clone https://github.com/apache/rocketmq-streams.git`
+`cd rocketmq-streams`
+`mvn clean -DskipTests install -U`
+
+
+### 2、基于rocketmq-streams创建应用
+
+#### 2.1、pom依赖
+```xml
+<dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-clients</artifactId>
+</dependency>
+```
+#### 2.2、shade clients依赖包
+```xml
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.2.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <minimizeJar>false</minimizeJar>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <artifactSet>
+ <includes>
+ <include>org.apache.rocketmq:rocketmq-streams-clients</include>
+ </includes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+```
+
+#### 2.3、编写业务代码
+Please see the [rocketmq-streams-examples](rocketmq-streams-examples/README.md)
+#### 2.4、运行
+- 前提:在从rocketmq中读取数据做流处理时,需要运行topic在rocketmq中自动创建,因为做groupBy操作时,需要用到rocketmq作为shuffle数据的读写目的地。
+- 命令:
+```
+ java -jar XXXX-shade.jar \
+ -Dlog4j.level=ERROR \
+ -Dlog4j.home=/logs \
+ -Xms1024m \
+ -Xmx1024m
+```
+
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
index 6e18ba7..eb8134d 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
+
public class RocketMQSink extends AbstractSupportShuffleSink {
private static final Log LOG = LogFactory.getLog(RocketMQSink.class);
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
index b45df86..34dc221 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
@@ -41,12 +41,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.streams.common.channel.source.AbstractSupportOffsetResetSource;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
-import org.apache.rocketmq.streams.common.context.AbstractContext;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.ReflectUtil;
-import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
import org.apache.rocketmq.streams.debug.DebugWriter;
import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
@@ -60,7 +55,6 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class RocketMQSource extends AbstractSupportOffsetResetSource {
@@ -134,6 +128,8 @@ public class RocketMQSource extends AbstractSupportOffsetResetSource {
for (MessageExt msg : msgs) {
String data = new String(msg.getBody(), CHARSET);
JSONObject jsonObject = create(data);
+
+
String queueId = RocketMQMessageQueue.getQueueId(context.getMessageQueue());
String offset = msg.getQueueOffset() + "";
org.apache.rocketmq.streams.common.context.Message message = createMessage(jsonObject, queueId, offset, false);
diff --git a/rocketmq-streams-clients/pom.xml b/rocketmq-streams-clients/pom.xml
index 5632fd6..dfe0986 100644
--- a/rocketmq-streams-clients/pom.xml
+++ b/rocketmq-streams-clients/pom.xml
@@ -53,6 +53,10 @@
<artifactId>rocketmq-streams-window</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-dbinit</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
index 0a052fa..a8b99fa 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
@@ -18,11 +18,6 @@
package org.apache.rocketmq.streams.client;
import com.google.common.collect.Maps;
-
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
import org.apache.rocketmq.streams.client.strategy.Strategy;
import org.apache.rocketmq.streams.client.transform.DataStream;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
@@ -32,6 +27,10 @@ import org.apache.rocketmq.streams.common.topology.ChainStage;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
import org.apache.rocketmq.streams.configurable.ConfigurableComponent;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
public class DataStreamAction extends DataStream {
private final Map<String, Object> properties = Maps.newHashMap();
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
index 49e8559..2684c18 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
@@ -28,7 +28,11 @@ import org.junit.Before;
import org.junit.Test;
import java.io.Serializable;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
public class DataStreamTest implements Serializable {
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java
index 688b485..3eef4f2 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java
@@ -18,6 +18,16 @@
package org.apache.rocketmq.streams.client.windows;
import com.alibaba.fastjson.JSONObject;
+import org.apache.rocketmq.streams.client.transform.DataStream;
+import org.apache.rocketmq.streams.client.transform.window.Time;
+import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
+import org.apache.rocketmq.streams.common.functions.ForEachFunction;
+import org.apache.rocketmq.streams.common.functions.MapFunction;
+import org.apache.rocketmq.streams.common.utils.DateUtil;
+import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
@@ -28,15 +38,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.rocketmq.streams.client.transform.DataStream;
-import org.apache.rocketmq.streams.client.transform.window.Time;
-import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
-import org.apache.rocketmq.streams.common.functions.ForEachFunction;
-import org.apache.rocketmq.streams.common.functions.MapFunction;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
import static junit.framework.TestCase.assertTrue;
diff --git a/rocketmq-streams-clients/src/test/resources/window_msg_10 b/rocketmq-streams-clients/src/test/resources/window_msg_10.txt
similarity index 100%
rename from rocketmq-streams-clients/src/test/resources/window_msg_10
rename to rocketmq-streams-clients/src/test/resources/window_msg_10.txt
diff --git a/rocketmq-streams-clients/src/test/resources/window_msg_100 b/rocketmq-streams-clients/src/test/resources/window_msg_100.txt
similarity index 100%
rename from rocketmq-streams-clients/src/test/resources/window_msg_100
rename to rocketmq-streams-clients/src/test/resources/window_msg_100.txt
diff --git a/rocketmq-streams-clients/src/test/resources/window_msg_1000 b/rocketmq-streams-clients/src/test/resources/window_msg_1000.txt
similarity index 100%
rename from rocketmq-streams-clients/src/test/resources/window_msg_1000
rename to rocketmq-streams-clients/src/test/resources/window_msg_1000.txt
diff --git a/rocketmq-streams-clients/src/test/resources/window_msg_10000 b/rocketmq-streams-clients/src/test/resources/window_msg_10000.txt
similarity index 100%
rename from rocketmq-streams-clients/src/test/resources/window_msg_10000
rename to rocketmq-streams-clients/src/test/resources/window_msg_10000.txt
diff --git a/rocketmq-streams-clients/src/test/resources/window_msg_88121 b/rocketmq-streams-clients/src/test/resources/window_msg_88121.txt
similarity index 100%
rename from rocketmq-streams-clients/src/test/resources/window_msg_88121
rename to rocketmq-streams-clients/src/test/resources/window_msg_88121.txt
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
index 8c4f63c..ebb9415 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
@@ -16,11 +16,10 @@
*/
package org.apache.rocketmq.streams.common.channel.impl;
-import java.util.List;
-
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.utils.PrintUtil;
+
+import java.util.List;
/**
* 测试使用,输出就是把消息打印出来
@@ -30,7 +29,7 @@ public class OutputPrintChannel extends AbstractSink {
@Override
protected boolean batchInsert(List<IMessage> messages) {
for (IMessage msg : messages) {
- //System.out.println(msg.getMessageValue());
+ System.out.println(msg.getMessageValue());
}
return false;
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileSource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileSource.java
index f8fcbcd..3add3df 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileSource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileSource.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
+import java.net.URL;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -55,7 +56,7 @@ public class FileSource extends AbstractBatchSource {
@Override
protected boolean initConfigurable() {
super.initConfigurable();
- File file = new File(filePath);
+ File file = getFile(filePath);
if (file.exists() && file.isDirectory()) {
executorService = new ThreadPoolExecutor(maxThread, maxThread,
0L, TimeUnit.MILLISECONDS,
@@ -64,6 +65,23 @@ public class FileSource extends AbstractBatchSource {
return true;
}
+ private File getFile(String filePath) {
+ File file = new File(filePath);
+ if (!file.exists()) {
+ ClassLoader loader = getClass().getClassLoader();
+ URL url = loader.getResource(filePath);
+
+ if (url != null) {
+ String path = url.getFile();
+ file = new File(path);
+ this.filePath = path;
+ }
+ }
+ return file;
+
+
+ }
+
@Override
protected boolean startSource() {
@@ -97,7 +115,7 @@ public class FileSource extends AbstractBatchSource {
*/
protected LinkedBlockingQueue<FileIterator> createIteratorList() {
LinkedBlockingQueue<FileIterator> iterators = new LinkedBlockingQueue<>(1000);
- File file = new File(filePath);
+ File file = getFile(filePath);
if (file.exists() == false) {
return null;
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java
index 34d5536..41a113c 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java
@@ -17,13 +17,16 @@
package org.apache.rocketmq.streams.common.datatype;
import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.utils.NumberUtils;
+
import java.sql.Timestamp;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.Date;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.utils.NumberUtils;
public class DateDataType extends BaseDataType<Date> {
@@ -85,6 +88,10 @@ public class DateDataType extends BaseDataType<Date> {
if (Timestamp.class.isInstance(object)) {
return new Date(((Timestamp)object).getTime());
}
+ if (object instanceof LocalDateTime) {
+ LocalDateTime tempTime = (LocalDateTime) object;
+ return Date.from(tempTime.atZone(ZoneId.systemDefault()).toInstant());
+ }
return super.convert(object);
}
diff --git a/rocketmq-streams-dbinit/pom.xml b/rocketmq-streams-dbinit/pom.xml
index 1eba812..14f5679 100644
--- a/rocketmq-streams-dbinit/pom.xml
+++ b/rocketmq-streams-dbinit/pom.xml
@@ -31,13 +31,10 @@
<resources>
<resource>
<directory>src/main/resources</directory>
-
<includes>
<include>**/*.sql</include>
<include>**/*.properties</include>
</includes>
-
-
<filtering>true</filtering>
</resource>
</resources>
diff --git a/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql b/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql
index 0c6bc81..d6c23ee 100644
--- a/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql
+++ b/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql
@@ -1,24 +1,9 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
CREATE TABLE IF NOT EXISTS `window_max_value` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`gmt_create` datetime NOT NULL,
`gmt_modified` datetime NOT NULL,
+ `max_offset` varchar(20) NOT NULL,
+ `is_max_offset_long` int(11) DEFAULT NULL,
`max_value` bigint(20) unsigned NOT NULL,
`max_event_time` bigint(20) unsigned NOT NULL,
`msg_key` varchar(256) NOT NULL,
diff --git a/rocketmq-streams-examples/README.md b/rocketmq-streams-examples/README.md
new file mode 100644
index 0000000..53bb503
--- /dev/null
+++ b/rocketmq-streams-examples/README.md
@@ -0,0 +1,159 @@
+## rocketmq-streams-examples
+
+### 1、fileSource example
+逐行读取文件数据,并打印出来。
+```java
+public class FileSourceExample {
+ public static void main(String[] args) {
+ DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+ source.fromFile("data.txt", false)
+ .map(message -> message)
+ .toPrint(1)
+ .start();
+ }
+}
+
+```
+
+
+### 2、分时间段,统计分组中某字段的和
+
+#### 2.1 安装rocketmq
+可以参考[rocketmq搭建文档](https://rocketmq.apache.org/docs/quick-start/)
+
+#### 2.2 源数据
+[源数据](./../rocketmq-streams-examples/src/main/resources/data.txt)
+```xml
+{"InFlow":"1","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"0"}
+{"InFlow":"2","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"1"}
+{"InFlow":"3","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"2"}
+{"InFlow":"4","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"3"}
+{"InFlow":"5","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"4"}
+{"InFlow":"6","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"5"}
+{"InFlow":"7","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"6"}
+{"InFlow":"8","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"7"}
+{"InFlow":"9","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"8"}
+{"InFlow":"10","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"9"}
+```
+
+#### 2.3 代码示例
+
+[代码示例](./../rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java)
+
+
+#### 2.4 结果说明
+这个例子中,使用rocketmq-streams消费rocketmq中的数据,并按照ProjectName和LogStore两个字段联合分组统计,两个字段的值相同,分为一组。
+分别统计每组的InFlow和OutFlow两字段累计和。
+
+data.text数据运行的结果部分如下:
+
+```xml
+"InFlow":22,"total":4,"ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":18
+"InFlow":18,"total":3,"ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":15
+"InFlow":15,"total":3,"ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":12
+```
+可见"ProjectName":"ProjectName-0","LogStore":"LogStore-0"分组公有4条数据,"ProjectName":"ProjectName-2","LogStore":"LogStore-2",3条数据。
+"ProjectName":"ProjectName-1","LogStore":"LogStore-1"分组3条数据,总共10条数据。结果与源数据一致。
+
+### 3、网页点击统计
+#### 3.1、数据说明
+原始数据为resources路径下的[pageClickData.txt](./../rocketmq-streams-examples/src/main/resources/pageClickData.txt)
+
+第一列是用户id,第二列是用户点击时间,最后一列是网页地址
+```xml
+{"userId":"1","eventTime":"1631700000000","method":"GET","url":"page-1"}
+{"userId":"2","eventTime":"1631700030000","method":"POST","url":"page-2"}
+{"userId":"3","eventTime":"1631700040000","method":"GET","url":"page-3"}
+{"userId":"1","eventTime":"1631700050000","method":"DELETE","url":"page-2"}
+{"userId":"1","eventTime":"1631700060000","method":"DELETE","url":"page-2"}
+{"userId":"2","eventTime":"1631700070000","method":"POST","url":"page-3"}
+{"userId":"3","eventTime":"1631700080000","method":"GET","url":"page-1"}
+{"userId":"1","eventTime":"1631700090000","method":"GET","url":"page-2"}
+{"userId":"2","eventTime":"1631700100000","method":"PUT","url":"page-3"}
+{"userId":"4","eventTime":"1631700120000","method":"POST","url":"page-1"}
+```
+
+#### 3.1、统计某段时间窗口内用户点击网页次数
+[代码示例](./../rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/UsersDimension.java)
+
+结果:
+```xml
+{"start_time":"2021-09-15 18:00:00","total":1,"windowInstanceId":"SPVGTV6DaXmxV5mGNzQixQ==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","userId":"2"}
+{"start_time":"2021-09-15 18:00:00","total":1,"windowInstanceId":"dzAZ104qjUAwzTE6gbKSPA==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","userId":"3"}
+{"start_time":"2021-09-15 18:00:00","total":2,"windowInstanceId":"wrTTyU5DiDkrAb6669Ig9w==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","userId":"1"}
+{"start_time":"2021-09-15 18:01:00","total":1,"windowInstanceId":"vabkmx14xHsJ7G7w16vwug==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","userId":"3"}
+{"start_time":"2021-09-15 18:01:00","total":2,"windowInstanceId":"YIgEKptN2Wf+Oq2m8sEcYw==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","userId":"2"}
+{"start_time":"2021-09-15 18:01:00","total":2,"windowInstanceId":"iYKnwMYAzXFJYbO1KvDnng==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","userId":"1"}
+{"start_time":"2021-09-15 18:02:00","total":1,"windowInstanceId":"HBojuU6/2F/6llkyefECxw==","offset":53892181100000001,"end_time":"2021-09-15 18:03:00","userId":"4"}
+```
+
+在时间范围 18:00:00- 18:01:00内:
+
+|userId|点击次数|
+|------|---|
+| 1 | 2 |
+| 2 | 1 |
+| 3 | 1 |
+
+在时间范围 18:01:00- 18:02:00内:
+
+|userId|点击次数|
+|------|---|
+| 1 | 2 |
+| 2 | 2 |
+| 3 | 1 |
+
+在时间范围 18:02:00- 18:03:00内:
+
+|userId|点击次数|
+|------|---|
+| 4 | 1 |
+
+可查看原数据文件,eventTime为时间字段,简单检查后上述结果与预期相符合。
+
+#### 3.2、统计某段时间窗口内,被点击次数最多的网页
+[代码示例](./../rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/PageDimension.java)
+
+运行结果:
+```xml
+{"start_time":"2021-09-15 18:00:00","total":1,"windowInstanceId":"wrTTyU5DiDkrAb6669Ig9w==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","url":"page-1"}
+{"start_time":"2021-09-15 18:00:00","total":2,"windowInstanceId":"seECZRcaQSRsET1rDc6ZAw==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","url":"page-2"}
+{"start_time":"2021-09-15 18:00:00","total":1,"windowInstanceId":"dzAZ104qjUAwzTE6gbKSPA==","offset":53892061100000001,"end_time":"2021-09-15 18:01:00","url":"page-3"}
+{"start_time":"2021-09-15 18:01:00","total":2,"windowInstanceId":"uCqvAeaLTYRnjQm8dCZOvw==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","url":"page-2"}
+{"start_time":"2021-09-15 18:01:00","total":3,"windowInstanceId":"vabkmx14xHsJ7G7w16vwug==","offset":53892121100000001,"end_time":"2021-09-15 18:02:00","url":"page-3"}
+{"start_time":"2021-09-15 18:02:00","total":1,"windowInstanceId":"NdgwYMT8azNMu55NUIvygg==","offset":53892181100000001,"end_time":"2021-09-15 18:03:00","url":"page-1"}
+
+```
+在时间窗口18:00:00 - 18:01:00 内,有4条数据;
+
+在时间窗口18:01:00 - 18:02:00 内,有5条数据;
+
+在时间窗口18:02:00 - 18:03:00 内,有1条数据;
+
+分钟统计窗口内,被点击次数最多的网页.
+得到上述数据后,需要按照窗口进行筛选最大值,需要再次计算。
+代码:
+```java
+ public void findMax() {
+ DataStreamSource source = StreamBuilder.dataStream("ns-1", "pl-1");
+ source.fromFile("/home/result.txt", false)
+ .map(message -> JSONObject.parseObject((String) message))
+ .window(TumblingWindow.of(Time.seconds(5)))
+ .groupBy("start_time","end_time")
+ .max("total")
+ .waterMark(1)
+ .setLocalStorageOnly(true)
+ .toDataSteam()
+ .toPrint(1)
+ .start();
+ }
+
+```
+得到结果:
+```xml
+{"start_time":"2021-09-17 11:09:35","total":"2","windowInstanceId":"kRRpe2hPEQtEuTkfnXUaHg==","offset":54040181100000001,"end_time":"2021-09-17 11:09:40"}
+{"start_time":"2021-09-17 11:09:35","total":"3","windowInstanceId":"kRRpe2hPEQtEuTkfnXUaHg==","offset":54040181100000002,"end_time":"2021-09-17 11:09:40"}
+{"start_time":"2021-09-17 11:09:35","total":"1","windowInstanceId":"kRRpe2hPEQtEuTkfnXUaHg==","offset":54040181100000003,"end_time":"2021-09-17 11:09:40"}
+```
+
+可以得到三个窗口中网页点击次数最多分别是2次,1次,3次。
diff --git a/rocketmq-streams-examples/pom.xml b/rocketmq-streams-examples/pom.xml
index ee6bbf6..8d97fa6 100644
--- a/rocketmq-streams-examples/pom.xml
+++ b/rocketmq-streams-examples/pom.xml
@@ -27,24 +27,48 @@
<artifactId>rocketmq-streams-examples</artifactId>
<name>ROCKETMQ STREAMS :: examples</name>
+
+ <properties>
+ <file_encoding>UTF-8</file_encoding>
+ <project.build.sourceEncoding>${file_encoding}</project.build.sourceEncoding>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+
+
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-clients</artifactId>
- <exclusions>
- <exclusion>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </exclusion>
- </exclusions>
</dependency>
</dependencies>
<packaging>jar</packaging>
- <properties>
- <file_encoding>UTF-8</file_encoding>
- <project.build.sourceEncoding>${file_encoding}</project.build.sourceEncoding>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.2.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <minimizeJar>false</minimizeJar>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <artifactSet>
+ <includes>
+ <include>org.apache.rocketmq:rocketmq-streams-clients</include>
+ </includes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointTest.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointTest.java
new file mode 100644
index 0000000..2daa525
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointTest.java
@@ -0,0 +1,96 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.streams.examples.checkpoint;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.rocketmq.streams.client.StreamBuilder;
+import org.apache.rocketmq.streams.client.source.DataStreamSource;
+import org.apache.rocketmq.streams.client.transform.window.Time;
+import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.dbinit.mysql.delegate.DBDelegate;
+import org.apache.rocketmq.streams.dbinit.mysql.delegate.DBDelegateFactory;
+import org.apache.rocketmq.streams.examples.rocketmqsource.ProducerFromFile;
+
+import static org.apache.rocketmq.streams.db.driver.DriverBuilder.DEFALUT_JDBC_DRIVER;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
+
+
+public class RemoteCheckpointTest {
+ //replace with your mysql url, database name can be anyone else.
+ private static final String URL = "jdbc:mysql://localhost:3306/rocketmq_streams";
+ // user name of mysql
+ private static final String USER_NAME = "";
+ //password of mysql
+ private static final String PASSWORD = "";
+
+
+ static {
+ ComponentCreator.getProperties().put(ConfigureFileKey.CONNECT_TYPE, "DB");
+ ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_URL, URL);
+ ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_DRIVER, DEFALUT_JDBC_DRIVER);
+ ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_USERNAME, USER_NAME);
+ ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_PASSWORD, PASSWORD);
+ }
+
+ public static void main(String[] args) {
+ ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC);
+ DBDelegate delegate = DBDelegateFactory.getDelegate();
+ delegate.init();
+
+ try {
+ Thread.sleep(1000 * 3);
+ } catch (InterruptedException e) {
+ }
+ System.out.println("begin streams code.");
+
+ DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+ source.fromRocketmq(
+ RMQ_TOPIC,
+ RMQ_CONSUMER_GROUP_NAME,
+ false,
+ NAMESRV_ADDRESS)
+ .filter((message) -> {
+ try {
+ JSONObject.parseObject((String) message);
+ } catch (Throwable t) {
+ // if can not convert to json, discard it.because all operator are base on json.
+ return true;
+ }
+ return false;
+ })
+ //must convert message to json.
+ .map(message -> JSONObject.parseObject((String) message))
+ .window(TumblingWindow.of(Time.seconds(10)))
+ .groupBy("ProjectName","LogStore")
+ .sum("OutFlow", "OutFlow")
+ .sum("InFlow", "InFlow")
+ .count("total")
+ .waterMark(5)
+ .setLocalStorageOnly(false)
+ .toDataSteam()
+ .toPrint(1)
+ .start();
+ }
+
+}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java
index d568d5f..a7b4361 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java
@@ -22,7 +22,7 @@ import org.apache.rocketmq.streams.client.source.DataStreamSource;
public class FileSourceExample {
public static void main(String[] args) {
DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
- source.fromFile("/your/file/path", false)
+ source.fromFile("data.txt", false)
.map(message -> message)
.toPrint(1)
.start();
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/PageDimension.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/PageDimension.java
new file mode 100644
index 0000000..95bfd5c
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/PageDimension.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.streams.examples.pageclick;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.rocketmq.streams.client.StreamBuilder;
+import org.apache.rocketmq.streams.client.source.DataStreamSource;
+import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
+import org.apache.rocketmq.streams.client.transform.window.Time;
+import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
+import org.apache.rocketmq.streams.examples.rocketmqsource.ProducerFromFile;
+import org.junit.Test;
+
+public class PageDimension {
+ private static final String topic = "pageClick";
+ private static final String namesrv = "127.0.0.1:9876";
+
+ /**
+ * In a certain period of time, how many times did a user click on a certain webpage
+ *
+ * @param args
+ */
+ public static void main(String[] args) {
+ ProducerFromFile.produce("pageClickData.txt", namesrv, topic);
+
+ try {
+ Thread.sleep(1000 * 3);
+ } catch (InterruptedException e) {
+ }
+ System.out.println("begin streams code.");
+
+ DataStreamSource source = StreamBuilder.dataStream("pageClickNS", "pageClickPL");
+ source.fromRocketmq(topic, "pageClickGroup", false, namesrv)
+ .map(message -> JSONObject.parseObject((String) message))
+ .window(TumblingWindow.of(Time.minutes(1)))
+ .groupBy("url")
+ .setTimeField("eventTime")
+ .count("total")
+ .waterMark(1)
+ .setLocalStorageOnly(true)
+ .toDataSteam()
+ .toFile("/home/result.txt")
+ .with(WindowStrategy.highPerformance())
+ .start();
+ }
+
+ @Test
+ public void findMax() {
+
+ DataStreamSource source = StreamBuilder.dataStream("ns-1", "pl-1");
+ source.fromFile("/home/result.txt", false)
+ .map(message -> JSONObject.parseObject((String) message))
+ .window(TumblingWindow.of(Time.seconds(5)))
+ .groupBy("start_time","end_time")
+ .max("total")
+ .waterMark(1)
+ .setLocalStorageOnly(true)
+ .toDataSteam()
+ .toPrint(1)
+ .start();
+
+ }
+
+}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/UsersDimension.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/UsersDimension.java
new file mode 100644
index 0000000..c7e6c74
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/pageclick/UsersDimension.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.streams.examples.pageclick;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.rocketmq.streams.client.StreamBuilder;
+import org.apache.rocketmq.streams.client.source.DataStreamSource;
+import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
+import org.apache.rocketmq.streams.client.transform.window.Time;
+import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
+import org.apache.rocketmq.streams.examples.rocketmqsource.ProducerFromFile;
+
+
+public class UsersDimension {
+ private static final String topic = "pageClick";
+ private static final String namesrv = "127.0.0.1:9876";
+
+ /**
+ * Count the number of times a user clicks on a webpage within 5s
+ * @param args
+ */
+ public static void main(String[] args) {
+ ProducerFromFile.produce("pageClickData.txt",namesrv, topic);
+
+ try {
+ Thread.sleep(1000 * 3);
+ } catch (InterruptedException e) {
+ }
+ System.out.println("begin streams code.");
+
+
+ DataStreamSource source = StreamBuilder.dataStream("pageClickNS", "pageClickPL");
+ source.fromRocketmq(topic, "pageClickGroup", false, namesrv)
+ .map(message -> JSONObject.parseObject((String) message))
+ .window(TumblingWindow.of(Time.minutes(1)))
+ .groupBy("userId")
+ .setTimeField("eventTime")
+ .count("total")
+ .waterMark(1)
+ .setLocalStorageOnly(true)
+ .toDataSteam()
+ .toPrint(1)
+ .with(WindowStrategy.highPerformance())
+ .start();
+
+ }
+
+
+
+}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/Constant.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/Constant.java
new file mode 100644
index 0000000..c9287f4
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/Constant.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.streams.examples.rocketmqsource;
+
+public class Constant {
+ public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
+ public static final String RMQ_TOPIC = "NormalTestTopic";
+ public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-02";
+ public static final String TAGS = "*";
+}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java
new file mode 100644
index 0000000..b26b66f
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.rocketmq.streams.examples.rocketmqsource;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ProducerFromFile {
+
+ public static void produce(String filePath, String nameServ, String topic) {
+ try {
+ DefaultMQProducer producer = new DefaultMQProducer("test-group");
+ producer.setNamesrvAddr(nameServ);
+ producer.start();
+
+ List<String> result = ProducerFromFile.read(filePath);
+
+ for (String str : result) {
+ Message msg = new Message(topic, "", str.getBytes(RemotingHelper.DEFAULT_CHARSET));
+ SendResult sendResult = producer.send(msg);
+ System.out.printf("%s%n", sendResult);
+ }
+ //Shut down once the producer instance is not longer in use.
+ producer.shutdown();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+
+ }
+
+ private static File getFile(String filePath) {
+ File file = new File(filePath);
+ if (!file.exists()) {
+ ClassLoader loader = ProducerFromFile.class.getClassLoader();
+ URL url = loader.getResource(filePath);
+
+ if (url != null) {
+ String path = url.getFile();
+ file = new File(path);
+ }
+ }
+ return file;
+
+ }
+
+ private static List<String> read(String path) {
+ File file = getFile(path);
+ List<String> result = new ArrayList<>();
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new FileReader(file));
+
+ String line = reader.readLine();
+ while (line != null && !"".equals(line)) {
+ result.add(line);
+ line = reader.readLine();
+ }
+
+ } catch (Throwable t) {
+ t.printStackTrace();
+ } finally {
+ if (reader!= null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ return result;
+ }
+}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample1.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample1.java
index c2e6bd1..c56c975 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample1.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample1.java
@@ -19,14 +19,30 @@ package org.apache.rocketmq.streams.examples.rocketmqsource;
import org.apache.rocketmq.streams.client.StreamBuilder;
import org.apache.rocketmq.streams.client.source.DataStreamSource;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
+
+
public class RocketMQSourceExample1 {
public static void main(String[] args) {
- DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+ ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC);
+ try {
+ Thread.sleep(1000 * 3);
+ } catch (InterruptedException e) {
+ }
+
+ System.out.println("begin streams code.");
+
+ DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+ /**
+ * 1、before run this case, make sure some data has already been rocketmq.
+ */
source.fromRocketmq(
- RocketMQSourceExample2.RMQ_TOPIC,
- RocketMQSourceExample2.RMQ_CONSUMER_GROUP_NAME,
- RocketMQSourceExample2.NAMESRV_ADDRESS
+ RMQ_TOPIC,
+ RMQ_CONSUMER_GROUP_NAME,
+ NAMESRV_ADDRESS
)
.map(message -> message)
.toPrint(1)
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
index c9d9bda..69c217b 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
@@ -18,21 +18,28 @@ package org.apache.rocketmq.streams.examples.rocketmqsource;
import org.apache.rocketmq.streams.client.StreamBuilder;
import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.client.transform.window.Time;
-import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
-import org.apache.rocketmq.streams.client.transform.window.WindowInfo;
import java.util.Arrays;
-public class RocketMQSourceExample2 {
- public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
- public static final String RMQ_TOPIC = "NormalTestTopic";
- public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-01";
- public static final String TAGS = "*";
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
+public class RocketMQSourceExample2 {
+ /**
+ * 1、before run this case, make sure some data has already been rocketmq.
+ */
public static void main(String[] args) {
- DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+ ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC);
+ try {
+ Thread.sleep(1000 * 3);
+ } catch (InterruptedException e) {
+ }
+
+ System.out.println("begin streams code.");
+
+ DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
source.fromRocketmq(
RMQ_TOPIC,
RMQ_CONSUMER_GROUP_NAME,
@@ -47,7 +54,7 @@ public class RocketMQSourceExample2 {
.filter((value) -> {
System.out.println("filter: ===========");
String messageValue = (String)value;
- return !messageValue.contains("RocketMQ");
+ return !messageValue.contains("InFlow");
})
.flatMap((message)->{
String value = (String) message;
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
index 1af6206..8411745 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
@@ -17,20 +17,34 @@
package org.apache.rocketmq.streams.examples.rocketmqsource;
+import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.streams.client.StreamBuilder;
import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
-public class RocketMQSourceExample3 {
- public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
- public static final String RMQ_TOPIC = "NormalTestTopic";
- public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-03";
- public static final String TAGS = "*";
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
+public class RocketMQSourceExample3 {
+ /**
+ * 1、before run this case, make sure some data has already been rocketmq.
+ */
public static void main(String[] args) {
- DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+ ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC);
+
+ try {
+ Thread.sleep(1000 * 3);
+ } catch (InterruptedException e) {
+ }
+ System.out.println("begin streams code.");
+
+ DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
source.fromRocketmq(
RMQ_TOPIC,
RMQ_CONSUMER_GROUP_NAME,
@@ -42,12 +56,19 @@ public class RocketMQSourceExample3 {
.map(message -> message)
.filter((value) -> {
String messageValue = (String) value;
- return messageValue.contains("RocketMQ");
+ return !messageValue.contains("InFlow");
})
.flatMap((message) -> {
- String value = (String) message;
- String[] result = value.split(" ");
- return Arrays.asList(result);
+ JSONObject jsonObject = JSONObject.parseObject((String) message);
+ Set<Map.Entry<String, Object>> entries = jsonObject.entrySet();
+
+ List<String> result = new ArrayList<>();
+
+ for (Map.Entry<String, Object> entry : entries) {
+ String str = entry.getKey() + ":" + entry.getValue();
+ result.add(str);
+ }
+ return result;
})
.toPrint(1)
.start();
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
index a50d6cb..6dfdfb5 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
@@ -24,15 +24,26 @@ import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
import org.apache.rocketmq.streams.client.transform.window.Time;
import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
+import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
+
public class RocketmqWindowTest {
- public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
- public static final String RMQ_TOPIC = "NormalTestTopic";
- public static final String RMQ_CONSUMER_GROUP_NAME = "group-03";
- public static final String TAGS = "*";
+ /**
+ * 1、before run this case, make sure some data has already been rocketmq.
+ * 2、rocketmq allow create topic automatically.
+ */
public static void main(String[] args) {
- DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+ ProducerFromFile.produce("data.txt",NAMESRV_ADDRESS, RMQ_TOPIC);
+ try {
+ Thread.sleep(1000 * 3);
+ } catch (InterruptedException e) {
+ }
+ System.out.println("begin streams code.");
+
+ DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
source.fromRocketmq(
RMQ_TOPIC,
RMQ_CONSUMER_GROUP_NAME,
@@ -47,11 +58,14 @@ public class RocketmqWindowTest {
}
return false;
})
+ //must convert message to json.
.map(message -> JSONObject.parseObject((String) message))
- .window(TumblingWindow.of(Time.seconds(1)))
- .groupBy("ProjectName", "LogStore")
+ .window(TumblingWindow.of(Time.seconds(10)))
+ .groupBy("ProjectName","LogStore")
+ .sum("OutFlow", "OutFlow")
+ .sum("InFlow", "InFlow")
.count("total")
- .waterMark(1)
+ .waterMark(5)
.setLocalStorageOnly(true)
.toDataSteam()
.toPrint(1)
diff --git a/rocketmq-streams-examples/src/main/resources/data.txt b/rocketmq-streams-examples/src/main/resources/data.txt
new file mode 100644
index 0000000..d6a9ae9
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/resources/data.txt
@@ -0,0 +1,11 @@
+{"InFlow":"1","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"0"}
+{"InFlow":"2","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"1"}
+{"InFlow":"3","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"2"}
+{"InFlow":"4","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"3"}
+{"InFlow":"5","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"4"}
+{"InFlow":"6","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"5"}
+{"InFlow":"7","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"6"}
+{"InFlow":"8","ProjectName":"ProjectName-1","LogStore":"LogStore-1","OutFlow":"7"}
+{"InFlow":"9","ProjectName":"ProjectName-2","LogStore":"LogStore-2","OutFlow":"8"}
+{"InFlow":"10","ProjectName":"ProjectName-0","LogStore":"LogStore-0","OutFlow":"9"}
+
diff --git a/rocketmq-streams-examples/src/main/resources/pageClickData.txt b/rocketmq-streams-examples/src/main/resources/pageClickData.txt
new file mode 100644
index 0000000..51bd89d
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/resources/pageClickData.txt
@@ -0,0 +1,11 @@
+{"userId":"1","eventTime":"1631700000000","method":"GET","url":"page-1"}
+{"userId":"2","eventTime":"1631700030000","method":"POST","url":"page-2"}
+{"userId":"3","eventTime":"1631700040000","method":"GET","url":"page-3"}
+{"userId":"1","eventTime":"1631700050000","method":"DELETE","url":"page-2"}
+{"userId":"1","eventTime":"1631700060000","method":"DELETE","url":"page-2"}
+{"userId":"2","eventTime":"1631700070000","method":"POST","url":"page-3"}
+{"userId":"3","eventTime":"1631700080000","method":"GET","url":"page-2"}
+{"userId":"1","eventTime":"1631700090000","method":"GET","url":"page-3"}
+{"userId":"2","eventTime":"1631700100000","method":"PUT","url":"page-3"}
+{"userId":"4","eventTime":"1631700120000","method":"POST","url":"page-1"}
+
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
index 8850ada..46eb1dd 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
@@ -16,13 +16,13 @@
*/
package org.apache.rocketmq.streams.window.fire;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
+import java.util.HashMap;
+import java.util.Map;
+
public class EventTimeManager {
private Map<String,SplitEventTimeManager> eventTimeManagerMap=new HashMap<>();
protected ISource source;
@@ -40,6 +40,8 @@ public class EventTimeManager {
}
}
}
+
+
splitEventTimeManager.updateEventTime(message,window);
}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
index 70742f3..42f5408 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
@@ -16,32 +16,26 @@
*/
package org.apache.rocketmq.streams.window.fire;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.context.MessageOffset;
-import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.db.driver.DriverBuilder;
-import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
import org.apache.rocketmq.streams.window.model.WindowCache;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class SplitEventTimeManager {
protected static final Log LOG = LogFactory.getLog(SplitEventTimeManager.class);
- protected Map<String,Long> messageSplitId2MaxTime=new HashMap<>();
+ protected static Map<String,Long> messageSplitId2MaxTime=new HashMap<>();
private AtomicInteger queueIdCount=new AtomicInteger(0);
protected Long lastUpdateTime;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
index 0f573e2..435aa71 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
@@ -16,18 +16,6 @@
*/
package org.apache.rocketmq.streams.window.operator.impl;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.MessageOffset;
@@ -48,6 +36,18 @@ import org.apache.rocketmq.streams.window.storage.IWindowStorage;
import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;
import org.apache.rocketmq.streams.window.storage.WindowStorage.WindowBaseValueIterator;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class WindowOperator extends AbstractShuffleWindow {
private static final String ORDER_BY_SPLIT_NUM="_order_by_split_num_";//key=_order;queueid,windowinstanceid,partitionNum
@@ -80,7 +80,7 @@ public class WindowOperator extends AbstractShuffleWindow {
protected transient AtomicInteger shuffleCount=new AtomicInteger(0);
protected transient AtomicInteger fireCountAccumulator=new AtomicInteger(0);
@Override
- public int fireWindowInstance(WindowInstance instance, String queueId,Map<String,String> queueId2Offset) {
+ public int fireWindowInstance(WindowInstance instance, String queueId, Map<String,String> queueId2Offset) {
List<WindowValue> windowValues=new ArrayList<>();
int fireCount=0;
long startTime=System.currentTimeMillis();
@@ -100,10 +100,9 @@ public class WindowOperator extends AbstractShuffleWindow {
continue;
}
WindowValue windowValue=(WindowValue)windowBaseValue;
- Integer currentValue=(Integer)windowValue.getComputedColumnResultByKey("total");
- if(currentValue==null){
- currentValue=0;
- }
+
+ Integer currentValue = getValue(windowValue, "total");
+
fireCountAccumulator.addAndGet(currentValue);
windowValues.add((WindowValue)windowBaseValue);
if(windowValues.size()>=windowCache.getBatchSize()){
@@ -121,15 +120,9 @@ public class WindowOperator extends AbstractShuffleWindow {
sendCost+=(System.currentTimeMillis()-sendFireCost);
fireCount+=windowValues.size();
}
-// if(fireCountAccumulator.get()>25000){
-// System.out.println("fire count is "+fireCountAccumulator.get());
-// }
-
- //long clearStart=System.currentTimeMillis();
clearFire(instance);
this.sqlCache.addCache(new FiredNotifySQLElement(queueId,instance.createWindowInstanceId()));
- // System.out.println("=============== fire cost is "+(System.currentTimeMillis()-startTime)+"send cost is "+sendCost+" clear cost is "+(System.currentTimeMillis()-clearStart));
- return fireCount;
+ return fireCount;
}
protected transient Map<String,Integer> shuffleWindowInstanceId2MsgCount=new HashMap<>();
@@ -161,39 +154,39 @@ public class WindowOperator extends AbstractShuffleWindow {
}
allWindowValues.put(storeKey,windowValue);
windowValue.incrementUpdateVersion();
- Integer origValue=(Integer)windowValue.getComputedColumnResultByKey("total");
- if(origValue==null){
- origValue=0;
- }
+
+ Integer origValue = getValue(windowValue, "total");
+
if(msgs!=null){
for(IMessage message:msgs){
windowValue.calculate(this,message);
}
}
- Integer currentValue=(Integer)windowValue.getComputedColumnResultByKey("total");
- if(currentValue==null){
- currentValue=0;
- }
+ Integer currentValue = getValue(windowValue, "total");
+
shuffleCount.addAndGet(-origValue);
shuffleCount.addAndGet(currentValue);
- // if(shuffleCount.get()>25000){
- // System.out.println("==========shuffle count is "+shuffleCount.get());
- //}
-
-
}
if(DebugWriter.getDebugWriter(this.getConfigureName()).isOpenDebug()){
DebugWriter.getDebugWriter(this.getConfigureName()).writeWindowCalculate(this,new ArrayList(allWindowValues.values()),queueId);
}
saveStorage(allWindowValues,instance,queueId);
- //Integer count=shuffleWindowInstanceId2MsgCount.get(instance.createWindowInstanceId());
- //if(count==null){
- // count=0;
- //}
- //count+=messages.size();
- //shuffleWindowInstanceId2MsgCount.put(instance.createWindowInstanceId(),count);
+ }
+
+ private Integer getValue(WindowValue windowValue, String fieldName) {
+ Object value = windowValue.getComputedColumnResultByKey(fieldName);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Integer) {
+ return (Integer) value;
+ } else if (value instanceof String) {
+ String strValue = (String) value;
+ return Integer.valueOf(strValue);
+ }
+ throw new ClassCastException("value:["+value+"] of fieldName:["+fieldName+"] can not change to number.");
}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index ba17a44..523eb0f 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -20,10 +20,6 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
@@ -32,32 +28,36 @@ import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointState;
import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.common.context.AbstractContext;
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.context.MessageOffset;
import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
import org.apache.rocketmq.streams.common.topology.model.Pipeline;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.DateUtil;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.common.utils.TraceUtil;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
import org.apache.rocketmq.streams.window.debug.DebugWriter;
+import org.apache.rocketmq.streams.window.model.WindowCache;
+import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.common.context.AbstractContext;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
-import org.apache.rocketmq.streams.window.model.WindowCache;
import org.apache.rocketmq.streams.window.operator.impl.WindowOperator.WindowRowOperator;
import org.apache.rocketmq.streams.window.sqlcache.impl.SQLElement;
import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
* 负责处理分片