You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by se...@apache.org on 2021/12/09 09:30:15 UTC

[rocketmq-streams] branch main updated: fix check failed issue

This is an automated email from the ASF dual-hosted git repository.

seraph pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new 8057d72  fix check failed issue
     new cc8b9db  Merge pull request #102 from duhenglucky/main_check
8057d72 is described below

commit 8057d72beaa60ac5c52bf9c1a1293c0b1bf48a82
Author: duhenglucky <du...@apache.org>
AuthorDate: Thu Dec 9 17:25:18 2021 +0800

    fix check failed issue
---
 README-chinese.md                                  | 34 ++++++-------
 .../apache/rocketmq/streams/db/sink/DBSink.java    |  2 -
 .../rocketmq/streams/db/sink/EnhanceDBSink.java    |  9 ++--
 .../sqltemplate/MysqlInsertIntoSqlTemplate.java    |  5 +-
 ...MysqlInsertIntoWithDuplicateKeySqlTemplate.java |  5 +-
 .../db/sink/sqltemplate/SqlTemplateFactory.java    |  3 +-
 .../streams/db/sink/db/ISqlTemplateTest.java       |  5 +-
 .../rocketmq/streams/es/sink/ESSinkBuilder.java    |  3 +-
 .../streams/es/sink/ESSinkOnlyChannel.java         |  9 ++--
 .../rocketmq/streams/db/sink/es/EsChannelTest.java |  4 +-
 .../rocketmq/streams/RocketMQChannelBuilder.java   |  2 -
 .../apache/rocketmq/streams/RocketMQOffset.java    |  2 +-
 .../rocketmq/streams/source/RocketMQSource.java    | 21 ++++-----
 .../rocketmq/streams/syslog/ISyslogRouter.java     |  2 -
 .../rocketmq/streams/syslog/SyslogChannel.java     | 12 ++---
 .../streams/syslog/SyslogChannelManager.java       |  1 -
 .../rocketmq/streams/syslog/SyslogParser.java      |  3 +-
 .../rocketmq/streams/syslog/SyslogServer.java      | 12 ++---
 .../rocketmq/streams/syslog/SyslogClient.java      | 11 ++---
 .../streams/checkpoint/db/DBCheckPointStorage.java |  5 +-
 .../streams/client/ScheduledStreamBuilder.java     |  8 ++--
 .../rocketmq/streams/client/ScheduledTask.java     |  3 +-
 .../streams/client/source/DataStreamSource.java    |  5 +-
 .../streams/client/strategy/WindowStrategy.java    |  1 -
 .../streams/client/transform/OverWindowStream.java |  2 -
 .../apache/rocketmq/streams/client/DBSinkTest.java |  9 ++--
 .../rocketmq/streams/client/DataStreamTest.java    | 14 +++---
 .../apache/rocketmq/streams/client/FileTest.java   | 35 +++++++-------
 .../rocketmq/streams/client/OnewayProducer.java    |  1 -
 .../apache/rocketmq/streams/client/SinkTest.java   |  2 -
 .../apache/rocketmq/streams/client/SourceTest.java |  1 -
 .../apache/rocketmq/streams/client/SplitTest.java  |  2 -
 .../apache/rocketmq/streams/client/WindowTest.java | 55 ----------------------
 .../streams/client/sink/UDFDefinedSQLParser.java   |  3 +-
 .../streams/client/sink/UserDefinedSink.java       | 10 ----
 .../client/sink/UserDefinedSupportShuffleSink.java |  1 -
 .../streams/client/windows/AbstractWindowTest.java | 19 ++++----
 .../streams/client/windows/MultiSplitTest.java     | 24 ----------
 .../client/windows/ShuffleOverWindowTest.java      |  6 ---
 .../streams/client/windows/SingleSplitTest.java    | 20 --------
 .../streams/client/windows/WindowDebugTest.java    | 15 +-----
 .../streams/common/cache/ByteArrayMemoryTable.java |  2 -
 .../streams/common/cache/ListMemoryTable.java      |  7 ---
 .../common/cache/compress/AbstractMemoryTable.java |  1 -
 .../common/cache/compress/impl/AbstractListKV.java |  2 -
 .../common/cache/compress/impl/IntListKV.java      |  2 -
 .../common/cache/compress/impl/LongListKV.java     |  2 -
 .../AbstractSupportShuffleChannelBuilder.java      |  2 -
 .../common/channel/impl/CollectionSink.java        |  5 +-
 .../common/channel/impl/CollectionSinkBuilder.java |  3 +-
 .../common/channel/impl/CollectionSource.java      |  7 ++-
 .../common/channel/impl/OutputPrintChannel.java    |  3 --
 .../common/channel/impl/memory/MemoryChannel.java  |  1 -
 .../channel/impl/mutiltask/MutilTaskSink.java      |  3 --
 .../impl/transit/TransitChannelBuilder.java        |  1 -
 .../streams/common/channel/sink/AbstractSink.java  | 13 +++--
 .../sink/AbstractSupportShuffleUDFSink.java        |  5 --
 .../impl/AbstractMultiSplitMessageCache.java       |  2 -
 .../channel/sinkcache/impl/MessageCache.java       |  1 -
 .../common/channel/source/AbstractSource.java      | 15 +++---
 .../checkpoint/AbstractCheckPointStorage.java      |  8 +++-
 .../streams/common/checkpoint/CheckPoint.java      |  1 -
 .../common/checkpoint/CheckPointManager.java       |  9 ++--
 .../checkpoint/CheckPointStorageFactory.java       |  5 +-
 .../common/checkpoint/ICheckPointStorage.java      |  4 +-
 .../streams/common/checkpoint/SourceSnapShot.java  |  3 +-
 .../streams/common/checkpoint/SourceState.java     |  3 +-
 .../streams/common/component/ComponentCreator.java |  1 -
 .../streams/common/datatype/DateDataType.java      |  7 ++-
 .../interfaces/IBatchMessageFinishNotify.java      |  1 -
 .../streams/common/metadata/MetaDataUtils.java     | 15 ++++--
 .../optimization/fingerprint/FingerprintCache.java |  2 -
 .../optimization/fingerprint/PreFingerprint.java   |  2 -
 .../streams/common/schedule/ScheduleManager.java   |  1 -
 .../streams/common/topology/ChainPipeline.java     |  3 --
 .../common/topology/builder/PipelineBuilder.java   | 14 +++---
 .../streams/common/topology/model/IWindow.java     |  2 -
 .../topology/stages/AbstractWindowStage.java       |  1 -
 .../common/topology/stages/OutputChainStage.java   |  1 -
 .../common/topology/stages/udf/StageBuilder.java   |  3 +-
 .../streams/common/topology/task/StreamsTask.java  |  1 -
 .../streams/common/utils/DataTypeUtil.java         |  9 ++--
 .../rocketmq/streams/common/utils/MapKeyUtil.java  |  7 ++-
 .../configurable/ConfigurableComponent.java        |  1 -
 .../AbstractSupportParentConfigureService.java     |  1 -
 .../service/ConfigurableServiceFactory.java        |  1 -
 .../service/impl/FileConfigureService.java         |  2 -
 .../connectors/balance/AbstractBalance.java        |  1 -
 .../streams/connectors/balance/SplitChanged.java   |  1 -
 .../connectors/balance/impl/LeaseBalanceImpl.java  | 13 +++--
 .../streams/connectors/model/ReaderStatus.java     |  5 +-
 .../streams/connectors/reader/DBScanReader.java    | 10 ++--
 .../streams/connectors/reader/ISplitReader.java    |  2 +-
 .../connectors/reader/SplitCloseFuture.java        |  1 -
 .../connectors/source/AbstractPullSource.java      | 12 ++---
 .../source/CycleDynamicMultipleDBScanSource.java   | 12 +++--
 .../source/DynamicMultipleDBScanSource.java        | 13 +++--
 .../streams/connectors/source/IPullSource.java     |  1 -
 .../source/filter/BoundedPatternFilter.java        |  3 +-
 .../connectors/source/filter/CyclePeriod.java      |  5 +-
 .../connectors/source/filter/CycleSchedule.java    |  3 +-
 .../source/filter/DataFormatPatternFilter.java     |  6 +--
 .../streams/dim/builder/DBDimSQLParser.java        |  2 -
 .../streams/dim/builder/SQLParserFactory.java      |  1 -
 .../rocketmq/streams/dim/index/DimIndex.java       |  1 -
 .../rocketmq/streams/dim/index/IndexExecutor.java  |  3 --
 .../intelligence/AbstractIntelligenceCache.java    |  1 -
 .../rocketmq/streams/dim/model/AbstractDim.java    |  2 -
 .../apache/rocketmq/streams/dim/model/DBDim.java   |  2 -
 .../apache/rocketmq/streams/dim/model/FileDim.java |  2 -
 .../java/com/aliyun/service/TableCompressTest.java |  2 -
 .../mutilconsumer/MutilStreamsClientTest.java      |  7 ++-
 .../streams/examples/mutilconsumer/Producer.java   |  5 +-
 .../examples/rocketmqsource/ProducerFromFile.java  |  9 ++--
 .../rocketmqsource/RocketMQSourceExample2.java     |  3 +-
 .../rocketmqsource/RocketMQSourceExample3.java     |  5 +-
 .../rocketmq/streams/filter/FilterComponent.java   |  1 -
 .../streams/filter/builder/ExpressionBuilder.java  |  1 -
 .../streams/filter/context/RuleContext.java        |  1 -
 .../filter/engine/impl/DefaultRuleEngine.java      | 42 +----------------
 .../function/script/CaseDependentParser.java       |  1 -
 .../filter/function/script/CaseFunction.java       |  3 --
 .../rocketmq/streams/filter/operator/Rule.java     |  1 -
 .../streams/filter/operator/RuleExpression.java    |  1 -
 .../filter/operator/expression/Expression.java     |  1 -
 .../streams/filter/operator/var/InnerVar.java      |  1 -
 .../optimization/casewhen/CaseWhenBuilder.java     |  2 -
 .../optimization/casewhen/GroupByVarCaseWhen.java  |  1 -
 .../filter/optimization/dependency/BlinkRule.java  |  2 -
 .../dependency/BlinkRuleV2Expression.java          |  6 ---
 .../optimization/dependency/CommonExpression.java  |  1 -
 .../optimization/dependency/DependencyTree.java    |  4 --
 .../optimization/dependency/FilterTreeNode.java    |  6 ---
 .../optimization/dependency/PipelineTree.java      |  1 -
 .../optimization/dependency/ScriptTreeNode.java    |  6 ---
 .../homologous/HomologousOptimization.java         |  3 --
 .../optimization/script/ScriptOptimization.java    |  4 --
 .../filter/service/impl/RuleEngineServiceImpl.java |  1 -
 .../lease/service/impl/BasedLesaseImpl.java        |  1 -
 .../lease/service/storages/DBLeaseStorage.java     |  1 -
 .../streams/script/context/FunctionContext.java    |  2 -
 .../aggregation/FirstValueAccumulator.java         |  4 --
 .../function/impl/between/BetweenFunction.java     |  1 -
 .../function/impl/condition/IFScopeFunction.java   |  1 -
 .../function/impl/context/ContextFunction.java     |  2 -
 .../script/function/impl/date/GetDateFunction.java |  1 -
 .../script/function/impl/eval/EvalFunction.java    |  1 -
 .../function/impl/flatmap/SplitArrayFunction.java  |  2 -
 .../script/function/impl/item/ItemFunction.java    |  4 --
 .../script/function/impl/relation/AndFunction.java |  6 ---
 .../operator/expression/GroupScriptExpression.java |  1 -
 .../operator/expression/ICaseDependentParser.java  |  2 +-
 .../operator/expression/ScriptExpression.java      |  4 --
 .../script/operator/impl/FunctionScript.java       |  6 +--
 .../streams/script/utils/ExpressionUtil.java       |  1 -
 .../streams/script/utils/FunctionUtils.java        |  2 -
 .../streams/script/function/UDTFFunctionTest.java  |  5 +-
 .../rocketmq/streams/state/AbstractState.java      |  1 -
 .../streams/state/kv/rocksdb/RocksDBOperator.java  |  1 -
 .../streams/window/debug/DebugAnalysis.java        |  8 ----
 .../rocketmq/streams/window/debug/DebugWriter.java |  2 -
 .../rocketmq/streams/window/debug/WindowDebug.java |  2 -
 .../streams/window/fire/EventTimeManager.java      |  1 -
 .../streams/window/fire/SplitEventTimeManager.java | 10 ++--
 .../streams/window/fire/WindowFireManager.java     |  1 -
 .../rocketmq/streams/window/model/WindowCache.java | 17 +++----
 .../streams/window/model/WindowInstance.java       |  1 -
 .../window/offset/WindowMaxValueManager.java       |  1 -
 .../window/offset/WindowMaxValueProcessor.java     |  2 -
 .../window/operator/AbstractShuffleWindow.java     |  2 -
 .../streams/window/operator/AbstractWindow.java    | 53 +++++++++------------
 .../window/operator/impl/ShuffleOverWindow.java    |  4 --
 .../window/operator/impl/WindowOperator.java       | 23 ++++-----
 .../streams/window/operator/join/DBOperator.java   |  2 -
 .../streams/window/operator/join/JoinWindow.java   |  2 +-
 .../streams/window/shuffle/ShuffleChannel.java     | 35 +++++++-------
 .../streams/window/sqlcache/ISQLElement.java       |  2 -
 .../rocketmq/streams/window/sqlcache/SQLCache.java |  1 -
 .../streams/window/state/impl/WindowValue.java     |  1 -
 .../streams/window/storage/IWindowStorage.java     |  1 -
 .../streams/window/storage/WindowStorage.java      |  1 -
 .../streams/window/storage/db/DBStorage.java       |  1 -
 .../streams/window/storage/file/FileStorage.java   |  2 -
 .../window/storage/rocksdb/RocksdbStorage.java     |  5 --
 184 files changed, 294 insertions(+), 729 deletions(-)

diff --git a/README-chinese.md b/README-chinese.md
index cfa6e99..8daa91a 100644
--- a/README-chinese.md
+++ b/README-chinese.md
@@ -47,15 +47,15 @@ StreamBuilder 用于构建流任务的源; 内部包含```dataStream()```和``
 DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据;
 
 + ```fromFile```  从文件中读取数据, 该方法包含俩个参数
-  + ```filePath``` 文件路径,必填参数
-  + ```isJsonData```  是否json数据, 非必填参数, 默认为```true```
+    + ```filePath``` 文件路径,必填参数
+    + ```isJsonData```  是否json数据, 非必填参数, 默认为```true```
 
 
 + ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
-  + ```topic``` rocketmq消息队列的topic名称,必填参数
-  + ```groupName``` 消费者组的名称,必填参数
-  + ```isJson``` 是否json格式,非必填参数
-  + ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
+    + ```topic``` rocketmq消息队列的topic名称,必填参数
+    + ```groupName``` 消费者组的名称,必填参数
+    + ```isJson``` 是否json格式,非必填参数
+    + ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
 
 + ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源
 
