You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/11/24 13:20:42 UTC

[skywalking] branch master updated: Add `@BanyanDB.TimestampColumn` to identify `which column in Record` is providing the timestamp(milliseconds) for BanyanDB (#10019)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a75de5ece Add `@BanyanDB.TimestampColumn` to identify `which column in Record` is providing the timestamp(milliseconds) for BanyanDB (#10019)
6a75de5ece is described below

commit 6a75de5ece9cf8f20d4fc2b06bb31306ff991008
Author: Wan Kai <wa...@foxmail.com>
AuthorDate: Thu Nov 24 21:20:36 2022 +0800

    Add `@BanyanDB.TimestampColumn` to identify `which column in Record` is providing the timestamp(milliseconds) for BanyanDB (#10019)
---
 docs/en/changes/changes.md                         |  4 +++
 .../listener/DatabaseSlowStatementBuilder.java     |  4 +++
 .../trace/parser/listener/SampledTraceBuilder.java |  5 ++-
 .../listener/vservice/VirtualCacheProcessor.java   |  1 +
 .../vservice/VirtualDatabaseProcessor.java         |  1 +
 .../analyzer/dsl/spec/extractor/ExtractorSpec.java |  1 +
 .../oap/server/core/alarm/AlarmRecord.java         |  1 +
 .../manual/cache/CacheSlowAccessDispatcher.java    |  2 ++
 .../manual/cache/TopNCacheReadCommand.java         |  4 +++
 .../manual/cache/TopNCacheWriteCommand.java        |  4 +++
 .../database/DatabaseStatementDispatcher.java      |  1 +
 .../manual/database/TopNDatabaseStatement.java     |  4 +++
 .../server/core/analysis/manual/log/LogRecord.java |  2 ++
 .../analysis/manual/segment/SegmentRecord.java     |  1 +
 .../manual/spanattach/SpanAttachedEventRecord.java |  8 +++++
 .../manual/trace/SampledSlowTraceRecord.java       | 10 +++++-
 .../manual/trace/SampledStatus4xxTraceRecord.java  |  8 +++++
 .../manual/trace/SampledStatus5xxTraceRecord.java  |  8 +++++
 .../oap/server/core/analysis/topn/TopN.java        |  5 +++
 .../manual/errorlog/BrowserErrorLogRecord.java     |  1 +
 .../ebpf/storage/EBPFProfilingDataRecord.java      |  3 +-
 .../ebpf/storage/EBPFProfilingTaskRecord.java      |  1 +
 .../core/profiling/trace/ProfileTaskLogRecord.java |  8 +++++
 .../core/profiling/trace/ProfileTaskRecord.java    |  1 +
 .../trace/ProfileThreadSnapshotRecord.java         |  1 +
 .../oap/server/core/source/CacheSlowAccess.java    |  3 ++
 .../server/core/source/DatabaseSlowStatement.java  |  3 ++
 .../server/core/storage/annotation/BanyanDB.java   | 12 +++++++
 .../core/storage/model/BanyanDBModelExtension.java | 40 ++++++++++++++++++++++
 .../oap/server/core/storage/model/Model.java       |  5 ++-
 .../server/core/storage/model/StorageModels.java   | 10 +++++-
 .../oap/server/core/zipkin/ZipkinSpanRecord.java   |  3 +-
 .../handler/ProfileTaskServiceHandler.java         |  5 +--
 .../SpanAttachedEventReportServiceHandler.java     |  9 ++---
 .../storage/plugin/banyandb/BanyanDBConverter.java |  3 ++
 .../plugin/banyandb/BanyanDBNoneStreamDAO.java     | 10 +++---
 .../plugin/banyandb/BanyanDBZipkinQueryDAO.java    | 36 +++++++++----------
 .../storage/plugin/banyandb/MetadataRegistry.java  | 10 ++++++
 .../plugin/banyandb/stream/BanyanDBRecordDAO.java  | 10 +++---
 .../elasticsearch/base/TimeSeriesUtilsTest.java    | 10 ++++--
 40 files changed, 214 insertions(+), 44 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index e865f06135..412a766e73 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -119,6 +119,10 @@
 * Support dynamic config the sampling strategy in network profiling.
 * Zipkin module support BanyanDB storage.
 * Zipkin traces query API, sort the result set by start time by default.
+* [**Breaking Change**] Add `@BanyanDB.TimestampColumn` to identify `which column in Record` is providing the timestamp(milliseconds) for BanyanDB, 
+  since BanyanDB stream requires a timestamp in milliseconds.
+  For SQL-Database: add new column `timestamp` for tables `profile_task_log/top_n_database_statement`,
+  requires altering this column or removing these tables before OAP starts, if bump up from previous releases.
 
 #### UI
 
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/DatabaseSlowStatementBuilder.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/DatabaseSlowStatementBuilder.java
index 6a9e1e9d0a..47b59ab721 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/DatabaseSlowStatementBuilder.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/DatabaseSlowStatementBuilder.java
@@ -51,6 +51,9 @@ public class DatabaseSlowStatementBuilder {
     @Getter
     @Setter
     private long timeBucket;
+    @Getter
+    @Setter
+    private long timestamp;
 
     public void prepare() {
         this.serviceName = namingControl.formatServiceName(serviceName);
@@ -64,6 +67,7 @@ public class DatabaseSlowStatementBuilder {
         dbSlowStat.setStatement(statement);
         dbSlowStat.setLatency(latency);
         dbSlowStat.setTimeBucket(timeBucket);
+        dbSlowStat.setTimestamp(timestamp);
         return dbSlowStat;
     }
 }
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SampledTraceBuilder.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SampledTraceBuilder.java
index 799155bba6..8a64d84eb9 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SampledTraceBuilder.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SampledTraceBuilder.java
@@ -107,6 +107,7 @@ public class SampledTraceBuilder {
                 slowTraceRecord.setUri(uri);
                 slowTraceRecord.setLatency(latency);
                 slowTraceRecord.setTimeBucket(TimeBucket.getTimeBucket(timestamp, DownSampling.Second));
+                slowTraceRecord.setTimestamp(timestamp);
                 return slowTraceRecord;
             case STATUS_4XX:
                 final SampledStatus4xxTraceRecord status4xxTraceRecord = new SampledStatus4xxTraceRecord();
@@ -118,6 +119,7 @@ public class SampledTraceBuilder {
                 status4xxTraceRecord.setUri(uri);
                 status4xxTraceRecord.setLatency(latency);
                 status4xxTraceRecord.setTimeBucket(TimeBucket.getTimeBucket(timestamp, DownSampling.Second));
+                status4xxTraceRecord.setTimestamp(timestamp);
                 return status4xxTraceRecord;
             case STATUS_5XX:
                 final SampledStatus5xxTraceRecord status5xxTraceRecord = new SampledStatus5xxTraceRecord();
@@ -129,6 +131,7 @@ public class SampledTraceBuilder {
                 status5xxTraceRecord.setUri(uri);
                 status5xxTraceRecord.setLatency(latency);
                 status5xxTraceRecord.setTimeBucket(TimeBucket.getTimeBucket(timestamp, DownSampling.Second));
+                status5xxTraceRecord.setTimestamp(timestamp);
                 return status5xxTraceRecord;
             default:
                 throw new IllegalArgumentException("unknown reason: " + this.reason);
@@ -156,4 +159,4 @@ public class SampledTraceBuilder {
         STATUS_4XX,
         STATUS_5XX
     }
-}
\ No newline at end of file
+}
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/vservice/VirtualCacheProcessor.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/vservice/VirtualCacheProcessor.java
index de67db00c2..ccf387a8fd 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/vservice/VirtualCacheProcessor.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/vservice/VirtualCacheProcessor.java
@@ -89,6 +89,7 @@ public class VirtualCacheProcessor implements VirtualServiceProcessor {
             slowAccess.setCommand(tags.get(SpanTags.CACHE_CMD));
             slowAccess.setKey(tags.get(SpanTags.CACHE_KEY));
             slowAccess.setTimeBucket(TimeBucket.getRecordTimeBucket(span.getStartTime()));
+            slowAccess.setTimestamp(span.getStartTime());
             slowAccess.setOperation(op);
             sourceList.add(slowAccess);
         }
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/vservice/VirtualDatabaseProcessor.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/vservice/VirtualDatabaseProcessor.java
index 103303c02e..9e8c7df2cd 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/vservice/VirtualDatabaseProcessor.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/vservice/VirtualDatabaseProcessor.java
@@ -72,6 +72,7 @@ public class VirtualDatabaseProcessor implements VirtualServiceProcessor {
             dbSlowStat.setStatement(statement);
             dbSlowStat.setLatency(latency);
             dbSlowStat.setTimeBucket(TimeBucket.getRecordTimeBucket(span.getStartTime()));
+            dbSlowStat.setTimestamp(span.getStartTime());
             recordList.add(dbSlowStat);
         });
     }
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java
index 96c0d3fefc..d535492ef2 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java
@@ -289,6 +289,7 @@ public class ExtractorSpec extends AbstractSpec {
 
         long timeBucketForDB = TimeBucket.getTimeBucket(log.getTimestamp(), DownSampling.Second);
         builder.setTimeBucket(timeBucketForDB);
+        builder.setTimestamp(log.getTimestamp());
 
         String entityId = serviceMeta.getEntityId();
         builder.prepare();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
index 84fe19d8cd..2a666b15ac 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
@@ -43,6 +43,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.AL
 @ScopeDeclaration(id = ALARM, name = "Alarm")
 @Stream(name = AlarmRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ALARM, builder = AlarmRecord.Builder.class, processor = RecordStreamProcessor.class)
 @SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = AlarmRecord.ADDITIONAL_TAG_TABLE, parentColumn = TIME_BUCKET)
+@BanyanDB.TimestampColumn(AlarmRecord.START_TIME)
 public class AlarmRecord extends Record {
 
     public static final String INDEX_NAME = "alarm_record";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/CacheSlowAccessDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/CacheSlowAccessDispatcher.java
index f72b6da884..9db95006a9 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/CacheSlowAccessDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/CacheSlowAccessDispatcher.java
@@ -36,6 +36,7 @@ public class CacheSlowAccessDispatcher implements SourceDispatcher<CacheSlowAcce
             readCommand.setTraceId(source.getTraceId());
             readCommand.setEntityId(source.getCacheServiceId());
             readCommand.setTimeBucket(source.getTimeBucket());
+            readCommand.setTimestamp(source.getTimestamp());
             TopNStreamProcessor.getInstance().in(readCommand);
         } else if (source.getOperation() == VirtualCacheOperation.Write) {
             TopNCacheWriteCommand writeCommand = new TopNCacheWriteCommand();
@@ -45,6 +46,7 @@ public class CacheSlowAccessDispatcher implements SourceDispatcher<CacheSlowAcce
             writeCommand.setTraceId(source.getTraceId());
             writeCommand.setEntityId(source.getCacheServiceId());
             writeCommand.setTimeBucket(source.getTimeBucket());
+            writeCommand.setTimestamp(source.getTimestamp());
             TopNStreamProcessor.getInstance().in(writeCommand);
         }
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/TopNCacheReadCommand.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/TopNCacheReadCommand.java
index 66eeb7efb3..c3e60ec131 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/TopNCacheReadCommand.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/TopNCacheReadCommand.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.Stream;
 import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
 import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
 import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
@@ -34,6 +35,7 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
  * Database TopN statement, including Database SQL statement, mongoDB and Redis commands.
  */
 @Stream(name = TopNCacheReadCommand.INDEX_NAME, scopeId = DefaultScopeDefine.CACHE_SLOW_ACCESS, builder = TopNCacheReadCommand.Builder.class, processor = TopNStreamProcessor.class)
+@BanyanDB.TimestampColumn(TopN.TIMESTAMP)
 public class TopNCacheReadCommand extends TopN {
     public static final String INDEX_NAME = "top_n_cache_read_command";
 
@@ -73,6 +75,7 @@ public class TopNCacheReadCommand extends TopN {
             statement.setLatency(((Number) converter.get(LATENCY)).longValue());
             statement.setEntityId((String) converter.get(ENTITY_ID));
             statement.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+            statement.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
             return statement;
         }
 
@@ -83,6 +86,7 @@ public class TopNCacheReadCommand extends TopN {
             converter.accept(LATENCY, storageData.getLatency());
             converter.accept(ENTITY_ID, storageData.getEntityId());
             converter.accept(TIME_BUCKET, storageData.getTimeBucket());
+            converter.accept(TIMESTAMP, storageData.getTimestamp());
         }
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/TopNCacheWriteCommand.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/TopNCacheWriteCommand.java
index 7c061e9a09..1a7b9759c5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/TopNCacheWriteCommand.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/TopNCacheWriteCommand.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.core.analysis.Stream;
 import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
 import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
 import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
@@ -35,6 +36,7 @@ import java.util.Objects;
  * Database TopN statement, including Database SQL statement, mongoDB and Redis commands.
  */
 @Stream(name = TopNCacheWriteCommand.INDEX_NAME, scopeId = DefaultScopeDefine.CACHE_SLOW_ACCESS, builder = TopNCacheWriteCommand.Builder.class, processor = TopNStreamProcessor.class)
+@BanyanDB.TimestampColumn(TopN.TIMESTAMP)
 public class TopNCacheWriteCommand extends TopN {
     public static final String INDEX_NAME = "top_n_cache_write_command";
 
@@ -74,6 +76,7 @@ public class TopNCacheWriteCommand extends TopN {
             statement.setLatency(((Number) converter.get(LATENCY)).longValue());
             statement.setEntityId((String) converter.get(ENTITY_ID));
             statement.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+            statement.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
             return statement;
         }
 
@@ -84,6 +87,7 @@ public class TopNCacheWriteCommand extends TopN {
             converter.accept(LATENCY, storageData.getLatency());
             converter.accept(ENTITY_ID, storageData.getEntityId());
             converter.accept(TIME_BUCKET, storageData.getTimeBucket());
+            converter.accept(TIMESTAMP, storageData.getTimestamp());
         }
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
index 2af4820334..5eb38a9146 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
@@ -33,6 +33,7 @@ public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlo
         statement.setStatement(source.getStatement());
         statement.setTimeBucket(source.getTimeBucket());
         statement.setTraceId(source.getTraceId());
+        statement.setTimestamp(source.getTimestamp());
 
         TopNStreamProcessor.getInstance().in(statement);
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
index d158455242..28cc05b0a7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.Stream;
 import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
 import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
 import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
@@ -34,6 +35,7 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
  * Database TopN statement, including Database SQL statement, mongoDB and Redis commands.
  */
 @Stream(name = TopNDatabaseStatement.INDEX_NAME, scopeId = DefaultScopeDefine.DATABASE_SLOW_STATEMENT, builder = TopNDatabaseStatement.Builder.class, processor = TopNStreamProcessor.class)
+@BanyanDB.TimestampColumn(TopN.TIMESTAMP)
 public class TopNDatabaseStatement extends TopN {
     public static final String INDEX_NAME = "top_n_database_statement";
 
@@ -73,6 +75,7 @@ public class TopNDatabaseStatement extends TopN {
             statement.setLatency(((Number) converter.get(LATENCY)).longValue());
             statement.setEntityId((String) converter.get(ENTITY_ID));
             statement.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+            statement.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
             return statement;
         }
 
@@ -83,6 +86,7 @@ public class TopNDatabaseStatement extends TopN {
             converter.accept(LATENCY, storageData.getLatency());
             converter.accept(ENTITY_ID, storageData.getEntityId());
             converter.accept(TIME_BUCKET, storageData.getTimeBucket());
+            converter.accept(TIMESTAMP, storageData.getTimestamp());
         }
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/LogRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/LogRecord.java
index dfdc34143a..352e012cd6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/LogRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/LogRecord.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.analysis.Stream;
 import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
 import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
 import org.apache.skywalking.oap.server.core.storage.ShardingAlgorithm;
+import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase;
 import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
@@ -36,6 +37,7 @@ import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_
 @Stream(name = LogRecord.INDEX_NAME, scopeId = DefaultScopeDefine.LOG, builder = LogRecord.Builder.class, processor = RecordStreamProcessor.class)
 @SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = AbstractLogRecord.ADDITIONAL_TAG_TABLE, parentColumn = TIME_BUCKET)
 @SQLDatabase.Sharding(shardingAlgorithm = ShardingAlgorithm.TIME_SEC_RANGE_SHARDING_ALGORITHM, dataSourceShardingColumn = SERVICE_ID, tableShardingColumn = TIME_BUCKET)
+@BanyanDB.TimestampColumn(AbstractLogRecord.TIMESTAMP)
 public class LogRecord extends AbstractLogRecord {
 
     public static final String INDEX_NAME = "log";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
index 5e173b8ccf..650ee09b28 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
@@ -42,6 +42,7 @@ import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_
 @Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, builder = SegmentRecord.Builder.class, processor = RecordStreamProcessor.class)
 @SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = SegmentRecord.ADDITIONAL_TAG_TABLE, parentColumn = TIME_BUCKET)
 @SQLDatabase.Sharding(shardingAlgorithm = ShardingAlgorithm.TIME_SEC_RANGE_SHARDING_ALGORITHM, dataSourceShardingColumn = SERVICE_ID, tableShardingColumn = TIME_BUCKET)
+@BanyanDB.TimestampColumn(SegmentRecord.START_TIME)
 public class SegmentRecord extends Record {
 
     public static final String INDEX_NAME = "segment";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/spanattach/SpanAttachedEventRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/spanattach/SpanAttachedEventRecord.java
index 9a04a7a674..e5b0598cf8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/spanattach/SpanAttachedEventRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/spanattach/SpanAttachedEventRecord.java
@@ -39,6 +39,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SP
 @Getter
 @ScopeDeclaration(id = SPAN_ATTACHED_EVENT, name = "SpanAttachedEvent")
 @Stream(name = SpanAttachedEventRecord.INDEX_NAME, scopeId = SPAN_ATTACHED_EVENT, builder = SpanAttachedEventRecord.Builder.class, processor = RecordStreamProcessor.class)
+@BanyanDB.TimestampColumn(SpanAttachedEventRecord.TIMESTAMP)
 public class SpanAttachedEventRecord extends Record {
 
     public static final String INDEX_NAME = "span_attached_event_record";
@@ -52,6 +53,7 @@ public class SpanAttachedEventRecord extends Record {
     public static final String TRACE_SEGMENT_ID = "trace_segment_id";
     public static final String TRACE_SPAN_ID = "trace_span_id";
     public static final String DATA_BINARY = "data_binary";
+    public static final String TIMESTAMP = "timestamp";
 
     @Column(columnName = START_TIME_SECOND)
     private long startTimeSecond;
@@ -74,6 +76,10 @@ public class SpanAttachedEventRecord extends Record {
     private String traceSpanId;
     @Column(columnName = DATA_BINARY, storageOnly = true)
     private byte[] dataBinary;
+    @Setter
+    @Getter
+    @Column(columnName = TIMESTAMP)
+    private long timestamp;
 
     @Override
     public String id() {
@@ -94,6 +100,7 @@ public class SpanAttachedEventRecord extends Record {
             record.setTraceSegmentId((String) converter.get(TRACE_SEGMENT_ID));
             record.setTraceSpanId((String) converter.get(TRACE_SPAN_ID));
             record.setDataBinary(converter.getBytes(DATA_BINARY));
+            record.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
             return record;
         }
 
@@ -109,6 +116,7 @@ public class SpanAttachedEventRecord extends Record {
             converter.accept(TRACE_SEGMENT_ID, entity.getTraceSegmentId());
             converter.accept(TRACE_SPAN_ID, entity.getTraceSpanId());
             converter.accept(DATA_BINARY, entity.getDataBinary());
+            converter.accept(TIMESTAMP, entity.getTimestamp());
         }
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledSlowTraceRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledSlowTraceRecord.java
index abe2d321ac..7018621683 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledSlowTraceRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledSlowTraceRecord.java
@@ -38,6 +38,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SA
 @Getter
 @ScopeDeclaration(id = SAMPLED_SLOW_TRACE, name = "SampledTraceSlowRecord")
 @Stream(name = SampledSlowTraceRecord.INDEX_NAME, scopeId = SAMPLED_SLOW_TRACE, builder = SampledSlowTraceRecord.Builder.class, processor = RecordStreamProcessor.class)
+@BanyanDB.TimestampColumn(SampledSlowTraceRecord.TIMESTAMP)
 public class SampledSlowTraceRecord extends Record {
 
     public static final String INDEX_NAME = "sampled_slow_trace_record";
@@ -46,6 +47,7 @@ public class SampledSlowTraceRecord extends Record {
     public static final String TRACE_ID = TopN.TRACE_ID;
     public static final String URI = TopN.STATEMENT;
     public static final String LATENCY = "latency";
+    public static final String TIMESTAMP = "timestamp";
 
     @Column(columnName = SCOPE)
     private int scope;
@@ -58,6 +60,10 @@ public class SampledSlowTraceRecord extends Record {
     private String uri;
     @Column(columnName = LATENCY, dataType = Column.ValueDataType.SAMPLED_RECORD)
     private long latency;
+    @Setter
+    @Getter
+    @Column(columnName = TIMESTAMP)
+    private long timestamp;
 
     @Override
     public String id() {
@@ -75,6 +81,7 @@ public class SampledSlowTraceRecord extends Record {
             record.setUri((String) converter.get(URI));
             record.setLatency(((Number) converter.get(LATENCY)).longValue());
             record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+            record.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
             return record;
         }
 
@@ -86,6 +93,7 @@ public class SampledSlowTraceRecord extends Record {
             converter.accept(URI, entity.getUri());
             converter.accept(LATENCY, entity.getLatency());
             converter.accept(TIME_BUCKET, entity.getTimeBucket());
+            converter.accept(TIMESTAMP, entity.getTimestamp());
         }
     }
-}
\ No newline at end of file
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledStatus4xxTraceRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledStatus4xxTraceRecord.java
index a84dd11bf8..9253ae3037 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledStatus4xxTraceRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledStatus4xxTraceRecord.java
@@ -38,6 +38,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SA
 @Getter
 @ScopeDeclaration(id = SAMPLED_STATUS_4XX_TRACE, name = "SampledStatus4xxTraceRecord")
 @Stream(name = SampledStatus4xxTraceRecord.INDEX_NAME, scopeId = SAMPLED_STATUS_4XX_TRACE, builder = SampledStatus4xxTraceRecord.Builder.class, processor = RecordStreamProcessor.class)
+@BanyanDB.TimestampColumn(SampledStatus4xxTraceRecord.TIMESTAMP)
 public class SampledStatus4xxTraceRecord extends Record {
 
     public static final String INDEX_NAME = "sampled_status_4xx_trace_record";
@@ -47,6 +48,7 @@ public class SampledStatus4xxTraceRecord extends Record {
     public static final String TRACE_ID = TopN.TRACE_ID;
     public static final String URI = TopN.STATEMENT;
     public static final String LATENCY = "latency";
+    public static final String TIMESTAMP = "timestamp";
 
     @Column(columnName = SCOPE)
     private int scope;
@@ -59,6 +61,10 @@ public class SampledStatus4xxTraceRecord extends Record {
     private String uri;
     @Column(columnName = LATENCY, dataType = Column.ValueDataType.SAMPLED_RECORD)
     private long latency;
+    @Setter
+    @Getter
+    @Column(columnName = TIMESTAMP)
+    private long timestamp;
 
     @Override
     public String id() {
@@ -76,6 +82,7 @@ public class SampledStatus4xxTraceRecord extends Record {
             record.setUri((String) converter.get(URI));
             record.setLatency(((Number) converter.get(LATENCY)).longValue());
             record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+            record.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
             return record;
         }
 
@@ -87,6 +94,7 @@ public class SampledStatus4xxTraceRecord extends Record {
             converter.accept(URI, entity.getUri());
             converter.accept(LATENCY, entity.getLatency());
             converter.accept(TIME_BUCKET, entity.getTimeBucket());
+            converter.accept(TIMESTAMP, entity.getTimestamp());
         }
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledStatus5xxTraceRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledStatus5xxTraceRecord.java
index 9f22153a08..9088bf336b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledStatus5xxTraceRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledStatus5xxTraceRecord.java
@@ -38,6 +38,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SA
 @Getter
 @ScopeDeclaration(id = SAMPLED_STATUS_5XX_TRACE, name = "SampledStatus5xxTraceRecord")
 @Stream(name = SampledStatus5xxTraceRecord.INDEX_NAME, scopeId = SAMPLED_STATUS_5XX_TRACE, builder = SampledStatus5xxTraceRecord.Builder.class, processor = RecordStreamProcessor.class)
+@BanyanDB.TimestampColumn(SampledStatus5xxTraceRecord.TIMESTAMP)
 public class SampledStatus5xxTraceRecord extends Record {
 
     public static final String INDEX_NAME = "sampled_status_5xx_trace_record";
@@ -47,6 +48,7 @@ public class SampledStatus5xxTraceRecord extends Record {
     public static final String TRACE_ID = TopN.TRACE_ID;
     public static final String URI = TopN.STATEMENT;
     public static final String LATENCY = "latency";
+    public static final String TIMESTAMP = "timestamp";
 
     @Column(columnName = SCOPE)
     private int scope;
@@ -59,6 +61,10 @@ public class SampledStatus5xxTraceRecord extends Record {
     private String uri;
     @Column(columnName = LATENCY, dataType = Column.ValueDataType.SAMPLED_RECORD)
     private long latency;
+    @Setter
+    @Getter
+    @Column(columnName = TIMESTAMP)
+    private long timestamp;
 
     @Override
     public String id() {
@@ -76,6 +82,7 @@ public class SampledStatus5xxTraceRecord extends Record {
             record.setUri((String) converter.get(URI));
             record.setLatency(((Number) converter.get(LATENCY)).longValue());
             record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+            record.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
             return record;
         }
 
@@ -87,6 +94,7 @@ public class SampledStatus5xxTraceRecord extends Record {
             converter.accept(URI, entity.getUri());
             converter.accept(LATENCY, entity.getLatency());
             converter.accept(TIME_BUCKET, entity.getTimeBucket());
+            converter.accept(TIMESTAMP, entity.getTimestamp());
         }
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java
index a10dc44126..88a070ad7c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java
@@ -33,6 +33,7 @@ public abstract class TopN extends Record implements ComparableStorageData {
     public static final String LATENCY = "latency";
     public static final String TRACE_ID = "trace_id";
     public static final String ENTITY_ID = "entity_id";
+    public static final String TIMESTAMP = "timestamp";
     
     @Getter
     @Setter
@@ -47,6 +48,10 @@ public abstract class TopN extends Record implements ComparableStorageData {
     @Column(columnName = ENTITY_ID, length = 512)
     @BanyanDB.ShardingKey(index = 0)
     private String entityId;
+    @Getter
+    @Setter
+    @Column(columnName = TIMESTAMP)
+    private long timestamp;
 
     @Override
     public int compareTo(Object o) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/manual/errorlog/BrowserErrorLogRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/manual/errorlog/BrowserErrorLogRecord.java
index 782e8d86ca..ffc73b93b3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/manual/errorlog/BrowserErrorLogRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/manual/errorlog/BrowserErrorLogRecord.java
@@ -38,6 +38,7 @@ import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_
 @SuperDataset
 @Stream(name = BrowserErrorLogRecord.INDEX_NAME, scopeId = DefaultScopeDefine.BROWSER_ERROR_LOG, builder = BrowserErrorLogRecord.Builder.class, processor = RecordStreamProcessor.class)
 @SQLDatabase.Sharding(shardingAlgorithm = ShardingAlgorithm.TIME_SEC_RANGE_SHARDING_ALGORITHM, dataSourceShardingColumn = SERVICE_ID, tableShardingColumn = TIME_BUCKET)
+@BanyanDB.TimestampColumn(BrowserErrorLogRecord.TIMESTAMP)
 public class BrowserErrorLogRecord extends Record {
     public static final String INDEX_NAME = "browser_error_log";
     public static final String UNIQUE_ID = "unique_id";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingDataRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingDataRecord.java
index bb30d99e0a..d57e17daa7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingDataRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingDataRecord.java
@@ -38,6 +38,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EB
 @Data
 @Stream(name = EBPFProfilingDataRecord.INDEX_NAME, scopeId = EBPF_PROFILING_DATA,
     builder = EBPFProfilingDataRecord.Builder.class, processor = RecordStreamProcessor.class)
+@BanyanDB.TimestampColumn(EBPFProfilingDataRecord.UPLOAD_TIME)
 public class EBPFProfilingDataRecord extends Record {
 
     public static final String INDEX_NAME = "ebpf_profiling_data";
@@ -97,4 +98,4 @@ public class EBPFProfilingDataRecord extends Record {
             converter.accept(TIME_BUCKET, storageData.getTimeBucket());
         }
     }
-}
\ No newline at end of file
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingTaskRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingTaskRecord.java
index 98c4894143..1be7d826ff 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingTaskRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingTaskRecord.java
@@ -40,6 +40,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EB
 @ScopeDeclaration(id = EBPF_PROFILING_TASK, name = "EBPFProfilingTask")
 @Stream(name = EBPFProfilingTaskRecord.INDEX_NAME, scopeId = EBPF_PROFILING_TASK,
         builder = EBPFProfilingTaskRecord.Builder.class, processor = NoneStreamProcessor.class)
+@BanyanDB.TimestampColumn(EBPFProfilingTaskRecord.CREATE_TIME)
 public class EBPFProfilingTaskRecord extends NoneStream {
     public static final String INDEX_NAME = "ebpf_profiling_task";
     public static final String LOGICAL_ID = "logical_id";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileTaskLogRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileTaskLogRecord.java
index df741b599f..7bb872e185 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileTaskLogRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileTaskLogRecord.java
@@ -40,6 +40,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PR
 @Setter
 @ScopeDeclaration(id = PROFILE_TASK_LOG, name = "ProfileTaskLog")
 @Stream(name = ProfileTaskLogRecord.INDEX_NAME, scopeId = PROFILE_TASK_LOG, builder = ProfileTaskLogRecord.Builder.class, processor = RecordStreamProcessor.class)
+@BanyanDB.TimestampColumn(ProfileTaskLogRecord.TIMESTAMP)
 public class ProfileTaskLogRecord extends Record {
 
     public static final String INDEX_NAME = "profile_task_log";
@@ -47,6 +48,7 @@ public class ProfileTaskLogRecord extends Record {
     public static final String INSTANCE_ID = "instance_id";
     public static final String OPERATION_TYPE = "operation_type";
     public static final String OPERATION_TIME = "operation_time";
+    public static final String TIMESTAMP = "timestamp";
 
     @Column(columnName = TASK_ID)
     private String taskId;
@@ -57,6 +59,10 @@ public class ProfileTaskLogRecord extends Record {
     private int operationType;
     @Column(columnName = OPERATION_TIME)
     private long operationTime;
+    @Getter
+    @Setter
+    @Column(columnName = TIMESTAMP)
+    private long timestamp;
 
     @Override
     public String id() {
@@ -72,6 +78,7 @@ public class ProfileTaskLogRecord extends Record {
             log.setOperationType(((Number) converter.get(OPERATION_TYPE)).intValue());
             log.setOperationTime(((Number) converter.get(OPERATION_TIME)).longValue());
             log.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+            log.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
             return log;
         }
 
@@ -82,6 +89,7 @@ public class ProfileTaskLogRecord extends Record {
             converter.accept(OPERATION_TYPE, storageData.getOperationType());
             converter.accept(OPERATION_TIME, storageData.getOperationTime());
             converter.accept(TIME_BUCKET, storageData.getTimeBucket());
+            converter.accept(TIMESTAMP, storageData.getTimestamp());
         }
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileTaskRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileTaskRecord.java
index cb353cb60b..287f37a226 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileTaskRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileTaskRecord.java
@@ -39,6 +39,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PR
 @Setter
 @ScopeDeclaration(id = PROFILE_TASK, name = "ProfileTask")
 @Stream(name = ProfileTaskRecord.INDEX_NAME, scopeId = PROFILE_TASK, builder = ProfileTaskRecord.Builder.class, processor = NoneStreamProcessor.class)
+@BanyanDB.TimestampColumn(ProfileTaskRecord.START_TIME)
 public class ProfileTaskRecord extends NoneStream {
 
     public static final String INDEX_NAME = "profile_task";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileThreadSnapshotRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileThreadSnapshotRecord.java
index cfe13fd73d..454019a46d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileThreadSnapshotRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileThreadSnapshotRecord.java
@@ -41,6 +41,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PR
 @Setter
 @ScopeDeclaration(id = PROFILE_TASK_SEGMENT_SNAPSHOT, name = "ProfileThreadSnapshot")
 @Stream(name = ProfileThreadSnapshotRecord.INDEX_NAME, scopeId = PROFILE_TASK_SEGMENT_SNAPSHOT, builder = ProfileThreadSnapshotRecord.Builder.class, processor = RecordStreamProcessor.class)
+@BanyanDB.TimestampColumn(ProfileThreadSnapshotRecord.DUMP_TIME)
 public class ProfileThreadSnapshotRecord extends Record {
 
     public static final String INDEX_NAME = "profile_task_segment_snapshot";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CacheSlowAccess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CacheSlowAccess.java
index 420ad4b7a8..21e3e32a45 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CacheSlowAccess.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CacheSlowAccess.java
@@ -53,6 +53,9 @@ public class CacheSlowAccess extends Source {
     @Getter
     @Setter
     private VirtualCacheOperation operation;
+    @Getter
+    @Setter
+    private long timestamp;
 
     @Override
     public int scope() {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
index 40480ca81c..eacc69b6f1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
@@ -42,6 +42,9 @@ public class DatabaseSlowStatement extends Source {
     @Getter
     @Setter
     private String traceId;
+    @Getter
+    @Setter
+    private long timestamp;
 
     @Override
     public int scope() {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
index 80acbdfc42..43f517db02 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
@@ -22,6 +22,7 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
 
 /**
  * BanyanDB annotation is a holder including all annotations for BanyanDB storage
@@ -118,4 +119,15 @@ public @interface BanyanDB {
             TREE;
         }
     }
+
+    /**
+     * timestampColumn is to identify which column in {@link Record} is providing the timestamp(millisecond) for BanyanDB.
+     * BanyanDB stream requires a timestamp in milliseconds.
+     * @since 9.3.0
+     */
+    @Target({ElementType.TYPE})
+    @Retention(RetentionPolicy.RUNTIME)
+    @interface TimestampColumn {
+        String value();
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
new file mode 100644
index 0000000000..dc8b60ad9c
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage.model;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+
+/**
+ * BanyanDBExtension represents extra metadata for models, but specific for BanyanDB usages.
+ *
+ * @since 9.3.0
+ */
+public class BanyanDBModelExtension {
+    /**
+     * timestampColumn is to identify which column in {@link Record} is providing the timestamp(millisecond) for BanyanDB.
+     * BanyanDB stream requires a timestamp in milliseconds
+     * @since 9.3.0
+     */
+    @Getter
+    @Setter
+    private String timestampColumn;
+
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
index 4e37d756f3..23514dda32 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
@@ -39,6 +39,7 @@ public class Model {
     private final Class<?> streamClass;
     private final boolean timeRelativeID;
     private final SQLDatabaseModelExtension sqlDBModelExtension;
+    private final BanyanDBModelExtension banyanDBModelExtension;
 
     public Model(final String name,
                  final List<ModelColumn> columns,
@@ -48,7 +49,8 @@ public class Model {
                  final boolean superDataset,
                  final Class<?> streamClass,
                  boolean timeRelativeID,
-                 final SQLDatabaseModelExtension sqlDBModelExtension) {
+                 final SQLDatabaseModelExtension sqlDBModelExtension,
+                 final BanyanDBModelExtension banyanDBModelExtension) {
         this.name = name;
         this.columns = columns;
         this.scopeId = scopeId;
@@ -59,5 +61,6 @@ public class Model {
         this.streamClass = streamClass;
         this.timeRelativeID = timeRelativeID;
         this.sqlDBModelExtension = sqlDBModelExtension;
+        this.banyanDBModelExtension = banyanDBModelExtension;
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 33969f60f2..f8fc461093 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -59,6 +59,7 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
         List<ModelColumn> modelColumns = new ArrayList<>();
         ShardingKeyChecker checker = new ShardingKeyChecker();
         SQLDatabaseModelExtension sqlDBModelExtension = new SQLDatabaseModelExtension();
+        BanyanDBModelExtension banyanDBModelExtension = new BanyanDBModelExtension();
         retrieval(aClass, storage.getModelName(), modelColumns, scopeId, checker, sqlDBModelExtension, record);
         // Add extra column for additional entities
         if (aClass.isAnnotationPresent(SQLDatabase.ExtraColumn4AdditionalEntity.class)
@@ -86,6 +87,12 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
                 }
             });
         }
+        //Add timestampColumn for BanyanDB
+        if (aClass.isAnnotationPresent(BanyanDB.TimestampColumn.class)) {
+            String timestampColumn = aClass.getAnnotation(BanyanDB.TimestampColumn.class).value();
+            banyanDBModelExtension.setTimestampColumn(timestampColumn);
+        }
+
         checker.check(storage.getModelName());
 
         Model model = new Model(
@@ -97,7 +104,8 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
             isSuperDatasetModel(aClass),
             aClass,
             storage.isTimeRelativeID(),
-            sqlDBModelExtension
+            sqlDBModelExtension,
+            banyanDBModelExtension
         );
 
         this.followColumnNameRules(model);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
index 3e44aa1f65..2676942032 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
@@ -50,6 +50,7 @@ import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_
 @Stream(name = ZipkinSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ZIPKIN_SPAN, builder = ZipkinSpanRecord.Builder.class, processor = RecordStreamProcessor.class)
 @SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = ZipkinSpanRecord.ADDITIONAL_QUERY_TABLE, parentColumn = TIME_BUCKET)
 @SQLDatabase.Sharding(shardingAlgorithm = ShardingAlgorithm.TIME_SEC_RANGE_SHARDING_ALGORITHM, dataSourceShardingColumn = TRACE_ID, tableShardingColumn = TIME_BUCKET)
+@BanyanDB.TimestampColumn(ZipkinSpanRecord.TIMESTAMP_MILLIS)
 public class ZipkinSpanRecord extends Record {
     private static final Gson GSON = new Gson();
     public static final int QUERY_LENGTH = 256;
@@ -167,7 +168,7 @@ public class ZipkinSpanRecord extends Record {
 
     @Override
     public String id() {
-        return spanId + Const.LINE + kind;
+        return traceId + Const.LINE + spanId;
     }
 
     public static class Builder implements StorageBuilder<ZipkinSpanRecord> {
diff --git a/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java
index 44210a0fa9..1de09417ff 100644
--- a/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java
@@ -153,9 +153,10 @@ public class ProfileTaskServiceHandler extends ProfileTaskGrpc.ProfileTaskImplBa
         logRecord.setOperationType(operationType.getCode());
         logRecord.setOperationTime(System.currentTimeMillis());
         // same with task time bucket, ensure record will ttl same with profile task
+        long timestamp = task.getStartTime() + TimeUnit.MINUTES.toMillis(task.getDuration());
         logRecord.setTimeBucket(
-            TimeBucket.getRecordTimeBucket(task.getStartTime() + TimeUnit.MINUTES.toMillis(task.getDuration())));
-
+            TimeBucket.getRecordTimeBucket(timestamp));
+        logRecord.setTimestamp(timestamp);
         RecordStreamProcessor.getInstance().in(logRecord);
     }
 
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v8/grpc/SpanAttachedEventReportServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v8/grpc/SpanAttachedEventReportServiceHandler.java
index 513061dca7..d7660f7d24 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v8/grpc/SpanAttachedEventReportServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v8/grpc/SpanAttachedEventReportServiceHandler.java
@@ -57,9 +57,10 @@ public class SpanAttachedEventReportServiceHandler extends SpanAttachedEventRepo
                 record.setTraceSegmentId(event.getTraceContext().getTraceSegmentId());
                 record.setTraceSpanId(event.getTraceContext().getSpanId());
                 record.setDataBinary(event.toByteArray());
-                record.setTimeBucket(TimeBucket.getMinuteTimeBucket(TimeUnit.SECONDS.toMillis(record.getStartTimeSecond())
-                    + TimeUnit.NANOSECONDS.toMillis(record.getStartTimeNanos())));
-
+                long timestamp = TimeUnit.SECONDS.toMillis(record.getStartTimeSecond())
+                    + TimeUnit.NANOSECONDS.toMillis(record.getStartTimeNanos());
+                record.setTimeBucket(TimeBucket.getMinuteTimeBucket(timestamp));
+                record.setTimestamp(timestamp);
                 RecordStreamProcessor.getInstance().in(record);
             }
 
@@ -82,4 +83,4 @@ public class SpanAttachedEventReportServiceHandler extends SpanAttachedEventRepo
             }
         };
     }
-}
\ No newline at end of file
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
index e40c6b7015..acab5dcb7e 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
@@ -70,6 +70,9 @@ public class BanyanDBConverter {
 
         @Override
         public void accept(String fieldName, Object fieldValue) {
+            if (fieldName.equals(this.schema.getTimestampColumn4Stream())) {
+                streamWrite.setTimestamp((long) fieldValue);
+            }
             MetadataRegistry.ColumnSpec columnSpec = this.schema.getSpec(fieldName);
             if (columnSpec == null) {
                 throw new IllegalArgumentException("fail to find field[" + fieldName + "]");
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
index 12b03a94eb..46edb436ab 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
@@ -45,10 +44,11 @@ public class BanyanDBNoneStreamDAO extends AbstractDAO<BanyanDBStorageClient> im
         if (schema == null) {
             throw new IOException(model.getName() + " is not registered");
         }
-        StreamWrite streamWrite = new StreamWrite(schema.getMetadata().getGroup(), // group name
-                schema.getMetadata().name(), // stream-name
-                noneStream.id(), // identity
-                TimeBucket.getTimestamp(noneStream.getTimeBucket(), model.getDownsampling())); // timestamp
+        StreamWrite streamWrite = new StreamWrite(
+            schema.getMetadata().getGroup(), // group name
+            schema.getMetadata().name(), // stream-name
+            noneStream.id() // identity
+        ); // set timestamp inside `BanyanDBConverter.StreamToStorage`
         Convert2Storage<StreamWrite> convert2Storage = new BanyanDBConverter.StreamToStorage(schema, streamWrite);
         storageBuilder.entity2Storage(noneStream, convert2Storage);
         getClient().write(streamWrite);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBZipkinQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBZipkinQueryDAO.java
index 96560abb28..3efb6f093b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBZipkinQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBZipkinQueryDAO.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -178,35 +177,33 @@ public class BanyanDBZipkinQueryDAO extends AbstractBanyanDBDAO implements IZipk
     public List<List<Span>> getTraces(final QueryRequest request, Duration duration) throws IOException {
         final int tracesLimit = request.limit();
         int scrollLimit = 1000;
-        int scrollFrom = 0;
+        long scrollEndTime = duration.getEndTimestamp();
         Set<String> traceIds = new HashSet<>();
         while (traceIds.size() < tracesLimit) {
-            Set<String> resp = getTraceIds(request, duration, scrollFrom, scrollLimit);
-            if (resp.size() == 0) {
-                break;
-            }
-            for (String traceId : resp) {
-                traceIds.add(traceId);
+            List<ZipkinSpanRecord> spans = getSpans(request, duration, scrollEndTime, scrollLimit);
+            for (ZipkinSpanRecord span : spans) {
+                traceIds.add(span.getTraceId());
                 if (traceIds.size() >= tracesLimit) {
                     break;
                 }
             }
-            scrollFrom = scrollFrom + scrollLimit;
+            if (spans.size() < scrollLimit) {
+                break;
+            }
+            scrollEndTime = spans.get(spans.size() - 1).getTimestampMillis();
         }
 
         return getTraces(traceIds);
     }
 
-    private Set<String> getTraceIds(final QueryRequest request,
+    private List<ZipkinSpanRecord> getSpans(final QueryRequest request,
                                     Duration duration,
-                                    int from,
+                                    long scrollEndTime,
                                     int limit) throws IOException {
         final long startTimeMillis = duration.getStartTimestamp();
-        final long endTimeMillis = duration.getEndTimestamp();
-
         TimestampRange tsRange = null;
-        if (startTimeMillis > 0 && endTimeMillis > 0) {
-            tsRange = new TimestampRange(startTimeMillis, endTimeMillis);
+        if (startTimeMillis > 0 && scrollEndTime > 0) {
+            tsRange = new TimestampRange(startTimeMillis, scrollEndTime);
         }
         final QueryBuilder<StreamQuery> queryBuilder = new QueryBuilder<StreamQuery>() {
 
@@ -242,15 +239,16 @@ public class BanyanDBZipkinQueryDAO extends AbstractBanyanDBDAO implements IZipk
                 }
                 query.setOrderBy(new StreamQuery.OrderBy(ZipkinSpanRecord.TIMESTAMP_MILLIS, AbstractQuery.Sort.DESC));
                 query.setLimit(limit);
-                query.setOffset(from);
             }
         };
         StreamQueryResponse resp = query(ZipkinSpanRecord.INDEX_NAME, TRACE_TAGS, tsRange, queryBuilder);
-        Set<String> traceIds = new LinkedHashSet<>(); //needs to keep order here
+        List<ZipkinSpanRecord> spans = new ArrayList<>(); //needs to keep order here
         for (final RowEntity rowEntity : resp.getElements()) {
-            traceIds.add(rowEntity.getTagValue(ZipkinSpanRecord.TRACE_ID));
+            ZipkinSpanRecord spanRecord = new ZipkinSpanRecord.Builder().storage2Entity(
+                new BanyanDBConverter.StorageToStream(ZipkinSpanRecord.INDEX_NAME, rowEntity));
+            spans.add(spanRecord);
         }
-        return traceIds;
+        return spans;
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 9f6f2e4cf8..f679deca66 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -61,6 +61,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
 
 @Slf4j
 public enum MetadataRegistry {
@@ -87,6 +88,12 @@ public enum MetadataRegistry {
                 schemaBuilder.tag(tagSpec.getTagName());
             }
         }
+        String timestampColumn4Stream = model.getBanyanDBModelExtension().getTimestampColumn();
+        if (StringUtil.isBlank(timestampColumn4Stream)) {
+            throw new IllegalStateException(
+                "Model[stream." + model.getName() + "] miss defined @BanyanDB.TimestampColumn");
+        }
+        schemaBuilder.timestampColumn4Stream(timestampColumn4Stream);
         List<IndexRule> indexRules = tags.stream()
                 .map(TagMetadata::getIndexRule)
                 .filter(Objects::nonNull)
@@ -517,6 +524,9 @@ public enum MetadataRegistry {
         @Singular
         private final Set<String> fields;
 
+        @Getter
+        private final String timestampColumn4Stream;
+
         public ColumnSpec getSpec(String columnName) {
             return this.specs.get(columnName);
         }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
index 11b0d9abec..ed04f7e1bf 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -42,10 +41,11 @@ public class BanyanDBRecordDAO implements IRecordDAO {
         if (schema == null) {
             throw new IOException(model.getName() + " is not registered");
         }
-        StreamWrite streamWrite = new StreamWrite(schema.getMetadata().getGroup(), // group name
-                model.getName(), // index-name
-                record.id(), // identity
-                TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling())); // timestamp
+        StreamWrite streamWrite = new StreamWrite(
+            schema.getMetadata().getGroup(), // group name
+            model.getName(), // index-name
+            record.id() // identity
+        ); // set timestamp inside `BanyanDBConverter.StreamToStorage`
         Convert2Storage<StreamWrite> convert2Storage = new BanyanDBConverter.StreamToStorage(schema, streamWrite);
         storageBuilder.entity2Storage(record, convert2Storage);
 
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
index a0f45b1599..8114e68494 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.query.enumeration.Step;
+import org.apache.skywalking.oap.server.core.storage.model.BanyanDBModelExtension;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.SQLDatabaseModelExtension;
 import org.junit.Assert;
@@ -41,13 +42,16 @@ public class TimeSeriesUtilsTest {
     @Before
     public void prepare() {
         superDatasetModel = new Model("superDatasetModel", Lists.newArrayList(),
-                                      0, DownSampling.Second, true, true, Record.class, true, new SQLDatabaseModelExtension()
+                                      0, DownSampling.Second, true, true, Record.class, true,
+                                      new SQLDatabaseModelExtension(), new BanyanDBModelExtension()
         );
         normalRecordModel = new Model("normalRecordModel", Lists.newArrayList(),
-                                      0, DownSampling.Second, true, false, Record.class, true, new SQLDatabaseModelExtension()
+                                      0, DownSampling.Second, true, false, Record.class, true,
+                                      new SQLDatabaseModelExtension(), new BanyanDBModelExtension()
         );
         normalMetricsModel = new Model("normalMetricsModel", Lists.newArrayList(),
-                                       0, DownSampling.Minute, false, false, Metrics.class, true, new SQLDatabaseModelExtension()
+                                       0, DownSampling.Minute, false, false, Metrics.class, true,
+                                       new SQLDatabaseModelExtension(), new BanyanDBModelExtension()
         );
         TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(1);
         TimeSeriesUtils.setDAY_STEP(3);