You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/04/13 03:15:45 UTC

[skywalking-banyandb] branch bug-duplicate-dp created (now 964acbf)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch bug-duplicate-dp
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


      at 964acbf  Update all rpc time unit from nanosecond to millisecond. Add a random number(0~999999) to the low 6 digits of a datapoint's timestamp on writing it to kv

This branch includes the following new commits:

     new 964acbf  Update all rpc time unit from nanosecond to millisecond. Add a random number(0~999999) to the low 6 digits of a datapoint's timestamp on writing it to kv

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking-banyandb] 01/01: Update all rpc time unit from nanosecond to millisecond. Add a random number(0~999999) to the low 6 digits of a datapoint's timestamp on writing it to kv

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch bug-duplicate-dp
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 964acbf54c3c4e490216e0b998e6cf75e13be9a3
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Apr 13 03:08:00 2022 +0000

    Update all rpc time unit from nanosecond to millisecond.
    Add a random number(0~999999) to the low 6 digits of
    a datapoint's timestamp on writing it to kv
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 api/proto/banyandb/measure/v1/query.pb.go |  4 +--
 api/proto/banyandb/measure/v1/query.proto |  4 +--
 api/proto/banyandb/measure/v1/topn.pb.go  |  4 +--
 api/proto/banyandb/measure/v1/topn.proto  |  4 +--
 api/proto/banyandb/measure/v1/write.pb.go |  2 +-
 api/proto/banyandb/measure/v1/write.proto |  2 +-
 api/proto/banyandb/stream/v1/query.pb.go  |  6 ++--
 api/proto/banyandb/stream/v1/query.proto  |  6 ++--
 api/proto/banyandb/stream/v1/write.pb.go  |  2 +-
 api/proto/banyandb/stream/v1/write.proto  |  2 +-
 banyand/measure/measure_write.go          |  2 +-
 banyand/measure/measure_write_test.go     |  3 +-
 banyand/query/processor_test.go           |  3 +-
 banyand/stream/stream_query_test.go       |  2 +-
 banyand/stream/stream_write.go            |  2 +-
 banyand/stream/stream_write_test.go       | 45 ++++++++++++++++++++++++++++-
 pkg/query/logical/common_test.go          |  5 ++--
 pkg/timestamp/nano.go                     | 48 +++++++++++++++++++++++++++++++
 pkg/timestamp/nano_test.go                | 34 ++++++++++++++++++++++
 19 files changed, 154 insertions(+), 26 deletions(-)

