You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/04/13 07:25:26 UTC

[skywalking-banyandb] branch main updated: Fix duplicate datapoints (#101)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new fd1aa3a  Fix duplicate datapoints (#101)
fd1aa3a is described below

commit fd1aa3a9306caa28e90a2c467e4721a715504902
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Apr 13 15:25:22 2022 +0800

    Fix duplicate datapoints (#101)
    
    * Update all rpc time unit from nanosecond to millisecond.
    Add a random number(0~999999) to the low 6 digits of
    a datapoint's timestamp on writing it to kv
---
 api/proto/banyandb/measure/v1/query.pb.go |  4 +--
 api/proto/banyandb/measure/v1/query.proto |  4 +--
 api/proto/banyandb/measure/v1/topn.pb.go  |  4 +--
 api/proto/banyandb/measure/v1/topn.proto  |  4 +--
 api/proto/banyandb/measure/v1/write.pb.go |  2 +-
 api/proto/banyandb/measure/v1/write.proto |  2 +-
 api/proto/banyandb/stream/v1/query.pb.go  |  6 ++--
 api/proto/banyandb/stream/v1/query.proto  |  6 ++--
 api/proto/banyandb/stream/v1/write.pb.go  |  2 +-
 api/proto/banyandb/stream/v1/write.proto  |  2 +-
 banyand/measure/measure_write.go          |  2 +-
 banyand/measure/measure_write_test.go     |  3 +-
 banyand/query/processor_test.go           |  3 +-
 banyand/stream/stream_query_test.go       |  2 +-
 banyand/stream/stream_write.go            |  2 +-
 banyand/stream/stream_write_test.go       | 45 ++++++++++++++++++++++++++++-
 banyand/tsdb/metric.go                    |  5 ++++
 pkg/query/logical/common_test.go          |  5 ++--
 pkg/timestamp/nano.go                     | 48 +++++++++++++++++++++++++++++++
 pkg/timestamp/nano_test.go                | 33 +++++++++++++++++++++
 20 files changed, 158 insertions(+), 26 deletions(-)

diff --git a/api/proto/banyandb/measure/v1/query.pb.go b/api/proto/banyandb/measure/v1/query.pb.go
index fa598f3..b6cf15d 100644
--- a/api/proto/banyandb/measure/v1/query.pb.go
+++ b/api/proto/banyandb/measure/v1/query.pb.go
@@ -48,7 +48,7 @@ type DataPoint struct {
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	// timestamp is in the timeunit of nanoseconds.
+	// timestamp is in the timeunit of milliseconds.
 	Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
 	// tag_families contains tags selected in the projection
 	TagFamilies []*v1.TagFamily `protobuf:"bytes,2,rep,name=tag_families,json=tagFamilies,proto3" json:"tag_families,omitempty"`
@@ -166,7 +166,7 @@ type QueryRequest struct {
 
 	// metadata is required
 	Metadata *v11.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
-	// time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+	// time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
 	TimeRange *v1.TimeRange `protobuf:"bytes,2,opt,name=time_range,json=timeRange,proto3" json:"time_range,omitempty"`
 	// tag_families are indexed.
 	Criteria []*v1.Criteria `protobuf:"bytes,4,rep,name=criteria,proto3" json:"criteria,omitempty"`
diff --git a/api/proto/banyandb/measure/v1/query.proto b/api/proto/banyandb/measure/v1/query.proto
index 01f50ec..09797c3 100644
--- a/api/proto/banyandb/measure/v1/query.proto
+++ b/api/proto/banyandb/measure/v1/query.proto
@@ -29,7 +29,7 @@ import "banyandb/model/v1/query.proto";
 
 // DataPoint is stored in Measures
 message DataPoint {
-  // timestamp is in the timeunit of nanoseconds.
+  // timestamp is in the timeunit of milliseconds.
   google.protobuf.Timestamp timestamp = 1;
   // tag_families contains tags selected in the projection
   repeated model.v1.TagFamily tag_families = 2;
@@ -51,7 +51,7 @@ message QueryResponse {
 message QueryRequest {
   // metadata is required
   common.v1.Metadata metadata = 1;
-  // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+  // time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
   model.v1.TimeRange time_range = 2;
   // tag_families are indexed.
   repeated model.v1.Criteria criteria = 4;
diff --git a/api/proto/banyandb/measure/v1/topn.pb.go b/api/proto/banyandb/measure/v1/topn.pb.go
index 22749b5..40d7b41 100644
--- a/api/proto/banyandb/measure/v1/topn.pb.go
+++ b/api/proto/banyandb/measure/v1/topn.pb.go
@@ -48,7 +48,7 @@ type TopNList struct {
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	// timestamp is in the timeunit of nanoseconds.
+	// timestamp is in the timeunit of milliseconds.
 	Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
 	// items contains top-n items in a list
 	Items []*TopNList_Item `protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"`
@@ -158,7 +158,7 @@ type TopNRequest struct {
 
 	// metadata is required
 	Metadata *v1.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
-	// time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+	// time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
 	TimeRange *v11.TimeRange `protobuf:"bytes,2,opt,name=time_range,json=timeRange,proto3" json:"time_range,omitempty"`
 	// top_n set the how many items should be returned in each list.
 	TopN int32 `protobuf:"varint,3,opt,name=top_n,json=topN,proto3" json:"top_n,omitempty"`
diff --git a/api/proto/banyandb/measure/v1/topn.proto b/api/proto/banyandb/measure/v1/topn.proto
index d4acb6e..9910966 100644
--- a/api/proto/banyandb/measure/v1/topn.proto
+++ b/api/proto/banyandb/measure/v1/topn.proto
@@ -29,7 +29,7 @@ import "banyandb/model/v1/query.proto";
 
 //TopNList contains a series of topN items
 message TopNList {
-    // timestamp is in the timeunit of nanoseconds.
+    // timestamp is in the timeunit of milliseconds.
     google.protobuf.Timestamp timestamp = 1;
     message Item {
         string name = 1;
@@ -50,7 +50,7 @@ message TopNResponse {
 message TopNRequest {
     // metadata is required
     common.v1.Metadata metadata = 1;
-    // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+    // time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
     model.v1.TimeRange time_range = 2;
     // top_n set the how many items should be returned in each list.
     int32 top_n = 3;
diff --git a/api/proto/banyandb/measure/v1/write.pb.go b/api/proto/banyandb/measure/v1/write.pb.go
index 7746fe1..8c1ee58 100644
--- a/api/proto/banyandb/measure/v1/write.pb.go
+++ b/api/proto/banyandb/measure/v1/write.pb.go
@@ -48,7 +48,7 @@ type DataPointValue struct {
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	// timestamp is in the timeunit of nanoseconds.
+	// timestamp is in the timeunit of milliseconds.
 	Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
 	// the order of tag_families' items match the measure schema
 	TagFamilies []*v1.TagFamilyForWrite `protobuf:"bytes,2,rep,name=tag_families,json=tagFamilies,proto3" json:"tag_families,omitempty"`
diff --git a/api/proto/banyandb/measure/v1/write.proto b/api/proto/banyandb/measure/v1/write.proto
index 4246cd6..4bd2540 100644
--- a/api/proto/banyandb/measure/v1/write.proto
+++ b/api/proto/banyandb/measure/v1/write.proto
@@ -28,7 +28,7 @@ import "banyandb/model/v1/common.proto";
 
 //DataPointValue is the data point for writing. It only contains values.
 message DataPointValue {
-  // timestamp is in the timeunit of nanoseconds.
+  // timestamp is in the timeunit of milliseconds.
   google.protobuf.Timestamp timestamp = 1;
   // the order of tag_families' items match the measure schema
   repeated model.v1.TagFamilyForWrite tag_families = 2;
diff --git a/api/proto/banyandb/stream/v1/query.pb.go b/api/proto/banyandb/stream/v1/query.pb.go
index eb50566..6a53f46 100644
--- a/api/proto/banyandb/stream/v1/query.pb.go
+++ b/api/proto/banyandb/stream/v1/query.pb.go
@@ -52,7 +52,7 @@ type Element struct {
 
 	// element_id could be span_id of a Span or segment_id of a Segment in the context of stream
 	ElementId string `protobuf:"bytes,1,opt,name=element_id,json=elementId,proto3" json:"element_id,omitempty"`
-	// timestamp represents
+	// timestamp represents a millisecond
 	// 1) either the start time of a Span/Segment,
 	// 2) or the timestamp of a log
 	Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
@@ -61,7 +61,7 @@ type Element struct {
 	// - duration
 	// - service_name
 	// - service_instance_id
-	// - end_time_nanoseconds
+	// - end_time_milliseconds
 	TagFamilies []*v1.TagFamily `protobuf:"bytes,3,rep,name=tag_families,json=tagFamilies,proto3" json:"tag_families,omitempty"`
 }
 
@@ -175,7 +175,7 @@ type QueryRequest struct {
 
 	// metadata is required
 	Metadata *v11.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
-	// time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+	// time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
 	// In the context of stream, it represents the range of the `startTime` for spans/segments,
 	// while in the context of Log, it means the range of the timestamp(s) for logs.
 	// it is always recommended to specify time range for performance reason
diff --git a/api/proto/banyandb/stream/v1/query.proto b/api/proto/banyandb/stream/v1/query.proto
index ae11f8c..9430906 100644
--- a/api/proto/banyandb/stream/v1/query.proto
+++ b/api/proto/banyandb/stream/v1/query.proto
@@ -32,7 +32,7 @@ import "banyandb/model/v1/query.proto";
 message Element {
   // element_id could be span_id of a Span or segment_id of a Segment in the context of stream
   string element_id = 1;
-  // timestamp represents
+  // timestamp represents a millisecond
   // 1) either the start time of a Span/Segment,
   // 2) or the timestamp of a log
   google.protobuf.Timestamp timestamp = 2;
@@ -41,7 +41,7 @@ message Element {
   // - duration
   // - service_name
   // - service_instance_id
-  // - end_time_nanoseconds
+  // - end_time_milliseconds
   repeated model.v1.TagFamily tag_families = 3;
 }
 
@@ -55,7 +55,7 @@ message QueryResponse {
 message QueryRequest {
   // metadata is required
   common.v1.Metadata metadata = 1;
-  // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+  // time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
   // In the context of stream, it represents the range of the `startTime` for spans/segments,
   // while in the context of Log, it means the range of the timestamp(s) for logs.
   // it is always recommended to specify time range for performance reason
diff --git a/api/proto/banyandb/stream/v1/write.pb.go b/api/proto/banyandb/stream/v1/write.pb.go
index 1af1637..06bb73f 100644
--- a/api/proto/banyandb/stream/v1/write.pb.go
+++ b/api/proto/banyandb/stream/v1/write.pb.go
@@ -49,7 +49,7 @@ type ElementValue struct {
 
 	// element_id could be span_id of a Span or segment_id of a Segment in the context of stream
 	ElementId string `protobuf:"bytes,1,opt,name=element_id,json=elementId,proto3" json:"element_id,omitempty"`
-	// timestamp_nanoseconds is in the timeunit of nanoseconds. It represents
+	// timestamp is in the timeunit of milliseconds. It represents
 	// 1) either the start time of a Span/Segment,
 	// 2) or the timestamp of a log
 	Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
diff --git a/api/proto/banyandb/stream/v1/write.proto b/api/proto/banyandb/stream/v1/write.proto
index b30961e..cb1b5a5 100644
--- a/api/proto/banyandb/stream/v1/write.proto
+++ b/api/proto/banyandb/stream/v1/write.proto
@@ -29,7 +29,7 @@ import "banyandb/model/v1/common.proto";
 message ElementValue {
   // element_id could be span_id of a Span or segment_id of a Segment in the context of stream
   string element_id = 1;
-  // timestamp_nanoseconds is in the timeunit of nanoseconds. It represents
+  // timestamp is in the timeunit of milliseconds. It represents
   // 1) either the start time of a Span/Segment,
   // 2) or the timestamp of a log
   google.protobuf.Timestamp timestamp = 2;
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 5b777e7..a8f60d9 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -78,7 +78,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
 	if err != nil {
 		return err
 	}
-	t := value.GetTimestamp().AsTime()
+	t := timestamp.MToN(value.GetTimestamp().AsTime())
 	wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
 	if err != nil {
 		if wp != nil {
diff --git a/banyand/measure/measure_write_test.go b/banyand/measure/measure_write_test.go
index 249a31a..22161df 100644
--- a/banyand/measure/measure_write_test.go
+++ b/banyand/measure/measure_write_test.go
@@ -30,6 +30,7 @@ import (
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	"github.com/apache/skywalking-banyandb/banyand/measure"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var _ = Describe("Write service_cpm_minute", func() {
@@ -70,7 +71,7 @@ var _ = Describe("Write service_cpm_minute", func() {
 var dataFS embed.FS
 
 func writeData(dataFile string, measure measure.Measure) (baseTime time.Time) {
-	baseTime = time.Now()
+	baseTime = timestamp.NowMilli()
 	writeDataWithBaseTime(baseTime, dataFile, measure)
 	return baseTime
 }
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index fff833f..48f1bf4 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -46,6 +46,7 @@ import (
 	pb "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 	"github.com/apache/skywalking-banyandb/pkg/query/logical"
 	"github.com/apache/skywalking-banyandb/pkg/test"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var (
@@ -145,7 +146,7 @@ var dataFS embed.FS
 
 func setUpStreamQueryData(dataFile string, stream stream.Stream) (baseTime time.Time) {
 	var templates []interface{}
-	baseTime = time.Now()
+	baseTime = timestamp.NowMilli()
 	content, err := dataFS.ReadFile("testdata/" + dataFile)
 	Expect(err).ShouldNot(HaveOccurred())
 	Expect(json.Unmarshal(content, &templates)).Should(Succeed())
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index cae08e6..5adb563 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -789,7 +789,7 @@ var dataFS embed.FS
 
 func setupQueryData(dataFile string, stream *stream) (baseTime time.Time) {
 	var templates []interface{}
-	baseTime = time.Now()
+	baseTime = timestamp.NowMilli()
 	content, err := dataFS.ReadFile("testdata/" + dataFile)
 	Expect(err).ShouldNot(HaveOccurred())
 	Expect(json.Unmarshal(content, &templates)).ShouldNot(HaveOccurred())
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 10fcc74..5affbc4 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -69,7 +69,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
 	if err != nil {
 		return err
 	}
-	t := value.GetTimestamp().AsTime()
+	t := timestamp.MToN(value.GetTimestamp().AsTime())
 	wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
 	if err != nil {
 		if wp != nil {
diff --git a/banyand/stream/stream_write_test.go b/banyand/stream/stream_write_test.go
index 1e07194..ab99f2f 100644
--- a/banyand/stream/stream_write_test.go
+++ b/banyand/stream/stream_write_test.go
@@ -19,6 +19,7 @@ package stream
 
 import (
 	"encoding/base64"
+	"time"
 
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
@@ -27,6 +28,9 @@ import (
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var _ = Describe("Write", func() {
@@ -191,6 +195,45 @@ var _ = Describe("Write", func() {
 			})
 		}
 	})
+	It("Same millisecond data", func() {
+		ele := getEle(
+			"trace_id-xxfff.111323",
+			0,
+			"webapp_id",
+			"10.0.0.1_id",
+			"/home_id",
+			300,
+			1622933202000000000,
+		)
+		err := s.Write(ele)
+		Expect(err).ShouldNot(HaveOccurred())
+		err = s.Write(ele)
+		Expect(err).ShouldNot(HaveOccurred())
+		entity := tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), tsdb.Entry(convert.Int64ToBytes(0))}
+		Eventually(func() int {
+			ss, err := s.Shards(entity)
+			Expect(err).ShouldNot(HaveOccurred())
+			eleSize := 0
+			for _, shard := range ss {
+				series, errInternal := shard.Series().Get(entity)
+				Expect(errInternal).ShouldNot(HaveOccurred())
+				ts, errInternal := series.Span(timestamp.NewInclusiveTimeRangeDuration(ele.Timestamp.AsTime(), 1*time.Hour))
+				Expect(errInternal).ShouldNot(HaveOccurred())
+				sb, errInternal := ts.SeekerBuilder().Build()
+				Expect(errInternal).ShouldNot(HaveOccurred())
+				iters, errInternal := sb.Seek()
+				Expect(errInternal).ShouldNot(HaveOccurred())
+				for _, iter := range iters {
+					for iter.Next() {
+						eleSize++
+					}
+					iter.Close()
+				}
+				ts.Close()
+			}
+			return eleSize
+		}).Should(Equal(2))
+	})
 })
 
 func getEle(tags ...interface{}) *streamv1.ElementValue {
@@ -201,7 +244,7 @@ func getEle(tags ...interface{}) *streamv1.ElementValue {
 	bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
 	e := &streamv1.ElementValue{
 		ElementId: "1231.dfd.123123ssf",
-		Timestamp: timestamppb.Now(),
+		Timestamp: timestamppb.New(timestamp.NowMilli()),
 		TagFamilies: []*modelv1.TagFamilyForWrite{
 			{
 				Tags: []*modelv1.TagValue{
diff --git a/banyand/tsdb/metric.go b/banyand/tsdb/metric.go
index 3638bb4..f06b673 100644
--- a/banyand/tsdb/metric.go
+++ b/banyand/tsdb/metric.go
@@ -67,6 +67,11 @@ func (s *shard) runStat() {
 }
 
 func (s *shard) stat() {
+	defer func() {
+		if r := recover(); r != nil {
+			s.l.Warn().Interface("r", r).Msg("recovered")
+		}
+	}()
 	seriesStat := s.seriesDatabase.Stats()
 	s.curry(mtBytes).WithLabelValues("series").Set(float64(seriesStat.MemBytes))
 	s.curry(maxMtBytes).WithLabelValues("series").Set(float64(seriesStat.MaxMemBytes))
diff --git a/pkg/query/logical/common_test.go b/pkg/query/logical/common_test.go
index 101d529..8512fb6 100644
--- a/pkg/query/logical/common_test.go
+++ b/pkg/query/logical/common_test.go
@@ -45,6 +45,7 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/test"
 	testmeasure "github.com/apache/skywalking-banyandb/pkg/test/measure"
 	teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 //go:embed testdata/*.json
@@ -53,7 +54,7 @@ var dataFS embed.FS
 func setupQueryData(testing *testing.T, dataFile string, stream stream.Stream) (baseTime time.Time) {
 	t := assert.New(testing)
 	var templates []interface{}
-	baseTime = time.Now()
+	baseTime = timestamp.NowMilli()
 	content, err := dataFS.ReadFile("testdata/" + dataFile)
 	t.NoError(err)
 	t.NoError(json.Unmarshal(content, &templates))
@@ -194,7 +195,7 @@ func setupMeasure(t *require.Assertions) (measure.Measure, metadata.Service, fun
 func setupMeasureQueryData(testing *testing.T, dataFile string, measure measure.Measure) (baseTime time.Time) {
 	t := assert.New(testing)
 	var templates []interface{}
-	baseTime = time.Now()
+	baseTime = timestamp.NowMilli()
 	content, err := dataFS.ReadFile("testdata/" + dataFile)
 	t.NoError(err)
 	t.NoError(json.Unmarshal(content, &templates))
diff --git a/pkg/timestamp/nano.go b/pkg/timestamp/nano.go
new file mode 100644
index 0000000..207f924
--- /dev/null
+++ b/pkg/timestamp/nano.go
@@ -0,0 +1,48 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp
+
+import (
+	"time"
+	// link runtime pkg fastrand
+	_ "unsafe"
+)
+
+var (
+	maxNanoSecond = uint32(time.Millisecond - 1)
+	mSecond       = int64(time.Millisecond)
+)
+
+// FastRandN is a fast thread local random function.
+//go:linkname fastRandN runtime.fastrandn
+func fastRandN(n uint32) uint32
+
+// MToN convert time unix millisends to nanoseconds
+func MToN(ms time.Time) time.Time {
+	ns := ms.UnixNano()
+	if ms.Nanosecond()%int(mSecond) > 0 {
+		ns = ns / mSecond * mSecond
+	}
+	nns := ns + int64(fastRandN(maxNanoSecond))
+	return time.Unix(ms.Unix(), nns%int64(time.Second))
+}
+
+// NowMilli returns a time based on a unix millisecond
+func NowMilli() time.Time {
+	return time.UnixMilli(time.Now().UnixMilli())
+}
diff --git a/pkg/timestamp/nano_test.go b/pkg/timestamp/nano_test.go
new file mode 100644
index 0000000..f2e41ca
--- /dev/null
+++ b/pkg/timestamp/nano_test.go
@@ -0,0 +1,33 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp_test
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+func TestMToN(t *testing.T) {
+	m := timestamp.NowMilli()
+	n := timestamp.MToN(m)
+	assert.Equal(t, n.UnixMilli(), m.UnixMilli())
+	assert.GreaterOrEqual(t, n.Nanosecond(), m.Nanosecond())
+}