You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by se...@apache.org on 2021/12/09 09:30:15 UTC
[rocketmq-streams] branch main updated: fix check failed issue
This is an automated email from the ASF dual-hosted git repository.
seraph pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new 8057d72 fix check failed issue
new cc8b9db Merge pull request #102 from duhenglucky/main_check
8057d72 is described below
commit 8057d72beaa60ac5c52bf9c1a1293c0b1bf48a82
Author: duhenglucky <du...@apache.org>
AuthorDate: Thu Dec 9 17:25:18 2021 +0800
fix check failed issue
---
README-chinese.md | 34 ++++++-------
.../apache/rocketmq/streams/db/sink/DBSink.java | 2 -
.../rocketmq/streams/db/sink/EnhanceDBSink.java | 9 ++--
.../sqltemplate/MysqlInsertIntoSqlTemplate.java | 5 +-
...MysqlInsertIntoWithDuplicateKeySqlTemplate.java | 5 +-
.../db/sink/sqltemplate/SqlTemplateFactory.java | 3 +-
.../streams/db/sink/db/ISqlTemplateTest.java | 5 +-
.../rocketmq/streams/es/sink/ESSinkBuilder.java | 3 +-
.../streams/es/sink/ESSinkOnlyChannel.java | 9 ++--
.../rocketmq/streams/db/sink/es/EsChannelTest.java | 4 +-
.../rocketmq/streams/RocketMQChannelBuilder.java | 2 -
.../apache/rocketmq/streams/RocketMQOffset.java | 2 +-
.../rocketmq/streams/source/RocketMQSource.java | 21 ++++-----
.../rocketmq/streams/syslog/ISyslogRouter.java | 2 -
.../rocketmq/streams/syslog/SyslogChannel.java | 12 ++---
.../streams/syslog/SyslogChannelManager.java | 1 -
.../rocketmq/streams/syslog/SyslogParser.java | 3 +-
.../rocketmq/streams/syslog/SyslogServer.java | 12 ++---
.../rocketmq/streams/syslog/SyslogClient.java | 11 ++---
.../streams/checkpoint/db/DBCheckPointStorage.java | 5 +-
.../streams/client/ScheduledStreamBuilder.java | 8 ++--
.../rocketmq/streams/client/ScheduledTask.java | 3 +-
.../streams/client/source/DataStreamSource.java | 5 +-
.../streams/client/strategy/WindowStrategy.java | 1 -
.../streams/client/transform/OverWindowStream.java | 2 -
.../apache/rocketmq/streams/client/DBSinkTest.java | 9 ++--
.../rocketmq/streams/client/DataStreamTest.java | 14 +++---
.../apache/rocketmq/streams/client/FileTest.java | 35 +++++++-------
.../rocketmq/streams/client/OnewayProducer.java | 1 -
.../apache/rocketmq/streams/client/SinkTest.java | 2 -
.../apache/rocketmq/streams/client/SourceTest.java | 1 -
.../apache/rocketmq/streams/client/SplitTest.java | 2 -
.../apache/rocketmq/streams/client/WindowTest.java | 55 ----------------------
.../streams/client/sink/UDFDefinedSQLParser.java | 3 +-
.../streams/client/sink/UserDefinedSink.java | 10 ----
.../client/sink/UserDefinedSupportShuffleSink.java | 1 -
.../streams/client/windows/AbstractWindowTest.java | 19 ++++----
.../streams/client/windows/MultiSplitTest.java | 24 ----------
.../client/windows/ShuffleOverWindowTest.java | 6 ---
.../streams/client/windows/SingleSplitTest.java | 20 --------
.../streams/client/windows/WindowDebugTest.java | 15 +-----
.../streams/common/cache/ByteArrayMemoryTable.java | 2 -
.../streams/common/cache/ListMemoryTable.java | 7 ---
.../common/cache/compress/AbstractMemoryTable.java | 1 -
.../common/cache/compress/impl/AbstractListKV.java | 2 -
.../common/cache/compress/impl/IntListKV.java | 2 -
.../common/cache/compress/impl/LongListKV.java | 2 -
.../AbstractSupportShuffleChannelBuilder.java | 2 -
.../common/channel/impl/CollectionSink.java | 5 +-
.../common/channel/impl/CollectionSinkBuilder.java | 3 +-
.../common/channel/impl/CollectionSource.java | 7 ++-
.../common/channel/impl/OutputPrintChannel.java | 3 --
.../common/channel/impl/memory/MemoryChannel.java | 1 -
.../channel/impl/mutiltask/MutilTaskSink.java | 3 --
.../impl/transit/TransitChannelBuilder.java | 1 -
.../streams/common/channel/sink/AbstractSink.java | 13 +++--
.../sink/AbstractSupportShuffleUDFSink.java | 5 --
.../impl/AbstractMultiSplitMessageCache.java | 2 -
.../channel/sinkcache/impl/MessageCache.java | 1 -
.../common/channel/source/AbstractSource.java | 15 +++---
.../checkpoint/AbstractCheckPointStorage.java | 8 +++-
.../streams/common/checkpoint/CheckPoint.java | 1 -
.../common/checkpoint/CheckPointManager.java | 9 ++--
.../checkpoint/CheckPointStorageFactory.java | 5 +-
.../common/checkpoint/ICheckPointStorage.java | 4 +-
.../streams/common/checkpoint/SourceSnapShot.java | 3 +-
.../streams/common/checkpoint/SourceState.java | 3 +-
.../streams/common/component/ComponentCreator.java | 1 -
.../streams/common/datatype/DateDataType.java | 7 ++-
.../interfaces/IBatchMessageFinishNotify.java | 1 -
.../streams/common/metadata/MetaDataUtils.java | 15 ++++--
.../optimization/fingerprint/FingerprintCache.java | 2 -
.../optimization/fingerprint/PreFingerprint.java | 2 -
.../streams/common/schedule/ScheduleManager.java | 1 -
.../streams/common/topology/ChainPipeline.java | 3 --
.../common/topology/builder/PipelineBuilder.java | 14 +++---
.../streams/common/topology/model/IWindow.java | 2 -
.../topology/stages/AbstractWindowStage.java | 1 -
.../common/topology/stages/OutputChainStage.java | 1 -
.../common/topology/stages/udf/StageBuilder.java | 3 +-
.../streams/common/topology/task/StreamsTask.java | 1 -
.../streams/common/utils/DataTypeUtil.java | 9 ++--
.../rocketmq/streams/common/utils/MapKeyUtil.java | 7 ++-
.../configurable/ConfigurableComponent.java | 1 -
.../AbstractSupportParentConfigureService.java | 1 -
.../service/ConfigurableServiceFactory.java | 1 -
.../service/impl/FileConfigureService.java | 2 -
.../connectors/balance/AbstractBalance.java | 1 -
.../streams/connectors/balance/SplitChanged.java | 1 -
.../connectors/balance/impl/LeaseBalanceImpl.java | 13 +++--
.../streams/connectors/model/ReaderStatus.java | 5 +-
.../streams/connectors/reader/DBScanReader.java | 10 ++--
.../streams/connectors/reader/ISplitReader.java | 2 +-
.../connectors/reader/SplitCloseFuture.java | 1 -
.../connectors/source/AbstractPullSource.java | 12 ++---
.../source/CycleDynamicMultipleDBScanSource.java | 12 +++--
.../source/DynamicMultipleDBScanSource.java | 13 +++--
.../streams/connectors/source/IPullSource.java | 1 -
.../source/filter/BoundedPatternFilter.java | 3 +-
.../connectors/source/filter/CyclePeriod.java | 5 +-
.../connectors/source/filter/CycleSchedule.java | 3 +-
.../source/filter/DataFormatPatternFilter.java | 6 +--
.../streams/dim/builder/DBDimSQLParser.java | 2 -
.../streams/dim/builder/SQLParserFactory.java | 1 -
.../rocketmq/streams/dim/index/DimIndex.java | 1 -
.../rocketmq/streams/dim/index/IndexExecutor.java | 3 --
.../intelligence/AbstractIntelligenceCache.java | 1 -
.../rocketmq/streams/dim/model/AbstractDim.java | 2 -
.../apache/rocketmq/streams/dim/model/DBDim.java | 2 -
.../apache/rocketmq/streams/dim/model/FileDim.java | 2 -
.../java/com/aliyun/service/TableCompressTest.java | 2 -
.../mutilconsumer/MutilStreamsClientTest.java | 7 ++-
.../streams/examples/mutilconsumer/Producer.java | 5 +-
.../examples/rocketmqsource/ProducerFromFile.java | 9 ++--
.../rocketmqsource/RocketMQSourceExample2.java | 3 +-
.../rocketmqsource/RocketMQSourceExample3.java | 5 +-
.../rocketmq/streams/filter/FilterComponent.java | 1 -
.../streams/filter/builder/ExpressionBuilder.java | 1 -
.../streams/filter/context/RuleContext.java | 1 -
.../filter/engine/impl/DefaultRuleEngine.java | 42 +----------------
.../function/script/CaseDependentParser.java | 1 -
.../filter/function/script/CaseFunction.java | 3 --
.../rocketmq/streams/filter/operator/Rule.java | 1 -
.../streams/filter/operator/RuleExpression.java | 1 -
.../filter/operator/expression/Expression.java | 1 -
.../streams/filter/operator/var/InnerVar.java | 1 -
.../optimization/casewhen/CaseWhenBuilder.java | 2 -
.../optimization/casewhen/GroupByVarCaseWhen.java | 1 -
.../filter/optimization/dependency/BlinkRule.java | 2 -
.../dependency/BlinkRuleV2Expression.java | 6 ---
.../optimization/dependency/CommonExpression.java | 1 -
.../optimization/dependency/DependencyTree.java | 4 --
.../optimization/dependency/FilterTreeNode.java | 6 ---
.../optimization/dependency/PipelineTree.java | 1 -
.../optimization/dependency/ScriptTreeNode.java | 6 ---
.../homologous/HomologousOptimization.java | 3 --
.../optimization/script/ScriptOptimization.java | 4 --
.../filter/service/impl/RuleEngineServiceImpl.java | 1 -
.../lease/service/impl/BasedLesaseImpl.java | 1 -
.../lease/service/storages/DBLeaseStorage.java | 1 -
.../streams/script/context/FunctionContext.java | 2 -
.../aggregation/FirstValueAccumulator.java | 4 --
.../function/impl/between/BetweenFunction.java | 1 -
.../function/impl/condition/IFScopeFunction.java | 1 -
.../function/impl/context/ContextFunction.java | 2 -
.../script/function/impl/date/GetDateFunction.java | 1 -
.../script/function/impl/eval/EvalFunction.java | 1 -
.../function/impl/flatmap/SplitArrayFunction.java | 2 -
.../script/function/impl/item/ItemFunction.java | 4 --
.../script/function/impl/relation/AndFunction.java | 6 ---
.../operator/expression/GroupScriptExpression.java | 1 -
.../operator/expression/ICaseDependentParser.java | 2 +-
.../operator/expression/ScriptExpression.java | 4 --
.../script/operator/impl/FunctionScript.java | 6 +--
.../streams/script/utils/ExpressionUtil.java | 1 -
.../streams/script/utils/FunctionUtils.java | 2 -
.../streams/script/function/UDTFFunctionTest.java | 5 +-
.../rocketmq/streams/state/AbstractState.java | 1 -
.../streams/state/kv/rocksdb/RocksDBOperator.java | 1 -
.../streams/window/debug/DebugAnalysis.java | 8 ----
.../rocketmq/streams/window/debug/DebugWriter.java | 2 -
.../rocketmq/streams/window/debug/WindowDebug.java | 2 -
.../streams/window/fire/EventTimeManager.java | 1 -
.../streams/window/fire/SplitEventTimeManager.java | 10 ++--
.../streams/window/fire/WindowFireManager.java | 1 -
.../rocketmq/streams/window/model/WindowCache.java | 17 +++----
.../streams/window/model/WindowInstance.java | 1 -
.../window/offset/WindowMaxValueManager.java | 1 -
.../window/offset/WindowMaxValueProcessor.java | 2 -
.../window/operator/AbstractShuffleWindow.java | 2 -
.../streams/window/operator/AbstractWindow.java | 53 +++++++++------------
.../window/operator/impl/ShuffleOverWindow.java | 4 --
.../window/operator/impl/WindowOperator.java | 23 ++++-----
.../streams/window/operator/join/DBOperator.java | 2 -
.../streams/window/operator/join/JoinWindow.java | 2 +-
.../streams/window/shuffle/ShuffleChannel.java | 35 +++++++-------
.../streams/window/sqlcache/ISQLElement.java | 2 -
.../rocketmq/streams/window/sqlcache/SQLCache.java | 1 -
.../streams/window/state/impl/WindowValue.java | 1 -
.../streams/window/storage/IWindowStorage.java | 1 -
.../streams/window/storage/WindowStorage.java | 1 -
.../streams/window/storage/db/DBStorage.java | 1 -
.../streams/window/storage/file/FileStorage.java | 2 -
.../window/storage/rocksdb/RocksdbStorage.java | 5 --
184 files changed, 294 insertions(+), 729 deletions(-)
diff --git a/README-chinese.md b/README-chinese.md
index cfa6e99..8daa91a 100644
--- a/README-chinese.md
+++ b/README-chinese.md
@@ -47,15 +47,15 @@ StreamBuilder 用于构建流任务的源; 内部包含```dataStream()```和``
DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据;
+ ```fromFile``` 从文件中读取数据, 该方法包含俩个参数
- + ```filePath``` 文件路径,必填参数
- + ```isJsonData``` 是否json数据, 非必填参数, 默认为```true```
+ + ```filePath``` 文件路径,必填参数
+ + ```isJsonData``` 是否json数据, 非必填参数, 默认为```true```
+ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
- + ```topic``` rocketmq消息队列的topic名称,必填参数
- + ```groupName``` 消费者组的名称,必填参数
- + ```isJson``` 是否json格式,非必填参数
- + ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
+ + ```topic``` rocketmq消息队列的topic名称,必填参数
+ + ```groupName``` 消费者组的名称,必填参数
+ + ```isJson``` 是否json格式,非必填参数
+ + ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
+ ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源
@@ -80,12 +80,12 @@ DataStream实现了一系列常见的流计算算子
+ ```toRocketmq``` 将结果输出到rocketmq
+ ```to``` 将结果经过自定义的ISink接口输出到指定的存储
+ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
- + ```count``` 在窗口内计数
- + ```min``` 获取窗口内统计值的最小值
- + ```max``` 获取窗口内统计值得最大值
- + ```avg``` 获取窗口内统计值的平均值
- + ```sum``` 获取窗口内统计值的加和值
- + ```reduce``` 在窗口内进行自定义的汇总运算
+ + ```count``` 在窗口内计数
+ + ```min``` 获取窗口内统计值的最小值
+ + ```max``` 获取窗口内统计值得最大值
+ + ```avg``` 获取窗口内统计值的平均值
+ + ```sum``` 获取窗口内统计值的加和值
+ + ```reduce``` 在窗口内进行自定义的汇总运算
+ ```join``` 根据条件将将俩个流进行关联, 合并为一个大流进行相关的运算
+ ```union``` 将俩个流进行合并
+ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
@@ -115,11 +115,11 @@ Rocketmq-Streams 核心就是一个独立的jar包, 用户可以在自己的
1. 通过```mvn clean install``` 构建工程
2. 从```rocketmq-streams-runner/target/rocket-streams-1.0.0-SNAPSHOT-distribution.tar.gz``` 中获取tar.gz包, 并解压
3. ```rocketmq-streams```目录架构如下:
- + ```bin``` 指令目录,包括启动和停止指令
- + ```conf``` 配置目录,包括日志配置以及应用的相关配置文件
- + ```jobs``` 任务目录, 独立打包后的rocketmq-streams jar包
- + ```lib``` 依赖包
- + ```log``` 日志目录
+ + ```bin``` 指令目录,包括启动和停止指令
+ + ```conf``` 配置目录,包括日志配置以及应用的相关配置文件
+ + ```jobs``` 任务目录, 独立打包后的rocketmq-streams jar包
+ + ```lib``` 依赖包
+ + ```log``` 日志目录
### 发布应用
用户依赖rocketmq-streams,开发流处理程序,独立打包后, 将jar包拷贝到jobs目录, 通过指令即可完成任务的启动和运行;可以通过启动多个独立的应用程序;
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
index b60bb68..485987d 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
@@ -23,9 +23,7 @@ import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.util.List;
-import java.util.Locale;
import java.util.Set;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.IChannel;
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/EnhanceDBSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/EnhanceDBSink.java
index 7680279..d789703 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/EnhanceDBSink.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/EnhanceDBSink.java
@@ -17,6 +17,10 @@
package org.apache.rocketmq.streams.db.sink;
import com.alibaba.fastjson.JSONObject;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.IChannel;
@@ -38,11 +42,6 @@ import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
import org.apache.rocketmq.streams.db.sink.sqltemplate.ISqlTemplate;
import org.apache.rocketmq.streams.db.sink.sqltemplate.SqlTemplateFactory;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
/**
* @description enhance db sink, support atomic sink and multiple sink
*/
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoSqlTemplate.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoSqlTemplate.java
index 4e61bba..d352fda 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoSqlTemplate.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoSqlTemplate.java
@@ -16,11 +16,10 @@
*/
package org.apache.rocketmq.streams.db.sink.sqltemplate;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.utils.SQLUtil;
-
import java.util.List;
import java.util.Map;
+import org.apache.rocketmq.streams.common.metadata.MetaData;
+import org.apache.rocketmq.streams.common.utils.SQLUtil;
/**
* @description create insert into sql
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoWithDuplicateKeySqlTemplate.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoWithDuplicateKeySqlTemplate.java
index c091c53..c0150af 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoWithDuplicateKeySqlTemplate.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoWithDuplicateKeySqlTemplate.java
@@ -16,11 +16,10 @@
*/
package org.apache.rocketmq.streams.db.sink.sqltemplate;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.utils.SQLUtil;
-
import java.util.List;
import java.util.Map;
+import org.apache.rocketmq.streams.common.metadata.MetaData;
+import org.apache.rocketmq.streams.common.utils.SQLUtil;
/**
* @author zengyu.cw
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/SqlTemplateFactory.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/SqlTemplateFactory.java
index 9c4435f..8837c9c 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/SqlTemplateFactory.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/SqlTemplateFactory.java
@@ -16,9 +16,8 @@
*/
package org.apache.rocketmq.streams.db.sink.sqltemplate;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-
import java.util.Arrays;
+import org.apache.rocketmq.streams.common.metadata.MetaData;
/**
* @description
diff --git a/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/ISqlTemplateTest.java b/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/ISqlTemplateTest.java
index 45d849c..4109149 100644
--- a/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/ISqlTemplateTest.java
+++ b/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/ISqlTemplateTest.java
@@ -17,15 +17,14 @@
package org.apache.rocketmq.streams.db.sink.db;
import com.alibaba.fastjson.JSONObject;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.db.sink.sqltemplate.MysqlInsertIgnoreIntoSqlTemplate;
import org.apache.rocketmq.streams.db.sink.sqltemplate.MysqlInsertIntoSqlTemplate;
import org.apache.rocketmq.streams.db.sink.sqltemplate.MysqlInsertIntoWithDuplicateKeySqlTemplate;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* @description
*/
diff --git a/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkBuilder.java b/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkBuilder.java
index dd3be8e..dec19f9 100644
--- a/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkBuilder.java
+++ b/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkBuilder.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.streams.es.sink;
import com.alibaba.fastjson.JSONObject;
import com.google.auto.service.AutoService;
+import java.util.Properties;
import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
@@ -26,8 +27,6 @@ import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.model.ServiceName;
import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
-import java.util.Properties;
-
@AutoService(IChannelBuilder.class)
@ServiceName(value = ESSinkBuilder.TYPE, aliasName = "elasticsearch")
public class ESSinkBuilder extends AbstractSupportShuffleChannelBuilder {
diff --git a/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkOnlyChannel.java b/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkOnlyChannel.java
index 488dcf5..f4346e4 100644
--- a/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkOnlyChannel.java
+++ b/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkOnlyChannel.java
@@ -16,6 +16,10 @@
*/
package org.apache.rocketmq.streams.es.sink;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
@@ -34,11 +38,6 @@ import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
public class ESSinkOnlyChannel extends AbstractSink {
private static final Log LOG = LogFactory.getLog(ESSinkOnlyChannel.class);
diff --git a/rocketmq-streams-channel-es/src/test/java/org/apache/rocketmq/streams/db/sink/es/EsChannelTest.java b/rocketmq-streams-channel-es/src/test/java/org/apache/rocketmq/streams/db/sink/es/EsChannelTest.java
index 6e59a19..8f09c4a 100644
--- a/rocketmq-streams-channel-es/src/test/java/org/apache/rocketmq/streams/db/sink/es/EsChannelTest.java
+++ b/rocketmq-streams-channel-es/src/test/java/org/apache/rocketmq/streams/db/sink/es/EsChannelTest.java
@@ -16,11 +16,9 @@
*/
package org.apache.rocketmq.streams.db.sink.es;
+import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.List;
-
-import com.alibaba.fastjson.JSONObject;
-
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java
index cd91f64..71fe8b1 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java
@@ -19,9 +19,7 @@ package org.apache.rocketmq.streams;
import com.alibaba.fastjson.JSONObject;
import com.google.auto.service.AutoService;
-
import java.util.Properties;
-
import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQOffset.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQOffset.java
index b82af06..eabd543 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQOffset.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQOffset.java
@@ -20,8 +20,8 @@ package org.apache.rocketmq.streams;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
index 662f215..395161c 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
@@ -18,6 +18,16 @@
package org.apache.rocketmq.streams.source;
import com.alibaba.fastjson.JSONObject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
@@ -49,17 +59,6 @@ import org.apache.rocketmq.streams.debug.DebugWriter;
import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
public class RocketMQSource extends AbstractSupportShuffleSource {
protected static final Log LOG = LogFactory.getLog(RocketMQSource.class);
diff --git a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/ISyslogRouter.java b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/ISyslogRouter.java
index 3160416..7ac8660 100644
--- a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/ISyslogRouter.java
+++ b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/ISyslogRouter.java
@@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.streams.syslog;
-import org.apache.rocketmq.streams.common.channel.IChannel;
-
public interface ISyslogRouter {
/**
diff --git a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannel.java b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannel.java
index 84efe2a..e4e176c 100644
--- a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannel.java
+++ b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannel.java
@@ -16,14 +16,16 @@
*/
package org.apache.rocketmq.streams.syslog;
+import com.alibaba.fastjson.JSONObject;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-
-import com.alibaba.fastjson.JSONObject;
-
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.AbstractChannel;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
@@ -36,10 +38,6 @@ import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.MessageOffset;
import org.apache.rocketmq.streams.common.utils.IPUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.graylog2.syslog4j.Syslog;
import org.graylog2.syslog4j.SyslogConfigIF;
diff --git a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannelManager.java b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannelManager.java
index e0fe2bf..13a52ef 100644
--- a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannelManager.java
+++ b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannelManager.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.syslog;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.graylog2.syslog4j.SyslogConstants;
diff --git a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogParser.java b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogParser.java
index 183087b..47968e8 100644
--- a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogParser.java
+++ b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogParser.java
@@ -16,11 +16,10 @@
*/
package org.apache.rocketmq.streams.syslog;
+import java.util.Date;
import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.IPUtil;
-import java.util.Date;
-
public class SyslogParser {
/**
diff --git a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogServer.java b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogServer.java
index 7536edf..adf291c 100644
--- a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogServer.java
+++ b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogServer.java
@@ -16,26 +16,24 @@
*/
package org.apache.rocketmq.streams.syslog;
+import com.alibaba.fastjson.JSONObject;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
-
-import com.alibaba.fastjson.JSONObject;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.cache.softreference.impl.SoftReferenceCache;
import org.apache.rocketmq.streams.common.channel.source.AbstractUnreliableSource;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
-import org.apache.rocketmq.streams.common.context.UserDefinedMessage;
-import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.context.UserDefinedMessage;
+import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.IPUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.graylog2.syslog4j.server.SyslogServerConfigIF;
import org.graylog2.syslog4j.server.SyslogServerEventIF;
import org.graylog2.syslog4j.server.SyslogServerIF;
diff --git a/rocketmq-streams-channel-syslog/src/test/java/org/apache/rocketmq/streams/syslog/SyslogClient.java b/rocketmq-streams-channel-syslog/src/test/java/org/apache/rocketmq/streams/syslog/SyslogClient.java
index 2aab534..31b6e5c 100644
--- a/rocketmq-streams-channel-syslog/src/test/java/org/apache/rocketmq/streams/syslog/SyslogClient.java
+++ b/rocketmq-streams-channel-syslog/src/test/java/org/apache/rocketmq/streams/syslog/SyslogClient.java
@@ -16,16 +16,13 @@
*/
package org.apache.rocketmq.streams.syslog;
-import java.util.Date;
-
import com.alibaba.fastjson.JSONObject;
-
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.rocketmq.streams.syslog.SyslogChannel;
-import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
+import java.util.Date;
+import org.apache.rocketmq.streams.common.channel.IChannel;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.channel.IChannel;
+import org.apache.rocketmq.streams.common.context.Message;
+import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.IPUtil;
import org.junit.Test;
diff --git a/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java b/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java
index acf538f..793dc13 100644
--- a/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java
+++ b/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java
@@ -16,19 +16,16 @@
*/
package org.apache.rocketmq.streams.checkpoint.db;
-import com.google.auto.service.AutoService;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.checkpoint.AbstractCheckPointStorage;
import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
-import org.apache.rocketmq.streams.common.checkpoint.ICheckPointStorage;
import org.apache.rocketmq.streams.common.checkpoint.SourceSnapShot;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
-import java.util.List;
-
/**
* @description
*/
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/ScheduledStreamBuilder.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/ScheduledStreamBuilder.java
index e1b4f0c..ffa4f62 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/ScheduledStreamBuilder.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/ScheduledStreamBuilder.java
@@ -16,15 +16,15 @@
*/
package org.apache.rocketmq.streams.client;
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.utils.ThreadUtil;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.utils.ThreadUtil;
/**
* @description
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/ScheduledTask.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/ScheduledTask.java
index 87ce9fc..4956fe2 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/ScheduledTask.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/ScheduledTask.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.streams.client;
+import java.util.Date;
import org.apache.rocketmq.streams.client.source.DataStreamSource;
import org.apache.rocketmq.streams.client.transform.DataStream;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
@@ -23,8 +24,6 @@ import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
import org.apache.rocketmq.streams.connectors.source.filter.CycleSchedule;
import org.apache.rocketmq.streams.db.sink.EnhanceDBSink;
-import java.util.Date;
-
/**
* @description
*/
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
index ab25d68..b5859da 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
@@ -34,8 +34,8 @@
package org.apache.rocketmq.streams.client.source;
import com.alibaba.fastjson.JSONObject;
-
import com.google.common.collect.Sets;
+import java.util.Set;
import org.apache.rocketmq.streams.client.transform.DataStream;
import org.apache.rocketmq.streams.common.channel.impl.CollectionSource;
import org.apache.rocketmq.streams.common.channel.impl.file.FileSource;
@@ -44,14 +44,11 @@ import org.apache.rocketmq.streams.common.channel.impl.memory.MemorySource;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.topology.ChainStage;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
-
import org.apache.rocketmq.streams.connectors.source.CycleDynamicMultipleDBScanSource;
import org.apache.rocketmq.streams.connectors.source.DynamicMultipleDBScanSource;
import org.apache.rocketmq.streams.connectors.source.filter.CycleSchedule;
import org.apache.rocketmq.streams.source.RocketMQSource;
-import java.util.Set;
-
public class DataStreamSource {
protected PipelineBuilder mainPipelineBuilder;
protected Set<PipelineBuilder> otherPipelineBuilders;
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
index ce42385..c111f0b 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.streams.client.strategy;
import java.util.Properties;
import org.apache.rocketmq.streams.common.component.AbstractComponent;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
-import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
public class WindowStrategy implements Strategy {
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/OverWindowStream.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/OverWindowStream.java
index 48d5ab1..87832c1 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/OverWindowStream.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/OverWindowStream.java
@@ -21,8 +21,6 @@ import java.util.Set;
import org.apache.rocketmq.streams.common.topology.ChainStage;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.window.operator.impl.OverWindow;
import org.apache.rocketmq.streams.window.operator.impl.ShuffleOverWindow;
public class OverWindowStream {
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBSinkTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBSinkTest.java
index 4a319d5..775427a 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBSinkTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBSinkTest.java
@@ -17,17 +17,16 @@
package org.apache.rocketmq.streams.client;
import com.alibaba.fastjson.JSONObject;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
import org.apache.rocketmq.streams.client.source.DataStreamSource;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
import org.junit.Before;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Random;
-
/**
/**
* @description
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
index b63efdf..2484af5 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
@@ -18,7 +18,12 @@
package org.apache.rocketmq.streams.client;
import com.alibaba.fastjson.JSONObject;
-
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import org.apache.rocketmq.streams.client.source.DataStreamSource;
import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
import org.apache.rocketmq.streams.client.transform.window.Time;
@@ -28,13 +33,6 @@ import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
import org.junit.Before;
import org.junit.Test;
-import java.io.Serializable;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
public class DataStreamTest implements Serializable {
DataStreamSource dataStream;
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/FileTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/FileTest.java
index 6f77ae4..22d1aa9 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/FileTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/FileTest.java
@@ -22,41 +22,40 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.common.functions.FilterFunction;
import org.apache.rocketmq.streams.common.utils.FileUtil;
import org.junit.Test;
public class FileTest {
@Test
- public void testFilter(){
- DataStreamSource dataStream =DataStreamSource.create("namespace","name");
- dataStream.fromFile("/tmp/file.txt",false)
- .filter((message)->{
- JSONObject jsonObject= JSON.parseObject((String)message);
- if(Objects.nonNull(jsonObject)){
- int inFlow=jsonObject.getIntValue("InFlow");
- if(inFlow>2){
+ public void testFilter() {
+ DataStreamSource dataStream = DataStreamSource.create("namespace", "name");
+ dataStream.fromFile("/tmp/file.txt", false)
+ .filter((message) -> {
+ JSONObject jsonObject = JSON.parseObject((String) message);
+ if (Objects.nonNull(jsonObject)) {
+ int inFlow = jsonObject.getIntValue("InFlow");
+ if (inFlow > 2) {
return false;
- }else {
+ } else {
return true;
}
}
return true;
- }).toPrint(1)
+ }).toPrint(1)
.start();
-
+
}
@Test
- public void testWriteFile(){
- List<String> lines=new ArrayList<>();
- for(int i=0;i<4;i++){
- JSONObject jsonObject=new JSONObject();
- jsonObject.put("InFlow",i);
+ public void testWriteFile() {
+ List<String> lines = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("InFlow", i);
lines.add(jsonObject.toJSONString());
}
lines.add("");
- FileUtil.write("/tmp/file.txt",lines);
+ FileUtil.write("/tmp/file.txt", lines);
}
}
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/OnewayProducer.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/OnewayProducer.java
index f34c405..c9be0cc 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/OnewayProducer.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/OnewayProducer.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.streams.client;
import java.io.BufferedReader;
import java.io.FileReader;
-
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SinkTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SinkTest.java
index f16bb8c..7c8e181 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SinkTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SinkTest.java
@@ -17,8 +17,6 @@
package org.apache.rocketmq.streams.client;
import com.alibaba.fastjson.JSONObject;
-
-import com.google.gson.JsonObject;
import org.apache.rocketmq.streams.common.channel.impl.file.FileSink;
import org.apache.rocketmq.streams.common.context.Message;
import org.junit.Test;
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SourceTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SourceTest.java
index ccc8589..97d0342 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SourceTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SourceTest.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.client;
import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.client.transform.DataStream;
import org.apache.rocketmq.streams.common.channel.impl.file.FileSource;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SplitTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SplitTest.java
index 301043d..76597b5 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SplitTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SplitTest.java
@@ -27,8 +27,6 @@ import org.apache.rocketmq.streams.client.transform.window.Time;
import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
import org.apache.rocketmq.streams.common.functions.FilterFunction;
import org.apache.rocketmq.streams.common.functions.FlatMapFunction;
-import org.apache.rocketmq.streams.common.functions.MapFunction;
-import org.apache.rocketmq.streams.common.functions.ReduceFunction;
import org.apache.rocketmq.streams.common.functions.SplitFunction;
import org.junit.Test;
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/WindowTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/WindowTest.java
index 164fa8e..c8baece 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/WindowTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/WindowTest.java
@@ -29,7 +29,6 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.streams.client.transform.window.SessionWindow;
import org.apache.rocketmq.streams.client.transform.window.Time;
import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
-import org.apache.rocketmq.streams.common.functions.ForEachFunction;
import org.apache.rocketmq.streams.common.functions.MapFunction;
import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.junit.Assert;
@@ -38,60 +37,6 @@ import org.junit.Test;
public class WindowTest implements Serializable {
@Test
- public void testWindow() {
- StreamBuilder.dataStream("namespace", "name")
- .fromFile("/Users/duheng/project/opensource/sls_100.txt", false)
- .map((MapFunction<JSONObject, String>) message -> JSONObject.parseObject(message))
- .window(TumblingWindow.of(Time.seconds(5)))
- .groupBy("ProjectName", "LogStore")
- .setLocalStorageOnly(true)
- .count("total")
- .sum("OutFlow", "OutFlow")
- .sum("InFlow", "InFlow")
- .toDataSteam()
- .forEach(new ForEachFunction<JSONObject>() {
- protected int sum = 0;
-
- @Override
- public void foreach(JSONObject o) {
- int total = o.getInteger("total");
- sum = sum + total;
- o.put("sum(total)", sum);
- }
- }).toPrint().start();
-
- }
-
- // @Test
- // public void testWindowFromMetaq() throws InterruptedException {
- // String topic = "TOPIC_DIPPER_SYSTEM_MSG_4";
- // StreamBuilder.dataStream("namespace", "name")
- // .fromFile("/Users/yuanxiaodong/chris/sls_100.txt", true)
- // .toRocketmq(topic)
- // .asyncStart();
- //
- // StreamBuilder.dataStream("namespace", "name1")
- // .fromRocketmq(topic, "chris", true)
- // .window(TumblingWindow.of(Time.seconds(5)))
- // .groupby("ProjectName", "LogStore")
- // .setLocalStorageOnly(true)
- // .count("total")
- // .sum("OutFlow", "OutFlow")
- // .sum("InFlow", "inflow")
- // .toDataSteam()
- // .forEach(new ForEachFunction<JSONObject>() {
- // protected int sum = 0;
- //
- // @Override
- // public void foreach(JSONObject o) {
- // int total = o.getInteger("total");
- // sum = sum + total;
- // o.put("sum(total)", sum);
- // }
- // }).toPrint().start();
- // }
-
- @Test
public void testSession() {
//dataset
List<String> behaviorList = new ArrayList<>();
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UDFDefinedSQLParser.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UDFDefinedSQLParser.java
index fd631c3..ee074e6 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UDFDefinedSQLParser.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UDFDefinedSQLParser.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.streams.client.sink;
import com.google.auto.service.AutoService;
import org.apache.rocketmq.streams.common.channel.builder.AbstractChannelSQLSupportShuffleSQLParser;
-import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.channel.source.ISource;
@@ -27,7 +26,7 @@ import org.apache.rocketmq.streams.common.model.ServiceName;
@AutoService(IChannelBuilder.class)
@ServiceName(value = UDFDefinedSQLParser.TYPE, aliasName = "source_alias_name")
public class UDFDefinedSQLParser extends AbstractChannelSQLSupportShuffleSQLParser {
- public static final String TYPE="source_type";
+ public static final String TYPE = "source_type";
@Override protected String getSourceClass() {
return null;
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UserDefinedSink.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UserDefinedSink.java
index 5573a65..4a5ea8b 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UserDefinedSink.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UserDefinedSink.java
@@ -16,20 +16,10 @@
*/
package org.apache.rocketmq.streams.client.sink;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.channel.sink.AbstractUDFSink;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
public class UserDefinedSink extends AbstractUDFSink {
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UserDefinedSupportShuffleSink.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UserDefinedSupportShuffleSink.java
index 8f7d83a..6c64511 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UserDefinedSupportShuffleSink.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UserDefinedSupportShuffleSink.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.client.sink;
import java.util.List;
-import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleUDFSink;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.context.IMessage;
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java
index 3eef4f2..688b485 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java
@@ -18,16 +18,6 @@
package org.apache.rocketmq.streams.client.windows;
import com.alibaba.fastjson.JSONObject;
-import org.apache.rocketmq.streams.client.transform.DataStream;
-import org.apache.rocketmq.streams.client.transform.window.Time;
-import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
-import org.apache.rocketmq.streams.common.functions.ForEachFunction;
-import org.apache.rocketmq.streams.common.functions.MapFunction;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
@@ -38,6 +28,15 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.streams.client.transform.DataStream;
+import org.apache.rocketmq.streams.client.transform.window.Time;
+import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
+import org.apache.rocketmq.streams.common.functions.ForEachFunction;
+import org.apache.rocketmq.streams.common.functions.MapFunction;
+import org.apache.rocketmq.streams.common.utils.DateUtil;
+import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
import static junit.framework.TestCase.assertTrue;
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/MultiSplitTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/MultiSplitTest.java
index 95b3c21..dd5f78a 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/MultiSplitTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/MultiSplitTest.java
@@ -17,35 +17,11 @@
package org.apache.rocketmq.streams.client.windows;
import com.alibaba.fastjson.JSONObject;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.Reader;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.rocketmq.streams.client.StreamBuilder;
import org.apache.rocketmq.streams.client.transform.DataStream;
import org.apache.rocketmq.streams.common.functions.MapFunction;
import org.apache.rocketmq.streams.common.topology.model.IWindow;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.db.driver.batchloader.IRowOperator;
-import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
-import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.window.operator.impl.WindowOperator;
-import org.apache.rocketmq.streams.window.state.impl.WindowValue;
-import org.apache.rocketmq.streams.window.storage.WindowStorage;
import org.junit.Test;
public class MultiSplitTest extends SingleSplitTest {
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/ShuffleOverWindowTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/ShuffleOverWindowTest.java
index d5116de..fdc77e3 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/ShuffleOverWindowTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/ShuffleOverWindowTest.java
@@ -19,13 +19,7 @@ package org.apache.rocketmq.streams.client.windows;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.streams.client.StreamBuilder;
-import org.apache.rocketmq.streams.client.transform.window.Time;
-import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
-import org.apache.rocketmq.streams.common.functions.ForEachFunction;
-import org.apache.rocketmq.streams.common.functions.MapFunction;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.FileUtil;
import org.junit.Test;
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/SingleSplitTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/SingleSplitTest.java
index 30006e7..2de857a 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/SingleSplitTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/SingleSplitTest.java
@@ -16,29 +16,9 @@
*/
package org.apache.rocketmq.streams.client.windows;
-import com.alibaba.fastjson.JSONObject;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import org.apache.rocketmq.streams.client.StreamBuilder;
import org.apache.rocketmq.streams.client.transform.DataStream;
-import org.apache.rocketmq.streams.common.component.ComponentCreator;
-import org.apache.rocketmq.streams.common.functions.MapFunction;
import org.apache.rocketmq.streams.common.topology.model.IWindow;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.PrintUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.db.driver.batchloader.IRowOperator;
-import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
-import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.window.operator.impl.WindowOperator;
-import org.apache.rocketmq.streams.window.state.impl.WindowValue;
-import org.apache.rocketmq.streams.window.storage.WindowStorage;
import org.junit.Test;
public class SingleSplitTest extends AbstractWindowTest {
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/WindowDebugTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/WindowDebugTest.java
index 7188c3f..ba4e14d 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/WindowDebugTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/WindowDebugTest.java
@@ -17,24 +17,13 @@
package org.apache.rocketmq.streams.client.windows;
-import com.alibaba.fastjson.JSONObject;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.window.debug.WindowDebug;
import org.junit.Test;
public class WindowDebugTest {
@Test
- public void windowDebug(){
- WindowDebug windowDebug=new WindowDebug("name1_window_10001","logTime","/tmp/rockstmq-streams","sum(total)",88121);
+ public void windowDebug() {
+ WindowDebug windowDebug = new WindowDebug("name1_window_10001", "logTime", "/tmp/rockstmq-streams", "sum(total)", 88121);
windowDebug.startAnalysis();
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/ByteArrayMemoryTable.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/ByteArrayMemoryTable.java
index 9081468..4048f05 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/ByteArrayMemoryTable.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/ByteArrayMemoryTable.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.streams.common.cache;
-import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -24,7 +23,6 @@ import java.util.Map;
import org.apache.rocketmq.streams.common.cache.compress.AbstractMemoryTable;
import org.apache.rocketmq.streams.common.cache.compress.AdditionStore;
import org.apache.rocketmq.streams.common.cache.compress.ByteArray;
-import org.apache.rocketmq.streams.common.cache.compress.CacheKV;
import org.apache.rocketmq.streams.common.cache.compress.MapAddress;
import org.apache.rocketmq.streams.common.utils.NumberUtils;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/ListMemoryTable.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/ListMemoryTable.java
index 2f3a729..ab8795b 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/ListMemoryTable.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/ListMemoryTable.java
@@ -17,17 +17,10 @@
package org.apache.rocketmq.streams.common.cache;
import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.streams.common.cache.compress.AbstractMemoryTable;
-import org.apache.rocketmq.streams.common.datatype.DataType;
-import org.apache.rocketmq.streams.common.datatype.NotSupportDataType;
-import org.apache.rocketmq.streams.common.datatype.StringDataType;
-import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
/**
* 压缩表,行数据以byte[][]存放
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/AbstractMemoryTable.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/AbstractMemoryTable.java
index 00574bc..eec4e18 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/AbstractMemoryTable.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/AbstractMemoryTable.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.common.cache.compress;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/AbstractListKV.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/AbstractListKV.java
index d95d212..c1a8b36 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/AbstractListKV.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/AbstractListKV.java
@@ -17,9 +17,7 @@
package org.apache.rocketmq.streams.common.cache.compress.impl;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import org.apache.rocketmq.streams.common.cache.compress.AdditionStore;
import org.apache.rocketmq.streams.common.cache.compress.ByteArray;
import org.apache.rocketmq.streams.common.cache.compress.CacheKV;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntListKV.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntListKV.java
index 23cdb07..2246c7b 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntListKV.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntListKV.java
@@ -16,11 +16,9 @@
*/
package org.apache.rocketmq.streams.common.cache.compress.impl;
-import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.streams.common.cache.compress.AdditionStore;
import org.apache.rocketmq.streams.common.cache.compress.ByteArray;
-import org.apache.rocketmq.streams.common.cache.compress.CacheKV;
import org.apache.rocketmq.streams.common.cache.compress.MapAddress;
import org.apache.rocketmq.streams.common.utils.NumberUtils;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/LongListKV.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/LongListKV.java
index 16382ed..7292185 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/LongListKV.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/LongListKV.java
@@ -16,11 +16,9 @@
*/
package org.apache.rocketmq.streams.common.cache.compress.impl;
-import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.streams.common.cache.compress.AdditionStore;
import org.apache.rocketmq.streams.common.cache.compress.ByteArray;
-import org.apache.rocketmq.streams.common.cache.compress.CacheKV;
import org.apache.rocketmq.streams.common.cache.compress.MapAddress;
import org.apache.rocketmq.streams.common.utils.NumberUtils;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java
index 6ca55b5..4742f8b 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java
@@ -18,9 +18,7 @@ package org.apache.rocketmq.streams.common.channel.builder;
import com.alibaba.fastjson.JSONObject;
import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
public abstract class AbstractSupportShuffleChannelBuilder implements IChannelBuilder, IShuffleChannelBuilder {
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSink.java
index c02ec31..15fc4a8 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSink.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSink.java
@@ -16,11 +16,10 @@
*/
package org.apache.rocketmq.streams.common.channel.impl;
-import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
-import org.apache.rocketmq.streams.common.context.IMessage;
-
import java.util.ArrayList;
import java.util.List;
+import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
+import org.apache.rocketmq.streams.common.context.IMessage;
/**
* @description just support json object
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSinkBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSinkBuilder.java
index 32458c7..24a24ee 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSinkBuilder.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSinkBuilder.java
@@ -17,14 +17,13 @@
package org.apache.rocketmq.streams.common.channel.impl;
import com.google.auto.service.AutoService;
+import java.util.Properties;
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.model.ServiceName;
-import java.util.Properties;
-
/**
* @description
*/
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSource.java
index 0c2bdb0..4557b92 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSource.java
@@ -17,15 +17,14 @@
package org.apache.rocketmq.streams.common.channel.impl;
import com.alibaba.fastjson.JSONObject;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
-
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
/**
*
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
index 48e32e4..f7c3585 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
@@ -17,12 +17,9 @@
package org.apache.rocketmq.streams.common.channel.impl;
import java.util.List;
-
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.rocketmq.streams.common.utils.PrintUtil;
/**
* 测试使用,输出就是把消息打印出来
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java
index 7d1389e..ce69674 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java
@@ -24,7 +24,6 @@ import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.channel.source.AbstractUnreliableSource;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
/**
* 消息产生的source数据,就是通过sink写入的消息
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilTaskSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilTaskSink.java
index 487153e..2afb806 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilTaskSink.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilTaskSink.java
@@ -16,16 +16,13 @@
*/
package org.apache.rocketmq.streams.common.channel.impl.mutiltask;
-import com.google.auto.service.AutoService;
import java.util.List;
import java.util.Set;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.context.Context;
import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.model.ServiceName;
import org.apache.rocketmq.streams.common.topology.task.StreamsTask;
public class MutilTaskSink extends AbstractSink implements IAfterConfigurableRefreshListener {
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitChannelBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitChannelBuilder.java
index e9f943f..f48c511 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitChannelBuilder.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitChannelBuilder.java
@@ -21,7 +21,6 @@ import com.alibaba.fastjson.JSONObject;
import com.google.auto.service.AutoService;
import java.util.Properties;
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.impl.memory.MemorySource;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.metadata.MetaData;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
index 68c11bd..f1bf7d7 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
@@ -17,6 +17,12 @@
package org.apache.rocketmq.streams.common.channel.sink;
import com.alibaba.fastjson.JSONObject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
@@ -36,13 +42,6 @@ import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
import org.apache.rocketmq.streams.common.utils.StringUtil;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
/**
* 输出的接口抽象,针对json消息的场景
*/
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSupportShuffleUDFSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSupportShuffleUDFSink.java
index 98fdc23..14b93bb 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSupportShuffleUDFSink.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSupportShuffleUDFSink.java
@@ -16,12 +16,7 @@
*/
package org.apache.rocketmq.streams.common.channel.sink;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.context.IMessage;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/AbstractMultiSplitMessageCache.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/AbstractMultiSplitMessageCache.java
index 0c28b3a..e1018cf 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/AbstractMultiSplitMessageCache.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/AbstractMultiSplitMessageCache.java
@@ -22,12 +22,10 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
import org.apache.rocketmq.streams.common.context.IMessage;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
index 86d31ee..f628ad4 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
@@ -28,7 +28,6 @@ import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
import org.apache.rocketmq.streams.common.schedule.ScheduleManager;
import org.apache.rocketmq.streams.common.schedule.ScheduleTask;
-import org.apache.rocketmq.streams.common.utils.ThreadUtil;
/**
* 消息缓存的实现,通过消息队列做本地缓存。目前多是用了这个实现
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
index 6958970..949e411 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
@@ -16,14 +16,18 @@
*/
package org.apache.rocketmq.streams.common.channel.source;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
import java.io.UnsupportedEncodingException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-
import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage;
@@ -43,7 +47,6 @@ import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.metadata.MetaDataField;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
-import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/AbstractCheckPointStorage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/AbstractCheckPointStorage.java
index 9ad5e69..5d611cd 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/AbstractCheckPointStorage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/AbstractCheckPointStorage.java
@@ -16,6 +16,12 @@
*/
package org.apache.rocketmq.streams.common.checkpoint;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
@@ -23,8 +29,6 @@ import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBac
import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
import org.apache.rocketmq.streams.common.context.MessageOffset;
-import java.util.*;
-
/**
* @description
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPoint.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPoint.java
index 8bcedbd..a035591 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPoint.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPoint.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.common.checkpoint;
import com.alibaba.fastjson.JSONObject;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.model.Entity;
/**
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointManager.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointManager.java
index 1f022ac..06a2f86 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointManager.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointManager.java
@@ -16,7 +16,12 @@
*/
package org.apache.rocketmq.streams.common.checkpoint;
-
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
@@ -27,8 +32,6 @@ import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.ReflectUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
-import java.util.*;
-
public class CheckPointManager extends BasedConfigurable{
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointStorageFactory.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointStorageFactory.java
index fb9845b..1f8f371 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointStorageFactory.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointStorageFactory.java
@@ -17,13 +17,12 @@
package org.apache.rocketmq.streams.common.checkpoint;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Iterator;
import java.util.ServiceLoader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* @description
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/ICheckPointStorage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/ICheckPointStorage.java
index 97bfe89..2be9a5f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/ICheckPointStorage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/ICheckPointStorage.java
@@ -16,10 +16,8 @@
*/
package org.apache.rocketmq.streams.common.checkpoint;
-
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-
import java.util.List;
+import org.apache.rocketmq.streams.common.channel.source.ISource;
/**
* @description 负责checkpoint的保存、恢复
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceSnapShot.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceSnapShot.java
index 1128af1..f036b3a 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceSnapShot.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceSnapShot.java
@@ -17,11 +17,10 @@
package org.apache.rocketmq.streams.common.checkpoint;
import com.alibaba.fastjson.JSONObject;
+import java.io.Serializable;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.model.Entity;
-import java.io.Serializable;
-
/**
* @create 2021-08-06 16:21:30
* @description
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceState.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceState.java
index a239fdc..4e0739f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceState.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceState.java
@@ -16,10 +16,9 @@
*/
package org.apache.rocketmq.streams.common.checkpoint;
-import org.apache.rocketmq.streams.common.context.MessageOffset;
-
import java.util.HashMap;
import java.util.Map;
+import org.apache.rocketmq.streams.common.context.MessageOffset;
/**
* @create 2021-08-11 15:51:50
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/component/ComponentCreator.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/component/ComponentCreator.java
index ff01034..889af89 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/component/ComponentCreator.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/component/ComponentCreator.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java
index 41a113c..3a67682 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java
@@ -17,16 +17,15 @@
package org.apache.rocketmq.streams.common.datatype;
import com.alibaba.fastjson.JSONObject;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.utils.NumberUtils;
-
import java.sql.Timestamp;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.utils.NumberUtils;
public class DateDataType extends BaseDataType<Date> {
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/interfaces/IBatchMessageFinishNotify.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/interfaces/IBatchMessageFinishNotify.java
index 81106d3..8007d46 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/interfaces/IBatchMessageFinishNotify.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/interfaces/IBatchMessageFinishNotify.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.common.interfaces;
import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
-import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataUtils.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataUtils.java
index 3c54775..bc0c3e7 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataUtils.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataUtils.java
@@ -16,12 +16,21 @@
*/
package org.apache.rocketmq.streams.common.metadata;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.rocketmq.streams.common.component.AbstractComponent;
import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
-import java.sql.*;
-import java.util.*;
-
/**
* @description metaDataUtils
*/
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/fingerprint/FingerprintCache.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/fingerprint/FingerprintCache.java
index 680711c..7dff0c0 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/fingerprint/FingerprintCache.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/fingerprint/FingerprintCache.java
@@ -17,8 +17,6 @@
package org.apache.rocketmq.streams.common.optimization.fingerprint;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/fingerprint/PreFingerprint.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/fingerprint/PreFingerprint.java
index 81fdbf6..9800a7b 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/fingerprint/PreFingerprint.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/fingerprint/PreFingerprint.java
@@ -17,14 +17,12 @@
package org.apache.rocketmq.streams.common.optimization.fingerprint;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.streams.common.cache.compress.BitSetCache;
import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.topology.model.AbstractRule;
import org.apache.rocketmq.streams.common.topology.model.AbstractStage;
import org.apache.rocketmq.streams.common.topology.stages.FilterChainStage;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/schedule/ScheduleManager.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/schedule/ScheduleManager.java
index 1920f80..b29a6cc 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/schedule/ScheduleManager.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/schedule/ScheduleManager.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.streams.common.schedule;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
index 5c7fe41..1269dc7 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
@@ -17,9 +17,7 @@
package org.apache.rocketmq.streams.common.topology;
import com.alibaba.fastjson.JSONObject;
-
import com.google.common.collect.Lists;
-
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
@@ -28,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.rocketmq.streams.common.cache.compress.impl.LongValueKV;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java
index a7d60e6..5d47ba1 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java
@@ -16,10 +16,15 @@
*/
package org.apache.rocketmq.streams.common.topology.builder;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
-import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
import org.apache.rocketmq.streams.common.configurable.IConfigurable;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.metadata.MetaData;
@@ -30,13 +35,6 @@ import org.apache.rocketmq.streams.common.topology.stages.OutputChainStage;
import org.apache.rocketmq.streams.common.utils.NameCreatorUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
public class PipelineBuilder implements Serializable {
private static final long serialVersionUID = 1L;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/IWindow.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/IWindow.java
index 833d451..f16993b 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/IWindow.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/IWindow.java
@@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.streams.common.topology.model;
-import java.util.Set;
-
import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.configurable.IConfigurable;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
index 8896583..2badd50 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.streams.common.topology.stages;
import java.util.HashSet;
import java.util.Set;
-
import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
index 154bab5..8a514e2 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
@@ -34,7 +34,6 @@ import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.topology.ChainStage;
import org.apache.rocketmq.streams.common.topology.model.IStageHandle;
-import org.apache.rocketmq.streams.common.utils.JsonableUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
public class OutputChainStage<T extends IMessage> extends ChainStage<T> implements IAfterConfigurableRefreshListener {
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java
index c152945..aa996b9 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.streams.common.topology.stages.udf;
+import java.io.Serializable;
import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.context.AbstractContext;
@@ -26,8 +27,6 @@ import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
import org.apache.rocketmq.streams.common.topology.model.IStageHandle;
import org.apache.rocketmq.streams.common.topology.stages.AbstractStatelessChainStage;
-import java.io.Serializable;
-
/**
* 给用户提供自定义的抽象类
*/
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/task/StreamsTask.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/task/StreamsTask.java
index e24ee9e..bc5fd7f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/task/StreamsTask.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/task/StreamsTask.java
@@ -25,7 +25,6 @@ import java.util.ServiceLoader;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.cache.compress.BitSetCache;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/DataTypeUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/DataTypeUtil.java
index 726a4f0..d0d76ea 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/DataTypeUtil.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/DataTypeUtil.java
@@ -18,11 +18,6 @@ package org.apache.rocketmq.streams.common.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.configurable.IConfigurable;
-import org.apache.rocketmq.streams.common.datatype.*;
-
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.math.BigDecimal;
@@ -33,6 +28,9 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
import org.apache.rocketmq.streams.common.datatype.ArrayDataType;
import org.apache.rocketmq.streams.common.datatype.BaseDataType;
import org.apache.rocketmq.streams.common.datatype.BooleanDataType;
@@ -40,6 +38,7 @@ import org.apache.rocketmq.streams.common.datatype.ByteDataType;
import org.apache.rocketmq.streams.common.datatype.ConfigurableDataType;
import org.apache.rocketmq.streams.common.datatype.DataType;
import org.apache.rocketmq.streams.common.datatype.DateDataType;
+import org.apache.rocketmq.streams.common.datatype.DateTimeDataType;
import org.apache.rocketmq.streams.common.datatype.DoubleDataType;
import org.apache.rocketmq.streams.common.datatype.FloatDataType;
import org.apache.rocketmq.streams.common.datatype.GenericParameterDataType;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/MapKeyUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/MapKeyUtil.java
index 94a9e69..5b31787 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/MapKeyUtil.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/MapKeyUtil.java
@@ -17,8 +17,11 @@
package org.apache.rocketmq.streams.common.utils;
import com.alibaba.fastjson.JSONObject;
-
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
public class MapKeyUtil {
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
index c97b408..1f9ffd9 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
@@ -20,7 +20,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.component.AbstractComponent;
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractSupportParentConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractSupportParentConfigureService.java
index 7b0e484..58c62ae 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractSupportParentConfigureService.java
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractSupportParentConfigureService.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.configurable.IConfigurable;
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java
index 27dadf6..1497b43 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.configurable.service;
import java.util.Properties;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
index b597f25..79c25dc 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
@@ -20,8 +20,6 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
-
-import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java
index ea289df..c00d957 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.connectors.source.SourceInstance;
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java
index 5ed4819..c01c151 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.connectors.balance;
import java.util.List;
-
import org.apache.rocketmq.streams.common.channel.split.ISplit;
public class SplitChanged {
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java
index a4b91d3..2ffc7a8 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java
@@ -16,20 +16,23 @@
*/
package org.apache.rocketmq.streams.connectors.balance.impl;
-import java.util.*;
-
import com.google.auto.service.AutoService;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.connectors.balance.AbstractBalance;
-import org.apache.rocketmq.streams.connectors.balance.ISourceBalance;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.model.ServiceName;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
+import org.apache.rocketmq.streams.connectors.balance.AbstractBalance;
+import org.apache.rocketmq.streams.connectors.balance.ISourceBalance;
+import org.apache.rocketmq.streams.connectors.source.SourceInstance;
import org.apache.rocketmq.streams.lease.LeaseComponent;
import org.apache.rocketmq.streams.lease.model.LeaseInfo;
-import org.apache.rocketmq.streams.connectors.source.SourceInstance;
import org.apache.rocketmq.streams.lease.service.ILeaseService;
@AutoService(ISourceBalance.class)
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java
index 3807585..cc926b1 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java
@@ -16,11 +16,10 @@
*/
package org.apache.rocketmq.streams.connectors.model;
-import org.apache.rocketmq.streams.common.model.Entity;
-import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
-
import java.util.Date;
import java.util.List;
+import org.apache.rocketmq.streams.common.model.Entity;
+import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
/**
* @description
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java
index a3298c5..d4abb03 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java
@@ -15,8 +15,13 @@
* limitations under the License.
*/
package org.apache.rocketmq.streams.connectors.reader;
+
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.source.ISource;
@@ -33,11 +38,6 @@ import org.apache.rocketmq.streams.db.driver.DriverBuilder;
import org.apache.rocketmq.streams.db.driver.JDBCDriver;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
/**
* @description
*/
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java
index da12274..293d72d 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java
@@ -15,10 +15,10 @@
* limitations under the License.
*/
package org.apache.rocketmq.streams.connectors.reader;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
-
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.connectors.model.PullMessage;
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java
index a2fe513..a3ec8b2 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java
@@ -20,7 +20,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
import org.apache.rocketmq.streams.common.channel.split.ISplit;
public class SplitCloseFuture implements Future<Boolean> {
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java
index 74161af..0403dd1 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.streams.connectors.source;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -26,20 +28,18 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
import org.apache.rocketmq.streams.common.context.Message;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.connectors.balance.ISourceBalance;
import org.apache.rocketmq.streams.connectors.balance.SplitChanged;
import org.apache.rocketmq.streams.connectors.balance.impl.LeaseBalanceImpl;
-import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
-import org.apache.rocketmq.streams.common.channel.split.ISplit;
-import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.connectors.model.PullMessage;
import org.apache.rocketmq.streams.connectors.reader.ISplitReader;
import org.apache.rocketmq.streams.connectors.reader.SplitCloseFuture;
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java
index 27598f6..1f036c6 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java
@@ -17,6 +17,13 @@
package org.apache.rocketmq.streams.connectors.source;
import com.alibaba.fastjson.JSONObject;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
@@ -34,11 +41,6 @@ import org.apache.rocketmq.streams.connectors.source.filter.CycleSchedule;
import org.apache.rocketmq.streams.connectors.source.filter.CycleScheduleFilter;
import org.apache.rocketmq.streams.db.CycleSplit;
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
* @description
*/
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java
index f44fae4..90e86d2 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java
@@ -15,9 +15,14 @@
* limitations under the License.
*/
package org.apache.rocketmq.streams.connectors.source;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.metadata.MetaDataUtils;
import org.apache.rocketmq.streams.connectors.reader.DBScanReader;
@@ -26,12 +31,6 @@ import org.apache.rocketmq.streams.connectors.source.filter.DataFormatPatternFil
import org.apache.rocketmq.streams.connectors.source.filter.PatternFilter;
import org.apache.rocketmq.streams.db.DynamicMultipleDBSplit;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* @description DynamicMultipleDBScanSource
*/
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java
index 81f488c..645069a 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.streams.connectors.source;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java
index b07371b..c9d983f 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java
@@ -16,12 +16,11 @@
*/
package org.apache.rocketmq.streams.connectors.source.filter;
+import java.io.Serializable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.connectors.model.ReaderStatus;
-import java.io.Serializable;
-
/**
* @description 过滤掉已经完成的reader
*/
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java
index acc75bb..9526c67 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java
@@ -16,12 +16,11 @@
*/
package org.apache.rocketmq.streams.connectors.source.filter;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* @Description
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java
index 824cfd7..0ce8b66 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java
@@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.streams.connectors.source.filter;
-import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
-
import java.io.Serializable;
import java.text.ParseException;
import java.util.ArrayList;
@@ -25,6 +23,7 @@ import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
/**
* @description 用来做分区选取
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java
index 557ef0e..b5b6666 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java
@@ -16,13 +16,11 @@
*/
package org.apache.rocketmq.streams.connectors.source.filter;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* @description
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/builder/DBDimSQLParser.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/builder/DBDimSQLParser.java
index 681477d..8ed99da 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/builder/DBDimSQLParser.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/builder/DBDimSQLParser.java
@@ -22,10 +22,8 @@ import java.util.Properties;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.metadata.MetaDataField;
import org.apache.rocketmq.streams.common.model.ServiceName;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.dim.model.AbstractDim;
import org.apache.rocketmq.streams.dim.model.DBDim;
-import org.apache.rocketmq.streams.dim.model.FileDim;
@AutoService(IDimSQLParser.class)
@ServiceName(value = DBDimSQLParser.TYPE, aliasName = "rds")
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/builder/SQLParserFactory.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/builder/SQLParserFactory.java
index 6d0f2a0..8fe1286 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/builder/SQLParserFactory.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/builder/SQLParserFactory.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.streams.dim.builder;
-import org.apache.rocketmq.streams.dim.model.AbstractDim;
import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent;
public class SQLParserFactory {
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/index/DimIndex.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/index/DimIndex.java
index 0989393..cfca308 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/index/DimIndex.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/index/DimIndex.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.cache.ListMemoryTable;
import org.apache.rocketmq.streams.common.cache.compress.AbstractMemoryTable;
import org.apache.rocketmq.streams.common.cache.compress.impl.IntListKV;
import org.apache.rocketmq.streams.common.datatype.IntDataType;
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/index/IndexExecutor.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/index/IndexExecutor.java
index c0e2c71..369d81c 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/index/IndexExecutor.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/index/IndexExecutor.java
@@ -25,16 +25,13 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.streams.common.datatype.IntDataType;
-import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.dim.model.AbstractDim;
import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
import org.apache.rocketmq.streams.filter.function.expression.Equals;
import org.apache.rocketmq.streams.filter.operator.Rule;
import org.apache.rocketmq.streams.filter.operator.expression.Expression;
import org.apache.rocketmq.streams.filter.operator.expression.RelationExpression;
-import org.apache.rocketmq.streams.script.ScriptComponent;
/**
* 执行索引的查询和构建。主要是完成表达式的解析,对于等值的表达式字段,如果有索引,根据索引查询,然后执行非等值部分的判断
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AbstractIntelligenceCache.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AbstractIntelligenceCache.java
index a79aa81..0b55d6e 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AbstractIntelligenceCache.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AbstractIntelligenceCache.java
@@ -49,7 +49,6 @@ import org.apache.rocketmq.streams.common.dboperator.IDBDriver;
import org.apache.rocketmq.streams.common.utils.NumberUtils;
import org.apache.rocketmq.streams.common.utils.SQLUtil;
import org.apache.rocketmq.streams.db.driver.JDBCDriver;
-import org.apache.rocketmq.streams.http.source.util.HttpUtil;
public abstract class AbstractIntelligenceCache extends BasedConfigurable implements
IAfterConfigurableRefreshListener {
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/AbstractDim.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/AbstractDim.java
index 5d0d7d1..c428dcf 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/AbstractDim.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/AbstractDim.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.streams.dim.model;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -32,7 +31,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.cache.ByteArrayMemoryTable;
-import org.apache.rocketmq.streams.common.cache.ListMemoryTable;
import org.apache.rocketmq.streams.common.cache.compress.AbstractMemoryTable;
import org.apache.rocketmq.streams.common.cache.softreference.ICache;
import org.apache.rocketmq.streams.common.cache.softreference.impl.SoftReferenceCache;
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/DBDim.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/DBDim.java
index a643b18..b8e9ee5 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/DBDim.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/DBDim.java
@@ -21,8 +21,6 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.cache.ByteArrayMemoryTable;
-import org.apache.rocketmq.streams.common.cache.ListMemoryTable;
import org.apache.rocketmq.streams.common.cache.compress.AbstractMemoryTable;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
import org.apache.rocketmq.streams.common.utils.IPUtil;
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/FileDim.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/FileDim.java
index 50c0933..d004d60 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/FileDim.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/FileDim.java
@@ -18,11 +18,9 @@ package org.apache.rocketmq.streams.dim.model;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
-import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.rocketmq.streams.common.cache.ByteArrayMemoryTable;
import org.apache.rocketmq.streams.common.cache.compress.AbstractMemoryTable;
import org.apache.rocketmq.streams.common.utils.FileUtil;
diff --git a/rocketmq-streams-dim/src/test/java/com/aliyun/service/TableCompressTest.java b/rocketmq-streams-dim/src/test/java/com/aliyun/service/TableCompressTest.java
index 4e41f45..f31d060 100644
--- a/rocketmq-streams-dim/src/test/java/com/aliyun/service/TableCompressTest.java
+++ b/rocketmq-streams-dim/src/test/java/com/aliyun/service/TableCompressTest.java
@@ -23,10 +23,8 @@ import java.util.Map;
import org.apache.rocketmq.streams.common.cache.ByteArrayMemoryTable;
import org.apache.rocketmq.streams.common.cache.compress.AbstractMemoryTable;
import org.apache.rocketmq.streams.common.cache.compress.impl.IntListKV;
-import org.apache.rocketmq.streams.common.cache.compress.impl.LongListKV;
import org.junit.Test;
-import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertFalse;
public class TableCompressTest {
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MutilStreamsClientTest.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MutilStreamsClientTest.java
index 6eea30f..69fc569 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MutilStreamsClientTest.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MutilStreamsClientTest.java
@@ -20,16 +20,15 @@
package org.apache.rocketmq.streams.examples.mutilconsumer;
import com.alibaba.fastjson.JSONObject;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.rocketmq.streams.client.StreamBuilder;
import org.apache.rocketmq.streams.client.source.DataStreamSource;
import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
import org.apache.rocketmq.streams.client.transform.window.Time;
import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/Producer.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/Producer.java
index 731b226..7f88c1c 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/Producer.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/Producer.java
@@ -19,14 +19,13 @@
package org.apache.rocketmq.streams.examples.mutilconsumer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.streams.examples.rocketmqsource.ProducerFromFile;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java
index 6c7606f..bebfd79 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java
@@ -19,11 +19,6 @@
package org.apache.rocketmq.streams.examples.rocketmqsource;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
@@ -31,6 +26,10 @@ import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ProducerFromFile {
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
index 5610b1f..2da9f23 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
@@ -16,11 +16,10 @@
*/
package org.apache.rocketmq.streams.examples.rocketmqsource;
+import java.util.Arrays;
import org.apache.rocketmq.streams.client.StreamBuilder;
import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import java.util.Arrays;
-
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
index 45a9216..f4a7c49 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
@@ -18,13 +18,12 @@
package org.apache.rocketmq.streams.examples.rocketmqsource;
import com.alibaba.fastjson.JSONObject;
-import org.apache.rocketmq.streams.client.StreamBuilder;
-import org.apache.rocketmq.streams.client.source.DataStreamSource;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.rocketmq.streams.client.StreamBuilder;
+import org.apache.rocketmq.streams.client.source.DataStreamSource;
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/FilterComponent.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/FilterComponent.java
index 08be597..08d08d4 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/FilterComponent.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/FilterComponent.java
@@ -28,7 +28,6 @@ import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.filter.builder.RuleBuilder;
import org.apache.rocketmq.streams.filter.context.ContextConfigure;
-import org.apache.rocketmq.streams.filter.context.RuleMessage;
import org.apache.rocketmq.streams.filter.operator.Rule;
import org.apache.rocketmq.streams.filter.service.IRuleEngineService;
import org.apache.rocketmq.streams.filter.service.impl.RuleEngineServiceImpl;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/builder/ExpressionBuilder.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/builder/ExpressionBuilder.java
index ad65e80..fbbf40f 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/builder/ExpressionBuilder.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/builder/ExpressionBuilder.java
@@ -37,7 +37,6 @@ import org.apache.rocketmq.streams.filter.operator.expression.Expression;
import org.apache.rocketmq.streams.filter.operator.expression.ExpressionRelationParser;
import org.apache.rocketmq.streams.filter.operator.expression.RelationExpression;
import org.apache.rocketmq.streams.filter.operator.expression.SimpleExpression;
-import org.apache.rocketmq.streams.filter.operator.var.ConstantVar;
import org.apache.rocketmq.streams.filter.operator.var.ContextVar;
import org.apache.rocketmq.streams.filter.operator.var.Var;
import org.apache.rocketmq.streams.filter.optimization.ExpressionOptimization;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
index e51e94f..ce08793 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
@@ -24,7 +24,6 @@ import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/engine/impl/DefaultRuleEngine.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/engine/impl/DefaultRuleEngine.java
index 0a09a1d..8f353c6 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/engine/impl/DefaultRuleEngine.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/engine/impl/DefaultRuleEngine.java
@@ -121,17 +121,6 @@ public class DefaultRuleEngine implements IRuleEngine {
* 判断rule的status 如果是观察者模式,则写入到观察者库中的观察表里
*/
if (rule.getRuleStatus().intValue() == 3) {
-// if (!context.getContextConfigure().isAction2Online()) {
-// // LOG.warn("DefaultRuleEngine fireAction ignore : configure action2Online false!");
-// if (context.getContextConfigure().isActionOnline2Observer()) {
-// Action action = getAction(RuleContext.OBSERVER_NAME,rule);
-// if (action == null) {
-// return;
-// }
-// doAction(message,context, action, rule);
-// }
-// return;
-// }
try {
if (rule.getActionNames() == null || rule.getActionNames().size() == 0) {
return;
@@ -145,12 +134,9 @@ public class DefaultRuleEngine implements IRuleEngine {
}
} catch (Exception e) {
LOG.error("DefaultRuleEngine fire atciton error: ruleName is" + rule.getConfigureName(), e);
- // context.addErrorMessage(rule, "DefaultRuleEngine fire atciton error: " + e.getMessage());
}
} else {
-// if (context.getContextConfigure() != null && !context.getContextConfigure().isAction2Observer()) {
-// return;
-// }
+
Action action = getAction(RuleContext.OBSERVER_NAME,rule);
if (action == null) {
return;
@@ -173,32 +159,6 @@ public class DefaultRuleEngine implements IRuleEngine {
@SuppressWarnings("rawtypes")
protected void doAction(final IMessage message, AbstractContext context, final Action action, final Rule rule) {
action.doMessage(message,context);
-// context.getActionExecutor().execute(new Runnable() {
-//
-// @Override
-// public void run() {
-//// IMonitor monitor = context.getRuleMonitor();
-//// IMonitor actionMonitor = monitor.createChildren(action);
-// try {
-//
-//
-// if (monitor != null) {
-// actionMonitor.endMonitor();
-// if (actionMonitor.isSlow()) {
-// actionMonitor.setSampleData(context).put("action_info", action.toJsonObject());
-// }
-// }
-// } catch (Exception e) {
-// String errorMsg = "DefaultRuleEngine doAction error,rule: " + rule.getRuleCode() + " ,action: "
-// + action.getConfigureName();
-// // RULEENGINE_MESSAGE_LOG.warn(errorMsg
-// // , e);
-// actionMonitor.occureError(e, errorMsg, e.getMessage());
-// actionMonitor.setSampleData(context).put("action_info", action.toJsonObject());
-// }
-//
-// }
-// });
}
public Action getAction(String name, Rule rule) {
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/script/CaseDependentParser.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/script/CaseDependentParser.java
index a9da48b..4f9f61e 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/script/CaseDependentParser.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/script/CaseDependentParser.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.filter.function.script;
import com.google.auto.service.AutoService;
-import java.util.ArrayList;
import java.util.Set;
import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
import org.apache.rocketmq.streams.filter.operator.Rule;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/script/CaseFunction.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/script/CaseFunction.java
index 59ee5a1..b491dd9 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/script/CaseFunction.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/script/CaseFunction.java
@@ -16,11 +16,8 @@
*/
package org.apache.rocketmq.streams.filter.function.script;
-import com.alibaba.fastjson.JSONObject;
-import java.util.List;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
-import org.apache.rocketmq.streams.script.ScriptComponent;
import org.apache.rocketmq.streams.script.annotation.Function;
import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
import org.apache.rocketmq.streams.script.context.FunctionContext;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/Rule.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/Rule.java
index e5527c3..33b4360 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/Rule.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/Rule.java
@@ -46,7 +46,6 @@ import org.apache.rocketmq.streams.common.topology.stages.FilterChainStage;
import org.apache.rocketmq.streams.common.utils.TraceUtil;
import org.apache.rocketmq.streams.db.driver.JDBCDriver;
import org.apache.rocketmq.streams.filter.FilterComponent;
-import org.apache.rocketmq.streams.filter.context.RuleContext;
import org.apache.rocketmq.streams.filter.operator.action.Action;
import org.apache.rocketmq.streams.filter.operator.action.impl.SinkAction;
import org.apache.rocketmq.streams.filter.operator.expression.Expression;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/RuleExpression.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/RuleExpression.java
index e45383d..8ec38d9 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/RuleExpression.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/RuleExpression.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.filter.operator.Rule;
import org.apache.rocketmq.streams.script.context.FunctionContext;
import org.apache.rocketmq.streams.script.service.IScriptExpression;
import org.apache.rocketmq.streams.script.service.IScriptParamter;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/expression/Expression.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/expression/Expression.java
index 507386e..ac3540b 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/expression/Expression.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/expression/Expression.java
@@ -56,7 +56,6 @@ import org.apache.rocketmq.streams.filter.operator.var.ContextVar;
import org.apache.rocketmq.streams.filter.operator.var.Var;
import org.apache.rocketmq.streams.script.ScriptComponent;
import org.apache.rocketmq.streams.script.function.model.FunctionConfigure;
-import org.apache.rocketmq.streams.script.optimization.performance.IScriptOptimization;
import org.apache.rocketmq.streams.script.utils.FunctionUtils;
public class Expression<T> extends BasedConfigurable
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/var/InnerVar.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/var/InnerVar.java
index 3aa0c10..1333abc 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/var/InnerVar.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/var/InnerVar.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.streams.filter.operator.var;
-import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/casewhen/CaseWhenBuilder.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/casewhen/CaseWhenBuilder.java
index 6e46285..87b5493 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/casewhen/CaseWhenBuilder.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/casewhen/CaseWhenBuilder.java
@@ -26,10 +26,8 @@ import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
import org.apache.rocketmq.streams.filter.operator.Rule;
-import org.apache.rocketmq.streams.filter.operator.expression.Expression;
import org.apache.rocketmq.streams.script.function.impl.field.RemoveFieldFunction;
import org.apache.rocketmq.streams.script.operator.expression.GroupScriptExpression;
-import org.apache.rocketmq.streams.script.operator.expression.ScriptExpression;
import org.apache.rocketmq.streams.script.optimization.performance.IScriptOptimization;
import org.apache.rocketmq.streams.script.service.IScriptExpression;
import org.apache.rocketmq.streams.script.service.IScriptParamter;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/casewhen/GroupByVarCaseWhen.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/casewhen/GroupByVarCaseWhen.java
index 02ba02b..a23f996 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/casewhen/GroupByVarCaseWhen.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/casewhen/GroupByVarCaseWhen.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/BlinkRule.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/BlinkRule.java
index 8040895..284516d 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/BlinkRule.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/BlinkRule.java
@@ -23,9 +23,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.filter.operator.expression.Expression;
-import org.apache.rocketmq.streams.filter.operator.expression.RelationExpression;
public class BlinkRule {
/**
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/BlinkRuleV2Expression.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/BlinkRuleV2Expression.java
index 3ae033c..175417a 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/BlinkRuleV2Expression.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/BlinkRuleV2Expression.java
@@ -39,16 +39,10 @@ import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.FileUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.ReflectUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.configurable.ConfigurableComponent;
import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
-import org.apache.rocketmq.streams.filter.context.RuleContext;
-import org.apache.rocketmq.streams.filter.operator.RuleExpression;
import org.apache.rocketmq.streams.filter.operator.Rule;
import org.apache.rocketmq.streams.filter.operator.expression.Expression;
import org.apache.rocketmq.streams.filter.operator.expression.RelationExpression;
-import org.apache.rocketmq.streams.script.operator.expression.GroupScriptExpression;
-import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
import org.apache.rocketmq.streams.script.service.IScriptExpression;
import org.apache.rocketmq.streams.script.service.udf.UDFScript;
import org.apache.rocketmq.streams.script.utils.FunctionUtils;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/CommonExpression.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/CommonExpression.java
index eb7852b..ba1fd02 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/CommonExpression.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/CommonExpression.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.streams.filter.optimization.dependency;
-import com.sun.org.apache.bcel.internal.generic.FADD;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/DependencyTree.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/DependencyTree.java
index 72d15dc..4ac4b7f 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/DependencyTree.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/DependencyTree.java
@@ -18,8 +18,6 @@ package org.apache.rocketmq.streams.filter.optimization.dependency;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -32,8 +30,6 @@ import org.apache.rocketmq.streams.common.topology.model.AbstractStage;
import org.apache.rocketmq.streams.common.topology.stages.FilterChainStage;
import org.apache.rocketmq.streams.common.topology.stages.ScriptChainStage;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
-import org.apache.rocketmq.streams.script.service.IScriptExpression;
-import org.python.icu.impl.coll.BOCSU;
/**
* raverse the pipeline to create a prefix filter fingerprint
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/FilterTreeNode.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/FilterTreeNode.java
index fc9aae5..12ffe2e 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/FilterTreeNode.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/FilterTreeNode.java
@@ -19,11 +19,8 @@ package org.apache.rocketmq.streams.filter.optimization.dependency;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.streams.common.optimization.fingerprint.FingerprintCache;
@@ -31,14 +28,11 @@ import org.apache.rocketmq.streams.common.optimization.fingerprint.PreFingerprin
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
import org.apache.rocketmq.streams.common.topology.model.AbstractStage;
import org.apache.rocketmq.streams.common.topology.stages.FilterChainStage;
-import org.apache.rocketmq.streams.common.topology.stages.ScriptChainStage;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.filter.operator.Rule;
import org.apache.rocketmq.streams.filter.operator.expression.Expression;
import org.apache.rocketmq.streams.filter.operator.expression.RelationExpression;
-import org.apache.rocketmq.streams.script.operator.expression.GroupScriptExpression;
-import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
import org.apache.rocketmq.streams.script.service.IScriptExpression;
public class FilterTreeNode extends TreeNode<FilterChainStage> {
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/PipelineTree.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/PipelineTree.java
index 85e2f49..8cbe51b 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/PipelineTree.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/PipelineTree.java
@@ -20,7 +20,6 @@ import java.util.List;
import java.util.Map;
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
import org.apache.rocketmq.streams.common.topology.model.AbstractStage;
-import org.apache.rocketmq.streams.common.topology.model.Pipeline;
import org.apache.rocketmq.streams.common.topology.stages.FilterChainStage;
import org.apache.rocketmq.streams.common.topology.stages.ScriptChainStage;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/ScriptTreeNode.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/ScriptTreeNode.java
index a1599af..9327c67 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/ScriptTreeNode.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/ScriptTreeNode.java
@@ -17,12 +17,7 @@
package org.apache.rocketmq.streams.filter.optimization.dependency;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
@@ -33,7 +28,6 @@ import org.apache.rocketmq.streams.filter.operator.Rule;
import org.apache.rocketmq.streams.filter.operator.RuleExpression;
import org.apache.rocketmq.streams.filter.operator.expression.Expression;
import org.apache.rocketmq.streams.filter.optimization.casewhen.AbstractWhenExpression;
-import org.apache.rocketmq.streams.filter.optimization.casewhen.SingleCaseWhenExpression;
import org.apache.rocketmq.streams.filter.optimization.script.ScriptOptimization;
import org.apache.rocketmq.streams.script.operator.expression.GroupScriptExpression;
import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/homologous/HomologousOptimization.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/homologous/HomologousOptimization.java
index fa87eec..e705758 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/homologous/HomologousOptimization.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/homologous/HomologousOptimization.java
@@ -17,16 +17,13 @@
package org.apache.rocketmq.streams.filter.optimization.homologous;
import com.google.auto.service.AutoService;
-
import java.util.ArrayList;
import java.util.List;
-
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.optimization.IHomologousOptimization;
import org.apache.rocketmq.streams.common.optimization.fingerprint.FingerprintCache;
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
-import org.apache.rocketmq.streams.common.topology.stages.SubPiplineChainStage;
import org.apache.rocketmq.streams.filter.optimization.dependency.CommonExpression;
import org.apache.rocketmq.streams.filter.optimization.dependency.DependencyTree;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/script/ScriptOptimization.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/script/ScriptOptimization.java
index cda0eca..e320d52 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/script/ScriptOptimization.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/script/ScriptOptimization.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.streams.filter.optimization.script;
import com.google.auto.service.AutoService;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -27,13 +26,10 @@ import org.apache.rocketmq.streams.common.configurable.IConfigurableIdentificati
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.optimization.fingerprint.FingerprintCache;
-import org.apache.rocketmq.streams.filter.context.RuleContext;
import org.apache.rocketmq.streams.filter.operator.expression.Expression;
-import org.apache.rocketmq.streams.filter.optimization.casewhen.CaseWhenBuilder;
import org.apache.rocketmq.streams.filter.optimization.dependency.BlinkRuleV2Expression;
import org.apache.rocketmq.streams.filter.optimization.executor.GroupByVarExecutor;
import org.apache.rocketmq.streams.script.context.FunctionContext;
-import org.apache.rocketmq.streams.script.operator.expression.GroupScriptExpression;
import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
import org.apache.rocketmq.streams.script.optimization.performance.IScriptOptimization;
import org.apache.rocketmq.streams.script.service.IScriptExpression;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/service/impl/RuleEngineServiceImpl.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/service/impl/RuleEngineServiceImpl.java
index 37462b1..bc7d886 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/service/impl/RuleEngineServiceImpl.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/service/impl/RuleEngineServiceImpl.java
@@ -29,7 +29,6 @@ import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.filter.builder.RuleBuilder;
import org.apache.rocketmq.streams.filter.context.ContextConfigure;
import org.apache.rocketmq.streams.filter.context.RuleContext;
-import org.apache.rocketmq.streams.filter.context.RuleMessage;
import org.apache.rocketmq.streams.filter.engine.IRuleEngine;
import org.apache.rocketmq.streams.filter.engine.impl.DefaultRuleEngine;
import org.apache.rocketmq.streams.filter.operator.Rule;
diff --git a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/impl/BasedLesaseImpl.java b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/impl/BasedLesaseImpl.java
index 671f577..9584a46 100644
--- a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/impl/BasedLesaseImpl.java
+++ b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/impl/BasedLesaseImpl.java
@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
diff --git a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java
index c5a0c27..bd7ef82 100644
--- a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java
+++ b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.utils.DateUtil;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/context/FunctionContext.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/context/FunctionContext.java
index 5f158b8..00e3c95 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/context/FunctionContext.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/context/FunctionContext.java
@@ -21,9 +21,7 @@ import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.model.ThreadContext;
import org.apache.rocketmq.streams.script.function.service.IFunctionService;
import org.apache.rocketmq.streams.script.function.service.impl.ScanFunctionService;
-import org.apache.rocketmq.streams.script.service.IScriptExpression;
import org.apache.rocketmq.streams.script.utils.FunctionUtils;
-import org.python.icu.impl.coll.BOCSU;
/**
* 脚本执行的上下文
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/FirstValueAccumulator.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/FirstValueAccumulator.java
index b5c8193..211180b 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/FirstValueAccumulator.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/FirstValueAccumulator.java
@@ -17,14 +17,10 @@
package org.apache.rocketmq.streams.script.function.aggregation;
import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.script.annotation.Function;
import org.apache.rocketmq.streams.script.annotation.UDAFFunction;
import org.apache.rocketmq.streams.script.service.IAccumulator;
-import org.apache.rocketmq.streams.state.kv.rocksdb.RocksdbState;
@Function
@UDAFFunction("FIRST_VALUE")
public class FirstValueAccumulator<T> implements IAccumulator<T, FirstValueAccumulator.FirstValue> {
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/between/BetweenFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/between/BetweenFunction.java
index 9934ea6..f552001 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/between/BetweenFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/between/BetweenFunction.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.script.function.impl.between;
import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.script.annotation.Function;
import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
import org.apache.rocketmq.streams.script.annotation.FunctionParamter;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/condition/IFScopeFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/condition/IFScopeFunction.java
index 4b988bd..846ef9f 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/condition/IFScopeFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/condition/IFScopeFunction.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.streams.script.function.impl.condition;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.script.annotation.Function;
import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
-import org.apache.rocketmq.streams.script.annotation.FunctionParamter;
import org.apache.rocketmq.streams.script.context.FunctionContext;
@Function
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/context/ContextFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/context/ContextFunction.java
index d4fe4b8..72cf090 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/context/ContextFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/context/ContextFunction.java
@@ -17,8 +17,6 @@
package org.apache.rocketmq.streams.script.function.impl.context;
import com.alibaba.fastjson.JSONObject;
-import org.apache.rocketmq.streams.common.component.ComponentCreator;
-import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.script.annotation.Function;
import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/date/GetDateFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/date/GetDateFunction.java
index f6ec724..ee367be 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/date/GetDateFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/date/GetDateFunction.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.streams.script.function.impl.date;
-import java.util.Date;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.script.annotation.Function;
import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/eval/EvalFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/eval/EvalFunction.java
index b6d9f76..a77431a 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/eval/EvalFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/eval/EvalFunction.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.streams.script.function.impl.eval;
import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.streams.common.cache.softreference.ICache;
import org.apache.rocketmq.streams.common.cache.softreference.impl.SoftReferenceCache;
-import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.script.ScriptComponent;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/flatmap/SplitArrayFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/flatmap/SplitArrayFunction.java
index 8678103..25f40d9 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/flatmap/SplitArrayFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/flatmap/SplitArrayFunction.java
@@ -18,10 +18,8 @@ package org.apache.rocketmq.streams.script.function.impl.flatmap;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
-
import java.util.List;
import java.util.Map;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.context.IMessage;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/item/ItemFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/item/ItemFunction.java
index 77a9bcf..584157b 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/item/ItemFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/item/ItemFunction.java
@@ -16,13 +16,9 @@
*/
package org.apache.rocketmq.streams.script.function.impl.item;
-import com.alibaba.fastjson.JSONObject;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import jnr.ffi.annotations.In;
import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.script.annotation.Function;
import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
import org.apache.rocketmq.streams.script.annotation.FunctionParamter;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/relation/AndFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/relation/AndFunction.java
index c227d5f..012dd1c 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/relation/AndFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/relation/AndFunction.java
@@ -17,15 +17,9 @@
package org.apache.rocketmq.streams.script.function.impl.relation;
import com.alibaba.fastjson.JSONObject;
-import java.lang.reflect.Method;
-import java.util.List;
-import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.script.ScriptComponent;
import org.apache.rocketmq.streams.script.annotation.Function;
import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
-import org.apache.rocketmq.streams.script.annotation.FunctionParamter;
-import org.apache.rocketmq.streams.script.context.FunctionContext;
-import org.apache.rocketmq.streams.script.function.model.FunctionType;
@Function
public class AndFunction {
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/GroupScriptExpression.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/GroupScriptExpression.java
index 8d478f7..555d646 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/GroupScriptExpression.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/GroupScriptExpression.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.script.operator.expression;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ICaseDependentParser.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ICaseDependentParser.java
index 0c9ffe1..b4c9e86 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ICaseDependentParser.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ICaseDependentParser.java
@@ -15,9 +15,9 @@
* limitations under the License.
*/
package org.apache.rocketmq.streams.script.operator.expression;
+
import java.util.Set;
import org.apache.rocketmq.streams.script.service.IScriptExpression;
-import org.apache.rocketmq.streams.script.service.IScriptParamter;
public interface ICaseDependentParser {
/**
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ScriptExpression.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ScriptExpression.java
index 349bc12..a54978f 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ScriptExpression.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ScriptExpression.java
@@ -17,13 +17,11 @@
package org.apache.rocketmq.streams.script.operator.expression;
import com.alibaba.fastjson.JSONObject;
-
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.cache.softreference.ICache;
@@ -31,9 +29,7 @@ import org.apache.rocketmq.streams.common.cache.softreference.impl.SoftReference
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
import org.apache.rocketmq.streams.common.context.AbstractContext;
-import org.apache.rocketmq.streams.common.context.IExpressionResultCache;
import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.monitor.IMonitor;
import org.apache.rocketmq.streams.common.optimization.HomologousVar;
import org.apache.rocketmq.streams.common.utils.PrintUtil;
import org.apache.rocketmq.streams.common.utils.ReflectUtil;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/FunctionScript.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/FunctionScript.java
index 039aba6..4363ee4 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/FunctionScript.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/FunctionScript.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.script.operator.impl;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -33,7 +32,6 @@ import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.interfaces.IBaseStreamOperator;
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
-import org.apache.rocketmq.streams.common.optimization.FilterResultCache;
import org.apache.rocketmq.streams.common.topology.ChainStage;
import org.apache.rocketmq.streams.common.topology.builder.IStageBuilder;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
@@ -41,10 +39,8 @@ import org.apache.rocketmq.streams.common.topology.model.AbstractScript;
import org.apache.rocketmq.streams.common.topology.stages.ScriptChainStage;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.script.context.FunctionContext;
-import org.apache.rocketmq.streams.script.operator.expression.GroupScriptExpression;
-import org.apache.rocketmq.streams.script.operator.expression.ICaseDependentParser;
-import org.apache.rocketmq.streams.script.optimization.performance.IScriptOptimization;
import org.apache.rocketmq.streams.script.operator.expression.ScriptExpression;
+import org.apache.rocketmq.streams.script.optimization.performance.IScriptOptimization;
import org.apache.rocketmq.streams.script.parser.imp.FunctionParser;
import org.apache.rocketmq.streams.script.service.IScriptExpression;
import org.apache.rocketmq.streams.script.service.IScriptParamter;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/utils/ExpressionUtil.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/utils/ExpressionUtil.java
index 7b4e03e..7d976dc 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/utils/ExpressionUtil.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/utils/ExpressionUtil.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.script.utils;
import java.util.List;
-import org.apache.rocketmq.streams.script.annotation.Function;
import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
import org.apache.rocketmq.streams.script.service.IScriptExpression;
import org.apache.rocketmq.streams.script.service.IScriptParamter;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/utils/FunctionUtils.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/utils/FunctionUtils.java
index 12a29d5..fa7ecef 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/utils/FunctionUtils.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/utils/FunctionUtils.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.streams.script.utils;
-import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -24,7 +23,6 @@ import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.IgnoreMessage;
import org.apache.rocketmq.streams.common.datatype.DateDataType;
-import org.apache.rocketmq.streams.common.utils.ReflectUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
public class FunctionUtils {
diff --git a/rocketmq-streams-script/src/test/java/org/apache/rocketmq/streams/script/function/UDTFFunctionTest.java b/rocketmq-streams-script/src/test/java/org/apache/rocketmq/streams/script/function/UDTFFunctionTest.java
index d91496e..7efeda7 100644
--- a/rocketmq-streams-script/src/test/java/org/apache/rocketmq/streams/script/function/UDTFFunctionTest.java
+++ b/rocketmq-streams-script/src/test/java/org/apache/rocketmq/streams/script/function/UDTFFunctionTest.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.streams.script.function;
import com.alibaba.fastjson.JSONObject;
+import java.lang.reflect.Method;
+import java.util.List;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.script.ScriptComponent;
import org.apache.rocketmq.streams.script.function.function.JavaObjectUDFFunction;
@@ -24,9 +26,6 @@ import org.apache.rocketmq.streams.script.function.function.Person;
import org.apache.rocketmq.streams.script.function.model.FunctionType;
import org.junit.Test;
-import java.lang.reflect.Method;
-import java.util.List;
-
public class UDTFFunctionTest {
@Test
diff --git a/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/AbstractState.java b/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/AbstractState.java
index 368f6fc..4d46754 100644
--- a/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/AbstractState.java
+++ b/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/AbstractState.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent;
import org.apache.rocketmq.streams.state.backend.IStateBackend;
diff --git a/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/kv/rocksdb/RocksDBOperator.java b/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/kv/rocksdb/RocksDBOperator.java
index 87453f0..43cbbdd 100644
--- a/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/kv/rocksdb/RocksDBOperator.java
+++ b/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/kv/rocksdb/RocksDBOperator.java
@@ -20,7 +20,6 @@ import java.io.File;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.streams.common.utils.FileUtil;
import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugAnalysis.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugAnalysis.java
index 1639cb9..8b0a247 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugAnalysis.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugAnalysis.java
@@ -28,19 +28,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.FileUtil;
import org.apache.rocketmq.streams.common.utils.JsonableUtil;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.PrintUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
-import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.window.operator.impl.WindowOperator;
-import org.apache.rocketmq.streams.window.state.impl.WindowValue;
-import org.apache.rocketmq.streams.window.storage.WindowStorage;
-import org.apache.rocketmq.streams.window.storage.rocksdb.RocksdbStorage;
import org.junit.Assert;
import static junit.framework.TestCase.assertTrue;
-import static junit.framework.TestCase.format;
public class DebugAnalysis {
private String dir;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugWriter.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugWriter.java
index d9e20ad..6558bd8 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugWriter.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugWriter.java
@@ -26,14 +26,12 @@ import java.util.List;
import java.util.Map;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.FileUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.window.model.WindowCache;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.window.state.WindowBaseValue;
import org.apache.rocketmq.streams.window.state.impl.WindowValue;
public class DebugWriter {
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/WindowDebug.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/WindowDebug.java
index 1b57989..2d642c0 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/WindowDebug.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/WindowDebug.java
@@ -17,8 +17,6 @@
package org.apache.rocketmq.streams.window.debug;
-import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-
public class WindowDebug {
protected String sumFieldName;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
index 3720293..df20dad 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.streams.window.fire;
import java.util.HashMap;
import java.util.Map;
-
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.streams.common.channel.source.ISource;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
index 89ad616..4df71a5 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
@@ -16,8 +16,12 @@
*/
package org.apache.rocketmq.streams.window.fire;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
@@ -29,12 +33,6 @@ import org.apache.rocketmq.streams.window.model.WindowCache;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
public class SplitEventTimeManager {
protected static final Log LOG = LogFactory.getLog(SplitEventTimeManager.class);
protected static Map<String,Long> messageSplitId2MaxTime=new HashMap<>();
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/WindowFireManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/WindowFireManager.java
index f0c0620..a16779e 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/WindowFireManager.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/WindowFireManager.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.window.fire;
import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
public class WindowFireManager {
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java
index 3d67f0e..2803aba 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java
@@ -16,35 +16,32 @@
*/
package org.apache.rocketmq.streams.window.model;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
-import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.topology.model.IWindow;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.window.debug.DebugWriter;
import org.apache.rocketmq.streams.window.shuffle.ShuffleChannel;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-
/**
* 缓存数据,flush时,刷新完成数据落盘
*/
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
index 6346a61..57b39e1 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.streams.window.model;
-import java.awt.Window;
import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueManager.java
index aa2cccb..fdce661 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueManager.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueManager.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
import org.apache.rocketmq.streams.window.sqlcache.SQLCache;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
index f6560f1..bfb7176 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
@@ -24,12 +24,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.streams.common.context.MessageOffset;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.SQLUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.db.driver.DriverBuilder;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
import org.apache.rocketmq.streams.window.model.WindowInstance;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
index 9e51184..05d0fe8 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
@@ -21,8 +21,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
-import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.window.model.WindowInstance;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
index c45b412..141df8c 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.streams.window.operator;
+import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -26,58 +27,48 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
-
-import com.alibaba.fastjson.JSONObject;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
-import org.apache.rocketmq.streams.common.configure.StreamsConfigure;
-import org.apache.rocketmq.streams.common.context.Context;
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.rocketmq.streams.common.topology.ChainStage.PiplineRecieverAfterCurrentNode;
-import org.apache.rocketmq.streams.common.topology.stages.udf.IReducer;
-import org.apache.rocketmq.streams.common.utils.Base64Utils;
-import org.apache.rocketmq.streams.common.utils.InstantiationUtil;
-import org.apache.rocketmq.streams.common.utils.TraceUtil;
-import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
-import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
-import org.apache.rocketmq.streams.script.ScriptComponent;
-import org.apache.rocketmq.streams.window.debug.DebugWriter;
-import org.apache.rocketmq.streams.window.fire.EventTimeManager;
-import org.apache.rocketmq.streams.window.model.FunctionExecutor;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
-import org.apache.rocketmq.streams.window.model.WindowCache;
-import org.apache.rocketmq.streams.window.offset.IWindowMaxValueManager;
-import org.apache.rocketmq.streams.window.offset.WindowMaxValueManager;
-import org.apache.rocketmq.streams.window.source.WindowFireSource;
-import org.apache.rocketmq.streams.window.state.impl.WindowValue;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.context.MessageHeader;
import org.apache.rocketmq.streams.common.topology.ChainStage;
+import org.apache.rocketmq.streams.common.topology.ChainStage.PiplineRecieverAfterCurrentNode;
import org.apache.rocketmq.streams.common.topology.builder.IStageBuilder;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
-import org.apache.rocketmq.streams.common.topology.stages.WindowChainStage;
import org.apache.rocketmq.streams.common.topology.model.IWindow;
+import org.apache.rocketmq.streams.common.topology.stages.WindowChainStage;
+import org.apache.rocketmq.streams.common.topology.stages.udf.IReducer;
+import org.apache.rocketmq.streams.common.utils.Base64Utils;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.DateUtil;
+import org.apache.rocketmq.streams.common.utils.InstantiationUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.script.parser.imp.FunctionParser;
+import org.apache.rocketmq.streams.common.utils.TraceUtil;
+import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
+import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
+import org.apache.rocketmq.streams.script.operator.expression.ScriptExpression;
import org.apache.rocketmq.streams.script.operator.impl.AggregationScript;
import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
+import org.apache.rocketmq.streams.script.parser.imp.FunctionParser;
+import org.apache.rocketmq.streams.script.service.IAccumulator;
import org.apache.rocketmq.streams.script.service.IScriptExpression;
import org.apache.rocketmq.streams.script.service.IScriptParamter;
-import org.apache.rocketmq.streams.script.operator.expression.ScriptExpression;
-import org.apache.rocketmq.streams.script.service.IAccumulator;
+import org.apache.rocketmq.streams.window.debug.DebugWriter;
+import org.apache.rocketmq.streams.window.fire.EventTimeManager;
+import org.apache.rocketmq.streams.window.model.FunctionExecutor;
+import org.apache.rocketmq.streams.window.model.WindowCache;
+import org.apache.rocketmq.streams.window.model.WindowInstance;
+import org.apache.rocketmq.streams.window.offset.IWindowMaxValueManager;
+import org.apache.rocketmq.streams.window.offset.WindowMaxValueManager;
+import org.apache.rocketmq.streams.window.source.WindowFireSource;
import org.apache.rocketmq.streams.window.sqlcache.SQLCache;
+import org.apache.rocketmq.streams.window.state.impl.WindowValue;
import org.apache.rocketmq.streams.window.storage.WindowStorage;
-import org.python.antlr.ast.Str;
-
-import static java.util.concurrent.CompletableFuture.supplyAsync;
/**
* window definition in the pipeline, created by user's configure in WindowChainStage
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/ShuffleOverWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/ShuffleOverWindow.java
index 1bfa639..90ee4c1 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/ShuffleOverWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/ShuffleOverWindow.java
@@ -16,17 +16,13 @@
*/
package org.apache.rocketmq.streams.window.operator.impl;
-import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
-import org.apache.rocketmq.streams.window.operator.AbstractWindow;
import org.apache.rocketmq.streams.window.state.impl.WindowValue;
public class ShuffleOverWindow extends WindowOperator implements IAfterConfigurableRefreshListener {
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
index 0387f57..391a6d1 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
@@ -16,11 +16,20 @@
*/
package org.apache.rocketmq.streams.window.operator.impl;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.MessageOffset;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.db.driver.batchloader.IRowOperator;
@@ -36,18 +45,6 @@ import org.apache.rocketmq.streams.window.storage.IWindowStorage;
import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;
import org.apache.rocketmq.streams.window.storage.WindowStorage.WindowBaseValueIterator;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
public class WindowOperator extends AbstractShuffleWindow {
private static final String ORDER_BY_SPLIT_NUM="_order_by_split_num_";//key=_order;queueid,windowinstanceid,partitionNum
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/DBOperator.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/DBOperator.java
index b054583..5bb1ead 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/DBOperator.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/DBOperator.java
@@ -18,8 +18,6 @@ package org.apache.rocketmq.streams.window.operator.join;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.parser.ParserConfig;
-import com.alibaba.fastjson.util.TypeUtils;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Date;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
index f75fcea..89aee20 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
@@ -26,7 +26,6 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
-import org.apache.rocketmq.streams.common.utils.TraceUtil;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.Context;
import org.apache.rocketmq.streams.common.context.IMessage;
@@ -35,6 +34,7 @@ import org.apache.rocketmq.streams.common.context.MessageHeader;
import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.rocketmq.streams.common.utils.TraceUtil;
import org.apache.rocketmq.streams.dim.model.AbstractDim;
import org.apache.rocketmq.streams.window.model.WindowCache;
import org.apache.rocketmq.streams.window.model.WindowInstance;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index afc4920..cad2c3b 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -18,14 +18,19 @@ package org.apache.rocketmq.streams.window.shuffle;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
-import org.apache.commons.lang3.StringUtils;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
@@ -36,34 +41,28 @@ import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointState;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.common.context.AbstractContext;
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.context.MessageOffset;
import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
import org.apache.rocketmq.streams.common.topology.model.Pipeline;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.DateUtil;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.common.utils.TraceUtil;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
import org.apache.rocketmq.streams.window.debug.DebugWriter;
+import org.apache.rocketmq.streams.window.model.WindowCache;
+import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.common.context.AbstractContext;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
-import org.apache.rocketmq.streams.window.model.WindowCache;
-import org.apache.rocketmq.streams.window.operator.impl.WindowOperator;
import org.apache.rocketmq.streams.window.operator.impl.WindowOperator.WindowRowOperator;
import org.apache.rocketmq.streams.window.sqlcache.impl.SQLElement;
import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
import static org.apache.rocketmq.streams.window.model.WindowCache.ORIGIN_MESSAGE_TRACE_ID;
/**
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/ISQLElement.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/ISQLElement.java
index 2f335b9..7117e2e 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/ISQLElement.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/ISQLElement.java
@@ -17,8 +17,6 @@
package org.apache.rocketmq.streams.window.sqlcache;
-import java.util.List;
-
public interface ISQLElement {
boolean isWindowInstanceSQL();
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
index ef60fe3..9f5cf70 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache;
import org.apache.rocketmq.streams.db.driver.DriverBuilder;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
index 3ba1876..a738c48 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.streams.window.state.impl;
-import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.io.Serializable;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/IWindowStorage.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/IWindowStorage.java
index 216ed71..13d061e 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/IWindowStorage.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/IWindowStorage.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.streams.window.storage;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.db.driver.batchloader.IRowOperator;
import org.apache.rocketmq.streams.window.model.WindowInstance;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/WindowStorage.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/WindowStorage.java
index 712c028..874d2bf 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/WindowStorage.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/WindowStorage.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/db/DBStorage.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/db/DBStorage.java
index dfdf261..f47d309 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/db/DBStorage.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/db/DBStorage.java
@@ -32,7 +32,6 @@ import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.state.WindowBaseValue;
-import org.apache.rocketmq.streams.window.state.impl.WindowValue;
import org.apache.rocketmq.streams.window.storage.AbstractWindowStorage;
import org.apache.rocketmq.streams.window.storage.IRemoteStorage;
import org.apache.rocketmq.streams.window.storage.WindowStorage.WindowBaseValueIterator;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/file/FileStorage.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/file/FileStorage.java
index bba64a8..ffec393 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/file/FileStorage.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/file/FileStorage.java
@@ -20,12 +20,10 @@ import com.alibaba.fastjson.JSONArray;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.utils.FileUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/rocksdb/RocksdbStorage.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/rocksdb/RocksdbStorage.java
index 9739f93..528894c 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/rocksdb/RocksdbStorage.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/rocksdb/RocksdbStorage.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.window.storage.rocksdb;
import com.alibaba.fastjson.JSONArray;
-import java.io.File;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
@@ -31,22 +30,18 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.ReflectUtil;
-import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.state.kv.rocksdb.RocksDBOperator;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.state.WindowBaseValue;
import org.apache.rocketmq.streams.window.storage.AbstractWindowStorage;
import org.apache.rocketmq.streams.window.storage.WindowStorage.WindowBaseValueIterator;
-import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
-import org.rocksdb.TtlDB;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;