diff --git a/api/proto/banyandb/measure/v1/query.pb.go b/api/proto/banyandb/measure/v1/query.pb.go
index fa598f3..b6cf15d 100644
--- a/api/proto/banyandb/measure/v1/query.pb.go
+++ b/api/proto/banyandb/measure/v1/query.pb.go
@@ -48,7 +48,7 @@ type DataPoint struct {
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	// timestamp is in the timeunit of nanoseconds.
+	// timestamp is in the timeunit of milliseconds.
 	Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
 	// tag_families contains tags selected in the projection
 	TagFamilies []*v1.TagFamily `protobuf:"bytes,2,rep,name=tag_families,json=tagFamilies,proto3" json:"tag_families,omitempty"`
@@ -166,7 +166,7 @@ type QueryRequest struct {
 
 	// metadata is required
 	Metadata *v11.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
-	// time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+	// time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
 	TimeRange *v1.TimeRange `protobuf:"bytes,2,opt,name=time_range,json=timeRange,proto3" json:"time_range,omitempty"`
 	// tag_families are indexed.
 	Criteria []*v1.Criteria `protobuf:"bytes,4,rep,name=criteria,proto3" json:"criteria,omitempty"`
diff --git a/api/proto/banyandb/measure/v1/query.proto b/api/proto/banyandb/measure/v1/query.proto
index 01f50ec..09797c3 100644
--- a/api/proto/banyandb/measure/v1/query.proto
+++ b/api/proto/banyandb/measure/v1/query.proto
@@ -29,7 +29,7 @@ import "banyandb/model/v1/query.proto";
 
 // DataPoint is stored in Measures
 message DataPoint {
-  // timestamp is in the timeunit of nanoseconds.
+  // timestamp is in the timeunit of milliseconds.
   google.protobuf.Timestamp timestamp = 1;
   // tag_families contains tags selected in the projection
   repeated model.v1.TagFamily tag_families = 2;
@@ -51,7 +51,7 @@ message QueryResponse {
 message QueryRequest {
   // metadata is required
   common.v1.Metadata metadata = 1;
-  // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+  // time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
   model.v1.TimeRange time_range = 2;
   // tag_families are indexed.
   repeated model.v1.Criteria criteria = 4;
diff --git a/api/proto/banyandb/measure/v1/topn.pb.go b/api/proto/banyandb/measure/v1/topn.pb.go
index 22749b5..40d7b41 100644
--- a/api/proto/banyandb/measure/v1/topn.pb.go
+++ b/api/proto/banyandb/measure/v1/topn.pb.go
@@ -48,7 +48,7 @@ type TopNList struct {
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	// timestamp is in the timeunit of nanoseconds.
+	// timestamp is in the timeunit of milliseconds.
 	Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
 	// items contains top-n items in a list
 	Items []*TopNList_Item `protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"`
@@ -158,7 +158,7 @@ type TopNRequest struct {
 
 	// metadata is required
 	Metadata *v1.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
-	// time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+	// time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
 	TimeRange *v11.TimeRange `protobuf:"bytes,2,opt,name=time_range,json=timeRange,proto3" json:"time_range,omitempty"`
 	// top_n set the how many items should be returned in each list.
 	TopN int32 `protobuf:"varint,3,opt,name=top_n,json=topN,proto3" json:"top_n,omitempty"`
diff --git a/api/proto/banyandb/measure/v1/topn.proto b/api/proto/banyandb/measure/v1/topn.proto
index d4acb6e..9910966 100644
--- a/api/proto/banyandb/measure/v1/topn.proto
+++ b/api/proto/banyandb/measure/v1/topn.proto
@@ -29,7 +29,7 @@ import "banyandb/model/v1/query.proto";
 
 //TopNList contains a series of topN items
 message TopNList {
-    // timestamp is in the timeunit of nanoseconds.
+    // timestamp is in the timeunit of milliseconds.
     google.protobuf.Timestamp timestamp = 1;
     message Item {
         string name = 1;
@@ -50,7 +50,7 @@ message TopNResponse {
 message TopNRequest {
     // metadata is required
     common.v1.Metadata metadata = 1;
-    // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+    // time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
     model.v1.TimeRange time_range = 2;
     // top_n set the how many items should be returned in each list.
     int32 top_n = 3;
diff --git a/api/proto/banyandb/measure/v1/write.pb.go b/api/proto/banyandb/measure/v1/write.pb.go
index 7746fe1..8c1ee58 100644
--- a/api/proto/banyandb/measure/v1/write.pb.go
+++ b/api/proto/banyandb/measure/v1/write.pb.go
@@ -48,7 +48,7 @@ type DataPointValue struct {
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	// timestamp is in the timeunit of nanoseconds.
+	// timestamp is in the timeunit of milliseconds.
 	Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
 	// the order of tag_families' items match the measure schema
 	TagFamilies []*v1.TagFamilyForWrite `protobuf:"bytes,2,rep,name=tag_families,json=tagFamilies,proto3" json:"tag_families,omitempty"`
diff --git a/api/proto/banyandb/measure/v1/write.proto b/api/proto/banyandb/measure/v1/write.proto
index 4246cd6..4bd2540 100644
--- a/api/proto/banyandb/measure/v1/write.proto
+++ b/api/proto/banyandb/measure/v1/write.proto
@@ -28,7 +28,7 @@ import "banyandb/model/v1/common.proto";
 
 //DataPointValue is the data point for writing. It only contains values.
 message DataPointValue {
-  // timestamp is in the timeunit of nanoseconds.
+  // timestamp is in the timeunit of milliseconds.
   google.protobuf.Timestamp timestamp = 1;
   // the order of tag_families' items match the measure schema
   repeated model.v1.TagFamilyForWrite tag_families = 2;
diff --git a/api/proto/banyandb/stream/v1/query.pb.go b/api/proto/banyandb/stream/v1/query.pb.go
index eb50566..6a53f46 100644
--- a/api/proto/banyandb/stream/v1/query.pb.go
+++ b/api/proto/banyandb/stream/v1/query.pb.go
@@ -52,7 +52,7 @@ type Element struct {
 
 	// element_id could be span_id of a Span or segment_id of a Segment in the context of stream
 	ElementId string `protobuf:"bytes,1,opt,name=element_id,json=elementId,proto3" json:"element_id,omitempty"`
-	// timestamp represents
+	// timestamp represents a millisecond
 	// 1) either the start time of a Span/Segment,
 	// 2) or the timestamp of a log
 	Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
@@ -61,7 +61,7 @@ type Element struct {
 	// - duration
 	// - service_name
 	// - service_instance_id
-	// - end_time_nanoseconds
+	// - end_time_milliseconds
 	TagFamilies []*v1.TagFamily `protobuf:"bytes,3,rep,name=tag_families,json=tagFamilies,proto3" json:"tag_families,omitempty"`
 }
 
@@ -175,7 +175,7 @@ type QueryRequest struct {
 
 	// metadata is required
 	Metadata *v11.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
-	// time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+	// time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
 	// In the context of stream, it represents the range of the `startTime` for spans/segments,
 	// while in the context of Log, it means the range of the timestamp(s) for logs.
 	// it is always recommended to specify time range for performance reason
diff --git a/api/proto/banyandb/stream/v1/query.proto b/api/proto/banyandb/stream/v1/query.proto
index ae11f8c..9430906 100644
--- a/api/proto/banyandb/stream/v1/query.proto
+++ b/api/proto/banyandb/stream/v1/query.proto
@@ -32,7 +32,7 @@ import "banyandb/model/v1/query.proto";
 message Element {
   // element_id could be span_id of a Span or segment_id of a Segment in the context of stream
   string element_id = 1;
-  // timestamp represents
+  // timestamp represents a millisecond
   // 1) either the start time of a Span/Segment,
   // 2) or the timestamp of a log
   google.protobuf.Timestamp timestamp = 2;
@@ -41,7 +41,7 @@ message Element {
   // - duration
   // - service_name
   // - service_instance_id
-  // - end_time_nanoseconds
+  // - end_time_milliseconds
   repeated model.v1.TagFamily tag_families = 3;
 }
 
@@ -55,7 +55,7 @@ message QueryResponse {
 message QueryRequest {
   // metadata is required
   common.v1.Metadata metadata = 1;
-  // time_range is a range query with begin/end time of entities in the timeunit of nanoseconds.
+  // time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
   // In the context of stream, it represents the range of the `startTime` for spans/segments,
   // while in the context of Log, it means the range of the timestamp(s) for logs.
   // it is always recommended to specify time range for performance reason
diff --git a/api/proto/banyandb/stream/v1/write.pb.go b/api/proto/banyandb/stream/v1/write.pb.go
index 1af1637..06bb73f 100644
--- a/api/proto/banyandb/stream/v1/write.pb.go
+++ b/api/proto/banyandb/stream/v1/write.pb.go
@@ -49,7 +49,7 @@ type ElementValue struct {
 
 	// element_id could be span_id of a Span or segment_id of a Segment in the context of stream
 	ElementId string `protobuf:"bytes,1,opt,name=element_id,json=elementId,proto3" json:"element_id,omitempty"`
-	// timestamp_nanoseconds is in the timeunit of nanoseconds. It represents
+	// timestamp is in the timeunit of milliseconds. It represents
 	// 1) either the start time of a Span/Segment,
 	// 2) or the timestamp of a log
 	Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
diff --git a/api/proto/banyandb/stream/v1/write.proto b/api/proto/banyandb/stream/v1/write.proto
index b30961e..cb1b5a5 100644
--- a/api/proto/banyandb/stream/v1/write.proto
+++ b/api/proto/banyandb/stream/v1/write.proto
@@ -29,7 +29,7 @@ import "banyandb/model/v1/common.proto";
 message ElementValue {
   // element_id could be span_id of a Span or segment_id of a Segment in the context of stream
   string element_id = 1;
-  // timestamp_nanoseconds is in the timeunit of nanoseconds. It represents
+  // timestamp is in the timeunit of milliseconds. It represents
   // 1) either the start time of a Span/Segment,
   // 2) or the timestamp of a log
   google.protobuf.Timestamp timestamp = 2;
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 5b777e7..a8f60d9 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -78,7 +78,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
 	if err != nil {
 		return err
 	}
-	t := value.GetTimestamp().AsTime()
+	t := timestamp.MToN(value.GetTimestamp().AsTime())
 	wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
 	if err != nil {
 		if wp != nil {
diff --git a/banyand/measure/measure_write_test.go b/banyand/measure/measure_write_test.go
index 249a31a..22161df 100644
--- a/banyand/measure/measure_write_test.go
+++ b/banyand/measure/measure_write_test.go
@@ -30,6 +30,7 @@ import (
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	"github.com/apache/skywalking-banyandb/banyand/measure"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var _ = Describe("Write service_cpm_minute", func() {
@@ -70,7 +71,7 @@ var _ = Describe("Write service_cpm_minute", func() {
 var dataFS embed.FS
 
 func writeData(dataFile string, measure measure.Measure) (baseTime time.Time) {
-	baseTime = time.Now()
+	baseTime = timestamp.NowMilli()
 	writeDataWithBaseTime(baseTime, dataFile, measure)
 	return baseTime
 }
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index fff833f..48f1bf4 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -46,6 +46,7 @@ import (
 	pb "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 	"github.com/apache/skywalking-banyandb/pkg/query/logical"
 	"github.com/apache/skywalking-banyandb/pkg/test"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var (
@@ -145,7 +146,7 @@ var dataFS embed.FS
 
 func setUpStreamQueryData(dataFile string, stream stream.Stream) (baseTime time.Time) {
 	var templates []interface{}
-	baseTime = time.Now()
+	baseTime = timestamp.NowMilli()
 	content, err := dataFS.ReadFile("testdata/" + dataFile)
 	Expect(err).ShouldNot(HaveOccurred())
 	Expect(json.Unmarshal(content, &templates)).Should(Succeed())
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index cae08e6..5adb563 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -789,7 +789,7 @@ var dataFS embed.FS
 
 func setupQueryData(dataFile string, stream *stream) (baseTime time.Time) {
 	var templates []interface{}
-	baseTime = time.Now()
+	baseTime = timestamp.NowMilli()
 	content, err := dataFS.ReadFile("testdata/" + dataFile)
 	Expect(err).ShouldNot(HaveOccurred())
 	Expect(json.Unmarshal(content, &templates)).ShouldNot(HaveOccurred())
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 10fcc74..5affbc4 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -69,7 +69,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
 	if err != nil {
 		return err
 	}
-	t := value.GetTimestamp().AsTime()
+	t := timestamp.MToN(value.GetTimestamp().AsTime())
 	wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
 	if err != nil {
 		if wp != nil {
diff --git a/banyand/stream/stream_write_test.go b/banyand/stream/stream_write_test.go
index 1e07194..ab99f2f 100644
--- a/banyand/stream/stream_write_test.go
+++ b/banyand/stream/stream_write_test.go
@@ -19,6 +19,7 @@ package stream
 
 import (
 	"encoding/base64"
+	"time"
 
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
@@ -27,6 +28,9 @@ import (
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var _ = Describe("Write", func() {
@@ -191,6 +195,45 @@ var _ = Describe("Write", func() {
 			})
 		}
 	})
+	It("Same millisecond data", func() {
+		ele := getEle(
+			"trace_id-xxfff.111323",
+			0,
+			"webapp_id",
+			"10.0.0.1_id",
+			"/home_id",
+			300,
+			1622933202000000000,
+		)
+		err := s.Write(ele)
+		Expect(err).ShouldNot(HaveOccurred())
+		err = s.Write(ele)
+		Expect(err).ShouldNot(HaveOccurred())
+		entity := tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), tsdb.Entry(convert.Int64ToBytes(0))}
+		Eventually(func() int {
+			ss, err := s.Shards(entity)
+			Expect(err).ShouldNot(HaveOccurred())
+			eleSize := 0
+			for _, shard := range ss {
+				series, errInternal := shard.Series().Get(entity)
+				Expect(errInternal).ShouldNot(HaveOccurred())
+				ts, errInternal := series.Span(timestamp.NewInclusiveTimeRangeDuration(ele.Timestamp.AsTime(), 1*time.Hour))
+				Expect(errInternal).ShouldNot(HaveOccurred())
+				sb, errInternal := ts.SeekerBuilder().Build()
+				Expect(errInternal).ShouldNot(HaveOccurred())
+				iters, errInternal := sb.Seek()
+				Expect(errInternal).ShouldNot(HaveOccurred())
+				for _, iter := range iters {
+					for iter.Next() {
+						eleSize++
+					}
+					iter.Close()
+				}
+				ts.Close()
+			}
+			return eleSize
+		}).Should(Equal(2))
+	})
 })
 
 func getEle(tags ...interface{}) *streamv1.ElementValue {
@@ -201,7 +244,7 @@ func getEle(tags ...interface{}) *streamv1.ElementValue {
 	bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
 	e := &streamv1.ElementValue{
 		ElementId: "1231.dfd.123123ssf",
-		Timestamp: timestamppb.Now(),
+		Timestamp: timestamppb.New(timestamp.NowMilli()),
 		TagFamilies: []*modelv1.TagFamilyForWrite{
 			{
 				Tags: []*modelv1.TagValue{
diff --git a/pkg/query/logical/common_test.go b/pkg/query/logical/common_test.go
index 101d529..8512fb6 100644
--- a/pkg/query/logical/common_test.go
+++ b/pkg/query/logical/common_test.go
@@ -45,6 +45,7 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/test"
 	testmeasure "github.com/apache/skywalking-banyandb/pkg/test/measure"
 	teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 //go:embed testdata/*.json
@@ -53,7 +54,7 @@ var dataFS embed.FS
 func setupQueryData(testing *testing.T, dataFile string, stream stream.Stream) (baseTime time.Time) {
 	t := assert.New(testing)
 	var templates []interface{}
-	baseTime = time.Now()
+	baseTime = timestamp.NowMilli()
 	content, err := dataFS.ReadFile("testdata/" + dataFile)
 	t.NoError(err)
 	t.NoError(json.Unmarshal(content, &templates))
@@ -194,7 +195,7 @@ func setupMeasure(t *require.Assertions) (measure.Measure, metadata.Service, fun
 func setupMeasureQueryData(testing *testing.T, dataFile string, measure measure.Measure) (baseTime time.Time) {
 	t := assert.New(testing)
 	var templates []interface{}
-	baseTime = time.Now()
+	baseTime = timestamp.NowMilli()
 	content, err := dataFS.ReadFile("testdata/" + dataFile)
 	t.NoError(err)
 	t.NoError(json.Unmarshal(content, &templates))
diff --git a/pkg/timestamp/nano.go b/pkg/timestamp/nano.go
new file mode 100644
index 0000000..207f924
--- /dev/null
+++ b/pkg/timestamp/nano.go
@@ -0,0 +1,48 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp
+
+import (
+	"time"
+	// link runtime pkg fastrand
+	_ "unsafe"
+)
+
+var (
+	maxNanoSecond = uint32(time.Millisecond - 1)
+	mSecond       = int64(time.Millisecond)
+)
+
+// FastRandN is a fast thread local random function.
+//go:linkname fastRandN runtime.fastrandn
+func fastRandN(n uint32) uint32
+
+// MToN convert time unix millisends to nanoseconds
+func MToN(ms time.Time) time.Time {
+	ns := ms.UnixNano()
+	if ms.Nanosecond()%int(mSecond) > 0 {
+		ns = ns / mSecond * mSecond
+	}
+	nns := ns + int64(fastRandN(maxNanoSecond))
+	return time.Unix(ms.Unix(), nns%int64(time.Second))
+}
+
+// NowMilli returns a time based on a unix millisecond
+func NowMilli() time.Time {
+	return time.UnixMilli(time.Now().UnixMilli())
+}
diff --git a/pkg/timestamp/nano_test.go b/pkg/timestamp/nano_test.go
new file mode 100644
index 0000000..15f000d
--- /dev/null
+++ b/pkg/timestamp/nano_test.go
@@ -0,0 +1,34 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp_test
+
+import (
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+func TestMToN(t *testing.T) {
+	m := time.Now()
+	n := timestamp.MToN(m)
+	assert.Equal(t, n.UnixMilli(), m.UnixMilli())
+	assert.GreaterOrEqual(t, n.Nanosecond(), m.Nanosecond())
+}