@@ -80,12 +80,12 @@ DataStream实现了一系列常见的流计算算子
 + ```toRocketmq``` 将结果输出到rocketmq
 + ```to``` 将结果经过自定义的ISink接口输出到指定的存储
 + ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
-  + ```count``` 在窗口内计数
-  + ```min``` 获取窗口内统计值的最小值
-  + ```max``` 获取窗口内统计值得最大值
-  + ```avg``` 获取窗口内统计值的平均值
-  + ```sum``` 获取窗口内统计值的加和值
-  + ```reduce``` 在窗口内进行自定义的汇总运算
+    + ```count``` 在窗口内计数
+    + ```min``` 获取窗口内统计值的最小值
+    + ```max``` 获取窗口内统计值得最大值
+    + ```avg``` 获取窗口内统计值的平均值
+    + ```sum``` 获取窗口内统计值的加和值
+    + ```reduce``` 在窗口内进行自定义的汇总运算
 + ```join``` 根据条件将将俩个流进行关联, 合并为一个大流进行相关的运算
 + ```union``` 将俩个流进行合并
 + ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
@@ -115,11 +115,11 @@ Rocketmq-Streams 核心就是一个独立的jar包, 用户可以在自己的
 1. 通过```mvn clean install``` 构建工程
 2. 从```rocketmq-streams-runner/target/rocket-streams-1.0.0-SNAPSHOT-distribution.tar.gz``` 中获取tar.gz包, 并解压
 3. ```rocketmq-streams```目录架构如下:
-   + ```bin```  指令目录,包括启动和停止指令
-   + ```conf```  配置目录,包括日志配置以及应用的相关配置文件
-   + ```jobs```  任务目录, 独立打包后的rocketmq-streams jar包
-   + ```lib```  依赖包
-   + ```log```  日志目录
+    + ```bin```  指令目录,包括启动和停止指令
+    + ```conf```  配置目录,包括日志配置以及应用的相关配置文件
+    + ```jobs```  任务目录, 独立打包后的rocketmq-streams jar包
+    + ```lib```  依赖包
+    + ```log```  日志目录
 
 ### 发布应用
 用户依赖rocketmq-streams,开发流处理程序,独立打包后, 将jar包拷贝到jobs目录, 通过指令即可完成任务的启动和运行;可以通过启动多个独立的应用程序;
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
index b60bb68..485987d 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
@@ -23,9 +23,7 @@ import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Set;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.channel.IChannel;
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/EnhanceDBSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/EnhanceDBSink.java
index 7680279..d789703 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/EnhanceDBSink.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/EnhanceDBSink.java
@@ -17,6 +17,10 @@
 package org.apache.rocketmq.streams.db.sink;
 
 import com.alibaba.fastjson.JSONObject;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.channel.IChannel;
@@ -38,11 +42,6 @@ import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
 import org.apache.rocketmq.streams.db.sink.sqltemplate.ISqlTemplate;
 import org.apache.rocketmq.streams.db.sink.sqltemplate.SqlTemplateFactory;
 
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
 /**
  * @description enhance db sink, support atomic sink and multiple sink
  */
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoSqlTemplate.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoSqlTemplate.java
index 4e61bba..d352fda 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoSqlTemplate.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoSqlTemplate.java
@@ -16,11 +16,10 @@
  */
 package org.apache.rocketmq.streams.db.sink.sqltemplate;
 
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.utils.SQLUtil;
-
 import java.util.List;
 import java.util.Map;
+import org.apache.rocketmq.streams.common.metadata.MetaData;
+import org.apache.rocketmq.streams.common.utils.SQLUtil;
 
 /**
  * @description create insert into sql
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoWithDuplicateKeySqlTemplate.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoWithDuplicateKeySqlTemplate.java
index c091c53..c0150af 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoWithDuplicateKeySqlTemplate.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/MysqlInsertIntoWithDuplicateKeySqlTemplate.java
@@ -16,11 +16,10 @@
  */
 package org.apache.rocketmq.streams.db.sink.sqltemplate;
 
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.utils.SQLUtil;
-
 import java.util.List;
 import java.util.Map;
+import org.apache.rocketmq.streams.common.metadata.MetaData;
+import org.apache.rocketmq.streams.common.utils.SQLUtil;
 
 /**
  * @author zengyu.cw
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/SqlTemplateFactory.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/SqlTemplateFactory.java
index 9c4435f..8837c9c 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/SqlTemplateFactory.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/sqltemplate/SqlTemplateFactory.java
@@ -16,9 +16,8 @@
  */
 package org.apache.rocketmq.streams.db.sink.sqltemplate;
 
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-
 import java.util.Arrays;
+import org.apache.rocketmq.streams.common.metadata.MetaData;
 
 /**
  * @description
diff --git a/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/ISqlTemplateTest.java b/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/ISqlTemplateTest.java
index 45d849c..4109149 100644
--- a/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/ISqlTemplateTest.java
+++ b/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/ISqlTemplateTest.java
@@ -17,15 +17,14 @@
 package org.apache.rocketmq.streams.db.sink.db;
 
 import com.alibaba.fastjson.JSONObject;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.rocketmq.streams.common.metadata.MetaData;
 import org.apache.rocketmq.streams.db.sink.sqltemplate.MysqlInsertIgnoreIntoSqlTemplate;
 import org.apache.rocketmq.streams.db.sink.sqltemplate.MysqlInsertIntoSqlTemplate;
 import org.apache.rocketmq.streams.db.sink.sqltemplate.MysqlInsertIntoWithDuplicateKeySqlTemplate;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * @description
  */
diff --git a/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkBuilder.java b/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkBuilder.java
index dd3be8e..dec19f9 100644
--- a/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkBuilder.java
+++ b/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkBuilder.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.streams.es.sink;
 
 import com.alibaba.fastjson.JSONObject;
 import com.google.auto.service.AutoService;
+import java.util.Properties;
 import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
 import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
 import org.apache.rocketmq.streams.common.channel.sink.ISink;
@@ -26,8 +27,6 @@ import org.apache.rocketmq.streams.common.metadata.MetaData;
 import org.apache.rocketmq.streams.common.model.ServiceName;
 import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
 
