You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/11/24 13:20:42 UTC
[skywalking] branch master updated: Add `@BanyanDB.TimestampColumn` to identify `which column in Record` is providing the timestamp(milliseconds) for BanyanDB (#10019)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 6a75de5ece Add `@BanyanDB.TimestampColumn` to identify `which column in Record` is providing the timestamp(milliseconds) for BanyanDB (#10019)
6a75de5ece is described below
commit 6a75de5ece9cf8f20d4fc2b06bb31306ff991008
Author: Wan Kai <wa...@foxmail.com>
AuthorDate: Thu Nov 24 21:20:36 2022 +0800
Add `@BanyanDB.TimestampColumn` to identify `which column in Record` is providing the timestamp(milliseconds) for BanyanDB (#10019)
---
docs/en/changes/changes.md | 4 +++
.../listener/DatabaseSlowStatementBuilder.java | 4 +++
.../trace/parser/listener/SampledTraceBuilder.java | 5 ++-
.../listener/vservice/VirtualCacheProcessor.java | 1 +
.../vservice/VirtualDatabaseProcessor.java | 1 +
.../analyzer/dsl/spec/extractor/ExtractorSpec.java | 1 +
.../oap/server/core/alarm/AlarmRecord.java | 1 +
.../manual/cache/CacheSlowAccessDispatcher.java | 2 ++
.../manual/cache/TopNCacheReadCommand.java | 4 +++
.../manual/cache/TopNCacheWriteCommand.java | 4 +++
.../database/DatabaseStatementDispatcher.java | 1 +
.../manual/database/TopNDatabaseStatement.java | 4 +++
.../server/core/analysis/manual/log/LogRecord.java | 2 ++
.../analysis/manual/segment/SegmentRecord.java | 1 +
.../manual/spanattach/SpanAttachedEventRecord.java | 8 +++++
.../manual/trace/SampledSlowTraceRecord.java | 10 +++++-
.../manual/trace/SampledStatus4xxTraceRecord.java | 8 +++++
.../manual/trace/SampledStatus5xxTraceRecord.java | 8 +++++
.../oap/server/core/analysis/topn/TopN.java | 5 +++
.../manual/errorlog/BrowserErrorLogRecord.java | 1 +
.../ebpf/storage/EBPFProfilingDataRecord.java | 3 +-
.../ebpf/storage/EBPFProfilingTaskRecord.java | 1 +
.../core/profiling/trace/ProfileTaskLogRecord.java | 8 +++++
.../core/profiling/trace/ProfileTaskRecord.java | 1 +
.../trace/ProfileThreadSnapshotRecord.java | 1 +
.../oap/server/core/source/CacheSlowAccess.java | 3 ++
.../server/core/source/DatabaseSlowStatement.java | 3 ++
.../server/core/storage/annotation/BanyanDB.java | 12 +++++++
.../core/storage/model/BanyanDBModelExtension.java | 40 ++++++++++++++++++++++
.../oap/server/core/storage/model/Model.java | 5 ++-
.../server/core/storage/model/StorageModels.java | 10 +++++-
.../oap/server/core/zipkin/ZipkinSpanRecord.java | 3 +-
.../handler/ProfileTaskServiceHandler.java | 5 +--
.../SpanAttachedEventReportServiceHandler.java | 9 ++---
.../storage/plugin/banyandb/BanyanDBConverter.java | 3 ++
.../plugin/banyandb/BanyanDBNoneStreamDAO.java | 10 +++---
.../plugin/banyandb/BanyanDBZipkinQueryDAO.java | 36 +++++++++----------
.../storage/plugin/banyandb/MetadataRegistry.java | 10 ++++++
.../plugin/banyandb/stream/BanyanDBRecordDAO.java | 10 +++---
.../elasticsearch/base/TimeSeriesUtilsTest.java | 10 ++++--
40 files changed, 214 insertions(+), 44 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index e865f06135..412a766e73 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -119,6 +119,10 @@
* Support dynamic config the sampling strategy in network profiling.
* Zipkin module support BanyanDB storage.
* Zipkin traces query API, sort the result set by start time by default.
+* [**Breaking Change**] Add `@BanyanDB.TimestampColumn` to identify `which column in Record` is providing the timestamp(milliseconds) for BanyanDB,
+ since BanyanDB stream requires a timestamp in milliseconds.
+ For SQL-Database: add new column `timestamp` for tables `profile_task_log/top_n_database_statement`,
+ requires altering this column or removing these tables before OAP starts, if bump up from previous releases.
#### UI
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/DatabaseSlowStatementBuilder.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/DatabaseSlowStatementBuilder.java
index 6a9e1e9d0a..47b59ab721 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/DatabaseSlowStatementBuilder.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/DatabaseSlowStatementBuilder.java
@@ -51,6 +51,9 @@ public class DatabaseSlowStatementBuilder {
@Getter
@Setter
private long timeBucket;
+ @Getter
+ @Setter
+ private long timestamp;
public void prepare() {
this.serviceName = namingControl.formatServiceName(serviceName);
@@ -64,6 +67,7 @@ public class DatabaseSlowStatementBuilder {
dbSlowStat.setStatement(statement);
dbSlowStat.setLatency(latency);
dbSlowStat.setTimeBucket(timeBucket);
+ dbSlowStat.setTimestamp(timestamp);
return dbSlowStat;
}
}
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SampledTraceBuilder.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SampledTraceBuilder.java
index 799155bba6..8a64d84eb9 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SampledTraceBuilder.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SampledTraceBuilder.java
@@ -107,6 +107,7 @@ public class SampledTraceBuilder {
slowTraceRecord.setUri(uri);
slowTraceRecord.setLatency(latency);
slowTraceRecord.setTimeBucket(TimeBucket.getTimeBucket(timestamp, DownSampling.Second));
+ slowTraceRecord.setTimestamp(timestamp);
return slowTraceRecord;
case STATUS_4XX:
final SampledStatus4xxTraceRecord status4xxTraceRecord = new SampledStatus4xxTraceRecord();
@@ -118,6 +119,7 @@ public class SampledTraceBuilder {
status4xxTraceRecord.setUri(uri);
status4xxTraceRecord.setLatency(latency);
status4xxTraceRecord.setTimeBucket(TimeBucket.getTimeBucket(timestamp, DownSampling.Second));
+ status4xxTraceRecord.setTimestamp(timestamp);
return status4xxTraceRecord;
case STATUS_5XX:
final SampledStatus5xxTraceRecord status5xxTraceRecord = new SampledStatus5xxTraceRecord();
@@ -129,6 +131,7 @@ public class SampledTraceBuilder {
status5xxTraceRecord.setUri(uri);
status5xxTraceRecord.setLatency(latency);
status5xxTraceRecord.setTimeBucket(TimeBucket.getTimeBucket(timestamp, DownSampling.Second));
+ status5xxTraceRecord.setTimestamp(timestamp);
return status5xxTraceRecord;
default:
throw new IllegalArgumentException("unknown reason: " + this.reason);
@@ -156,4 +159,4 @@ public class SampledTraceBuilder {
STATUS_4XX,
STATUS_5XX
}
-}
\ No newline at end of file
+}
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/vservice/VirtualCacheProcessor.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/vservice/VirtualCacheProcessor.java
index de67db00c2..ccf387a8fd 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/vservice/VirtualCacheProcessor.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/vservice/VirtualCacheProcessor.java
@@ -89,6 +89,7 @@ public class VirtualCacheProcessor implements VirtualServiceProcessor {
slowAccess.setCommand(tags.get(SpanTags.CACHE_CMD));
slowAccess.setKey(tags.get(SpanTags.CACHE_KEY));
slowAccess.setTimeBucket(TimeBucket.getRecordTimeBucket(span.getStartTime()));
+ slowAccess.setTimestamp(span.getStartTime());
slowAccess.setOperation(op);
sourceList.add(slowAccess);
}
diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/vservice/VirtualDatabaseProcessor.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/vservice/VirtualDatabaseProcessor.java
index 103303c02e..9e8c7df2cd 100644
--- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/vservice/VirtualDatabaseProcessor.java
+++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/vservice/VirtualDatabaseProcessor.java
@@ -72,6 +72,7 @@ public class VirtualDatabaseProcessor implements VirtualServiceProcessor {
dbSlowStat.setStatement(statement);
dbSlowStat.setLatency(latency);
dbSlowStat.setTimeBucket(TimeBucket.getRecordTimeBucket(span.getStartTime()));
+ dbSlowStat.setTimestamp(span.getStartTime());
recordList.add(dbSlowStat);
});
}
diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java
index 96c0d3fefc..d535492ef2 100644
--- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java
+++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java
@@ -289,6 +289,7 @@ public class ExtractorSpec extends AbstractSpec {
long timeBucketForDB = TimeBucket.getTimeBucket(log.getTimestamp(), DownSampling.Second);
builder.setTimeBucket(timeBucketForDB);
+ builder.setTimestamp(log.getTimestamp());
String entityId = serviceMeta.getEntityId();
builder.prepare();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
index 84fe19d8cd..2a666b15ac 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
@@ -43,6 +43,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.AL
@ScopeDeclaration(id = ALARM, name = "Alarm")
@Stream(name = AlarmRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ALARM, builder = AlarmRecord.Builder.class, processor = RecordStreamProcessor.class)
@SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = AlarmRecord.ADDITIONAL_TAG_TABLE, parentColumn = TIME_BUCKET)
+@BanyanDB.TimestampColumn(AlarmRecord.START_TIME)
public class AlarmRecord extends Record {
public static final String INDEX_NAME = "alarm_record";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/CacheSlowAccessDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/CacheSlowAccessDispatcher.java
index f72b6da884..9db95006a9 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/CacheSlowAccessDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/CacheSlowAccessDispatcher.java
@@ -36,6 +36,7 @@ public class CacheSlowAccessDispatcher implements SourceDispatcher<CacheSlowAcce
readCommand.setTraceId(source.getTraceId());
readCommand.setEntityId(source.getCacheServiceId());
readCommand.setTimeBucket(source.getTimeBucket());
+ readCommand.setTimestamp(source.getTimestamp());
TopNStreamProcessor.getInstance().in(readCommand);
} else if (source.getOperation() == VirtualCacheOperation.Write) {
TopNCacheWriteCommand writeCommand = new TopNCacheWriteCommand();
@@ -45,6 +46,7 @@ public class CacheSlowAccessDispatcher implements SourceDispatcher<CacheSlowAcce
writeCommand.setTraceId(source.getTraceId());
writeCommand.setEntityId(source.getCacheServiceId());
writeCommand.setTimeBucket(source.getTimeBucket());
+ writeCommand.setTimestamp(source.getTimestamp());
TopNStreamProcessor.getInstance().in(writeCommand);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/TopNCacheReadCommand.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/TopNCacheReadCommand.java
index 66eeb7efb3..c3e60ec131 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/TopNCacheReadCommand.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/TopNCacheReadCommand.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
@@ -34,6 +35,7 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
* Database TopN statement, including Database SQL statement, mongoDB and Redis commands.
*/
@Stream(name = TopNCacheReadCommand.INDEX_NAME, scopeId = DefaultScopeDefine.CACHE_SLOW_ACCESS, builder = TopNCacheReadCommand.Builder.class, processor = TopNStreamProcessor.class)
+@BanyanDB.TimestampColumn(TopN.TIMESTAMP)
public class TopNCacheReadCommand extends TopN {
public static final String INDEX_NAME = "top_n_cache_read_command";
@@ -73,6 +75,7 @@ public class TopNCacheReadCommand extends TopN {
statement.setLatency(((Number) converter.get(LATENCY)).longValue());
statement.setEntityId((String) converter.get(ENTITY_ID));
statement.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+ statement.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
return statement;
}
@@ -83,6 +86,7 @@ public class TopNCacheReadCommand extends TopN {
converter.accept(LATENCY, storageData.getLatency());
converter.accept(ENTITY_ID, storageData.getEntityId());
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
+ converter.accept(TIMESTAMP, storageData.getTimestamp());
}
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/TopNCacheWriteCommand.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/TopNCacheWriteCommand.java
index 7c061e9a09..1a7b9759c5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/TopNCacheWriteCommand.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/cache/TopNCacheWriteCommand.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
@@ -35,6 +36,7 @@ import java.util.Objects;
* Database TopN statement, including Database SQL statement, mongoDB and Redis commands.
*/
@Stream(name = TopNCacheWriteCommand.INDEX_NAME, scopeId = DefaultScopeDefine.CACHE_SLOW_ACCESS, builder = TopNCacheWriteCommand.Builder.class, processor = TopNStreamProcessor.class)
+@BanyanDB.TimestampColumn(TopN.TIMESTAMP)
public class TopNCacheWriteCommand extends TopN {
public static final String INDEX_NAME = "top_n_cache_write_command";
@@ -74,6 +76,7 @@ public class TopNCacheWriteCommand extends TopN {
statement.setLatency(((Number) converter.get(LATENCY)).longValue());
statement.setEntityId((String) converter.get(ENTITY_ID));
statement.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+ statement.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
return statement;
}
@@ -84,6 +87,7 @@ public class TopNCacheWriteCommand extends TopN {
converter.accept(LATENCY, storageData.getLatency());
converter.accept(ENTITY_ID, storageData.getEntityId());
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
+ converter.accept(TIMESTAMP, storageData.getTimestamp());
}
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
index 2af4820334..5eb38a9146 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java
@@ -33,6 +33,7 @@ public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlo
statement.setStatement(source.getStatement());
statement.setTimeBucket(source.getTimeBucket());
statement.setTraceId(source.getTraceId());
+ statement.setTimestamp(source.getTimestamp());
TopNStreamProcessor.getInstance().in(statement);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
index d158455242..28cc05b0a7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
+import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
@@ -34,6 +35,7 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
* Database TopN statement, including Database SQL statement, mongoDB and Redis commands.
*/
@Stream(name = TopNDatabaseStatement.INDEX_NAME, scopeId = DefaultScopeDefine.DATABASE_SLOW_STATEMENT, builder = TopNDatabaseStatement.Builder.class, processor = TopNStreamProcessor.class)
+@BanyanDB.TimestampColumn(TopN.TIMESTAMP)
public class TopNDatabaseStatement extends TopN {
public static final String INDEX_NAME = "top_n_database_statement";
@@ -73,6 +75,7 @@ public class TopNDatabaseStatement extends TopN {
statement.setLatency(((Number) converter.get(LATENCY)).longValue());
statement.setEntityId((String) converter.get(ENTITY_ID));
statement.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+ statement.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
return statement;
}
@@ -83,6 +86,7 @@ public class TopNDatabaseStatement extends TopN {
converter.accept(LATENCY, storageData.getLatency());
converter.accept(ENTITY_ID, storageData.getEntityId());
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
+ converter.accept(TIMESTAMP, storageData.getTimestamp());
}
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/LogRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/LogRecord.java
index dfdc34143a..352e012cd6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/LogRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/LogRecord.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.ShardingAlgorithm;
+import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase;
import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
@@ -36,6 +37,7 @@ import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_
@Stream(name = LogRecord.INDEX_NAME, scopeId = DefaultScopeDefine.LOG, builder = LogRecord.Builder.class, processor = RecordStreamProcessor.class)
@SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = AbstractLogRecord.ADDITIONAL_TAG_TABLE, parentColumn = TIME_BUCKET)
@SQLDatabase.Sharding(shardingAlgorithm = ShardingAlgorithm.TIME_SEC_RANGE_SHARDING_ALGORITHM, dataSourceShardingColumn = SERVICE_ID, tableShardingColumn = TIME_BUCKET)
+@BanyanDB.TimestampColumn(AbstractLogRecord.TIMESTAMP)
public class LogRecord extends AbstractLogRecord {
public static final String INDEX_NAME = "log";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
index 5e173b8ccf..650ee09b28 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
@@ -42,6 +42,7 @@ import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_
@Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, builder = SegmentRecord.Builder.class, processor = RecordStreamProcessor.class)
@SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = SegmentRecord.ADDITIONAL_TAG_TABLE, parentColumn = TIME_BUCKET)
@SQLDatabase.Sharding(shardingAlgorithm = ShardingAlgorithm.TIME_SEC_RANGE_SHARDING_ALGORITHM, dataSourceShardingColumn = SERVICE_ID, tableShardingColumn = TIME_BUCKET)
+@BanyanDB.TimestampColumn(SegmentRecord.START_TIME)
public class SegmentRecord extends Record {
public static final String INDEX_NAME = "segment";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/spanattach/SpanAttachedEventRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/spanattach/SpanAttachedEventRecord.java
index 9a04a7a674..e5b0598cf8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/spanattach/SpanAttachedEventRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/spanattach/SpanAttachedEventRecord.java
@@ -39,6 +39,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SP
@Getter
@ScopeDeclaration(id = SPAN_ATTACHED_EVENT, name = "SpanAttachedEvent")
@Stream(name = SpanAttachedEventRecord.INDEX_NAME, scopeId = SPAN_ATTACHED_EVENT, builder = SpanAttachedEventRecord.Builder.class, processor = RecordStreamProcessor.class)
+@BanyanDB.TimestampColumn(SpanAttachedEventRecord.TIMESTAMP)
public class SpanAttachedEventRecord extends Record {
public static final String INDEX_NAME = "span_attached_event_record";
@@ -52,6 +53,7 @@ public class SpanAttachedEventRecord extends Record {
public static final String TRACE_SEGMENT_ID = "trace_segment_id";
public static final String TRACE_SPAN_ID = "trace_span_id";
public static final String DATA_BINARY = "data_binary";
+ public static final String TIMESTAMP = "timestamp";
@Column(columnName = START_TIME_SECOND)
private long startTimeSecond;
@@ -74,6 +76,10 @@ public class SpanAttachedEventRecord extends Record {
private String traceSpanId;
@Column(columnName = DATA_BINARY, storageOnly = true)
private byte[] dataBinary;
+ @Setter
+ @Getter
+ @Column(columnName = TIMESTAMP)
+ private long timestamp;
@Override
public String id() {
@@ -94,6 +100,7 @@ public class SpanAttachedEventRecord extends Record {
record.setTraceSegmentId((String) converter.get(TRACE_SEGMENT_ID));
record.setTraceSpanId((String) converter.get(TRACE_SPAN_ID));
record.setDataBinary(converter.getBytes(DATA_BINARY));
+ record.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
return record;
}
@@ -109,6 +116,7 @@ public class SpanAttachedEventRecord extends Record {
converter.accept(TRACE_SEGMENT_ID, entity.getTraceSegmentId());
converter.accept(TRACE_SPAN_ID, entity.getTraceSpanId());
converter.accept(DATA_BINARY, entity.getDataBinary());
+ converter.accept(TIMESTAMP, entity.getTimestamp());
}
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledSlowTraceRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledSlowTraceRecord.java
index abe2d321ac..7018621683 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledSlowTraceRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledSlowTraceRecord.java
@@ -38,6 +38,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SA
@Getter
@ScopeDeclaration(id = SAMPLED_SLOW_TRACE, name = "SampledTraceSlowRecord")
@Stream(name = SampledSlowTraceRecord.INDEX_NAME, scopeId = SAMPLED_SLOW_TRACE, builder = SampledSlowTraceRecord.Builder.class, processor = RecordStreamProcessor.class)
+@BanyanDB.TimestampColumn(SampledSlowTraceRecord.TIMESTAMP)
public class SampledSlowTraceRecord extends Record {
public static final String INDEX_NAME = "sampled_slow_trace_record";
@@ -46,6 +47,7 @@ public class SampledSlowTraceRecord extends Record {
public static final String TRACE_ID = TopN.TRACE_ID;
public static final String URI = TopN.STATEMENT;
public static final String LATENCY = "latency";
+ public static final String TIMESTAMP = "timestamp";
@Column(columnName = SCOPE)
private int scope;
@@ -58,6 +60,10 @@ public class SampledSlowTraceRecord extends Record {
private String uri;
@Column(columnName = LATENCY, dataType = Column.ValueDataType.SAMPLED_RECORD)
private long latency;
+ @Setter
+ @Getter
+ @Column(columnName = TIMESTAMP)
+ private long timestamp;
@Override
public String id() {
@@ -75,6 +81,7 @@ public class SampledSlowTraceRecord extends Record {
record.setUri((String) converter.get(URI));
record.setLatency(((Number) converter.get(LATENCY)).longValue());
record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+ record.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
return record;
}
@@ -86,6 +93,7 @@ public class SampledSlowTraceRecord extends Record {
converter.accept(URI, entity.getUri());
converter.accept(LATENCY, entity.getLatency());
converter.accept(TIME_BUCKET, entity.getTimeBucket());
+ converter.accept(TIMESTAMP, entity.getTimestamp());
}
}
-}
\ No newline at end of file
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledStatus4xxTraceRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledStatus4xxTraceRecord.java
index a84dd11bf8..9253ae3037 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledStatus4xxTraceRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledStatus4xxTraceRecord.java
@@ -38,6 +38,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SA
@Getter
@ScopeDeclaration(id = SAMPLED_STATUS_4XX_TRACE, name = "SampledStatus4xxTraceRecord")
@Stream(name = SampledStatus4xxTraceRecord.INDEX_NAME, scopeId = SAMPLED_STATUS_4XX_TRACE, builder = SampledStatus4xxTraceRecord.Builder.class, processor = RecordStreamProcessor.class)
+@BanyanDB.TimestampColumn(SampledStatus4xxTraceRecord.TIMESTAMP)
public class SampledStatus4xxTraceRecord extends Record {
public static final String INDEX_NAME = "sampled_status_4xx_trace_record";
@@ -47,6 +48,7 @@ public class SampledStatus4xxTraceRecord extends Record {
public static final String TRACE_ID = TopN.TRACE_ID;
public static final String URI = TopN.STATEMENT;
public static final String LATENCY = "latency";
+ public static final String TIMESTAMP = "timestamp";
@Column(columnName = SCOPE)
private int scope;
@@ -59,6 +61,10 @@ public class SampledStatus4xxTraceRecord extends Record {
private String uri;
@Column(columnName = LATENCY, dataType = Column.ValueDataType.SAMPLED_RECORD)
private long latency;
+ @Setter
+ @Getter
+ @Column(columnName = TIMESTAMP)
+ private long timestamp;
@Override
public String id() {
@@ -76,6 +82,7 @@ public class SampledStatus4xxTraceRecord extends Record {
record.setUri((String) converter.get(URI));
record.setLatency(((Number) converter.get(LATENCY)).longValue());
record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+ record.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
return record;
}
@@ -87,6 +94,7 @@ public class SampledStatus4xxTraceRecord extends Record {
converter.accept(URI, entity.getUri());
converter.accept(LATENCY, entity.getLatency());
converter.accept(TIME_BUCKET, entity.getTimeBucket());
+ converter.accept(TIMESTAMP, entity.getTimestamp());
}
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledStatus5xxTraceRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledStatus5xxTraceRecord.java
index 9f22153a08..9088bf336b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledStatus5xxTraceRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledStatus5xxTraceRecord.java
@@ -38,6 +38,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SA
@Getter
@ScopeDeclaration(id = SAMPLED_STATUS_5XX_TRACE, name = "SampledStatus5xxTraceRecord")
@Stream(name = SampledStatus5xxTraceRecord.INDEX_NAME, scopeId = SAMPLED_STATUS_5XX_TRACE, builder = SampledStatus5xxTraceRecord.Builder.class, processor = RecordStreamProcessor.class)
+@BanyanDB.TimestampColumn(SampledStatus5xxTraceRecord.TIMESTAMP)
public class SampledStatus5xxTraceRecord extends Record {
public static final String INDEX_NAME = "sampled_status_5xx_trace_record";
@@ -47,6 +48,7 @@ public class SampledStatus5xxTraceRecord extends Record {
public static final String TRACE_ID = TopN.TRACE_ID;
public static final String URI = TopN.STATEMENT;
public static final String LATENCY = "latency";
+ public static final String TIMESTAMP = "timestamp";
@Column(columnName = SCOPE)
private int scope;
@@ -59,6 +61,10 @@ public class SampledStatus5xxTraceRecord extends Record {
private String uri;
@Column(columnName = LATENCY, dataType = Column.ValueDataType.SAMPLED_RECORD)
private long latency;
+ @Setter
+ @Getter
+ @Column(columnName = TIMESTAMP)
+ private long timestamp;
@Override
public String id() {
@@ -76,6 +82,7 @@ public class SampledStatus5xxTraceRecord extends Record {
record.setUri((String) converter.get(URI));
record.setLatency(((Number) converter.get(LATENCY)).longValue());
record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+ record.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
return record;
}
@@ -87,6 +94,7 @@ public class SampledStatus5xxTraceRecord extends Record {
converter.accept(URI, entity.getUri());
converter.accept(LATENCY, entity.getLatency());
converter.accept(TIME_BUCKET, entity.getTimeBucket());
+ converter.accept(TIMESTAMP, entity.getTimestamp());
}
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java
index a10dc44126..88a070ad7c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java
@@ -33,6 +33,7 @@ public abstract class TopN extends Record implements ComparableStorageData {
public static final String LATENCY = "latency";
public static final String TRACE_ID = "trace_id";
public static final String ENTITY_ID = "entity_id";
+ public static final String TIMESTAMP = "timestamp";
@Getter
@Setter
@@ -47,6 +48,10 @@ public abstract class TopN extends Record implements ComparableStorageData {
@Column(columnName = ENTITY_ID, length = 512)
@BanyanDB.ShardingKey(index = 0)
private String entityId;
+ @Getter
+ @Setter
+ @Column(columnName = TIMESTAMP)
+ private long timestamp;
@Override
public int compareTo(Object o) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/manual/errorlog/BrowserErrorLogRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/manual/errorlog/BrowserErrorLogRecord.java
index 782e8d86ca..ffc73b93b3 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/manual/errorlog/BrowserErrorLogRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/manual/errorlog/BrowserErrorLogRecord.java
@@ -38,6 +38,7 @@ import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_
@SuperDataset
@Stream(name = BrowserErrorLogRecord.INDEX_NAME, scopeId = DefaultScopeDefine.BROWSER_ERROR_LOG, builder = BrowserErrorLogRecord.Builder.class, processor = RecordStreamProcessor.class)
@SQLDatabase.Sharding(shardingAlgorithm = ShardingAlgorithm.TIME_SEC_RANGE_SHARDING_ALGORITHM, dataSourceShardingColumn = SERVICE_ID, tableShardingColumn = TIME_BUCKET)
+@BanyanDB.TimestampColumn(BrowserErrorLogRecord.TIMESTAMP)
public class BrowserErrorLogRecord extends Record {
public static final String INDEX_NAME = "browser_error_log";
public static final String UNIQUE_ID = "unique_id";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingDataRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingDataRecord.java
index bb30d99e0a..d57e17daa7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingDataRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingDataRecord.java
@@ -38,6 +38,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EB
@Data
@Stream(name = EBPFProfilingDataRecord.INDEX_NAME, scopeId = EBPF_PROFILING_DATA,
builder = EBPFProfilingDataRecord.Builder.class, processor = RecordStreamProcessor.class)
+@BanyanDB.TimestampColumn(EBPFProfilingDataRecord.UPLOAD_TIME)
public class EBPFProfilingDataRecord extends Record {
public static final String INDEX_NAME = "ebpf_profiling_data";
@@ -97,4 +98,4 @@ public class EBPFProfilingDataRecord extends Record {
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
}
}
-}
\ No newline at end of file
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingTaskRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingTaskRecord.java
index 98c4894143..1be7d826ff 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingTaskRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/storage/EBPFProfilingTaskRecord.java
@@ -40,6 +40,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EB
@ScopeDeclaration(id = EBPF_PROFILING_TASK, name = "EBPFProfilingTask")
@Stream(name = EBPFProfilingTaskRecord.INDEX_NAME, scopeId = EBPF_PROFILING_TASK,
builder = EBPFProfilingTaskRecord.Builder.class, processor = NoneStreamProcessor.class)
+@BanyanDB.TimestampColumn(EBPFProfilingTaskRecord.CREATE_TIME)
public class EBPFProfilingTaskRecord extends NoneStream {
public static final String INDEX_NAME = "ebpf_profiling_task";
public static final String LOGICAL_ID = "logical_id";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileTaskLogRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileTaskLogRecord.java
index df741b599f..7bb872e185 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileTaskLogRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileTaskLogRecord.java
@@ -40,6 +40,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PR
@Setter
@ScopeDeclaration(id = PROFILE_TASK_LOG, name = "ProfileTaskLog")
@Stream(name = ProfileTaskLogRecord.INDEX_NAME, scopeId = PROFILE_TASK_LOG, builder = ProfileTaskLogRecord.Builder.class, processor = RecordStreamProcessor.class)
+@BanyanDB.TimestampColumn(ProfileTaskLogRecord.TIMESTAMP)
public class ProfileTaskLogRecord extends Record {
public static final String INDEX_NAME = "profile_task_log";
@@ -47,6 +48,7 @@ public class ProfileTaskLogRecord extends Record {
public static final String INSTANCE_ID = "instance_id";
public static final String OPERATION_TYPE = "operation_type";
public static final String OPERATION_TIME = "operation_time";
+ public static final String TIMESTAMP = "timestamp";
@Column(columnName = TASK_ID)
private String taskId;
@@ -57,6 +59,10 @@ public class ProfileTaskLogRecord extends Record {
private int operationType;
@Column(columnName = OPERATION_TIME)
private long operationTime;
+ @Getter
+ @Setter
+ @Column(columnName = TIMESTAMP)
+ private long timestamp;
@Override
public String id() {
@@ -72,6 +78,7 @@ public class ProfileTaskLogRecord extends Record {
log.setOperationType(((Number) converter.get(OPERATION_TYPE)).intValue());
log.setOperationTime(((Number) converter.get(OPERATION_TIME)).longValue());
log.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
+ log.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue());
return log;
}
@@ -82,6 +89,7 @@ public class ProfileTaskLogRecord extends Record {
converter.accept(OPERATION_TYPE, storageData.getOperationType());
converter.accept(OPERATION_TIME, storageData.getOperationTime());
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
+ converter.accept(TIMESTAMP, storageData.getTimestamp());
}
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileTaskRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileTaskRecord.java
index cb353cb60b..287f37a226 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileTaskRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileTaskRecord.java
@@ -39,6 +39,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PR
@Setter
@ScopeDeclaration(id = PROFILE_TASK, name = "ProfileTask")
@Stream(name = ProfileTaskRecord.INDEX_NAME, scopeId = PROFILE_TASK, builder = ProfileTaskRecord.Builder.class, processor = NoneStreamProcessor.class)
+@BanyanDB.TimestampColumn(ProfileTaskRecord.START_TIME)
public class ProfileTaskRecord extends NoneStream {
public static final String INDEX_NAME = "profile_task";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileThreadSnapshotRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileThreadSnapshotRecord.java
index cfe13fd73d..454019a46d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileThreadSnapshotRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/trace/ProfileThreadSnapshotRecord.java
@@ -41,6 +41,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PR
@Setter
@ScopeDeclaration(id = PROFILE_TASK_SEGMENT_SNAPSHOT, name = "ProfileThreadSnapshot")
@Stream(name = ProfileThreadSnapshotRecord.INDEX_NAME, scopeId = PROFILE_TASK_SEGMENT_SNAPSHOT, builder = ProfileThreadSnapshotRecord.Builder.class, processor = RecordStreamProcessor.class)
+@BanyanDB.TimestampColumn(ProfileThreadSnapshotRecord.DUMP_TIME)
public class ProfileThreadSnapshotRecord extends Record {
public static final String INDEX_NAME = "profile_task_segment_snapshot";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CacheSlowAccess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CacheSlowAccess.java
index 420ad4b7a8..21e3e32a45 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CacheSlowAccess.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/CacheSlowAccess.java
@@ -53,6 +53,9 @@ public class CacheSlowAccess extends Source {
@Getter
@Setter
private VirtualCacheOperation operation;
+ @Getter
+ @Setter
+ private long timestamp;
@Override
public int scope() {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
index 40480ca81c..eacc69b6f1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java
@@ -42,6 +42,9 @@ public class DatabaseSlowStatement extends Source {
@Getter
@Setter
private String traceId;
+ @Getter
+ @Setter
+ private long timestamp;
@Override
public int scope() {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
index 80acbdfc42..43f517db02 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
@@ -22,6 +22,7 @@ import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
/**
* BanyanDB annotation is a holder including all annotations for BanyanDB storage
@@ -118,4 +119,15 @@ public @interface BanyanDB {
TREE;
}
}
+
+ /**
+ * timestampColumn is to identify which column in {@link Record} is providing the timestamp(millisecond) for BanyanDB.
+ * BanyanDB stream requires a timestamp in milliseconds.
+ * @since 9.3.0
+ */
+ @Target({ElementType.TYPE})
+ @Retention(RetentionPolicy.RUNTIME)
+ @interface TimestampColumn {
+ String value();
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
new file mode 100644
index 0000000000..dc8b60ad9c
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage.model;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+
+/**
+ * BanyanDBExtension represents extra metadata for models, but specific for BanyanDB usages.
+ *
+ * @since 9.3.0
+ */
+public class BanyanDBModelExtension {
+ /**
+ * timestampColumn is to identify which column in {@link Record} is providing the timestamp(millisecond) for BanyanDB.
+ * BanyanDB stream requires a timestamp in milliseconds
+ * @since 9.3.0
+ */
+ @Getter
+ @Setter
+ private String timestampColumn;
+
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
index 4e37d756f3..23514dda32 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
@@ -39,6 +39,7 @@ public class Model {
private final Class<?> streamClass;
private final boolean timeRelativeID;
private final SQLDatabaseModelExtension sqlDBModelExtension;
+ private final BanyanDBModelExtension banyanDBModelExtension;
public Model(final String name,
final List<ModelColumn> columns,
@@ -48,7 +49,8 @@ public class Model {
final boolean superDataset,
final Class<?> streamClass,
boolean timeRelativeID,
- final SQLDatabaseModelExtension sqlDBModelExtension) {
+ final SQLDatabaseModelExtension sqlDBModelExtension,
+ final BanyanDBModelExtension banyanDBModelExtension) {
this.name = name;
this.columns = columns;
this.scopeId = scopeId;
@@ -59,5 +61,6 @@ public class Model {
this.streamClass = streamClass;
this.timeRelativeID = timeRelativeID;
this.sqlDBModelExtension = sqlDBModelExtension;
+ this.banyanDBModelExtension = banyanDBModelExtension;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 33969f60f2..f8fc461093 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -59,6 +59,7 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
List<ModelColumn> modelColumns = new ArrayList<>();
ShardingKeyChecker checker = new ShardingKeyChecker();
SQLDatabaseModelExtension sqlDBModelExtension = new SQLDatabaseModelExtension();
+ BanyanDBModelExtension banyanDBModelExtension = new BanyanDBModelExtension();
retrieval(aClass, storage.getModelName(), modelColumns, scopeId, checker, sqlDBModelExtension, record);
// Add extra column for additional entities
if (aClass.isAnnotationPresent(SQLDatabase.ExtraColumn4AdditionalEntity.class)
@@ -86,6 +87,12 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
}
});
}
+ //Add timestampColumn for BanyanDB
+ if (aClass.isAnnotationPresent(BanyanDB.TimestampColumn.class)) {
+ String timestampColumn = aClass.getAnnotation(BanyanDB.TimestampColumn.class).value();
+ banyanDBModelExtension.setTimestampColumn(timestampColumn);
+ }
+
checker.check(storage.getModelName());
Model model = new Model(
@@ -97,7 +104,8 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
isSuperDatasetModel(aClass),
aClass,
storage.isTimeRelativeID(),
- sqlDBModelExtension
+ sqlDBModelExtension,
+ banyanDBModelExtension
);
this.followColumnNameRules(model);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
index 3e44aa1f65..2676942032 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/zipkin/ZipkinSpanRecord.java
@@ -50,6 +50,7 @@ import static org.apache.skywalking.oap.server.core.analysis.record.Record.TIME_
@Stream(name = ZipkinSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ZIPKIN_SPAN, builder = ZipkinSpanRecord.Builder.class, processor = RecordStreamProcessor.class)
@SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = ZipkinSpanRecord.ADDITIONAL_QUERY_TABLE, parentColumn = TIME_BUCKET)
@SQLDatabase.Sharding(shardingAlgorithm = ShardingAlgorithm.TIME_SEC_RANGE_SHARDING_ALGORITHM, dataSourceShardingColumn = TRACE_ID, tableShardingColumn = TIME_BUCKET)
+@BanyanDB.TimestampColumn(ZipkinSpanRecord.TIMESTAMP_MILLIS)
public class ZipkinSpanRecord extends Record {
private static final Gson GSON = new Gson();
public static final int QUERY_LENGTH = 256;
@@ -167,7 +168,7 @@ public class ZipkinSpanRecord extends Record {
@Override
public String id() {
- return spanId + Const.LINE + kind;
+ return traceId + Const.LINE + spanId;
}
public static class Builder implements StorageBuilder<ZipkinSpanRecord> {
diff --git a/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java
index 44210a0fa9..1de09417ff 100644
--- a/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java
@@ -153,9 +153,10 @@ public class ProfileTaskServiceHandler extends ProfileTaskGrpc.ProfileTaskImplBa
logRecord.setOperationType(operationType.getCode());
logRecord.setOperationTime(System.currentTimeMillis());
// same with task time bucket, ensure record will ttl same with profile task
+ long timestamp = task.getStartTime() + TimeUnit.MINUTES.toMillis(task.getDuration());
logRecord.setTimeBucket(
- TimeBucket.getRecordTimeBucket(task.getStartTime() + TimeUnit.MINUTES.toMillis(task.getDuration())));
-
+ TimeBucket.getRecordTimeBucket(timestamp));
+ logRecord.setTimestamp(timestamp);
RecordStreamProcessor.getInstance().in(logRecord);
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v8/grpc/SpanAttachedEventReportServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v8/grpc/SpanAttachedEventReportServiceHandler.java
index 513061dca7..d7660f7d24 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v8/grpc/SpanAttachedEventReportServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v8/grpc/SpanAttachedEventReportServiceHandler.java
@@ -57,9 +57,10 @@ public class SpanAttachedEventReportServiceHandler extends SpanAttachedEventRepo
record.setTraceSegmentId(event.getTraceContext().getTraceSegmentId());
record.setTraceSpanId(event.getTraceContext().getSpanId());
record.setDataBinary(event.toByteArray());
- record.setTimeBucket(TimeBucket.getMinuteTimeBucket(TimeUnit.SECONDS.toMillis(record.getStartTimeSecond())
- + TimeUnit.NANOSECONDS.toMillis(record.getStartTimeNanos())));
-
+ long timestamp = TimeUnit.SECONDS.toMillis(record.getStartTimeSecond())
+ + TimeUnit.NANOSECONDS.toMillis(record.getStartTimeNanos());
+ record.setTimeBucket(TimeBucket.getMinuteTimeBucket(timestamp));
+ record.setTimestamp(timestamp);
RecordStreamProcessor.getInstance().in(record);
}
@@ -82,4 +83,4 @@ public class SpanAttachedEventReportServiceHandler extends SpanAttachedEventRepo
}
};
}
-}
\ No newline at end of file
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
index e40c6b7015..acab5dcb7e 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
@@ -70,6 +70,9 @@ public class BanyanDBConverter {
@Override
public void accept(String fieldName, Object fieldValue) {
+ if (fieldName.equals(this.schema.getTimestampColumn4Stream())) {
+ streamWrite.setTimestamp((long) fieldValue);
+ }
MetadataRegistry.ColumnSpec columnSpec = this.schema.getSpec(fieldName);
if (columnSpec == null) {
throw new IllegalArgumentException("fail to find field[" + fieldName + "]");
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
index 12b03a94eb..46edb436ab 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
@@ -45,10 +44,11 @@ public class BanyanDBNoneStreamDAO extends AbstractDAO<BanyanDBStorageClient> im
if (schema == null) {
throw new IOException(model.getName() + " is not registered");
}
- StreamWrite streamWrite = new StreamWrite(schema.getMetadata().getGroup(), // group name
- schema.getMetadata().name(), // stream-name
- noneStream.id(), // identity
- TimeBucket.getTimestamp(noneStream.getTimeBucket(), model.getDownsampling())); // timestamp
+ StreamWrite streamWrite = new StreamWrite(
+ schema.getMetadata().getGroup(), // group name
+ schema.getMetadata().name(), // stream-name
+ noneStream.id() // identity
+ ); // set timestamp inside `BanyanDBConverter.StreamToStorage`
Convert2Storage<StreamWrite> convert2Storage = new BanyanDBConverter.StreamToStorage(schema, streamWrite);
storageBuilder.entity2Storage(noneStream, convert2Storage);
getClient().write(streamWrite);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBZipkinQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBZipkinQueryDAO.java
index 96560abb28..3efb6f093b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBZipkinQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBZipkinQueryDAO.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -178,35 +177,33 @@ public class BanyanDBZipkinQueryDAO extends AbstractBanyanDBDAO implements IZipk
public List<List<Span>> getTraces(final QueryRequest request, Duration duration) throws IOException {
final int tracesLimit = request.limit();
int scrollLimit = 1000;
- int scrollFrom = 0;
+ long scrollEndTime = duration.getEndTimestamp();
Set<String> traceIds = new HashSet<>();
while (traceIds.size() < tracesLimit) {
- Set<String> resp = getTraceIds(request, duration, scrollFrom, scrollLimit);
- if (resp.size() == 0) {
- break;
- }
- for (String traceId : resp) {
- traceIds.add(traceId);
+ List<ZipkinSpanRecord> spans = getSpans(request, duration, scrollEndTime, scrollLimit);
+ for (ZipkinSpanRecord span : spans) {
+ traceIds.add(span.getTraceId());
if (traceIds.size() >= tracesLimit) {
break;
}
}
- scrollFrom = scrollFrom + scrollLimit;
+ if (spans.size() < scrollLimit) {
+ break;
+ }
+ scrollEndTime = spans.get(spans.size() - 1).getTimestampMillis();
}
return getTraces(traceIds);
}
- private Set<String> getTraceIds(final QueryRequest request,
+ private List<ZipkinSpanRecord> getSpans(final QueryRequest request,
Duration duration,
- int from,
+ long scrollEndTime,
int limit) throws IOException {
final long startTimeMillis = duration.getStartTimestamp();
- final long endTimeMillis = duration.getEndTimestamp();
-
TimestampRange tsRange = null;
- if (startTimeMillis > 0 && endTimeMillis > 0) {
- tsRange = new TimestampRange(startTimeMillis, endTimeMillis);
+ if (startTimeMillis > 0 && scrollEndTime > 0) {
+ tsRange = new TimestampRange(startTimeMillis, scrollEndTime);
}
final QueryBuilder<StreamQuery> queryBuilder = new QueryBuilder<StreamQuery>() {
@@ -242,15 +239,16 @@ public class BanyanDBZipkinQueryDAO extends AbstractBanyanDBDAO implements IZipk
}
query.setOrderBy(new StreamQuery.OrderBy(ZipkinSpanRecord.TIMESTAMP_MILLIS, AbstractQuery.Sort.DESC));
query.setLimit(limit);
- query.setOffset(from);
}
};
StreamQueryResponse resp = query(ZipkinSpanRecord.INDEX_NAME, TRACE_TAGS, tsRange, queryBuilder);
- Set<String> traceIds = new LinkedHashSet<>(); //needs to keep order here
+ List<ZipkinSpanRecord> spans = new ArrayList<>(); //needs to keep order here
for (final RowEntity rowEntity : resp.getElements()) {
- traceIds.add(rowEntity.getTagValue(ZipkinSpanRecord.TRACE_ID));
+ ZipkinSpanRecord spanRecord = new ZipkinSpanRecord.Builder().storage2Entity(
+ new BanyanDBConverter.StorageToStream(ZipkinSpanRecord.INDEX_NAME, rowEntity));
+ spans.add(spanRecord);
}
- return traceIds;
+ return spans;
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 9f6f2e4cf8..f679deca66 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -61,6 +61,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
@Slf4j
public enum MetadataRegistry {
@@ -87,6 +88,12 @@ public enum MetadataRegistry {
schemaBuilder.tag(tagSpec.getTagName());
}
}
+ String timestampColumn4Stream = model.getBanyanDBModelExtension().getTimestampColumn();
+ if (StringUtil.isBlank(timestampColumn4Stream)) {
+ throw new IllegalStateException(
+ "Model[stream." + model.getName() + "] miss defined @BanyanDB.TimestampColumn");
+ }
+ schemaBuilder.timestampColumn4Stream(timestampColumn4Stream);
List<IndexRule> indexRules = tags.stream()
.map(TagMetadata::getIndexRule)
.filter(Objects::nonNull)
@@ -517,6 +524,9 @@ public enum MetadataRegistry {
@Singular
private final Set<String> fields;
+ @Getter
+ private final String timestampColumn4Stream;
+
public ColumnSpec getSpec(String columnName) {
return this.specs.get(columnName);
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
index 11b0d9abec..ed04f7e1bf 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -42,10 +41,11 @@ public class BanyanDBRecordDAO implements IRecordDAO {
if (schema == null) {
throw new IOException(model.getName() + " is not registered");
}
- StreamWrite streamWrite = new StreamWrite(schema.getMetadata().getGroup(), // group name
- model.getName(), // index-name
- record.id(), // identity
- TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling())); // timestamp
+ StreamWrite streamWrite = new StreamWrite(
+ schema.getMetadata().getGroup(), // group name
+ model.getName(), // index-name
+ record.id() // identity
+ ); // set timestamp inside `BanyanDBConverter.StreamToStorage`
Convert2Storage<StreamWrite> convert2Storage = new BanyanDBConverter.StreamToStorage(schema, streamWrite);
storageBuilder.entity2Storage(record, convert2Storage);
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
index a0f45b1599..8114e68494 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.query.enumeration.Step;
+import org.apache.skywalking.oap.server.core.storage.model.BanyanDBModelExtension;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.SQLDatabaseModelExtension;
import org.junit.Assert;
@@ -41,13 +42,16 @@ public class TimeSeriesUtilsTest {
@Before
public void prepare() {
superDatasetModel = new Model("superDatasetModel", Lists.newArrayList(),
- 0, DownSampling.Second, true, true, Record.class, true, new SQLDatabaseModelExtension()
+ 0, DownSampling.Second, true, true, Record.class, true,
+ new SQLDatabaseModelExtension(), new BanyanDBModelExtension()
);
normalRecordModel = new Model("normalRecordModel", Lists.newArrayList(),
- 0, DownSampling.Second, true, false, Record.class, true, new SQLDatabaseModelExtension()
+ 0, DownSampling.Second, true, false, Record.class, true,
+ new SQLDatabaseModelExtension(), new BanyanDBModelExtension()
);
normalMetricsModel = new Model("normalMetricsModel", Lists.newArrayList(),
- 0, DownSampling.Minute, false, false, Metrics.class, true, new SQLDatabaseModelExtension()
+ 0, DownSampling.Minute, false, false, Metrics.class, true,
+ new SQLDatabaseModelExtension(), new BanyanDBModelExtension()
);
TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(1);
TimeSeriesUtils.setDAY_STEP(3);