You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/04/20 02:55:40 UTC

[skywalking-banyandb] branch patch-time updated (c5eea4d -> 5c266b5)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch patch-time
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


 discard c5eea4d  Check input time range
     new 5c266b5  Check input time range

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (c5eea4d)
            \
             N -- N -- N   refs/heads/patch-time (5c266b5)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pkg/timestamp/nano.go | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)


[skywalking-banyandb] 01/01: Check input time range

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch patch-time
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 5c266b59d445fce14581d51f61056b8b321f90bd
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Apr 20 02:52:45 2022 +0000

    Check input time range
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/liaison/grpc/measure.go      | 13 ++++++++++
 banyand/liaison/grpc/measure_test.go |  5 ++--
 banyand/liaison/grpc/stream.go       | 13 ++++++++++
 banyand/liaison/grpc/stream_test.go  |  5 ++--
 banyand/measure/measure_write.go     |  6 ++++-
 banyand/stream/stream_write.go       |  6 ++++-
 banyand/tsdb/shard_test.go           | 50 +++++++++++++++---------------------
 banyand/tsdb/tsdb_suite_test.go      |  3 +++
 pkg/timestamp/nano.go                | 35 +++++++++++++++++++++++++
 9 files changed, 101 insertions(+), 35 deletions(-)

diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index f8ca54d..3cfc97f 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -22,10 +22,13 @@ import (
 	"io"
 	"time"
 
+	"github.com/pkg/errors"
+
 	"github.com/apache/skywalking-banyandb/api/data"
 	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 type measureService struct {
@@ -42,6 +45,10 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
 		if err != nil {
 			return err
 		}
+		if errTime := timestamp.Check(writeRequest.DataPoint.Timestamp.AsTime()); errTime != nil {
+			ms.log.Error().Err(errTime).Msg("the data point time is invalid")
+			continue
+		}
 		entity, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies())
 		if err != nil {
 			ms.log.Error().Err(err).Msg("failed to navigate to the write target")
@@ -63,6 +70,12 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
 }
 
 func (ms *measureService) Query(_ context.Context, entityCriteria *measurev1.QueryRequest) (*measurev1.QueryResponse, error) {
+	if err := timestamp.Check(entityCriteria.GetTimeRange().Begin.AsTime()); err != nil {
+		return nil, errors.WithMessage(err, "the begin of time range is invalid")
+	}
+	if err := timestamp.Check(entityCriteria.GetTimeRange().End.AsTime()); err != nil {
+		return nil, errors.WithMessage(err, "the end of time range is invalid")
+	}
 	message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), entityCriteria)
 	feat, errQuery := ms.pipeline.Publish(data.TopicMeasureQuery, message)
 	if errQuery != nil {
diff --git a/banyand/liaison/grpc/measure_test.go b/banyand/liaison/grpc/measure_test.go
index b25b8a7..27fd27d 100644
--- a/banyand/liaison/grpc/measure_test.go
+++ b/banyand/liaison/grpc/measure_test.go
@@ -34,6 +34,7 @@ import (
 	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 	"github.com/apache/skywalking-banyandb/pkg/test"
 	testmeasure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var _ = Describe("Measure", func() {
@@ -104,7 +105,7 @@ var _ = Describe("Measure", func() {
 func writeMeasureData() *measurev1.WriteRequest {
 	return pbv1.NewMeasureWriteRequestBuilder().
 		Metadata("sw_metric", "service_cpm_minute").
-		Timestamp(time.Now()).
+		Timestamp(timestamp.NowMilli()).
 		TagFamily(
 			pbv1.ID("1"),
 			"entity_1",
@@ -154,7 +155,7 @@ func measureWrite(conn *grpclib.ClientConn) {
 func measureQuery(conn *grpclib.ClientConn) (int, error) {
 	c := measurev1.NewMeasureServiceClient(conn)
 	ctx := context.Background()
-	resp, err := c.Query(ctx, queryMeasureCriteria(time.Now()))
+	resp, err := c.Query(ctx, queryMeasureCriteria(timestamp.NowMilli()))
 
 	return len(resp.GetDataPoints()), err
 }
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 8d125b9..af2c1f5 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -22,10 +22,13 @@ import (
 	"io"
 	"time"
 
+	"github.com/pkg/errors"
+
 	"github.com/apache/skywalking-banyandb/api/data"
 	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 type streamService struct {
@@ -42,6 +45,10 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
 		if err != nil {
 			return err
 		}
+		if errTime := timestamp.Check(writeEntity.GetElement().Timestamp.AsTime()); errTime != nil {
+			s.log.Error().Err(errTime).Msg("the element time is invalid")
+			continue
+		}
 		entity, shardID, err := s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
 		if err != nil {
 			s.log.Error().Err(err).Msg("failed to navigate to the write target")
@@ -63,6 +70,12 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
 }
 
 func (s *streamService) Query(_ context.Context, entityCriteria *streamv1.QueryRequest) (*streamv1.QueryResponse, error) {
+	if err := timestamp.Check(entityCriteria.GetTimeRange().Begin.AsTime()); err != nil {
+		return nil, errors.WithMessage(err, "the begin of time range is invalid")
+	}
+	if err := timestamp.Check(entityCriteria.GetTimeRange().End.AsTime()); err != nil {
+		return nil, errors.WithMessage(err, "the end of time range is invalid")
+	}
 	message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), entityCriteria)
 	feat, errQuery := s.pipeline.Publish(data.TopicStreamQuery, message)
 	if errQuery != nil {
diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/stream_test.go
index 994639d..8f02215 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/stream_test.go
@@ -41,6 +41,7 @@ import (
 	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 	"github.com/apache/skywalking-banyandb/pkg/test"
 	teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var _ = Describe("Stream", func() {
@@ -163,7 +164,7 @@ func writeStreamData() *streamv1.WriteRequest {
 	return pbv1.NewStreamWriteRequestBuilder().
 		ID("1").
 		Metadata("default", "sw").
-		Timestamp(time.Now()).
+		Timestamp(timestamp.NowMilli()).
 		TagFamily(bb).
 		TagFamily(
 			"trace_id-xxfff.111",
@@ -209,7 +210,7 @@ func streamWrite(conn *grpclib.ClientConn) {
 func streamQuery(conn *grpclib.ClientConn) (int, error) {
 	c := streamv1.NewStreamServiceClient(conn)
 	ctx := context.Background()
-	resp, err := c.Query(ctx, queryStreamCriteria(time.Now()))
+	resp, err := c.Query(ctx, queryStreamCriteria(timestamp.NowMilli()))
 
 	return len(resp.GetElements()), err
 }
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index a8f60d9..d5f3db8 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -62,6 +62,10 @@ func (s *measure) Write(value *measurev1.DataPointValue) error {
 }
 
 func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *measurev1.DataPointValue, cb index.CallbackFn) error {
+	tp := value.GetTimestamp().AsTime()
+	if err := timestamp.Check(tp); err != nil {
+		return errors.WithMessage(err, "writing stream")
+	}
 	sm := s.schema
 	fLen := len(value.GetTagFamilies())
 	if fLen < 1 {
@@ -78,7 +82,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
 	if err != nil {
 		return err
 	}
-	t := timestamp.MToN(value.GetTimestamp().AsTime())
+	t := timestamp.MToN(tp)
 	wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
 	if err != nil {
 		if wp != nil {
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 5affbc4..d73130e 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -53,6 +53,10 @@ func (s *stream) Write(value *streamv1.ElementValue) error {
 }
 
 func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *streamv1.ElementValue, cb index.CallbackFn) error {
+	tp := value.GetTimestamp().AsTime()
+	if err := timestamp.Check(tp); err != nil {
+		return errors.WithMessage(err, "writing stream")
+	}
 	sm := s.schema
 	fLen := len(value.GetTagFamilies())
 	if fLen < 1 {
@@ -69,7 +73,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
 	if err != nil {
 		return err
 	}
-	t := timestamp.MToN(value.GetTimestamp().AsTime())
+	t := timestamp.MToN(tp)
 	wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
 	if err != nil {
 		if wp != nil {
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 9a3005f..0573e66 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -63,11 +63,11 @@ var _ = Describe("Shard", func() {
 				2,
 			)
 			Expect(err).NotTo(HaveOccurred())
-			By("1st block is opened")
+			By("01/01 00:00 1st block is opened")
 			t1 := clock.Now()
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}).Should(Equal([]tsdb.BlockState{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -76,13 +76,12 @@ var _ = Describe("Shard", func() {
 					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
 				},
 			}))
-			By("2nd block is opened")
-			// 01 10:00
+			By("01/01 10:00 2nd block is opened")
 			clock.Add(10 * time.Hour)
 			t2 := clock.Now().Add(2 * time.Hour)
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}).Should(Equal([]tsdb.BlockState{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -100,25 +99,23 @@ var _ = Describe("Shard", func() {
 			}))
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
-			}).Should(Equal([]tsdb.BlockID{}))
-			By("moves to the 2nd block")
-			// 01 13:00
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{}))
+			By("01/01 13:00 moves to the 2nd block")
 			clock.Add(3 * time.Hour)
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
-			}).Should(Equal([]tsdb.BlockID{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
 				},
 			}))
-			By("3rd block is opened")
-			// 01 22:00
+			By("01/01 22:00 3rd block is opened")
 			clock.Add(9 * time.Hour)
 			t3 := clock.Now().Add(2 * time.Hour)
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}).Should(Equal([]tsdb.BlockState{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -141,8 +138,7 @@ var _ = Describe("Shard", func() {
 					TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
 				},
 			}))
-			By("moves to 3rd block")
-			// 02 01:00
+			By("01/02 01:00 moves to 3rd block")
 			clock.Add(3 * time.Hour)
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
@@ -156,13 +152,12 @@ var _ = Describe("Shard", func() {
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
 				},
 			}))
-			By("4th block is opened")
-			// 02 10:00
+			By("01/02 10:00 4th block is opened")
 			clock.Add(9 * time.Hour)
 			t4 := clock.Now().Add(2 * time.Hour)
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}).Should(Equal([]tsdb.BlockState{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -192,12 +187,11 @@ var _ = Describe("Shard", func() {
 					TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
 				},
 			}))
-			By("moves to 4th block")
-			// 02 13:00
+			By("01/02 13:00 moves to 4th block")
 			clock.Add(3 * time.Hour)
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
-			}).Should(Equal([]tsdb.BlockID{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
@@ -211,13 +205,12 @@ var _ = Describe("Shard", func() {
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
 				},
 			}))
-			By("5th block is opened")
-			// 02 22:00
+			By("01/02 22:00 5th block is opened")
 			clock.Add(9 * time.Hour)
 			t5 := clock.Now().Add(2 * time.Hour)
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}).Should(Equal([]tsdb.BlockState{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -254,12 +247,11 @@ var _ = Describe("Shard", func() {
 					TimeRange: timestamp.NewTimeRangeDuration(t5, 12*time.Hour, true, false),
 				},
 			}))
-			By("close 1st block by adding 5th block")
-			// 03 01:00
+			By("01/03 01:00 close 1st block by adding 5th block")
 			clock.Add(3 * time.Hour)
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
-			}).Should(Equal([]tsdb.BlockID{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
@@ -275,7 +267,7 @@ var _ = Describe("Shard", func() {
 			}))
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}).Should(Equal([]tsdb.BlockState{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -326,7 +318,7 @@ var _ = Describe("Shard", func() {
 			Expect(err).NotTo(HaveOccurred())
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
-			}).Should(Equal([]tsdb.BlockID{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
@@ -342,7 +334,7 @@ var _ = Describe("Shard", func() {
 			}))
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}).Should(Equal([]tsdb.BlockState{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
diff --git a/banyand/tsdb/tsdb_suite_test.go b/banyand/tsdb/tsdb_suite_test.go
index c761503..cd1109d 100644
--- a/banyand/tsdb/tsdb_suite_test.go
+++ b/banyand/tsdb/tsdb_suite_test.go
@@ -19,6 +19,7 @@ package tsdb_test
 
 import (
 	"testing"
+	"time"
 
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
@@ -26,6 +27,8 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
+var defaultEventallyTimeout = 30 * time.Second
+
 func TestTsdb(t *testing.T) {
 	RegisterFailHandler(Fail)
 	RunSpecs(t, "Tsdb Suite")
diff --git a/pkg/timestamp/nano.go b/pkg/timestamp/nano.go
index 207f924..19b4c9c 100644
--- a/pkg/timestamp/nano.go
+++ b/pkg/timestamp/nano.go
@@ -18,9 +18,13 @@
 package timestamp
 
 import (
+	"math"
 	"time"
+
 	// link runtime pkg fastrand
 	_ "unsafe"
+
+	"github.com/pkg/errors"
 )
 
 var (
@@ -46,3 +50,34 @@ func MToN(ms time.Time) time.Time {
 func NowMilli() time.Time {
 	return time.UnixMilli(time.Now().UnixMilli())
 }
+
+const (
+	// MinNanoTime is the minimum time that can be represented.
+	//
+	// 1677-09-21 00:12:43.145224192 +0000 UTC
+	MinNanoTime = int64(math.MinInt64)
+
+	// MaxNanoTime is the maximum time that can be represented.
+	//
+	// 2262-04-11 23:47:16.854775807 +0000 UTC
+	MaxNanoTime = int64(math.MaxInt64)
+)
+
+var (
+	minNanoTime = time.Unix(0, MinNanoTime).UTC()
+	maxNanoTime = time.Unix(0, MaxNanoTime).UTC()
+
+	ErrTimeOutOfRange     = errors.Errorf("time is out of range %d - %d", MinNanoTime, MaxNanoTime)
+	ErrTimeNotMillisecond = errors.Errorf("time is not millisecond precision")
+)
+
+// Check checks that a time is valid
+func Check(t time.Time) error {
+	if t.Before(minNanoTime) || t.After(maxNanoTime) {
+		return ErrTimeOutOfRange
+	}
+	if t.Nanosecond()%int(mSecond) > 0 {
+		return ErrTimeNotMillisecond
+	}
+	return nil
+}