-import java.util.Properties;
-
 @AutoService(IChannelBuilder.class)
 @ServiceName(value = ESSinkBuilder.TYPE, aliasName = "elasticsearch")
 public class ESSinkBuilder extends AbstractSupportShuffleChannelBuilder {
diff --git a/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkOnlyChannel.java b/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkOnlyChannel.java
index 488dcf5..f4346e4 100644
--- a/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkOnlyChannel.java
+++ b/rocketmq-streams-channel-es/src/main/java/org/apache/rocketmq/streams/es/sink/ESSinkOnlyChannel.java
@@ -16,6 +16,10 @@
  */
 package org.apache.rocketmq.streams.es.sink;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.http.HttpHost;
@@ -34,11 +38,6 @@ import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestHighLevelClient;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 public class ESSinkOnlyChannel extends AbstractSink {
     private static final Log LOG = LogFactory.getLog(ESSinkOnlyChannel.class);
 
diff --git a/rocketmq-streams-channel-es/src/test/java/org/apache/rocketmq/streams/db/sink/es/EsChannelTest.java b/rocketmq-streams-channel-es/src/test/java/org/apache/rocketmq/streams/db/sink/es/EsChannelTest.java
index 6e59a19..8f09c4a 100644
--- a/rocketmq-streams-channel-es/src/test/java/org/apache/rocketmq/streams/db/sink/es/EsChannelTest.java
+++ b/rocketmq-streams-channel-es/src/test/java/org/apache/rocketmq/streams/db/sink/es/EsChannelTest.java
@@ -16,11 +16,9 @@
  */
 package org.apache.rocketmq.streams.db.sink.es;
 
+import com.alibaba.fastjson.JSONObject;
 import java.util.ArrayList;
 import java.util.List;
-
-import com.alibaba.fastjson.JSONObject;
-
 import org.apache.rocketmq.streams.common.channel.sink.ISink;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.context.Message;
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java
index cd91f64..71fe8b1 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java
@@ -19,9 +19,7 @@ package org.apache.rocketmq.streams;
 
 import com.alibaba.fastjson.JSONObject;
 import com.google.auto.service.AutoService;
-
 import java.util.Properties;
-
 import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
 import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
 import org.apache.rocketmq.streams.common.channel.sink.ISink;
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQOffset.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQOffset.java
index b82af06..eabd543 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQOffset.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQOffset.java
@@ -20,8 +20,8 @@ package org.apache.rocketmq.streams;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
index 662f215..395161c 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
@@ -18,6 +18,16 @@
 package org.apache.rocketmq.streams.source;
 
 import com.alibaba.fastjson.JSONObject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
@@ -49,17 +59,6 @@ import org.apache.rocketmq.streams.debug.DebugWriter;
 import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
 public class RocketMQSource extends AbstractSupportShuffleSource {
 
     protected static final Log LOG = LogFactory.getLog(RocketMQSource.class);
diff --git a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/ISyslogRouter.java b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/ISyslogRouter.java
index 3160416..7ac8660 100644
--- a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/ISyslogRouter.java
+++ b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/ISyslogRouter.java
@@ -16,8 +16,6 @@
  */
 package org.apache.rocketmq.streams.syslog;
 
-import org.apache.rocketmq.streams.common.channel.IChannel;
-
 public interface ISyslogRouter {
 
     /**
diff --git a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannel.java b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannel.java
index 84efe2a..e4e176c 100644
--- a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannel.java
+++ b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannel.java
@@ -16,14 +16,16 @@
  */
 package org.apache.rocketmq.streams.syslog;
 
+import com.alibaba.fastjson.JSONObject;
 import java.net.URLDecoder;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-
-import com.alibaba.fastjson.JSONObject;
-
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.channel.AbstractChannel;
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
 import org.apache.rocketmq.streams.common.channel.sink.ISink;
@@ -36,10 +38,6 @@ import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.context.MessageOffset;
 import org.apache.rocketmq.streams.common.utils.IPUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.graylog2.syslog4j.Syslog;
 import org.graylog2.syslog4j.SyslogConfigIF;
 
diff --git a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannelManager.java b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannelManager.java
index e0fe2bf..13a52ef 100644
--- a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannelManager.java
+++ b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogChannelManager.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.streams.syslog;
 
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.rocketmq.streams.common.component.ComponentCreator;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.graylog2.syslog4j.SyslogConstants;
diff --git a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogParser.java b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogParser.java
index 183087b..47968e8 100644
--- a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogParser.java
+++ b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogParser.java
@@ -16,11 +16,10 @@
  */
 package org.apache.rocketmq.streams.syslog;
 
+import java.util.Date;
 import org.apache.rocketmq.streams.common.utils.DateUtil;
 import org.apache.rocketmq.streams.common.utils.IPUtil;
 
-import java.util.Date;
-
 public class SyslogParser {
 
     /**
diff --git a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogServer.java b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogServer.java
index 7536edf..adf291c 100644
--- a/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogServer.java
+++ b/rocketmq-streams-channel-syslog/src/main/java/org/apache/rocketmq/streams/syslog/SyslogServer.java
@@ -16,26 +16,24 @@
  */
 package org.apache.rocketmq.streams.syslog;
 
+import com.alibaba.fastjson.JSONObject;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-
-import com.alibaba.fastjson.JSONObject;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.cache.softreference.impl.SoftReferenceCache;
 import org.apache.rocketmq.streams.common.channel.source.AbstractUnreliableSource;
 import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
-import org.apache.rocketmq.streams.common.context.UserDefinedMessage;
-import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.context.UserDefinedMessage;
+import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
 import org.apache.rocketmq.streams.common.utils.DateUtil;
 import org.apache.rocketmq.streams.common.utils.IPUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.graylog2.syslog4j.server.SyslogServerConfigIF;
 import org.graylog2.syslog4j.server.SyslogServerEventIF;
 import org.graylog2.syslog4j.server.SyslogServerIF;
diff --git a/rocketmq-streams-channel-syslog/src/test/java/org/apache/rocketmq/streams/syslog/SyslogClient.java b/rocketmq-streams-channel-syslog/src/test/java/org/apache/rocketmq/streams/syslog/SyslogClient.java
index 2aab534..31b6e5c 100644
--- a/rocketmq-streams-channel-syslog/src/test/java/org/apache/rocketmq/streams/syslog/SyslogClient.java
+++ b/rocketmq-streams-channel-syslog/src/test/java/org/apache/rocketmq/streams/syslog/SyslogClient.java
@@ -16,16 +16,13 @@
  */
 package org.apache.rocketmq.streams.syslog;
 
-import java.util.Date;
-
 import com.alibaba.fastjson.JSONObject;
-
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.rocketmq.streams.syslog.SyslogChannel;
-import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
+import java.util.Date;
+import org.apache.rocketmq.streams.common.channel.IChannel;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.channel.IChannel;
+import org.apache.rocketmq.streams.common.context.Message;
+import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
 import org.apache.rocketmq.streams.common.utils.DateUtil;
 import org.apache.rocketmq.streams.common.utils.IPUtil;
 import org.junit.Test;
diff --git a/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java b/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java
index acf538f..793dc13 100644
--- a/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java
+++ b/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java
@@ -16,19 +16,16 @@
  */
 package org.apache.rocketmq.streams.checkpoint.db;
 
-import com.google.auto.service.AutoService;
+import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.checkpoint.AbstractCheckPointStorage;
 import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
 import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
-import org.apache.rocketmq.streams.common.checkpoint.ICheckPointStorage;
 import org.apache.rocketmq.streams.common.checkpoint.SourceSnapShot;
 import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
 
-import java.util.List;
-
 /**
  * @description
  */
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/ScheduledStreamBuilder.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/ScheduledStreamBuilder.java
index e1b4f0c..ffa4f62 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/ScheduledStreamBuilder.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/ScheduledStreamBuilder.java
@@ -16,15 +16,15 @@
  */
 package org.apache.rocketmq.streams.client;
 
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.utils.ThreadUtil;
 import java.util.Arrays;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.utils.ThreadUtil;
 
 /**
  * @description
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/ScheduledTask.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/ScheduledTask.java
index 87ce9fc..4956fe2 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/ScheduledTask.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/ScheduledTask.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.streams.client;
 
+import java.util.Date;
 import org.apache.rocketmq.streams.client.source.DataStreamSource;
 import org.apache.rocketmq.streams.client.transform.DataStream;
 import org.apache.rocketmq.streams.common.component.ComponentCreator;
@@ -23,8 +24,6 @@ import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
 import org.apache.rocketmq.streams.connectors.source.filter.CycleSchedule;
 import org.apache.rocketmq.streams.db.sink.EnhanceDBSink;
 
-import java.util.Date;
-
 /**
  * @description
  */
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
index ab25d68..b5859da 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
@@ -34,8 +34,8 @@
 package org.apache.rocketmq.streams.client.source;
 
 import com.alibaba.fastjson.JSONObject;
-
 import com.google.common.collect.Sets;
+import java.util.Set;
 import org.apache.rocketmq.streams.client.transform.DataStream;
 import org.apache.rocketmq.streams.common.channel.impl.CollectionSource;
 import org.apache.rocketmq.streams.common.channel.impl.file.FileSource;
@@ -44,14 +44,11 @@ import org.apache.rocketmq.streams.common.channel.impl.memory.MemorySource;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.topology.ChainStage;
 import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
-
 import org.apache.rocketmq.streams.connectors.source.CycleDynamicMultipleDBScanSource;
 import org.apache.rocketmq.streams.connectors.source.DynamicMultipleDBScanSource;
 import org.apache.rocketmq.streams.connectors.source.filter.CycleSchedule;
 import org.apache.rocketmq.streams.source.RocketMQSource;
 
-import java.util.Set;
-
 public class DataStreamSource {
     protected PipelineBuilder mainPipelineBuilder;
     protected Set<PipelineBuilder> otherPipelineBuilders;
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
index ce42385..c111f0b 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.streams.client.strategy;
 import java.util.Properties;
 import org.apache.rocketmq.streams.common.component.AbstractComponent;
 import org.apache.rocketmq.streams.common.component.ComponentCreator;
-import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
 import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
 
 public class WindowStrategy implements Strategy {
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/OverWindowStream.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/OverWindowStream.java
index 48d5ab1..87832c1 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/OverWindowStream.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/OverWindowStream.java
@@ -21,8 +21,6 @@ import java.util.Set;
 import org.apache.rocketmq.streams.common.topology.ChainStage;
 import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
 import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.window.operator.impl.OverWindow;
 import org.apache.rocketmq.streams.window.operator.impl.ShuffleOverWindow;
 
 public class OverWindowStream {
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBSinkTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBSinkTest.java
index 4a319d5..775427a 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBSinkTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DBSinkTest.java
@@ -17,17 +17,16 @@
 package org.apache.rocketmq.streams.client;
 
 import com.alibaba.fastjson.JSONObject;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
 import org.apache.rocketmq.streams.client.source.DataStreamSource;
 import org.apache.rocketmq.streams.common.component.ComponentCreator;
 import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Random;
-
 /**
 /**
  * @description
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
index b63efdf..2484af5 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
@@ -18,7 +18,12 @@
 package org.apache.rocketmq.streams.client;
 
 import com.alibaba.fastjson.JSONObject;
-
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import org.apache.rocketmq.streams.client.source.DataStreamSource;
 import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
 import org.apache.rocketmq.streams.client.transform.window.Time;
@@ -28,13 +33,6 @@ import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.Serializable;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
 public class DataStreamTest implements Serializable {
 
     DataStreamSource dataStream;
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/FileTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/FileTest.java
index 6f77ae4..22d1aa9 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/FileTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/FileTest.java
@@ -22,41 +22,40 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.common.functions.FilterFunction;
 import org.apache.rocketmq.streams.common.utils.FileUtil;
 import org.junit.Test;
 
 public class FileTest {
     @Test
-    public void testFilter(){
-        DataStreamSource dataStream =DataStreamSource.create("namespace","name");
-        dataStream.fromFile("/tmp/file.txt",false)
-            .filter((message)->{
-                JSONObject jsonObject= JSON.parseObject((String)message);
-                if(Objects.nonNull(jsonObject)){
-                    int inFlow=jsonObject.getIntValue("InFlow");
-                    if(inFlow>2){
+    public void testFilter() {
+        DataStreamSource dataStream = DataStreamSource.create("namespace", "name");
+        dataStream.fromFile("/tmp/file.txt", false)
+            .filter((message) -> {
+                JSONObject jsonObject = JSON.parseObject((String) message);
+                if (Objects.nonNull(jsonObject)) {
+                    int inFlow = jsonObject.getIntValue("InFlow");
+                    if (inFlow > 2) {
                         return false;
-                    }else {
+                    } else {
                         return true;
                     }
                 }
                 return true;
 
-        }).toPrint(1)
+            }).toPrint(1)
             .start();
-        
+
     }
 
     @Test
-    public void testWriteFile(){
-        List<String> lines=new ArrayList<>();
-        for(int i=0;i<4;i++){
-            JSONObject jsonObject=new JSONObject();
-            jsonObject.put("InFlow",i);
+    public void testWriteFile() {
+        List<String> lines = new ArrayList<>();
+        for (int i = 0; i < 4; i++) {
+            JSONObject jsonObject = new JSONObject();
+            jsonObject.put("InFlow", i);
             lines.add(jsonObject.toJSONString());
         }
         lines.add("");
-        FileUtil.write("/tmp/file.txt",lines);
+        FileUtil.write("/tmp/file.txt", lines);
     }
 }
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/OnewayProducer.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/OnewayProducer.java
index f34c405..c9be0cc 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/OnewayProducer.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/OnewayProducer.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.streams.client;
 
 import java.io.BufferedReader;
 import java.io.FileReader;
-
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SinkTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SinkTest.java
index f16bb8c..7c8e181 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SinkTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SinkTest.java
@@ -17,8 +17,6 @@
 package org.apache.rocketmq.streams.client;
 
 import com.alibaba.fastjson.JSONObject;
-
-import com.google.gson.JsonObject;
 import org.apache.rocketmq.streams.common.channel.impl.file.FileSink;
 import org.apache.rocketmq.streams.common.context.Message;
 import org.junit.Test;
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SourceTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SourceTest.java
index ccc8589..97d0342 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SourceTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SourceTest.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.streams.client;
 
 import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.client.transform.DataStream;
 import org.apache.rocketmq.streams.common.channel.impl.file.FileSource;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SplitTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SplitTest.java
index 301043d..76597b5 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SplitTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SplitTest.java
@@ -27,8 +27,6 @@ import org.apache.rocketmq.streams.client.transform.window.Time;
 import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
 import org.apache.rocketmq.streams.common.functions.FilterFunction;
 import org.apache.rocketmq.streams.common.functions.FlatMapFunction;
-import org.apache.rocketmq.streams.common.functions.MapFunction;
-import org.apache.rocketmq.streams.common.functions.ReduceFunction;
 import org.apache.rocketmq.streams.common.functions.SplitFunction;
 import org.junit.Test;
 
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/WindowTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/WindowTest.java
index 164fa8e..c8baece 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/WindowTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/WindowTest.java
@@ -29,7 +29,6 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.rocketmq.streams.client.transform.window.SessionWindow;
 import org.apache.rocketmq.streams.client.transform.window.Time;
 import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
-import org.apache.rocketmq.streams.common.functions.ForEachFunction;
 import org.apache.rocketmq.streams.common.functions.MapFunction;
 import org.apache.rocketmq.streams.common.utils.DateUtil;
 import org.junit.Assert;
@@ -38,60 +37,6 @@ import org.junit.Test;
 public class WindowTest implements Serializable {
 
     @Test
-    public void testWindow() {
-        StreamBuilder.dataStream("namespace", "name")
-            .fromFile("/Users/duheng/project/opensource/sls_100.txt", false)
-            .map((MapFunction<JSONObject, String>) message -> JSONObject.parseObject(message))
-            .window(TumblingWindow.of(Time.seconds(5)))
-            .groupBy("ProjectName", "LogStore")
-            .setLocalStorageOnly(true)
-            .count("total")
-            .sum("OutFlow", "OutFlow")
-            .sum("InFlow", "InFlow")
-            .toDataSteam()
-            .forEach(new ForEachFunction<JSONObject>() {
-                protected int sum = 0;
-
-                @Override
-                public void foreach(JSONObject o) {
-                    int total = o.getInteger("total");
-                    sum = sum + total;
-                    o.put("sum(total)", sum);
-                }
-            }).toPrint().start();
-
-    }
-
-    //    @Test
-    //    public void testWindowFromMetaq() throws InterruptedException {
-    //        String topic = "TOPIC_DIPPER_SYSTEM_MSG_4";
-    //        StreamBuilder.dataStream("namespace", "name")
-    //            .fromFile("/Users/yuanxiaodong/chris/sls_100.txt", true)
-    //            .toRocketmq(topic)
-    //            .asyncStart();
-    //
-    //        StreamBuilder.dataStream("namespace", "name1")
-    //            .fromRocketmq(topic, "chris", true)
-    //            .window(TumblingWindow.of(Time.seconds(5)))
-    //            .groupby("ProjectName", "LogStore")
-    //            .setLocalStorageOnly(true)
-    //            .count("total")
-    //            .sum("OutFlow", "OutFlow")
-    //            .sum("InFlow", "inflow")
-    //            .toDataSteam()
-    //            .forEach(new ForEachFunction<JSONObject>() {
-    //                protected int sum = 0;
-    //
-    //                @Override
-    //                public void foreach(JSONObject o) {
-    //                    int total = o.getInteger("total");
-    //                    sum = sum + total;
-    //                    o.put("sum(total)", sum);
-    //                }
-    //            }).toPrint().start();
-    //    }
-
-    @Test
     public void testSession() {
         //dataset
         List<String> behaviorList = new ArrayList<>();
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UDFDefinedSQLParser.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UDFDefinedSQLParser.java
index fd631c3..ee074e6 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UDFDefinedSQLParser.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UDFDefinedSQLParser.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.streams.client.sink;
 
 import com.google.auto.service.AutoService;
 import org.apache.rocketmq.streams.common.channel.builder.AbstractChannelSQLSupportShuffleSQLParser;
-import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
 import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
 import org.apache.rocketmq.streams.common.channel.sink.ISink;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
@@ -27,7 +26,7 @@ import org.apache.rocketmq.streams.common.model.ServiceName;
 @AutoService(IChannelBuilder.class)
 @ServiceName(value = UDFDefinedSQLParser.TYPE, aliasName = "source_alias_name")
 public class UDFDefinedSQLParser extends AbstractChannelSQLSupportShuffleSQLParser {
-    public static final String TYPE="source_type";
+    public static final String TYPE = "source_type";
 
     @Override protected String getSourceClass() {
         return null;
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UserDefinedSink.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UserDefinedSink.java
index 5573a65..4a5ea8b 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UserDefinedSink.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UserDefinedSink.java
@@ -16,20 +16,10 @@
  */
 package org.apache.rocketmq.streams.client.sink;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
 import org.apache.rocketmq.streams.common.channel.sink.AbstractUDFSink;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
 
 public class UserDefinedSink extends AbstractUDFSink {
 
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UserDefinedSupportShuffleSink.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UserDefinedSupportShuffleSink.java
index 8f7d83a..6c64511 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UserDefinedSupportShuffleSink.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/sink/UserDefinedSupportShuffleSink.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.streams.client.sink;
 
 import java.util.List;
-import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleUDFSink;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.context.IMessage;
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java
index 3eef4f2..688b485 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java
@@ -18,16 +18,6 @@
 package org.apache.rocketmq.streams.client.windows;
 
 import com.alibaba.fastjson.JSONObject;
-import org.apache.rocketmq.streams.client.transform.DataStream;
-import org.apache.rocketmq.streams.client.transform.window.Time;
-import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
-import org.apache.rocketmq.streams.common.functions.ForEachFunction;
-import org.apache.rocketmq.streams.common.functions.MapFunction;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -38,6 +28,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.streams.client.transform.DataStream;
+import org.apache.rocketmq.streams.client.transform.window.Time;
+import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
+import org.apache.rocketmq.streams.common.functions.ForEachFunction;
+import org.apache.rocketmq.streams.common.functions.MapFunction;
+import org.apache.rocketmq.streams.common.utils.DateUtil;
+import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
 
 import static junit.framework.TestCase.assertTrue;
 
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/MultiSplitTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/MultiSplitTest.java
index 95b3c21..dd5f78a 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/MultiSplitTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/MultiSplitTest.java
@@ -17,35 +17,11 @@
 package org.apache.rocketmq.streams.client.windows;
 
 import com.alibaba.fastjson.JSONObject;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.Reader;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.rocketmq.streams.client.StreamBuilder;
 import org.apache.rocketmq.streams.client.transform.DataStream;
 import org.apache.rocketmq.streams.common.functions.MapFunction;
 import org.apache.rocketmq.streams.common.topology.model.IWindow;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.db.driver.batchloader.IRowOperator;
-import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
-import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.window.operator.impl.WindowOperator;
-import org.apache.rocketmq.streams.window.state.impl.WindowValue;
-import org.apache.rocketmq.streams.window.storage.WindowStorage;
 import org.junit.Test;
 
 public class MultiSplitTest extends SingleSplitTest {
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/ShuffleOverWindowTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/ShuffleOverWindowTest.java
index d5116de..fdc77e3 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/ShuffleOverWindowTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/ShuffleOverWindowTest.java
@@ -19,13 +19,7 @@ package org.apache.rocketmq.streams.client.windows;
 import com.alibaba.fastjson.JSONObject;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.streams.client.StreamBuilder;
-import org.apache.rocketmq.streams.client.transform.window.Time;
-import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
-import org.apache.rocketmq.streams.common.functions.ForEachFunction;
-import org.apache.rocketmq.streams.common.functions.MapFunction;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
 import org.apache.rocketmq.streams.common.utils.FileUtil;
 import org.junit.Test;
 
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/SingleSplitTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/SingleSplitTest.java
index 30006e7..2de857a 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/SingleSplitTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/SingleSplitTest.java
@@ -16,29 +16,9 @@
  */
 package org.apache.rocketmq.streams.client.windows;
 
-import com.alibaba.fastjson.JSONObject;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import org.apache.rocketmq.streams.client.StreamBuilder;
 import org.apache.rocketmq.streams.client.transform.DataStream;
-import org.apache.rocketmq.streams.common.component.ComponentCreator;
-import org.apache.rocketmq.streams.common.functions.MapFunction;
 import org.apache.rocketmq.streams.common.topology.model.IWindow;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.PrintUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.db.driver.batchloader.IRowOperator;
-import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
-import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.window.operator.impl.WindowOperator;
-import org.apache.rocketmq.streams.window.state.impl.WindowValue;
-import org.apache.rocketmq.streams.window.storage.WindowStorage;
 import org.junit.Test;
 
 public class SingleSplitTest extends AbstractWindowTest {
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/WindowDebugTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/WindowDebugTest.java
index 7188c3f..ba4e14d 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/WindowDebugTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/WindowDebugTest.java
@@ -17,24 +17,13 @@
 
 package org.apache.rocketmq.streams.client.windows;
 
-import com.alibaba.fastjson.JSONObject;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.window.debug.WindowDebug;
 import org.junit.Test;
 
 public class WindowDebugTest {
     @Test
-    public void windowDebug(){
-        WindowDebug windowDebug=new WindowDebug("name1_window_10001","logTime","/tmp/rockstmq-streams","sum(total)",88121);
+    public void windowDebug() {
+        WindowDebug windowDebug = new WindowDebug("name1_window_10001", "logTime", "/tmp/rockstmq-streams", "sum(total)", 88121);
         windowDebug.startAnalysis();
     }
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/ByteArrayMemoryTable.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/ByteArrayMemoryTable.java
index 9081468..4048f05 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/ByteArrayMemoryTable.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/ByteArrayMemoryTable.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.streams.common.cache;
 
-import com.alibaba.fastjson.JSONObject;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -24,7 +23,6 @@ import java.util.Map;
 import org.apache.rocketmq.streams.common.cache.compress.AbstractMemoryTable;
 import org.apache.rocketmq.streams.common.cache.compress.AdditionStore;
 import org.apache.rocketmq.streams.common.cache.compress.ByteArray;
-import org.apache.rocketmq.streams.common.cache.compress.CacheKV;
 import org.apache.rocketmq.streams.common.cache.compress.MapAddress;
 import org.apache.rocketmq.streams.common.utils.NumberUtils;
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/ListMemoryTable.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/ListMemoryTable.java
index 2f3a729..ab8795b 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/ListMemoryTable.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/ListMemoryTable.java
@@ -17,17 +17,10 @@
 package org.apache.rocketmq.streams.common.cache;
 
 import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.streams.common.cache.compress.AbstractMemoryTable;
-import org.apache.rocketmq.streams.common.datatype.DataType;
-import org.apache.rocketmq.streams.common.datatype.NotSupportDataType;
-import org.apache.rocketmq.streams.common.datatype.StringDataType;
-import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
 
 /**
  * 压缩表,行数据以byte[][]存放
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/AbstractMemoryTable.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/AbstractMemoryTable.java
index 00574bc..eec4e18 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/AbstractMemoryTable.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/AbstractMemoryTable.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.streams.common.cache.compress;
 
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/AbstractListKV.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/AbstractListKV.java
index d95d212..c1a8b36 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/AbstractListKV.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/AbstractListKV.java
@@ -17,9 +17,7 @@
 package org.apache.rocketmq.streams.common.cache.compress.impl;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import org.apache.rocketmq.streams.common.cache.compress.AdditionStore;
 import org.apache.rocketmq.streams.common.cache.compress.ByteArray;
 import org.apache.rocketmq.streams.common.cache.compress.CacheKV;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntListKV.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntListKV.java
index 23cdb07..2246c7b 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntListKV.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntListKV.java
@@ -16,11 +16,9 @@
  */
 package org.apache.rocketmq.streams.common.cache.compress.impl;
 
-import java.util.ArrayList;
 import java.util.List;
 import org.apache.rocketmq.streams.common.cache.compress.AdditionStore;
 import org.apache.rocketmq.streams.common.cache.compress.ByteArray;
-import org.apache.rocketmq.streams.common.cache.compress.CacheKV;
 import org.apache.rocketmq.streams.common.cache.compress.MapAddress;
 import org.apache.rocketmq.streams.common.utils.NumberUtils;
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/LongListKV.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/LongListKV.java
index 16382ed..7292185 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/LongListKV.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/LongListKV.java
@@ -16,11 +16,9 @@
  */
 package org.apache.rocketmq.streams.common.cache.compress.impl;
 
-import java.util.ArrayList;
 import java.util.List;
 import org.apache.rocketmq.streams.common.cache.compress.AdditionStore;
 import org.apache.rocketmq.streams.common.cache.compress.ByteArray;
-import org.apache.rocketmq.streams.common.cache.compress.CacheKV;
 import org.apache.rocketmq.streams.common.cache.compress.MapAddress;
 import org.apache.rocketmq.streams.common.utils.NumberUtils;
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java
index 6ca55b5..4742f8b 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java
@@ -18,9 +18,7 @@ package org.apache.rocketmq.streams.common.channel.builder;
 
 import com.alibaba.fastjson.JSONObject;
 import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
 import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
 
 public abstract class AbstractSupportShuffleChannelBuilder implements IChannelBuilder, IShuffleChannelBuilder {
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSink.java
index c02ec31..15fc4a8 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSink.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSink.java
@@ -16,11 +16,10 @@
  */
 package org.apache.rocketmq.streams.common.channel.impl;
 
-import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
-import org.apache.rocketmq.streams.common.context.IMessage;
-
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
+import org.apache.rocketmq.streams.common.context.IMessage;
 
 /**
  * @description just support json object
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSinkBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSinkBuilder.java
index 32458c7..24a24ee 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSinkBuilder.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSinkBuilder.java
@@ -17,14 +17,13 @@
 package org.apache.rocketmq.streams.common.channel.impl;
 
 import com.google.auto.service.AutoService;
+import java.util.Properties;
 import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
 import org.apache.rocketmq.streams.common.channel.sink.ISink;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.metadata.MetaData;
 import org.apache.rocketmq.streams.common.model.ServiceName;
 
-import java.util.Properties;
-
 /**
  * @description
  */
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSource.java
index 0c2bdb0..4557b92 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSource.java
@@ -17,15 +17,14 @@
 package org.apache.rocketmq.streams.common.channel.impl;
 
 import com.alibaba.fastjson.JSONObject;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
-
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
 
 /**
  *
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
index 48e32e4..f7c3585 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
@@ -17,12 +17,9 @@
 package org.apache.rocketmq.streams.common.channel.impl;
 
 import java.util.List;
-
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
 import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.rocketmq.streams.common.utils.PrintUtil;
 
 /**
  * 测试使用,输出就是把消息打印出来
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java
index 7d1389e..ce69674 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java
@@ -24,7 +24,6 @@ import org.apache.rocketmq.streams.common.channel.sink.ISink;
 import org.apache.rocketmq.streams.common.channel.source.AbstractUnreliableSource;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
 
 /**
  * 消息产生的source数据,就是通过sink写入的消息
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilTaskSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilTaskSink.java
index 487153e..2afb806 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilTaskSink.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilTaskSink.java
@@ -16,16 +16,13 @@
  */
 package org.apache.rocketmq.streams.common.channel.impl.mutiltask;
 
-import com.google.auto.service.AutoService;
 import java.util.List;
 import java.util.Set;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
 import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
 import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
 import org.apache.rocketmq.streams.common.context.Context;
 import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.model.ServiceName;
 import org.apache.rocketmq.streams.common.topology.task.StreamsTask;
 
 public class MutilTaskSink extends AbstractSink implements IAfterConfigurableRefreshListener {
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitChannelBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitChannelBuilder.java
index e9f943f..f48c511 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitChannelBuilder.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitChannelBuilder.java
@@ -21,7 +21,6 @@ import com.alibaba.fastjson.JSONObject;
 import com.google.auto.service.AutoService;
 import java.util.Properties;
 import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.impl.memory.MemorySource;
 import org.apache.rocketmq.streams.common.channel.sink.ISink;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.metadata.MetaData;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
index 68c11bd..f1bf7d7 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
@@ -17,6 +17,12 @@
 package org.apache.rocketmq.streams.common.channel.sink;
 
 import com.alibaba.fastjson.JSONObject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
@@ -36,13 +42,6 @@ import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
 import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * 输出的接口抽象,针对json消息的场景
  */
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSupportShuffleUDFSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSupportShuffleUDFSink.java
index 98fdc23..14b93bb 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSupportShuffleUDFSink.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSupportShuffleUDFSink.java
@@ -16,12 +16,7 @@
  */
 package org.apache.rocketmq.streams.common.channel.sink;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.context.IMessage;
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/AbstractMultiSplitMessageCache.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/AbstractMultiSplitMessageCache.java
index 0c28b3a..e1018cf 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/AbstractMultiSplitMessageCache.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/AbstractMultiSplitMessageCache.java
@@ -22,12 +22,10 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
 import org.apache.rocketmq.streams.common.context.IMessage;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
index 86d31ee..f628ad4 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
@@ -28,7 +28,6 @@ import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
 import org.apache.rocketmq.streams.common.schedule.ScheduleManager;
 import org.apache.rocketmq.streams.common.schedule.ScheduleTask;
-import org.apache.rocketmq.streams.common.utils.ThreadUtil;
 
 /**
  * 消息缓存的实现,通过消息队列做本地缓存。目前多是用了这个实现
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
index 6958970..949e411 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
@@ -16,14 +16,18 @@
  */
 package org.apache.rocketmq.streams.common.channel.source;
 
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
 import java.io.UnsupportedEncodingException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-
 import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
 import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
 import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage;
@@ -43,7 +47,6 @@ import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
 import org.apache.rocketmq.streams.common.metadata.MetaData;
 import org.apache.rocketmq.streams.common.metadata.MetaDataField;
 import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
-import org.apache.rocketmq.streams.common.utils.CollectionUtil;
 import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/AbstractCheckPointStorage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/AbstractCheckPointStorage.java
index 9ad5e69..5d611cd 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/AbstractCheckPointStorage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/AbstractCheckPointStorage.java
@@ -16,6 +16,12 @@
  */
 package org.apache.rocketmq.streams.common.checkpoint;
 
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
@@ -23,8 +29,6 @@ import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBac
 import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
 import org.apache.rocketmq.streams.common.context.MessageOffset;
 
-import java.util.*;
-
 
 /**
  * @description
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPoint.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPoint.java
index 8bcedbd..a035591 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPoint.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPoint.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.streams.common.checkpoint;
 
 import com.alibaba.fastjson.JSONObject;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.model.Entity;
 
 /**
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointManager.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointManager.java
index 1f022ac..06a2f86 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointManager.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointManager.java
@@ -16,7 +16,12 @@
  */
 package org.apache.rocketmq.streams.common.checkpoint;
 
-
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.component.ComponentCreator;
@@ -27,8 +32,6 @@ import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.common.utils.ReflectUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 
-import java.util.*;
-
 
 public class CheckPointManager extends BasedConfigurable{
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointStorageFactory.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointStorageFactory.java
index fb9845b..1f8f371 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointStorageFactory.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/CheckPointStorageFactory.java
@@ -17,13 +17,12 @@
 
 package org.apache.rocketmq.streams.common.checkpoint;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.Iterator;
 import java.util.ServiceLoader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @description
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/ICheckPointStorage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/ICheckPointStorage.java
index 97bfe89..2be9a5f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/ICheckPointStorage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/ICheckPointStorage.java
@@ -16,10 +16,8 @@
  */
 package org.apache.rocketmq.streams.common.checkpoint;
 
-
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-
 import java.util.List;
+import org.apache.rocketmq.streams.common.channel.source.ISource;
 
 /**
  * @description 负责checkpoint的保存、恢复
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceSnapShot.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceSnapShot.java
index 1128af1..f036b3a 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceSnapShot.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceSnapShot.java
@@ -17,11 +17,10 @@
 package org.apache.rocketmq.streams.common.checkpoint;
 
 import com.alibaba.fastjson.JSONObject;
+import java.io.Serializable;
 import org.apache.rocketmq.streams.common.metadata.MetaData;
 import org.apache.rocketmq.streams.common.model.Entity;
 
-import java.io.Serializable;
-
 /**
  * @create 2021-08-06 16:21:30
  * @description
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceState.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceState.java
index a239fdc..4e0739f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceState.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/checkpoint/SourceState.java
@@ -16,10 +16,9 @@
  */
 package org.apache.rocketmq.streams.common.checkpoint;
 
-import org.apache.rocketmq.streams.common.context.MessageOffset;
-
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.rocketmq.streams.common.context.MessageOffset;
 
 /**
  * @create 2021-08-11 15:51:50
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/component/ComponentCreator.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/component/ComponentCreator.java
index ff01034..889af89 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/component/ComponentCreator.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/component/ComponentCreator.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java
index 41a113c..3a67682 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/datatype/DateDataType.java
@@ -17,16 +17,15 @@
 package org.apache.rocketmq.streams.common.datatype;
 
 import com.alibaba.fastjson.JSONObject;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.utils.NumberUtils;
-
 import java.sql.Timestamp;
 import java.text.ParsePosition;
 import java.text.SimpleDateFormat;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.Date;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.utils.NumberUtils;
 
 public class DateDataType extends BaseDataType<Date> {
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/interfaces/IBatchMessageFinishNotify.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/interfaces/IBatchMessageFinishNotify.java
index 81106d3..8007d46 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/interfaces/IBatchMessageFinishNotify.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/interfaces/IBatchMessageFinishNotify.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.streams.common.interfaces;
 
 import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
-import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataUtils.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataUtils.java
index 3c54775..bc0c3e7 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataUtils.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataUtils.java
@@ -16,12 +16,21 @@
  */
 package org.apache.rocketmq.streams.common.metadata;
 
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.rocketmq.streams.common.component.AbstractComponent;
 import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
 
-import java.sql.*;
-import java.util.*;
-
 /**
  * @description metaDataUtils
  */
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/fingerprint/FingerprintCache.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/fingerprint/FingerprintCache.java
index 680711c..7dff0c0 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/fingerprint/FingerprintCache.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/fingerprint/FingerprintCache.java
@@ -17,8 +17,6 @@
 package org.apache.rocketmq.streams.common.optimization.fingerprint;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/fingerprint/PreFingerprint.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/fingerprint/PreFingerprint.java
index 81fdbf6..9800a7b 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/fingerprint/PreFingerprint.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/fingerprint/PreFingerprint.java
@@ -17,14 +17,12 @@
 package org.apache.rocketmq.streams.common.optimization.fingerprint;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import org.apache.rocketmq.streams.common.cache.compress.BitSetCache;
 import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.topology.model.AbstractRule;
 import org.apache.rocketmq.streams.common.topology.model.AbstractStage;
 import org.apache.rocketmq.streams.common.topology.stages.FilterChainStage;
 import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/schedule/ScheduleManager.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/schedule/ScheduleManager.java
index 1920f80..b29a6cc 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/schedule/ScheduleManager.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/schedule/ScheduleManager.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.streams.common.schedule;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
index 5c7fe41..1269dc7 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
@@ -17,9 +17,7 @@
 package org.apache.rocketmq.streams.common.topology;
 
 import com.alibaba.fastjson.JSONObject;
-
 import com.google.common.collect.Lists;
-
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -28,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.rocketmq.streams.common.cache.compress.impl.LongValueKV;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.component.ComponentCreator;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java
index a7d60e6..5d47ba1 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java
@@ -16,10 +16,15 @@
  */
 package org.apache.rocketmq.streams.common.topology.builder;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.rocketmq.streams.common.channel.sink.ISink;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
-import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
 import org.apache.rocketmq.streams.common.configurable.IConfigurable;
 import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
 import org.apache.rocketmq.streams.common.metadata.MetaData;
@@ -30,13 +35,6 @@ import org.apache.rocketmq.streams.common.topology.stages.OutputChainStage;
 import org.apache.rocketmq.streams.common.utils.NameCreatorUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 public class PipelineBuilder implements Serializable {
     private static final long serialVersionUID = 1L;
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/IWindow.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/IWindow.java
index 833d451..f16993b 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/IWindow.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/IWindow.java
@@ -16,8 +16,6 @@
  */
 package org.apache.rocketmq.streams.common.topology.model;
 
-import java.util.Set;
-
 import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
 import org.apache.rocketmq.streams.common.channel.sink.ISink;
 import org.apache.rocketmq.streams.common.configurable.IConfigurable;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
index 8896583..2badd50 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.streams.common.topology.stages;
 
 import java.util.HashSet;
 import java.util.Set;
-
 import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
 import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
 import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
index 154bab5..8a514e2 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
@@ -34,7 +34,6 @@ import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.metadata.MetaData;
 import org.apache.rocketmq.streams.common.topology.ChainStage;
 import org.apache.rocketmq.streams.common.topology.model.IStageHandle;
-import org.apache.rocketmq.streams.common.utils.JsonableUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 
 public class OutputChainStage<T extends IMessage> extends ChainStage<T> implements IAfterConfigurableRefreshListener {
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java
index c152945..aa996b9 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.streams.common.topology.stages.udf;
 
+import java.io.Serializable;
 import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
 import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
@@ -26,8 +27,6 @@ import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
 import org.apache.rocketmq.streams.common.topology.model.IStageHandle;
 import org.apache.rocketmq.streams.common.topology.stages.AbstractStatelessChainStage;
 
-import java.io.Serializable;
-
 /**
  * 给用户提供自定义的抽象类
  */
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/task/StreamsTask.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/task/StreamsTask.java
index e24ee9e..bc5fd7f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/task/StreamsTask.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/task/StreamsTask.java
@@ -25,7 +25,6 @@ import java.util.ServiceLoader;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.cache.compress.BitSetCache;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/DataTypeUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/DataTypeUtil.java
index 726a4f0..d0d76ea 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/DataTypeUtil.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/DataTypeUtil.java
@@ -18,11 +18,6 @@ package org.apache.rocketmq.streams.common.utils;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.configurable.IConfigurable;
-import org.apache.rocketmq.streams.common.datatype.*;
-
 import java.lang.reflect.Method;
 import java.lang.reflect.Type;
 import java.math.BigDecimal;
@@ -33,6 +28,9 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
 import org.apache.rocketmq.streams.common.datatype.ArrayDataType;
 import org.apache.rocketmq.streams.common.datatype.BaseDataType;
 import org.apache.rocketmq.streams.common.datatype.BooleanDataType;
@@ -40,6 +38,7 @@ import org.apache.rocketmq.streams.common.datatype.ByteDataType;
 import org.apache.rocketmq.streams.common.datatype.ConfigurableDataType;
 import org.apache.rocketmq.streams.common.datatype.DataType;
 import org.apache.rocketmq.streams.common.datatype.DateDataType;
+import org.apache.rocketmq.streams.common.datatype.DateTimeDataType;
 import org.apache.rocketmq.streams.common.datatype.DoubleDataType;
 import org.apache.rocketmq.streams.common.datatype.FloatDataType;
 import org.apache.rocketmq.streams.common.datatype.GenericParameterDataType;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/MapKeyUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/MapKeyUtil.java
index 94a9e69..5b31787 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/MapKeyUtil.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/MapKeyUtil.java
@@ -17,8 +17,11 @@
 package org.apache.rocketmq.streams.common.utils;
 
 import com.alibaba.fastjson.JSONObject;
-
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 public class MapKeyUtil {
 
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
index c97b408..1f9ffd9 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
@@ -20,7 +20,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.component.AbstractComponent;
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractSupportParentConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractSupportParentConfigureService.java
index 7b0e484..58c62ae 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractSupportParentConfigureService.java
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractSupportParentConfigureService.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.configurable.IConfigurable;
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java
index 27dadf6..1497b43 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.streams.configurable.service;
 
 import java.util.Properties;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
index b597f25..79c25dc 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
@@ -20,8 +20,6 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
-
-import java.util.UUID;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.component.ComponentCreator;
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java
index ea289df..c00d957 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.connectors.source.SourceInstance;
 
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java
index 5ed4819..c01c151 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.streams.connectors.balance;
 
 import java.util.List;
-
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 
 public class SplitChanged {
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java
index a4b91d3..2ffc7a8 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java
@@ -16,20 +16,23 @@
  */
 package org.apache.rocketmq.streams.connectors.balance.impl;
 
-import java.util.*;
-
 import com.google.auto.service.AutoService;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.connectors.balance.AbstractBalance;
-import org.apache.rocketmq.streams.connectors.balance.ISourceBalance;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.model.ServiceName;
 import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
+import org.apache.rocketmq.streams.connectors.balance.AbstractBalance;
+import org.apache.rocketmq.streams.connectors.balance.ISourceBalance;
+import org.apache.rocketmq.streams.connectors.source.SourceInstance;
 import org.apache.rocketmq.streams.lease.LeaseComponent;
 import org.apache.rocketmq.streams.lease.model.LeaseInfo;
-import org.apache.rocketmq.streams.connectors.source.SourceInstance;
 import org.apache.rocketmq.streams.lease.service.ILeaseService;
 
 @AutoService(ISourceBalance.class)
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java
index 3807585..cc926b1 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java
@@ -16,11 +16,10 @@
  */
 package org.apache.rocketmq.streams.connectors.model;
 
-import org.apache.rocketmq.streams.common.model.Entity;
-import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
-
 import java.util.Date;
 import java.util.List;
+import org.apache.rocketmq.streams.common.model.Entity;
+import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
 
 /**
  * @description
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java
index a3298c5..d4abb03 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java
@@ -15,8 +15,13 @@
  * limitations under the License.
  */
 package org.apache.rocketmq.streams.connectors.reader;
+
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
@@ -33,11 +38,6 @@ import org.apache.rocketmq.streams.db.driver.DriverBuilder;
 import org.apache.rocketmq.streams.db.driver.JDBCDriver;
 import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 /**
  * @description
  */
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java
index da12274..293d72d 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 package org.apache.rocketmq.streams.connectors.reader;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
-
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.connectors.model.PullMessage;
 
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java
index a2fe513..a3ec8b2 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java
@@ -20,7 +20,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 
 public class SplitCloseFuture implements Future<Boolean> {
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java
index 74161af..0403dd1 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.streams.connectors.source;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -26,20 +28,18 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
 import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
 import org.apache.rocketmq.streams.common.context.Message;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.connectors.balance.ISourceBalance;
 import org.apache.rocketmq.streams.connectors.balance.SplitChanged;
 import org.apache.rocketmq.streams.connectors.balance.impl.LeaseBalanceImpl;
-import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
-import org.apache.rocketmq.streams.common.channel.split.ISplit;
-import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.connectors.model.PullMessage;
 import org.apache.rocketmq.streams.connectors.reader.ISplitReader;
 import org.apache.rocketmq.streams.connectors.reader.SplitCloseFuture;
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java
index 27598f6..1f036c6 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java
@@ -17,6 +17,13 @@
 package org.apache.rocketmq.streams.connectors.source;
 
 import com.alibaba.fastjson.JSONObject;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
@@ -34,11 +41,6 @@ import org.apache.rocketmq.streams.connectors.source.filter.CycleSchedule;
 import org.apache.rocketmq.streams.connectors.source.filter.CycleScheduleFilter;
 import org.apache.rocketmq.streams.db.CycleSplit;
 
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * @description
  */
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java
index f44fae4..90e86d2 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java
@@ -15,9 +15,14 @@
  * limitations under the License.
  */
 package org.apache.rocketmq.streams.connectors.source;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.metadata.MetaDataUtils;
 import org.apache.rocketmq.streams.connectors.reader.DBScanReader;
@@ -26,12 +31,6 @@ import org.apache.rocketmq.streams.connectors.source.filter.DataFormatPatternFil
 import org.apache.rocketmq.streams.connectors.source.filter.PatternFilter;
 import org.apache.rocketmq.streams.db.DynamicMultipleDBSplit;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * @description DynamicMultipleDBScanSource
  */
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java
index 81f488c..645069a 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.streams.connectors.source;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java
index b07371b..c9d983f 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java
@@ -16,12 +16,11 @@
  */
 package org.apache.rocketmq.streams.connectors.source.filter;
 
+import java.io.Serializable;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.connectors.model.ReaderStatus;
 
-import java.io.Serializable;
-
 /**
  * @description 过滤掉已经完成的reader
  */
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java
index acc75bb..9526c67 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java
@@ -16,12 +16,11 @@
  */
 package org.apache.rocketmq.streams.connectors.source.filter;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @Description
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java
index 824cfd7..0ce8b66 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java
@@ -16,8 +16,6 @@
  */
 package org.apache.rocketmq.streams.connectors.source.filter;
 
-import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
-
 import java.io.Serializable;
 import java.text.ParseException;
 import java.util.ArrayList;
@@ -25,6 +23,7 @@ import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
 
 /**
  * @description 用来做分区选取
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java
index 557ef0e..b5b6666 100644
--- a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java
@@ -16,13 +16,11 @@
  */
 package org.apache.rocketmq.streams.connectors.source.filter;
 
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import java.io.Serializable;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @description
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/builder/DBDimSQLParser.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/builder/DBDimSQLParser.java
index 681477d..8ed99da 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/builder/DBDimSQLParser.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/builder/DBDimSQLParser.java
@@ -22,10 +22,8 @@ import java.util.Properties;
 import org.apache.rocketmq.streams.common.metadata.MetaData;
 import org.apache.rocketmq.streams.common.metadata.MetaDataField;
 import org.apache.rocketmq.streams.common.model.ServiceName;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.dim.model.AbstractDim;
 import org.apache.rocketmq.streams.dim.model.DBDim;
-import org.apache.rocketmq.streams.dim.model.FileDim;
 
 @AutoService(IDimSQLParser.class)
 @ServiceName(value = DBDimSQLParser.TYPE, aliasName = "rds")
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/builder/SQLParserFactory.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/builder/SQLParserFactory.java
index 6d0f2a0..8fe1286 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/builder/SQLParserFactory.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/builder/SQLParserFactory.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.streams.dim.builder;
 
-import org.apache.rocketmq.streams.dim.model.AbstractDim;
 import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent;
 
 public class SQLParserFactory {
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/index/DimIndex.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/index/DimIndex.java
index 0989393..cfca308 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/index/DimIndex.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/index/DimIndex.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.cache.ListMemoryTable;
 import org.apache.rocketmq.streams.common.cache.compress.AbstractMemoryTable;
 import org.apache.rocketmq.streams.common.cache.compress.impl.IntListKV;
 import org.apache.rocketmq.streams.common.datatype.IntDataType;
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/index/IndexExecutor.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/index/IndexExecutor.java
index c0e2c71..369d81c 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/index/IndexExecutor.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/index/IndexExecutor.java
@@ -25,16 +25,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.rocketmq.streams.common.datatype.IntDataType;
-import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
 import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.dim.model.AbstractDim;
 import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
 import org.apache.rocketmq.streams.filter.function.expression.Equals;
 import org.apache.rocketmq.streams.filter.operator.Rule;
 import org.apache.rocketmq.streams.filter.operator.expression.Expression;
 import org.apache.rocketmq.streams.filter.operator.expression.RelationExpression;
-import org.apache.rocketmq.streams.script.ScriptComponent;
 
 /**
  * 执行索引的查询和构建。主要是完成表达式的解析,对于等值的表达式字段,如果有索引,根据索引查询,然后执行非等值部分的判断
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AbstractIntelligenceCache.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AbstractIntelligenceCache.java
index a79aa81..0b55d6e 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AbstractIntelligenceCache.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/intelligence/AbstractIntelligenceCache.java
@@ -49,7 +49,6 @@ import org.apache.rocketmq.streams.common.dboperator.IDBDriver;
 import org.apache.rocketmq.streams.common.utils.NumberUtils;
 import org.apache.rocketmq.streams.common.utils.SQLUtil;
 import org.apache.rocketmq.streams.db.driver.JDBCDriver;
-import org.apache.rocketmq.streams.http.source.util.HttpUtil;
 
 public abstract class AbstractIntelligenceCache extends BasedConfigurable implements
     IAfterConfigurableRefreshListener {
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/AbstractDim.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/AbstractDim.java
index 5d0d7d1..c428dcf 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/AbstractDim.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/AbstractDim.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.streams.dim.model;
 import com.alibaba.fastjson.JSONObject;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -32,7 +31,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.cache.ByteArrayMemoryTable;
-import org.apache.rocketmq.streams.common.cache.ListMemoryTable;
 import org.apache.rocketmq.streams.common.cache.compress.AbstractMemoryTable;
 import org.apache.rocketmq.streams.common.cache.softreference.ICache;
 import org.apache.rocketmq.streams.common.cache.softreference.impl.SoftReferenceCache;
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/DBDim.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/DBDim.java
index a643b18..b8e9ee5 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/DBDim.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/DBDim.java
@@ -21,8 +21,6 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.cache.ByteArrayMemoryTable;
-import org.apache.rocketmq.streams.common.cache.ListMemoryTable;
 import org.apache.rocketmq.streams.common.cache.compress.AbstractMemoryTable;
 import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
 import org.apache.rocketmq.streams.common.utils.IPUtil;
diff --git a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/FileDim.java b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/FileDim.java
index 50c0933..d004d60 100644
--- a/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/FileDim.java
+++ b/rocketmq-streams-dim/src/main/java/org/apache/rocketmq/streams/dim/model/FileDim.java
@@ -18,11 +18,9 @@ package org.apache.rocketmq.streams.dim.model;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
-import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.rocketmq.streams.common.cache.ByteArrayMemoryTable;
 import org.apache.rocketmq.streams.common.cache.compress.AbstractMemoryTable;
 import org.apache.rocketmq.streams.common.utils.FileUtil;
 
diff --git a/rocketmq-streams-dim/src/test/java/com/aliyun/service/TableCompressTest.java b/rocketmq-streams-dim/src/test/java/com/aliyun/service/TableCompressTest.java
index 4e41f45..f31d060 100644
--- a/rocketmq-streams-dim/src/test/java/com/aliyun/service/TableCompressTest.java
+++ b/rocketmq-streams-dim/src/test/java/com/aliyun/service/TableCompressTest.java
@@ -23,10 +23,8 @@ import java.util.Map;
 import org.apache.rocketmq.streams.common.cache.ByteArrayMemoryTable;
 import org.apache.rocketmq.streams.common.cache.compress.AbstractMemoryTable;
 import org.apache.rocketmq.streams.common.cache.compress.impl.IntListKV;
-import org.apache.rocketmq.streams.common.cache.compress.impl.LongListKV;
 import org.junit.Test;
 
-import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertFalse;
 
 public class TableCompressTest {
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MutilStreamsClientTest.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MutilStreamsClientTest.java
index 6eea30f..69fc569 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MutilStreamsClientTest.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MutilStreamsClientTest.java
@@ -20,16 +20,15 @@
 package org.apache.rocketmq.streams.examples.mutilconsumer;
 
 import com.alibaba.fastjson.JSONObject;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.rocketmq.streams.client.StreamBuilder;
 import org.apache.rocketmq.streams.client.source.DataStreamSource;
 import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
 import org.apache.rocketmq.streams.client.transform.window.Time;
 import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
 
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
 import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
 import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/Producer.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/Producer.java
index 731b226..7f88c1c 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/Producer.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/Producer.java
@@ -19,14 +19,13 @@
 
 package org.apache.rocketmq.streams.examples.mutilconsumer;
 
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.streams.examples.rocketmqsource.ProducerFromFile;
 
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
 import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
 
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java
index 6c7606f..bebfd79 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/ProducerFromFile.java
@@ -19,11 +19,6 @@
 
 package org.apache.rocketmq.streams.examples.rocketmqsource;
 
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
@@ -31,6 +26,10 @@ import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
 
 public class ProducerFromFile {
 
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
index 5610b1f..2da9f23 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
@@ -16,11 +16,10 @@
  */
 package org.apache.rocketmq.streams.examples.rocketmqsource;
 
+import java.util.Arrays;
 import org.apache.rocketmq.streams.client.StreamBuilder;
 import org.apache.rocketmq.streams.client.source.DataStreamSource;
 
-import java.util.Arrays;
-
 import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
 import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
 import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
index 45a9216..f4a7c49 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
@@ -18,13 +18,12 @@
 package org.apache.rocketmq.streams.examples.rocketmqsource;
 
 import com.alibaba.fastjson.JSONObject;
-import org.apache.rocketmq.streams.client.StreamBuilder;
-import org.apache.rocketmq.streams.client.source.DataStreamSource;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.rocketmq.streams.client.StreamBuilder;
+import org.apache.rocketmq.streams.client.source.DataStreamSource;
 
 import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
 import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/FilterComponent.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/FilterComponent.java
index 08be597..08d08d4 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/FilterComponent.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/FilterComponent.java
@@ -28,7 +28,6 @@ import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.filter.builder.RuleBuilder;
 import org.apache.rocketmq.streams.filter.context.ContextConfigure;
-import org.apache.rocketmq.streams.filter.context.RuleMessage;
 import org.apache.rocketmq.streams.filter.operator.Rule;
 import org.apache.rocketmq.streams.filter.service.IRuleEngineService;
 import org.apache.rocketmq.streams.filter.service.impl.RuleEngineServiceImpl;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/builder/ExpressionBuilder.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/builder/ExpressionBuilder.java
index ad65e80..fbbf40f 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/builder/ExpressionBuilder.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/builder/ExpressionBuilder.java
@@ -37,7 +37,6 @@ import org.apache.rocketmq.streams.filter.operator.expression.Expression;
 import org.apache.rocketmq.streams.filter.operator.expression.ExpressionRelationParser;
 import org.apache.rocketmq.streams.filter.operator.expression.RelationExpression;
 import org.apache.rocketmq.streams.filter.operator.expression.SimpleExpression;
-import org.apache.rocketmq.streams.filter.operator.var.ConstantVar;
 import org.apache.rocketmq.streams.filter.operator.var.ContextVar;
 import org.apache.rocketmq.streams.filter.operator.var.Var;
 import org.apache.rocketmq.streams.filter.optimization.ExpressionOptimization;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
index e51e94f..ce08793 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
@@ -24,7 +24,6 @@ import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/engine/impl/DefaultRuleEngine.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/engine/impl/DefaultRuleEngine.java
index 0a09a1d..8f353c6 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/engine/impl/DefaultRuleEngine.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/engine/impl/DefaultRuleEngine.java
@@ -121,17 +121,6 @@ public class DefaultRuleEngine implements IRuleEngine {
              * 判断rule的status 如果是观察者模式,则写入到观察者库中的观察表里
              */
             if (rule.getRuleStatus().intValue() == 3) {
-//                if (!context.getContextConfigure().isAction2Online()) {
-//                    // LOG.warn("DefaultRuleEngine fireAction ignore : configure action2Online false!");
-//                    if (context.getContextConfigure().isActionOnline2Observer()) {
-//                        Action action = getAction(RuleContext.OBSERVER_NAME,rule);
-//                        if (action == null) {
-//                            return;
-//                        }
-//                        doAction(message,context, action, rule);
-//                    }
-//                    return;
-//                }
                 try {
                     if (rule.getActionNames() == null || rule.getActionNames().size() == 0) {
                         return;
@@ -145,12 +134,9 @@ public class DefaultRuleEngine implements IRuleEngine {
                     }
                 } catch (Exception e) {
                     LOG.error("DefaultRuleEngine fire atciton error: ruleName is" + rule.getConfigureName(), e);
-                   // context.addErrorMessage(rule, "DefaultRuleEngine fire atciton error: " + e.getMessage());
                 }
             } else {
-//                if (context.getContextConfigure() != null && !context.getContextConfigure().isAction2Observer()) {
-//                    return;
-//                }
+
                 Action action = getAction(RuleContext.OBSERVER_NAME,rule);
                 if (action == null) {
                     return;
@@ -173,32 +159,6 @@ public class DefaultRuleEngine implements IRuleEngine {
     @SuppressWarnings("rawtypes")
     protected void doAction(final IMessage message, AbstractContext context, final Action action, final Rule rule) {
         action.doMessage(message,context);
-//        context.getActionExecutor().execute(new Runnable() {
-//
-//            @Override
-//            public void run() {
-////                IMonitor monitor = context.getRuleMonitor();
-////                IMonitor actionMonitor = monitor.createChildren(action);
-//                try {
-//
-//
-//                    if (monitor != null) {
-//                        actionMonitor.endMonitor();
-//                        if (actionMonitor.isSlow()) {
-//                            actionMonitor.setSampleData(context).put("action_info", action.toJsonObject());
-//                        }
-//                    }
-//                } catch (Exception e) {
-//                    String errorMsg = "DefaultRuleEngine doAction error,rule: " + rule.getRuleCode() + " ,action: "
-//                        + action.getConfigureName();
-//                    //                    RULEENGINE_MESSAGE_LOG.warn(errorMsg
-//                    //                        , e);
-//                    actionMonitor.occureError(e, errorMsg, e.getMessage());
-//                    actionMonitor.setSampleData(context).put("action_info", action.toJsonObject());
-//                }
-//
-//            }
-//        });
     }
 
     public Action getAction(String name, Rule rule) {
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/script/CaseDependentParser.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/script/CaseDependentParser.java
index a9da48b..4f9f61e 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/script/CaseDependentParser.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/script/CaseDependentParser.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.streams.filter.function.script;
 
 import com.google.auto.service.AutoService;
-import java.util.ArrayList;
 import java.util.Set;
 import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
 import org.apache.rocketmq.streams.filter.operator.Rule;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/script/CaseFunction.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/script/CaseFunction.java
index 59ee5a1..b491dd9 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/script/CaseFunction.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/script/CaseFunction.java
@@ -16,11 +16,8 @@
  */
 package org.apache.rocketmq.streams.filter.function.script;
 
-import com.alibaba.fastjson.JSONObject;
-import java.util.List;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
-import org.apache.rocketmq.streams.script.ScriptComponent;
 import org.apache.rocketmq.streams.script.annotation.Function;
 import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
 import org.apache.rocketmq.streams.script.context.FunctionContext;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/Rule.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/Rule.java
index e5527c3..33b4360 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/Rule.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/Rule.java
@@ -46,7 +46,6 @@ import org.apache.rocketmq.streams.common.topology.stages.FilterChainStage;
 import org.apache.rocketmq.streams.common.utils.TraceUtil;
 import org.apache.rocketmq.streams.db.driver.JDBCDriver;
 import org.apache.rocketmq.streams.filter.FilterComponent;
-import org.apache.rocketmq.streams.filter.context.RuleContext;
 import org.apache.rocketmq.streams.filter.operator.action.Action;
 import org.apache.rocketmq.streams.filter.operator.action.impl.SinkAction;
 import org.apache.rocketmq.streams.filter.operator.expression.Expression;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/RuleExpression.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/RuleExpression.java
index e45383d..8ec38d9 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/RuleExpression.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/RuleExpression.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.filter.operator.Rule;
 import org.apache.rocketmq.streams.script.context.FunctionContext;
 import org.apache.rocketmq.streams.script.service.IScriptExpression;
 import org.apache.rocketmq.streams.script.service.IScriptParamter;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/expression/Expression.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/expression/Expression.java
index 507386e..ac3540b 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/expression/Expression.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/expression/Expression.java
@@ -56,7 +56,6 @@ import org.apache.rocketmq.streams.filter.operator.var.ContextVar;
 import org.apache.rocketmq.streams.filter.operator.var.Var;
 import org.apache.rocketmq.streams.script.ScriptComponent;
 import org.apache.rocketmq.streams.script.function.model.FunctionConfigure;
-import org.apache.rocketmq.streams.script.optimization.performance.IScriptOptimization;
 import org.apache.rocketmq.streams.script.utils.FunctionUtils;
 
 public class Expression<T> extends BasedConfigurable
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/var/InnerVar.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/var/InnerVar.java
index 3aa0c10..1333abc 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/var/InnerVar.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/var/InnerVar.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.streams.filter.operator.var;
 
-import com.alibaba.fastjson.JSONObject;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
 
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/casewhen/CaseWhenBuilder.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/casewhen/CaseWhenBuilder.java
index 6e46285..87b5493 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/casewhen/CaseWhenBuilder.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/casewhen/CaseWhenBuilder.java
@@ -26,10 +26,8 @@ import org.apache.rocketmq.streams.common.utils.CollectionUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
 import org.apache.rocketmq.streams.filter.operator.Rule;
-import org.apache.rocketmq.streams.filter.operator.expression.Expression;
 import org.apache.rocketmq.streams.script.function.impl.field.RemoveFieldFunction;
 import org.apache.rocketmq.streams.script.operator.expression.GroupScriptExpression;
-import org.apache.rocketmq.streams.script.operator.expression.ScriptExpression;
 import org.apache.rocketmq.streams.script.optimization.performance.IScriptOptimization;
 import org.apache.rocketmq.streams.script.service.IScriptExpression;
 import org.apache.rocketmq.streams.script.service.IScriptParamter;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/casewhen/GroupByVarCaseWhen.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/casewhen/GroupByVarCaseWhen.java
index 02ba02b..a23f996 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/casewhen/GroupByVarCaseWhen.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/casewhen/GroupByVarCaseWhen.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
 
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/BlinkRule.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/BlinkRule.java
index 8040895..284516d 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/BlinkRule.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/BlinkRule.java
@@ -23,9 +23,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.filter.operator.expression.Expression;
-import org.apache.rocketmq.streams.filter.operator.expression.RelationExpression;
 
 public class BlinkRule {
     /**
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/BlinkRuleV2Expression.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/BlinkRuleV2Expression.java
index 3ae033c..175417a 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/BlinkRuleV2Expression.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/BlinkRuleV2Expression.java
@@ -39,16 +39,10 @@ import org.apache.rocketmq.streams.common.utils.CollectionUtil;
 import org.apache.rocketmq.streams.common.utils.FileUtil;
 import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.common.utils.ReflectUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.configurable.ConfigurableComponent;
 import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
-import org.apache.rocketmq.streams.filter.context.RuleContext;
-import org.apache.rocketmq.streams.filter.operator.RuleExpression;
 import org.apache.rocketmq.streams.filter.operator.Rule;
 import org.apache.rocketmq.streams.filter.operator.expression.Expression;
 import org.apache.rocketmq.streams.filter.operator.expression.RelationExpression;
-import org.apache.rocketmq.streams.script.operator.expression.GroupScriptExpression;
-import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
 import org.apache.rocketmq.streams.script.service.IScriptExpression;
 import org.apache.rocketmq.streams.script.service.udf.UDFScript;
 import org.apache.rocketmq.streams.script.utils.FunctionUtils;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/CommonExpression.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/CommonExpression.java
index eb7852b..ba1fd02 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/CommonExpression.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/CommonExpression.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.streams.filter.optimization.dependency;
 
-import com.sun.org.apache.bcel.internal.generic.FADD;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/DependencyTree.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/DependencyTree.java
index 72d15dc..4ac4b7f 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/DependencyTree.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/DependencyTree.java
@@ -18,8 +18,6 @@ package org.apache.rocketmq.streams.filter.optimization.dependency;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -32,8 +30,6 @@ import org.apache.rocketmq.streams.common.topology.model.AbstractStage;
 import org.apache.rocketmq.streams.common.topology.stages.FilterChainStage;
 import org.apache.rocketmq.streams.common.topology.stages.ScriptChainStage;
 import org.apache.rocketmq.streams.common.utils.CollectionUtil;
-import org.apache.rocketmq.streams.script.service.IScriptExpression;
-import org.python.icu.impl.coll.BOCSU;
 
 /**
  * raverse the pipeline to create a prefix filter fingerprint
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/FilterTreeNode.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/FilterTreeNode.java
index fc9aae5..12ffe2e 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/FilterTreeNode.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/FilterTreeNode.java
@@ -19,11 +19,8 @@ package org.apache.rocketmq.streams.filter.optimization.dependency;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.streams.common.optimization.fingerprint.FingerprintCache;
@@ -31,14 +28,11 @@ import org.apache.rocketmq.streams.common.optimization.fingerprint.PreFingerprin
 import org.apache.rocketmq.streams.common.topology.ChainPipeline;
 import org.apache.rocketmq.streams.common.topology.model.AbstractStage;
 import org.apache.rocketmq.streams.common.topology.stages.FilterChainStage;
-import org.apache.rocketmq.streams.common.topology.stages.ScriptChainStage;
 import org.apache.rocketmq.streams.common.utils.CollectionUtil;
 import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.filter.operator.Rule;
 import org.apache.rocketmq.streams.filter.operator.expression.Expression;
 import org.apache.rocketmq.streams.filter.operator.expression.RelationExpression;
-import org.apache.rocketmq.streams.script.operator.expression.GroupScriptExpression;
-import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
 import org.apache.rocketmq.streams.script.service.IScriptExpression;
 
 public class FilterTreeNode extends TreeNode<FilterChainStage> {
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/PipelineTree.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/PipelineTree.java
index 85e2f49..8cbe51b 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/PipelineTree.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/PipelineTree.java
@@ -20,7 +20,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.rocketmq.streams.common.topology.ChainPipeline;
 import org.apache.rocketmq.streams.common.topology.model.AbstractStage;
-import org.apache.rocketmq.streams.common.topology.model.Pipeline;
 import org.apache.rocketmq.streams.common.topology.stages.FilterChainStage;
 import org.apache.rocketmq.streams.common.topology.stages.ScriptChainStage;
 
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/ScriptTreeNode.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/ScriptTreeNode.java
index a1599af..9327c67 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/ScriptTreeNode.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/dependency/ScriptTreeNode.java
@@ -17,12 +17,7 @@
 package org.apache.rocketmq.streams.filter.optimization.dependency;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.streams.common.topology.ChainPipeline;
@@ -33,7 +28,6 @@ import org.apache.rocketmq.streams.filter.operator.Rule;
 import org.apache.rocketmq.streams.filter.operator.RuleExpression;
 import org.apache.rocketmq.streams.filter.operator.expression.Expression;
 import org.apache.rocketmq.streams.filter.optimization.casewhen.AbstractWhenExpression;
-import org.apache.rocketmq.streams.filter.optimization.casewhen.SingleCaseWhenExpression;
 import org.apache.rocketmq.streams.filter.optimization.script.ScriptOptimization;
 import org.apache.rocketmq.streams.script.operator.expression.GroupScriptExpression;
 import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/homologous/HomologousOptimization.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/homologous/HomologousOptimization.java
index fa87eec..e705758 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/homologous/HomologousOptimization.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/homologous/HomologousOptimization.java
@@ -17,16 +17,13 @@
 package org.apache.rocketmq.streams.filter.optimization.homologous;
 
 import com.google.auto.service.AutoService;
-
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.optimization.IHomologousOptimization;
 import org.apache.rocketmq.streams.common.optimization.fingerprint.FingerprintCache;
 import org.apache.rocketmq.streams.common.topology.ChainPipeline;
-import org.apache.rocketmq.streams.common.topology.stages.SubPiplineChainStage;
 import org.apache.rocketmq.streams.filter.optimization.dependency.CommonExpression;
 import org.apache.rocketmq.streams.filter.optimization.dependency.DependencyTree;
 
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/script/ScriptOptimization.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/script/ScriptOptimization.java
index cda0eca..e320d52 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/script/ScriptOptimization.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/optimization/script/ScriptOptimization.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.streams.filter.optimization.script;
 
 import com.google.auto.service.AutoService;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -27,13 +26,10 @@ import org.apache.rocketmq.streams.common.configurable.IConfigurableIdentificati
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.optimization.fingerprint.FingerprintCache;
-import org.apache.rocketmq.streams.filter.context.RuleContext;
 import org.apache.rocketmq.streams.filter.operator.expression.Expression;
-import org.apache.rocketmq.streams.filter.optimization.casewhen.CaseWhenBuilder;
 import org.apache.rocketmq.streams.filter.optimization.dependency.BlinkRuleV2Expression;
 import org.apache.rocketmq.streams.filter.optimization.executor.GroupByVarExecutor;
 import org.apache.rocketmq.streams.script.context.FunctionContext;
-import org.apache.rocketmq.streams.script.operator.expression.GroupScriptExpression;
 import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
 import org.apache.rocketmq.streams.script.optimization.performance.IScriptOptimization;
 import org.apache.rocketmq.streams.script.service.IScriptExpression;
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/service/impl/RuleEngineServiceImpl.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/service/impl/RuleEngineServiceImpl.java
index 37462b1..bc7d886 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/service/impl/RuleEngineServiceImpl.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/service/impl/RuleEngineServiceImpl.java
@@ -29,7 +29,6 @@ import org.apache.rocketmq.streams.common.context.Message;
 import org.apache.rocketmq.streams.filter.builder.RuleBuilder;
 import org.apache.rocketmq.streams.filter.context.ContextConfigure;
 import org.apache.rocketmq.streams.filter.context.RuleContext;
-import org.apache.rocketmq.streams.filter.context.RuleMessage;
 import org.apache.rocketmq.streams.filter.engine.IRuleEngine;
 import org.apache.rocketmq.streams.filter.engine.impl.DefaultRuleEngine;
 import org.apache.rocketmq.streams.filter.operator.Rule;
diff --git a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/impl/BasedLesaseImpl.java b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/impl/BasedLesaseImpl.java
index 671f577..9584a46 100644
--- a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/impl/BasedLesaseImpl.java
+++ b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/impl/BasedLesaseImpl.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
diff --git a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java
index c5a0c27..bd7ef82 100644
--- a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java
+++ b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.utils.DateUtil;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/context/FunctionContext.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/context/FunctionContext.java
index 5f158b8..00e3c95 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/context/FunctionContext.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/context/FunctionContext.java
@@ -21,9 +21,7 @@ import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.model.ThreadContext;
 import org.apache.rocketmq.streams.script.function.service.IFunctionService;
 import org.apache.rocketmq.streams.script.function.service.impl.ScanFunctionService;
-import org.apache.rocketmq.streams.script.service.IScriptExpression;
 import org.apache.rocketmq.streams.script.utils.FunctionUtils;
-import org.python.icu.impl.coll.BOCSU;
 
 /**
  * 脚本执行的上下文
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/FirstValueAccumulator.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/FirstValueAccumulator.java
index b5c8193..211180b 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/FirstValueAccumulator.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/aggregation/FirstValueAccumulator.java
@@ -17,14 +17,10 @@
 package org.apache.rocketmq.streams.script.function.aggregation;
 
 import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
 import org.apache.rocketmq.streams.common.utils.CollectionUtil;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.script.annotation.Function;
 import org.apache.rocketmq.streams.script.annotation.UDAFFunction;
 import org.apache.rocketmq.streams.script.service.IAccumulator;
-import org.apache.rocketmq.streams.state.kv.rocksdb.RocksdbState;
 @Function
 @UDAFFunction("FIRST_VALUE")
 public class FirstValueAccumulator<T> implements IAccumulator<T, FirstValueAccumulator.FirstValue> {
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/between/BetweenFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/between/BetweenFunction.java
index 9934ea6..f552001 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/between/BetweenFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/between/BetweenFunction.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.streams.script.function.impl.between;
 
 import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
 import org.apache.rocketmq.streams.script.annotation.Function;
 import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
 import org.apache.rocketmq.streams.script.annotation.FunctionParamter;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/condition/IFScopeFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/condition/IFScopeFunction.java
index 4b988bd..846ef9f 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/condition/IFScopeFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/condition/IFScopeFunction.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.streams.script.function.impl.condition;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.script.annotation.Function;
 import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
-import org.apache.rocketmq.streams.script.annotation.FunctionParamter;
 import org.apache.rocketmq.streams.script.context.FunctionContext;
 
 @Function
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/context/ContextFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/context/ContextFunction.java
index d4fe4b8..72cf090 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/context/ContextFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/context/ContextFunction.java
@@ -17,8 +17,6 @@
 package org.apache.rocketmq.streams.script.function.impl.context;
 
 import com.alibaba.fastjson.JSONObject;
-import org.apache.rocketmq.streams.common.component.ComponentCreator;
-import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.script.annotation.Function;
 import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/date/GetDateFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/date/GetDateFunction.java
index f6ec724..ee367be 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/date/GetDateFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/date/GetDateFunction.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.streams.script.function.impl.date;
 
-import java.util.Date;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.script.annotation.Function;
 import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/eval/EvalFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/eval/EvalFunction.java
index b6d9f76..a77431a 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/eval/EvalFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/eval/EvalFunction.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.streams.script.function.impl.eval;
 import com.alibaba.fastjson.JSONObject;
 import org.apache.rocketmq.streams.common.cache.softreference.ICache;
 import org.apache.rocketmq.streams.common.cache.softreference.impl.SoftReferenceCache;
-import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.context.Message;
 import org.apache.rocketmq.streams.script.ScriptComponent;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/flatmap/SplitArrayFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/flatmap/SplitArrayFunction.java
index 8678103..25f40d9 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/flatmap/SplitArrayFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/flatmap/SplitArrayFunction.java
@@ -18,10 +18,8 @@ package org.apache.rocketmq.streams.script.function.impl.flatmap;
 
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
-
 import java.util.List;
 import java.util.Map;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.context.IMessage;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/item/ItemFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/item/ItemFunction.java
index 77a9bcf..584157b 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/item/ItemFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/item/ItemFunction.java
@@ -16,13 +16,9 @@
  */
 package org.apache.rocketmq.streams.script.function.impl.item;
 
-import com.alibaba.fastjson.JSONObject;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import jnr.ffi.annotations.In;
 import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.script.annotation.Function;
 import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
 import org.apache.rocketmq.streams.script.annotation.FunctionParamter;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/relation/AndFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/relation/AndFunction.java
index c227d5f..012dd1c 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/relation/AndFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/relation/AndFunction.java
@@ -17,15 +17,9 @@
 package org.apache.rocketmq.streams.script.function.impl.relation;
 
 import com.alibaba.fastjson.JSONObject;
-import java.lang.reflect.Method;
-import java.util.List;
-import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.script.ScriptComponent;
 import org.apache.rocketmq.streams.script.annotation.Function;
 import org.apache.rocketmq.streams.script.annotation.FunctionMethod;
-import org.apache.rocketmq.streams.script.annotation.FunctionParamter;
-import org.apache.rocketmq.streams.script.context.FunctionContext;
-import org.apache.rocketmq.streams.script.function.model.FunctionType;
 
 @Function
 public class AndFunction {
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/GroupScriptExpression.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/GroupScriptExpression.java
index 8d478f7..555d646 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/GroupScriptExpression.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/GroupScriptExpression.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.streams.script.operator.expression;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ICaseDependentParser.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ICaseDependentParser.java
index 0c9ffe1..b4c9e86 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ICaseDependentParser.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ICaseDependentParser.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 package org.apache.rocketmq.streams.script.operator.expression;
+
 import java.util.Set;
 import org.apache.rocketmq.streams.script.service.IScriptExpression;
-import org.apache.rocketmq.streams.script.service.IScriptParamter;
 
 public interface ICaseDependentParser {
     /**
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ScriptExpression.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ScriptExpression.java
index 349bc12..a54978f 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ScriptExpression.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ScriptExpression.java
@@ -17,13 +17,11 @@
 package org.apache.rocketmq.streams.script.operator.expression;
 
 import com.alibaba.fastjson.JSONObject;
-
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.cache.softreference.ICache;
@@ -31,9 +29,7 @@ import org.apache.rocketmq.streams.common.cache.softreference.impl.SoftReference
 import org.apache.rocketmq.streams.common.component.ComponentCreator;
 import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
-import org.apache.rocketmq.streams.common.context.IExpressionResultCache;
 import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.monitor.IMonitor;
 import org.apache.rocketmq.streams.common.optimization.HomologousVar;
 import org.apache.rocketmq.streams.common.utils.PrintUtil;
 import org.apache.rocketmq.streams.common.utils.ReflectUtil;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/FunctionScript.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/FunctionScript.java
index 039aba6..4363ee4 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/FunctionScript.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/impl/FunctionScript.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.streams.script.operator.impl;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -33,7 +32,6 @@ import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.interfaces.IBaseStreamOperator;
 import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
-import org.apache.rocketmq.streams.common.optimization.FilterResultCache;
 import org.apache.rocketmq.streams.common.topology.ChainStage;
 import org.apache.rocketmq.streams.common.topology.builder.IStageBuilder;
 import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
@@ -41,10 +39,8 @@ import org.apache.rocketmq.streams.common.topology.model.AbstractScript;
 import org.apache.rocketmq.streams.common.topology.stages.ScriptChainStage;
 import org.apache.rocketmq.streams.common.utils.CollectionUtil;
 import org.apache.rocketmq.streams.script.context.FunctionContext;
-import org.apache.rocketmq.streams.script.operator.expression.GroupScriptExpression;
-import org.apache.rocketmq.streams.script.operator.expression.ICaseDependentParser;
-import org.apache.rocketmq.streams.script.optimization.performance.IScriptOptimization;
 import org.apache.rocketmq.streams.script.operator.expression.ScriptExpression;
+import org.apache.rocketmq.streams.script.optimization.performance.IScriptOptimization;
 import org.apache.rocketmq.streams.script.parser.imp.FunctionParser;
 import org.apache.rocketmq.streams.script.service.IScriptExpression;
 import org.apache.rocketmq.streams.script.service.IScriptParamter;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/utils/ExpressionUtil.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/utils/ExpressionUtil.java
index 7b4e03e..7d976dc 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/utils/ExpressionUtil.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/utils/ExpressionUtil.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.streams.script.utils;
 
 import java.util.List;
-import org.apache.rocketmq.streams.script.annotation.Function;
 import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
 import org.apache.rocketmq.streams.script.service.IScriptExpression;
 import org.apache.rocketmq.streams.script.service.IScriptParamter;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/utils/FunctionUtils.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/utils/FunctionUtils.java
index 12a29d5..fa7ecef 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/utils/FunctionUtils.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/utils/FunctionUtils.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.streams.script.utils;
 
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -24,7 +23,6 @@ import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.context.IgnoreMessage;
 import org.apache.rocketmq.streams.common.datatype.DateDataType;
-import org.apache.rocketmq.streams.common.utils.ReflectUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 
 public class FunctionUtils {
diff --git a/rocketmq-streams-script/src/test/java/org/apache/rocketmq/streams/script/function/UDTFFunctionTest.java b/rocketmq-streams-script/src/test/java/org/apache/rocketmq/streams/script/function/UDTFFunctionTest.java
index d91496e..7efeda7 100644
--- a/rocketmq-streams-script/src/test/java/org/apache/rocketmq/streams/script/function/UDTFFunctionTest.java
+++ b/rocketmq-streams-script/src/test/java/org/apache/rocketmq/streams/script/function/UDTFFunctionTest.java
@@ -17,6 +17,8 @@
 package org.apache.rocketmq.streams.script.function;
 
 import com.alibaba.fastjson.JSONObject;
+import java.lang.reflect.Method;
+import java.util.List;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.script.ScriptComponent;
 import org.apache.rocketmq.streams.script.function.function.JavaObjectUDFFunction;
@@ -24,9 +26,6 @@ import org.apache.rocketmq.streams.script.function.function.Person;
 import org.apache.rocketmq.streams.script.function.model.FunctionType;
 import org.junit.Test;
 
-import java.lang.reflect.Method;
-import java.util.List;
-
 public class UDTFFunctionTest {
 
     @Test
diff --git a/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/AbstractState.java b/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/AbstractState.java
index 368f6fc..4d46754 100644
--- a/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/AbstractState.java
+++ b/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/AbstractState.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.ServiceLoader;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent;
 import org.apache.rocketmq.streams.state.backend.IStateBackend;
diff --git a/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/kv/rocksdb/RocksDBOperator.java b/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/kv/rocksdb/RocksDBOperator.java
index 87453f0..43cbbdd 100644
--- a/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/kv/rocksdb/RocksDBOperator.java
+++ b/rocketmq-streams-state/src/main/java/org/apache/rocketmq/streams/state/kv/rocksdb/RocksDBOperator.java
@@ -20,7 +20,6 @@ import java.io.File;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.streams.common.utils.FileUtil;
 import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugAnalysis.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugAnalysis.java
index 1639cb9..8b0a247 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugAnalysis.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugAnalysis.java
@@ -28,19 +28,11 @@ import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.streams.common.utils.DateUtil;
 import org.apache.rocketmq.streams.common.utils.FileUtil;
 import org.apache.rocketmq.streams.common.utils.JsonableUtil;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.common.utils.PrintUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
-import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.window.operator.impl.WindowOperator;
-import org.apache.rocketmq.streams.window.state.impl.WindowValue;
-import org.apache.rocketmq.streams.window.storage.WindowStorage;
-import org.apache.rocketmq.streams.window.storage.rocksdb.RocksdbStorage;
 import org.junit.Assert;
 
 import static junit.framework.TestCase.assertTrue;
-import static junit.framework.TestCase.format;
 
 public class DebugAnalysis {
     private String dir;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugWriter.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugWriter.java
index d9e20ad..6558bd8 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugWriter.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugWriter.java
@@ -26,14 +26,12 @@ import java.util.List;
 import java.util.Map;
 import org.apache.rocketmq.streams.common.component.ComponentCreator;
 import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.context.Message;
 import org.apache.rocketmq.streams.common.utils.DateUtil;
 import org.apache.rocketmq.streams.common.utils.FileUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.window.model.WindowCache;
 import org.apache.rocketmq.streams.window.model.WindowInstance;
 import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.window.state.WindowBaseValue;
 import org.apache.rocketmq.streams.window.state.impl.WindowValue;
 
 public class DebugWriter {
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/WindowDebug.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/WindowDebug.java
index 1b57989..2d642c0 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/WindowDebug.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/WindowDebug.java
@@ -17,8 +17,6 @@
 
 package org.apache.rocketmq.streams.window.debug;
 
-import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-
 public class WindowDebug {
 
     protected String sumFieldName;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
index 3720293..df20dad 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.streams.window.fire;
 
 import java.util.HashMap;
 import java.util.Map;
-
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
index 89ad616..4df71a5 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
@@ -16,8 +16,12 @@
  */
 package org.apache.rocketmq.streams.window.fire;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
@@ -29,12 +33,6 @@ import org.apache.rocketmq.streams.window.model.WindowCache;
 import org.apache.rocketmq.streams.window.model.WindowInstance;
 import org.apache.rocketmq.streams.window.operator.AbstractWindow;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
 public class SplitEventTimeManager {
     protected static final Log LOG = LogFactory.getLog(SplitEventTimeManager.class);
     protected static Map<String,Long> messageSplitId2MaxTime=new HashMap<>();
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/WindowFireManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/WindowFireManager.java
index f0c0620..a16779e 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/WindowFireManager.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/WindowFireManager.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.streams.window.fire;
 
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.rocketmq.streams.window.operator.AbstractWindow;
 
 public class WindowFireManager {
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java
index 3d67f0e..2803aba 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java
@@ -16,35 +16,32 @@
  */
 package org.apache.rocketmq.streams.window.model;
 
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
 import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.component.ComponentCreator;
-import org.apache.rocketmq.streams.common.context.Message;
 import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.context.Message;
 import org.apache.rocketmq.streams.common.topology.model.IWindow;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.window.debug.DebugWriter;
 import org.apache.rocketmq.streams.window.shuffle.ShuffleChannel;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-
 /**
  * 缓存数据,flush时,刷新完成数据落盘
  */
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
index 6346a61..57b39e1 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.streams.window.model;
 
-import java.awt.Window;
 import java.io.Serializable;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueManager.java
index aa2cccb..fdce661 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueManager.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueManager.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.rocketmq.streams.window.model.WindowInstance;
 import org.apache.rocketmq.streams.window.operator.AbstractWindow;
 import org.apache.rocketmq.streams.window.sqlcache.SQLCache;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
index f6560f1..bfb7176 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
@@ -24,12 +24,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
-import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.rocketmq.streams.common.context.MessageOffset;
 import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.common.utils.SQLUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.db.driver.DriverBuilder;
 import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
 import org.apache.rocketmq.streams.window.model.WindowInstance;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
index 9e51184..05d0fe8 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java
@@ -21,8 +21,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
-import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.window.model.WindowInstance;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
index c45b412..141df8c 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.streams.window.operator;
 
+import com.alibaba.fastjson.JSONObject;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -26,58 +27,48 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
-
-import com.alibaba.fastjson.JSONObject;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
-import org.apache.rocketmq.streams.common.configure.StreamsConfigure;
-import org.apache.rocketmq.streams.common.context.Context;
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.rocketmq.streams.common.topology.ChainStage.PiplineRecieverAfterCurrentNode;
-import org.apache.rocketmq.streams.common.topology.stages.udf.IReducer;
-import org.apache.rocketmq.streams.common.utils.Base64Utils;
-import org.apache.rocketmq.streams.common.utils.InstantiationUtil;
-import org.apache.rocketmq.streams.common.utils.TraceUtil;
-import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
-import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
-import org.apache.rocketmq.streams.script.ScriptComponent;
-import org.apache.rocketmq.streams.window.debug.DebugWriter;
-import org.apache.rocketmq.streams.window.fire.EventTimeManager;
-import org.apache.rocketmq.streams.window.model.FunctionExecutor;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
-import org.apache.rocketmq.streams.window.model.WindowCache;
-import org.apache.rocketmq.streams.window.offset.IWindowMaxValueManager;
-import org.apache.rocketmq.streams.window.offset.WindowMaxValueManager;
-import org.apache.rocketmq.streams.window.source.WindowFireSource;
-import org.apache.rocketmq.streams.window.state.impl.WindowValue;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.context.Message;
 import org.apache.rocketmq.streams.common.context.MessageHeader;
 import org.apache.rocketmq.streams.common.topology.ChainStage;
+import org.apache.rocketmq.streams.common.topology.ChainStage.PiplineRecieverAfterCurrentNode;
 import org.apache.rocketmq.streams.common.topology.builder.IStageBuilder;
 import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
-import org.apache.rocketmq.streams.common.topology.stages.WindowChainStage;
 import org.apache.rocketmq.streams.common.topology.model.IWindow;
+import org.apache.rocketmq.streams.common.topology.stages.WindowChainStage;
+import org.apache.rocketmq.streams.common.topology.stages.udf.IReducer;
+import org.apache.rocketmq.streams.common.utils.Base64Utils;
 import org.apache.rocketmq.streams.common.utils.CollectionUtil;
 import org.apache.rocketmq.streams.common.utils.DateUtil;
+import org.apache.rocketmq.streams.common.utils.InstantiationUtil;
 import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.script.parser.imp.FunctionParser;
+import org.apache.rocketmq.streams.common.utils.TraceUtil;
+import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
+import org.apache.rocketmq.streams.filter.builder.ExpressionBuilder;
+import org.apache.rocketmq.streams.script.operator.expression.ScriptExpression;
 import org.apache.rocketmq.streams.script.operator.impl.AggregationScript;
 import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
+import org.apache.rocketmq.streams.script.parser.imp.FunctionParser;
+import org.apache.rocketmq.streams.script.service.IAccumulator;
 import org.apache.rocketmq.streams.script.service.IScriptExpression;
 import org.apache.rocketmq.streams.script.service.IScriptParamter;
-import org.apache.rocketmq.streams.script.operator.expression.ScriptExpression;
-import org.apache.rocketmq.streams.script.service.IAccumulator;
+import org.apache.rocketmq.streams.window.debug.DebugWriter;
+import org.apache.rocketmq.streams.window.fire.EventTimeManager;
+import org.apache.rocketmq.streams.window.model.FunctionExecutor;
+import org.apache.rocketmq.streams.window.model.WindowCache;
+import org.apache.rocketmq.streams.window.model.WindowInstance;
+import org.apache.rocketmq.streams.window.offset.IWindowMaxValueManager;
+import org.apache.rocketmq.streams.window.offset.WindowMaxValueManager;
+import org.apache.rocketmq.streams.window.source.WindowFireSource;
 import org.apache.rocketmq.streams.window.sqlcache.SQLCache;
+import org.apache.rocketmq.streams.window.state.impl.WindowValue;
 import org.apache.rocketmq.streams.window.storage.WindowStorage;
-import org.python.antlr.ast.Str;
-
-import static java.util.concurrent.CompletableFuture.supplyAsync;
 
 /**
  * window definition in the pipeline, created by user's configure in WindowChainStage
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/ShuffleOverWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/ShuffleOverWindow.java
index 1bfa639..90ee4c1 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/ShuffleOverWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/ShuffleOverWindow.java
@@ -16,17 +16,13 @@
  */
 package org.apache.rocketmq.streams.window.operator.impl;
 
-import com.alibaba.fastjson.JSONObject;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
 import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
 import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
-import org.apache.rocketmq.streams.window.operator.AbstractWindow;
 import org.apache.rocketmq.streams.window.state.impl.WindowValue;
 
 public class ShuffleOverWindow extends WindowOperator implements IAfterConfigurableRefreshListener {
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
index 0387f57..391a6d1 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java
@@ -16,11 +16,20 @@
  */
 package org.apache.rocketmq.streams.window.operator.impl;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.context.MessageOffset;
 import org.apache.rocketmq.streams.common.utils.CollectionUtil;
-import org.apache.rocketmq.streams.common.utils.DateUtil;
 import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.db.driver.batchloader.IRowOperator;
@@ -36,18 +45,6 @@ import org.apache.rocketmq.streams.window.storage.IWindowStorage;
 import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;
 import org.apache.rocketmq.streams.window.storage.WindowStorage.WindowBaseValueIterator;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
 public class WindowOperator extends AbstractShuffleWindow {
 
     private static final String ORDER_BY_SPLIT_NUM="_order_by_split_num_";//key=_order;queueid,windowinstanceid,partitionNum
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/DBOperator.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/DBOperator.java
index b054583..5bb1ead 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/DBOperator.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/DBOperator.java
@@ -18,8 +18,6 @@ package org.apache.rocketmq.streams.window.operator.join;
 
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.parser.ParserConfig;
-import com.alibaba.fastjson.util.TypeUtils;
 import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Date;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
index f75fcea..89aee20 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.streams.common.component.ComponentCreator;
-import org.apache.rocketmq.streams.common.utils.TraceUtil;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.Context;
 import org.apache.rocketmq.streams.common.context.IMessage;
@@ -35,6 +34,7 @@ import org.apache.rocketmq.streams.common.context.MessageHeader;
 import org.apache.rocketmq.streams.common.utils.DateUtil;
 import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.rocketmq.streams.common.utils.TraceUtil;
 import org.apache.rocketmq.streams.dim.model.AbstractDim;
 import org.apache.rocketmq.streams.window.model.WindowCache;
 import org.apache.rocketmq.streams.window.model.WindowInstance;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index afc4920..cad2c3b 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -18,14 +18,19 @@ package org.apache.rocketmq.streams.window.shuffle;
 
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
-import org.apache.commons.lang3.StringUtils;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
 import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
@@ -36,34 +41,28 @@ import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
 import org.apache.rocketmq.streams.common.checkpoint.CheckPointState;
 import org.apache.rocketmq.streams.common.component.ComponentCreator;
 import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.common.context.AbstractContext;
+import org.apache.rocketmq.streams.common.context.IMessage;
+import org.apache.rocketmq.streams.common.context.Message;
 import org.apache.rocketmq.streams.common.context.MessageOffset;
 import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
 import org.apache.rocketmq.streams.common.topology.ChainPipeline;
 import org.apache.rocketmq.streams.common.topology.model.Pipeline;
 import org.apache.rocketmq.streams.common.utils.CollectionUtil;
 import org.apache.rocketmq.streams.common.utils.DateUtil;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.common.utils.TraceUtil;
 import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
 import org.apache.rocketmq.streams.window.debug.DebugWriter;
+import org.apache.rocketmq.streams.window.model.WindowCache;
+import org.apache.rocketmq.streams.window.model.WindowInstance;
 import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
 import org.apache.rocketmq.streams.window.operator.AbstractWindow;
-import org.apache.rocketmq.streams.common.context.AbstractContext;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.context.Message;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-import org.apache.rocketmq.streams.common.utils.StringUtil;
-import org.apache.rocketmq.streams.window.model.WindowInstance;
-import org.apache.rocketmq.streams.window.model.WindowCache;
-import org.apache.rocketmq.streams.window.operator.impl.WindowOperator;
 import org.apache.rocketmq.streams.window.operator.impl.WindowOperator.WindowRowOperator;
 import org.apache.rocketmq.streams.window.sqlcache.impl.SQLElement;
 import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;
 
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
 import static org.apache.rocketmq.streams.window.model.WindowCache.ORIGIN_MESSAGE_TRACE_ID;
 
 /**
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/ISQLElement.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/ISQLElement.java
index 2f335b9..7117e2e 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/ISQLElement.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/ISQLElement.java
@@ -17,8 +17,6 @@
 
 package org.apache.rocketmq.streams.window.sqlcache;
 
-import java.util.List;
-
 public interface ISQLElement {
 
     boolean isWindowInstanceSQL();
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
index ef60fe3..9f5cf70 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
 import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache;
 import org.apache.rocketmq.streams.db.driver.DriverBuilder;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
index 3ba1876..a738c48 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.streams.window.state.impl;
 
-import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import java.io.Serializable;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/IWindowStorage.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/IWindowStorage.java
index 216ed71..13d061e 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/IWindowStorage.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/IWindowStorage.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.streams.window.storage;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.db.driver.batchloader.IRowOperator;
 import org.apache.rocketmq.streams.window.model.WindowInstance;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/WindowStorage.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/WindowStorage.java
index 712c028..874d2bf 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/WindowStorage.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/WindowStorage.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/db/DBStorage.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/db/DBStorage.java
index dfdf261..f47d309 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/db/DBStorage.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/db/DBStorage.java
@@ -32,7 +32,6 @@ import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
 import org.apache.rocketmq.streams.window.model.WindowInstance;
 import org.apache.rocketmq.streams.window.state.WindowBaseValue;
-import org.apache.rocketmq.streams.window.state.impl.WindowValue;
 import org.apache.rocketmq.streams.window.storage.AbstractWindowStorage;
 import org.apache.rocketmq.streams.window.storage.IRemoteStorage;
 import org.apache.rocketmq.streams.window.storage.WindowStorage.WindowBaseValueIterator;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/file/FileStorage.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/file/FileStorage.java
index bba64a8..ffec393 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/file/FileStorage.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/file/FileStorage.java
@@ -20,12 +20,10 @@ import com.alibaba.fastjson.JSONArray;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.utils.FileUtil;
 import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/rocksdb/RocksdbStorage.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/rocksdb/RocksdbStorage.java
index 9739f93..528894c 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/rocksdb/RocksdbStorage.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/storage/rocksdb/RocksdbStorage.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.streams.window.storage.rocksdb;
 
 import com.alibaba.fastjson.JSONArray;
-import java.io.File;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -31,22 +30,18 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.streams.common.channel.split.ISplit;
 import org.apache.rocketmq.streams.common.utils.CollectionUtil;
-import org.apache.rocketmq.streams.common.utils.FileUtil;
 import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.common.utils.ReflectUtil;
-import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.state.kv.rocksdb.RocksDBOperator;
 import org.apache.rocketmq.streams.window.model.WindowInstance;
 import org.apache.rocketmq.streams.window.state.WindowBaseValue;
 import org.apache.rocketmq.streams.window.storage.AbstractWindowStorage;
 import org.apache.rocketmq.streams.window.storage.WindowStorage.WindowBaseValueIterator;
-import org.rocksdb.Options;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
-import org.rocksdb.TtlDB;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;