You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/04/13 07:25:26 UTC
[skywalking-banyandb] branch main updated: Fix duplicate datapoints (#101)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new fd1aa3a Fix duplicate datapoints (#101)
fd1aa3a is described below
commit fd1aa3a9306caa28e90a2c467e4721a715504902
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Apr 13 15:25:22 2022 +0800
Fix duplicate datapoints (#101)
* Update all rpc time unit from nanosecond to millisecond.
Add a random number(0~999999) to the low 6 digits of
a datapoint's timestamp on writing it to kv
---
api/proto/banyandb/measure/v1/query.pb.go | 4 +--
api/proto/banyandb/measure/v1/query.proto | 4 +--
api/proto/banyandb/measure/v1/topn.pb.go | 4 +--
api/proto/banyandb/measure/v1/topn.proto | 4 +--
api/proto/banyandb/measure/v1/write.pb.go | 2 +-
api/proto/banyandb/measure/v1/write.proto | 2 +-
api/proto/banyandb/stream/v1/query.pb.go | 6 ++--
api/proto/banyandb/stream/v1/query.proto | 6 ++--
api/proto/banyandb/stream/v1/write.pb.go | 2 +-
api/proto/banyandb/stream/v1/write.proto | 2 +-
banyand/measure/measure_write.go | 2 +-
banyand/measure/measure_write_test.go | 3 +-
banyand/query/processor_test.go | 3 +-
banyand/stream/stream_query_test.go | 2 +-
banyand/stream/stream_write.go | 2 +-
banyand/stream/stream_write_test.go | 45 ++++++++++++++++++++++++++++-
banyand/tsdb/metric.go | 5 ++++
pkg/query/logical/common_test.go | 5 ++--
pkg/timestamp/nano.go | 48 +++++++++++++++++++++++++++++++
pkg/timestamp/nano_test.go | 33 +++++++++++++++++++++
20 files changed, 158 insertions(+), 26 deletions(-)
diff --git a/api/proto/banyandb/measure/v1/query.pb.go b/api/proto/banyandb/measure/v1/query.pb.go
index fa598f3..b6cf15d 100644
--- a/api/proto/banyandb/measure/v1/query.pb.go
+++ b/api/proto/banyandb/measure/v1/query.pb.go
@@ -48,7 +48,7 @@ type DataPoint struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- // timestamp is in the timeunit of nanoseconds.
+ // timestamp is in the timeunit of milliseconds.
Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// tag_families contains tags selected in the projection
TagFamilies []*v1.TagFamily `protobuf:"bytes,2,rep,name=tag_families,json=tagFamilies,proto3" json:"tag_families,omitempty"`
@@ -166,7 +166,7 @@ type QueryRequest struct {
// metadata is required
Metadata *v11.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
- // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+ // time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
TimeRange *v1.TimeRange `protobuf:"bytes,2,opt,name=time_range,json=timeRange,proto3" json:"time_range,omitempty"`
// tag_families are indexed.
Criteria []*v1.Criteria `protobuf:"bytes,4,rep,name=criteria,proto3" json:"criteria,omitempty"`
diff --git a/api/proto/banyandb/measure/v1/query.proto b/api/proto/banyandb/measure/v1/query.proto
index 01f50ec..09797c3 100644
--- a/api/proto/banyandb/measure/v1/query.proto
+++ b/api/proto/banyandb/measure/v1/query.proto
@@ -29,7 +29,7 @@ import "banyandb/model/v1/query.proto";
// DataPoint is stored in Measures
message DataPoint {
- // timestamp is in the timeunit of nanoseconds.
+ // timestamp is in the timeunit of milliseconds.
google.protobuf.Timestamp timestamp = 1;
// tag_families contains tags selected in the projection
repeated model.v1.TagFamily tag_families = 2;
@@ -51,7 +51,7 @@ message QueryResponse {
message QueryRequest {
// metadata is required
common.v1.Metadata metadata = 1;
- // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+ // time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
model.v1.TimeRange time_range = 2;
// tag_families are indexed.
repeated model.v1.Criteria criteria = 4;
diff --git a/api/proto/banyandb/measure/v1/topn.pb.go b/api/proto/banyandb/measure/v1/topn.pb.go
index 22749b5..40d7b41 100644
--- a/api/proto/banyandb/measure/v1/topn.pb.go
+++ b/api/proto/banyandb/measure/v1/topn.pb.go
@@ -48,7 +48,7 @@ type TopNList struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- // timestamp is in the timeunit of nanoseconds.
+ // timestamp is in the timeunit of milliseconds.
Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// items contains top-n items in a list
Items []*TopNList_Item `protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"`
@@ -158,7 +158,7 @@ type TopNRequest struct {
// metadata is required
Metadata *v1.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
- // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+ // time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
TimeRange *v11.TimeRange `protobuf:"bytes,2,opt,name=time_range,json=timeRange,proto3" json:"time_range,omitempty"`
// top_n set the how many items should be returned in each list.
TopN int32 `protobuf:"varint,3,opt,name=top_n,json=topN,proto3" json:"top_n,omitempty"`
diff --git a/api/proto/banyandb/measure/v1/topn.proto b/api/proto/banyandb/measure/v1/topn.proto
index d4acb6e..9910966 100644
--- a/api/proto/banyandb/measure/v1/topn.proto
+++ b/api/proto/banyandb/measure/v1/topn.proto
@@ -29,7 +29,7 @@ import "banyandb/model/v1/query.proto";
//TopNList contains a series of topN items
message TopNList {
- // timestamp is in the timeunit of nanoseconds.
+ // timestamp is in the timeunit of milliseconds.
google.protobuf.Timestamp timestamp = 1;
message Item {
string name = 1;
@@ -50,7 +50,7 @@ message TopNResponse {
message TopNRequest {
// metadata is required
common.v1.Metadata metadata = 1;
- // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+ // time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
model.v1.TimeRange time_range = 2;
// top_n set the how many items should be returned in each list.
int32 top_n = 3;
diff --git a/api/proto/banyandb/measure/v1/write.pb.go b/api/proto/banyandb/measure/v1/write.pb.go
index 7746fe1..8c1ee58 100644
--- a/api/proto/banyandb/measure/v1/write.pb.go
+++ b/api/proto/banyandb/measure/v1/write.pb.go
@@ -48,7 +48,7 @@ type DataPointValue struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- // timestamp is in the timeunit of nanoseconds.
+ // timestamp is in the timeunit of milliseconds.
Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// the order of tag_families' items match the measure schema
TagFamilies []*v1.TagFamilyForWrite `protobuf:"bytes,2,rep,name=tag_families,json=tagFamilies,proto3" json:"tag_families,omitempty"`
diff --git a/api/proto/banyandb/measure/v1/write.proto b/api/proto/banyandb/measure/v1/write.proto
index 4246cd6..4bd2540 100644
--- a/api/proto/banyandb/measure/v1/write.proto
+++ b/api/proto/banyandb/measure/v1/write.proto
@@ -28,7 +28,7 @@ import "banyandb/model/v1/common.proto";
//DataPointValue is the data point for writing. It only contains values.
message DataPointValue {
- // timestamp is in the timeunit of nanoseconds.
+ // timestamp is in the timeunit of milliseconds.
google.protobuf.Timestamp timestamp = 1;
// the order of tag_families' items match the measure schema
repeated model.v1.TagFamilyForWrite tag_families = 2;
diff --git a/api/proto/banyandb/stream/v1/query.pb.go b/api/proto/banyandb/stream/v1/query.pb.go
index eb50566..6a53f46 100644
--- a/api/proto/banyandb/stream/v1/query.pb.go
+++ b/api/proto/banyandb/stream/v1/query.pb.go
@@ -52,7 +52,7 @@ type Element struct {
// element_id could be span_id of a Span or segment_id of a Segment in the context of stream
ElementId string `protobuf:"bytes,1,opt,name=element_id,json=elementId,proto3" json:"element_id,omitempty"`
- // timestamp represents
+ // timestamp represents a millisecond
// 1) either the start time of a Span/Segment,
// 2) or the timestamp of a log
Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
@@ -61,7 +61,7 @@ type Element struct {
// - duration
// - service_name
// - service_instance_id
- // - end_time_nanoseconds
+ // - end_time_milliseconds
TagFamilies []*v1.TagFamily `protobuf:"bytes,3,rep,name=tag_families,json=tagFamilies,proto3" json:"tag_families,omitempty"`
}
@@ -175,7 +175,7 @@ type QueryRequest struct {
// metadata is required
Metadata *v11.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
- // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+ // time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
// In the context of stream, it represents the range of the `startTime` for spans/segments,
// while in the context of Log, it means the range of the timestamp(s) for logs.
// it is always recommended to specify time range for performance reason
diff --git a/api/proto/banyandb/stream/v1/query.proto b/api/proto/banyandb/stream/v1/query.proto
index ae11f8c..9430906 100644
--- a/api/proto/banyandb/stream/v1/query.proto
+++ b/api/proto/banyandb/stream/v1/query.proto
@@ -32,7 +32,7 @@ import "banyandb/model/v1/query.proto";
message Element {
// element_id could be span_id of a Span or segment_id of a Segment in the context of stream
string element_id = 1;
- // timestamp represents
+ // timestamp represents a millisecond
// 1) either the start time of a Span/Segment,
// 2) or the timestamp of a log
google.protobuf.Timestamp timestamp = 2;
@@ -41,7 +41,7 @@ message Element {
// - duration
// - service_name
// - service_instance_id
- // - end_time_nanoseconds
+ // - end_time_milliseconds
repeated model.v1.TagFamily tag_families = 3;
}
@@ -55,7 +55,7 @@ message QueryResponse {
message QueryRequest {
// metadata is required
common.v1.Metadata metadata = 1;
- // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+ // time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
// In the context of stream, it represents the range of the `startTime` for spans/segments,
// while in the context of Log, it means the range of the timestamp(s) for logs.
// it is always recommended to specify time range for performance reason
diff --git a/api/proto/banyandb/stream/v1/write.pb.go b/api/proto/banyandb/stream/v1/write.pb.go
index 1af1637..06bb73f 100644
--- a/api/proto/banyandb/stream/v1/write.pb.go
+++ b/api/proto/banyandb/stream/v1/write.pb.go
@@ -49,7 +49,7 @@ type ElementValue struct {
// element_id could be span_id of a Span or segment_id of a Segment in the context of stream
ElementId string `protobuf:"bytes,1,opt,name=element_id,json=elementId,proto3" json:"element_id,omitempty"`
- // timestamp_nanoseconds is in the timeunit of nanoseconds. It represents
+ // timestamp is in the timeunit of milliseconds. It represents
// 1) either the start time of a Span/Segment,
// 2) or the timestamp of a log
Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
diff --git a/api/proto/banyandb/stream/v1/write.proto b/api/proto/banyandb/stream/v1/write.proto
index b30961e..cb1b5a5 100644
--- a/api/proto/banyandb/stream/v1/write.proto
+++ b/api/proto/banyandb/stream/v1/write.proto
@@ -29,7 +29,7 @@ import "banyandb/model/v1/common.proto";
message ElementValue {
// element_id could be span_id of a Span or segment_id of a Segment in the context of stream
string element_id = 1;
- // timestamp_nanoseconds is in the timeunit of nanoseconds. It represents
+ // timestamp is in the timeunit of milliseconds. It represents
// 1) either the start time of a Span/Segment,
// 2) or the timestamp of a log
google.protobuf.Timestamp timestamp = 2;
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 5b777e7..a8f60d9 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -78,7 +78,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
if err != nil {
return err
}
- t := value.GetTimestamp().AsTime()
+ t := timestamp.MToN(value.GetTimestamp().AsTime())
wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
if err != nil {
if wp != nil {
diff --git a/banyand/measure/measure_write_test.go b/banyand/measure/measure_write_test.go
index 249a31a..22161df 100644
--- a/banyand/measure/measure_write_test.go
+++ b/banyand/measure/measure_write_test.go
@@ -30,6 +30,7 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
"github.com/apache/skywalking-banyandb/banyand/measure"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var _ = Describe("Write service_cpm_minute", func() {
@@ -70,7 +71,7 @@ var _ = Describe("Write service_cpm_minute", func() {
var dataFS embed.FS
func writeData(dataFile string, measure measure.Measure) (baseTime time.Time) {
- baseTime = time.Now()
+ baseTime = timestamp.NowMilli()
writeDataWithBaseTime(baseTime, dataFile, measure)
return baseTime
}
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index fff833f..48f1bf4 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -46,6 +46,7 @@ import (
pb "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
"github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var (
@@ -145,7 +146,7 @@ var dataFS embed.FS
func setUpStreamQueryData(dataFile string, stream stream.Stream) (baseTime time.Time) {
var templates []interface{}
- baseTime = time.Now()
+ baseTime = timestamp.NowMilli()
content, err := dataFS.ReadFile("testdata/" + dataFile)
Expect(err).ShouldNot(HaveOccurred())
Expect(json.Unmarshal(content, &templates)).Should(Succeed())
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index cae08e6..5adb563 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -789,7 +789,7 @@ var dataFS embed.FS
func setupQueryData(dataFile string, stream *stream) (baseTime time.Time) {
var templates []interface{}
- baseTime = time.Now()
+ baseTime = timestamp.NowMilli()
content, err := dataFS.ReadFile("testdata/" + dataFile)
Expect(err).ShouldNot(HaveOccurred())
Expect(json.Unmarshal(content, &templates)).ShouldNot(HaveOccurred())
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 10fcc74..5affbc4 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -69,7 +69,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
if err != nil {
return err
}
- t := value.GetTimestamp().AsTime()
+ t := timestamp.MToN(value.GetTimestamp().AsTime())
wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
if err != nil {
if wp != nil {
diff --git a/banyand/stream/stream_write_test.go b/banyand/stream/stream_write_test.go
index 1e07194..ab99f2f 100644
--- a/banyand/stream/stream_write_test.go
+++ b/banyand/stream/stream_write_test.go
@@ -19,6 +19,7 @@ package stream
import (
"encoding/base64"
+ "time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -27,6 +28,9 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var _ = Describe("Write", func() {
@@ -191,6 +195,45 @@ var _ = Describe("Write", func() {
})
}
})
+ It("Same millisecond data", func() {
+ ele := getEle(
+ "trace_id-xxfff.111323",
+ 0,
+ "webapp_id",
+ "10.0.0.1_id",
+ "/home_id",
+ 300,
+ 1622933202000000000,
+ )
+ err := s.Write(ele)
+ Expect(err).ShouldNot(HaveOccurred())
+ err = s.Write(ele)
+ Expect(err).ShouldNot(HaveOccurred())
+ entity := tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), tsdb.Entry(convert.Int64ToBytes(0))}
+ Eventually(func() int {
+ ss, err := s.Shards(entity)
+ Expect(err).ShouldNot(HaveOccurred())
+ eleSize := 0
+ for _, shard := range ss {
+ series, errInternal := shard.Series().Get(entity)
+ Expect(errInternal).ShouldNot(HaveOccurred())
+ ts, errInternal := series.Span(timestamp.NewInclusiveTimeRangeDuration(ele.Timestamp.AsTime(), 1*time.Hour))
+ Expect(errInternal).ShouldNot(HaveOccurred())
+ sb, errInternal := ts.SeekerBuilder().Build()
+ Expect(errInternal).ShouldNot(HaveOccurred())
+ iters, errInternal := sb.Seek()
+ Expect(errInternal).ShouldNot(HaveOccurred())
+ for _, iter := range iters {
+ for iter.Next() {
+ eleSize++
+ }
+ iter.Close()
+ }
+ ts.Close()
+ }
+ return eleSize
+ }).Should(Equal(2))
+ })
})
func getEle(tags ...interface{}) *streamv1.ElementValue {
@@ -201,7 +244,7 @@ func getEle(tags ...interface{}) *streamv1.ElementValue {
bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
e := &streamv1.ElementValue{
ElementId: "1231.dfd.123123ssf",
- Timestamp: timestamppb.Now(),
+ Timestamp: timestamppb.New(timestamp.NowMilli()),
TagFamilies: []*modelv1.TagFamilyForWrite{
{
Tags: []*modelv1.TagValue{
diff --git a/banyand/tsdb/metric.go b/banyand/tsdb/metric.go
index 3638bb4..f06b673 100644
--- a/banyand/tsdb/metric.go
+++ b/banyand/tsdb/metric.go
@@ -67,6 +67,11 @@ func (s *shard) runStat() {
}
func (s *shard) stat() {
+ defer func() {
+ if r := recover(); r != nil {
+ s.l.Warn().Interface("r", r).Msg("recovered")
+ }
+ }()
seriesStat := s.seriesDatabase.Stats()
s.curry(mtBytes).WithLabelValues("series").Set(float64(seriesStat.MemBytes))
s.curry(maxMtBytes).WithLabelValues("series").Set(float64(seriesStat.MaxMemBytes))
diff --git a/pkg/query/logical/common_test.go b/pkg/query/logical/common_test.go
index 101d529..8512fb6 100644
--- a/pkg/query/logical/common_test.go
+++ b/pkg/query/logical/common_test.go
@@ -45,6 +45,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test"
testmeasure "github.com/apache/skywalking-banyandb/pkg/test/measure"
teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
//go:embed testdata/*.json
@@ -53,7 +54,7 @@ var dataFS embed.FS
func setupQueryData(testing *testing.T, dataFile string, stream stream.Stream) (baseTime time.Time) {
t := assert.New(testing)
var templates []interface{}
- baseTime = time.Now()
+ baseTime = timestamp.NowMilli()
content, err := dataFS.ReadFile("testdata/" + dataFile)
t.NoError(err)
t.NoError(json.Unmarshal(content, &templates))
@@ -194,7 +195,7 @@ func setupMeasure(t *require.Assertions) (measure.Measure, metadata.Service, fun
func setupMeasureQueryData(testing *testing.T, dataFile string, measure measure.Measure) (baseTime time.Time) {
t := assert.New(testing)
var templates []interface{}
- baseTime = time.Now()
+ baseTime = timestamp.NowMilli()
content, err := dataFS.ReadFile("testdata/" + dataFile)
t.NoError(err)
t.NoError(json.Unmarshal(content, &templates))
diff --git a/pkg/timestamp/nano.go b/pkg/timestamp/nano.go
new file mode 100644
index 0000000..207f924
--- /dev/null
+++ b/pkg/timestamp/nano.go
@@ -0,0 +1,48 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp
+
+import (
+ "time"
+ // link runtime pkg fastrand
+ _ "unsafe"
+)
+
+var (
+ maxNanoSecond = uint32(time.Millisecond - 1)
+ mSecond = int64(time.Millisecond)
+)
+
+// FastRandN is a fast thread local random function.
+//go:linkname fastRandN runtime.fastrandn
+func fastRandN(n uint32) uint32
+
+// MToN convert time unix millisends to nanoseconds
+func MToN(ms time.Time) time.Time {
+ ns := ms.UnixNano()
+ if ms.Nanosecond()%int(mSecond) > 0 {
+ ns = ns / mSecond * mSecond
+ }
+ nns := ns + int64(fastRandN(maxNanoSecond))
+ return time.Unix(ms.Unix(), nns%int64(time.Second))
+}
+
+// NowMilli returns a time based on a unix millisecond
+func NowMilli() time.Time {
+ return time.UnixMilli(time.Now().UnixMilli())
+}
diff --git a/pkg/timestamp/nano_test.go b/pkg/timestamp/nano_test.go
new file mode 100644
index 0000000..f2e41ca
--- /dev/null
+++ b/pkg/timestamp/nano_test.go
@@ -0,0 +1,33 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp_test
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+func TestMToN(t *testing.T) {
+ m := timestamp.NowMilli()
+ n := timestamp.MToN(m)
+ assert.Equal(t, n.UnixMilli(), m.UnixMilli())
+ assert.GreaterOrEqual(t, n.Nanosecond(), m.Nanosecond())
+}