You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/04/20 02:54:22 UTC

[skywalking-banyandb] 01/01: Check input time range

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch patch-time
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit c5eea4dd58694c1b0280fec636d21feeb74ba188
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Apr 20 02:52:45 2022 +0000

    Check input time range
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/liaison/grpc/measure.go      | 13 ++++++++++
 banyand/liaison/grpc/measure_test.go |  5 ++--
 banyand/liaison/grpc/stream.go       | 13 ++++++++++
 banyand/liaison/grpc/stream_test.go  |  5 ++--
 banyand/measure/measure_write.go     |  6 ++++-
 banyand/stream/stream_write.go       |  6 ++++-
 banyand/tsdb/shard_test.go           | 50 +++++++++++++++---------------------
 banyand/tsdb/tsdb_suite_test.go      |  3 +++
 pkg/timestamp/nano.go                | 35 +++++++++++++++++++++++++
 9 files changed, 101 insertions(+), 35 deletions(-)

diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index f8ca54d..3cfc97f 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -22,10 +22,13 @@ import (
 	"io"
 	"time"
 
+	"github.com/pkg/errors"
+
 	"github.com/apache/skywalking-banyandb/api/data"
 	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 type measureService struct {
@@ -42,6 +45,10 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
 		if err != nil {
 			return err
 		}
+		if errTime := timestamp.Check(writeRequest.DataPoint.Timestamp.AsTime()); errTime != nil {
+			ms.log.Error().Err(errTime).Msg("the data point time is invalid")
+			continue
+		}
 		entity, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies())
 		if err != nil {
 			ms.log.Error().Err(err).Msg("failed to navigate to the write target")
@@ -63,6 +70,12 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
 }
 
 func (ms *measureService) Query(_ context.Context, entityCriteria *measurev1.QueryRequest) (*measurev1.QueryResponse, error) {
+	if err := timestamp.Check(entityCriteria.GetTimeRange().Begin.AsTime()); err != nil {
+		return nil, errors.WithMessage(err, "the begin of time range is invalid")
+	}
+	if err := timestamp.Check(entityCriteria.GetTimeRange().End.AsTime()); err != nil {
+		return nil, errors.WithMessage(err, "the end of time range is invalid")
+	}
 	message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), entityCriteria)
 	feat, errQuery := ms.pipeline.Publish(data.TopicMeasureQuery, message)
 	if errQuery != nil {
diff --git a/banyand/liaison/grpc/measure_test.go b/banyand/liaison/grpc/measure_test.go
index b25b8a7..27fd27d 100644
--- a/banyand/liaison/grpc/measure_test.go
+++ b/banyand/liaison/grpc/measure_test.go
@@ -34,6 +34,7 @@ import (
 	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 	"github.com/apache/skywalking-banyandb/pkg/test"
 	testmeasure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var _ = Describe("Measure", func() {
@@ -104,7 +105,7 @@ var _ = Describe("Measure", func() {
 func writeMeasureData() *measurev1.WriteRequest {
 	return pbv1.NewMeasureWriteRequestBuilder().
 		Metadata("sw_metric", "service_cpm_minute").
-		Timestamp(time.Now()).
+		Timestamp(timestamp.NowMilli()).
 		TagFamily(
 			pbv1.ID("1"),
 			"entity_1",
@@ -154,7 +155,7 @@ func measureWrite(conn *grpclib.ClientConn) {
 func measureQuery(conn *grpclib.ClientConn) (int, error) {
 	c := measurev1.NewMeasureServiceClient(conn)
 	ctx := context.Background()
-	resp, err := c.Query(ctx, queryMeasureCriteria(time.Now()))
+	resp, err := c.Query(ctx, queryMeasureCriteria(timestamp.NowMilli()))
 
 	return len(resp.GetDataPoints()), err
 }
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 8d125b9..af2c1f5 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -22,10 +22,13 @@ import (
 	"io"
 	"time"
 
+	"github.com/pkg/errors"
+
 	"github.com/apache/skywalking-banyandb/api/data"
 	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 type streamService struct {
@@ -42,6 +45,10 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
 		if err != nil {
 			return err
 		}
+		if errTime := timestamp.Check(writeEntity.GetElement().Timestamp.AsTime()); errTime != nil {
+			s.log.Error().Err(errTime).Msg("the element time is invalid")
+			continue
+		}
 		entity, shardID, err := s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
 		if err != nil {
 			s.log.Error().Err(err).Msg("failed to navigate to the write target")
@@ -63,6 +70,12 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
 }
 
 func (s *streamService) Query(_ context.Context, entityCriteria *streamv1.QueryRequest) (*streamv1.QueryResponse, error) {
+	if err := timestamp.Check(entityCriteria.GetTimeRange().Begin.AsTime()); err != nil {
+		return nil, errors.WithMessage(err, "the begin of time range is invalid")
+	}
+	if err := timestamp.Check(entityCriteria.GetTimeRange().End.AsTime()); err != nil {
+		return nil, errors.WithMessage(err, "the end of time range is invalid")
+	}
 	message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), entityCriteria)
 	feat, errQuery := s.pipeline.Publish(data.TopicStreamQuery, message)
 	if errQuery != nil {
diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/stream_test.go
index 994639d..8f02215 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/stream_test.go
@@ -41,6 +41,7 @@ import (
 	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 	"github.com/apache/skywalking-banyandb/pkg/test"
 	teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var _ = Describe("Stream", func() {
@@ -163,7 +164,7 @@ func writeStreamData() *streamv1.WriteRequest {
 	return pbv1.NewStreamWriteRequestBuilder().
 		ID("1").
 		Metadata("default", "sw").
-		Timestamp(time.Now()).
+		Timestamp(timestamp.NowMilli()).
 		TagFamily(bb).
 		TagFamily(
 			"trace_id-xxfff.111",
@@ -209,7 +210,7 @@ func streamWrite(conn *grpclib.ClientConn) {
 func streamQuery(conn *grpclib.ClientConn) (int, error) {
 	c := streamv1.NewStreamServiceClient(conn)
 	ctx := context.Background()
-	resp, err := c.Query(ctx, queryStreamCriteria(time.Now()))
+	resp, err := c.Query(ctx, queryStreamCriteria(timestamp.NowMilli()))
 
 	return len(resp.GetElements()), err
 }
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index a8f60d9..d5f3db8 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -62,6 +62,10 @@ func (s *measure) Write(value *measurev1.DataPointValue) error {
 }
 
 func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *measurev1.DataPointValue, cb index.CallbackFn) error {
+	tp := value.GetTimestamp().AsTime()
+	if err := timestamp.Check(tp); err != nil {
+		return errors.WithMessage(err, "writing stream")
+	}
 	sm := s.schema
 	fLen := len(value.GetTagFamilies())
 	if fLen < 1 {
@@ -78,7 +82,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
 	if err != nil {
 		return err
 	}
-	t := timestamp.MToN(value.GetTimestamp().AsTime())
+	t := timestamp.MToN(tp)
 	wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
 	if err != nil {
 		if wp != nil {
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 5affbc4..d73130e 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -53,6 +53,10 @@ func (s *stream) Write(value *streamv1.ElementValue) error {
 }
 
 func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *streamv1.ElementValue, cb index.CallbackFn) error {
+	tp := value.GetTimestamp().AsTime()
+	if err := timestamp.Check(tp); err != nil {
+		return errors.WithMessage(err, "writing stream")
+	}
 	sm := s.schema
 	fLen := len(value.GetTagFamilies())
 	if fLen < 1 {
@@ -69,7 +73,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
 	if err != nil {
 		return err
 	}
-	t := timestamp.MToN(value.GetTimestamp().AsTime())
+	t := timestamp.MToN(tp)
 	wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
 	if err != nil {
 		if wp != nil {
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 9a3005f..0573e66 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -63,11 +63,11 @@ var _ = Describe("Shard", func() {
 				2,
 			)
 			Expect(err).NotTo(HaveOccurred())
-			By("1st block is opened")
+			By("01/01 00:00 1st block is opened")
 			t1 := clock.Now()
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}).Should(Equal([]tsdb.BlockState{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -76,13 +76,12 @@ var _ = Describe("Shard", func() {
 					TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
 				},
 			}))
-			By("2nd block is opened")
-			// 01 10:00
+			By("01/01 10:00 2nd block is opened")
 			clock.Add(10 * time.Hour)
 			t2 := clock.Now().Add(2 * time.Hour)
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}).Should(Equal([]tsdb.BlockState{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -100,25 +99,23 @@ var _ = Describe("Shard", func() {
 			}))
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
-			}).Should(Equal([]tsdb.BlockID{}))
-			By("moves to the 2nd block")
-			// 01 13:00
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{}))
+			By("01/01 13:00 moves to the 2nd block")
 			clock.Add(3 * time.Hour)
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
-			}).Should(Equal([]tsdb.BlockID{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
 				},
 			}))
-			By("3rd block is opened")
-			// 01 22:00
+			By("01/01 22:00 3rd block is opened")
 			clock.Add(9 * time.Hour)
 			t3 := clock.Now().Add(2 * time.Hour)
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}).Should(Equal([]tsdb.BlockState{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -141,8 +138,7 @@ var _ = Describe("Shard", func() {
 					TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
 				},
 			}))
-			By("moves to 3rd block")
-			// 02 01:00
+			By("01/02 01:00 moves to 3rd block")
 			clock.Add(3 * time.Hour)
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
@@ -156,13 +152,12 @@ var _ = Describe("Shard", func() {
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
 				},
 			}))
-			By("4th block is opened")
-			// 02 10:00
+			By("01/02 10:00 4th block is opened")
 			clock.Add(9 * time.Hour)
 			t4 := clock.Now().Add(2 * time.Hour)
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}).Should(Equal([]tsdb.BlockState{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -192,12 +187,11 @@ var _ = Describe("Shard", func() {
 					TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
 				},
 			}))
-			By("moves to 4th block")
-			// 02 13:00
+			By("01/02 13:00 moves to 4th block")
 			clock.Add(3 * time.Hour)
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
-			}).Should(Equal([]tsdb.BlockID{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
@@ -211,13 +205,12 @@ var _ = Describe("Shard", func() {
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
 				},
 			}))
-			By("5th block is opened")
-			// 02 22:00
+			By("01/02 22:00 5th block is opened")
 			clock.Add(9 * time.Hour)
 			t5 := clock.Now().Add(2 * time.Hour)
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}).Should(Equal([]tsdb.BlockState{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -254,12 +247,11 @@ var _ = Describe("Shard", func() {
 					TimeRange: timestamp.NewTimeRangeDuration(t5, 12*time.Hour, true, false),
 				},
 			}))
-			By("close 1st block by adding 5th block")
-			// 03 01:00
+			By("01/03 01:00 close 1st block by adding 5th block")
 			clock.Add(3 * time.Hour)
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
-			}).Should(Equal([]tsdb.BlockID{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
@@ -275,7 +267,7 @@ var _ = Describe("Shard", func() {
 			}))
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}).Should(Equal([]tsdb.BlockState{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -326,7 +318,7 @@ var _ = Describe("Shard", func() {
 			Expect(err).NotTo(HaveOccurred())
 			Eventually(func() []tsdb.BlockID {
 				return shard.State().OpenBlocks
-			}).Should(Equal([]tsdb.BlockID{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
 				{
 					SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700102),
 					BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
@@ -342,7 +334,7 @@ var _ = Describe("Shard", func() {
 			}))
 			Eventually(func() []tsdb.BlockState {
 				return shard.State().Blocks
-			}).Should(Equal([]tsdb.BlockState{
+			}, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
 				{
 					ID: tsdb.BlockID{
 						SegID:   tsdb.GenerateInternalID(tsdb.DAY, 19700101),
diff --git a/banyand/tsdb/tsdb_suite_test.go b/banyand/tsdb/tsdb_suite_test.go
index c761503..cd1109d 100644
--- a/banyand/tsdb/tsdb_suite_test.go
+++ b/banyand/tsdb/tsdb_suite_test.go
@@ -19,6 +19,7 @@ package tsdb_test
 
 import (
 	"testing"
+	"time"
 
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
@@ -26,6 +27,8 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
+var defaultEventallyTimeout = 30 * time.Second
+
 func TestTsdb(t *testing.T) {
 	RegisterFailHandler(Fail)
 	RunSpecs(t, "Tsdb Suite")
diff --git a/pkg/timestamp/nano.go b/pkg/timestamp/nano.go
index 207f924..ee6e926 100644
--- a/pkg/timestamp/nano.go
+++ b/pkg/timestamp/nano.go
@@ -18,9 +18,13 @@
 package timestamp
 
 import (
+	"math"
 	"time"
+
 	// link runtime pkg fastrand
 	_ "unsafe"
+
+	"github.com/pkg/errors"
 )
 
 var (
@@ -46,3 +50,34 @@ func MToN(ms time.Time) time.Time {
 func NowMilli() time.Time {
 	return time.UnixMilli(time.Now().UnixMilli())
 }
+
+const (
+	// MinNanoTime is the minimum time that can be represented.
+	//
+	// 1677-09-21 00:12:43.145224192 +0000 UTC
+	MinNanoTime = int64(math.MinInt64) + 2
+
+	// MaxNanoTime is the maximum time that can be represented.
+	//
+	// 2262-04-11 23:47:16.854775807 +0000 UTC
+	MaxNanoTime = int64(math.MaxInt64) - 1
+)
+
+var (
+	minNanoTime = time.Unix(0, MinNanoTime).UTC()
+	maxNanoTime = time.Unix(0, MaxNanoTime).UTC()
+
+	ErrTimeOutOfRange     = errors.Errorf("time is out of range %d - %d", MinNanoTime, MaxNanoTime)
+	ErrTimeNotMillisecond = errors.Errorf("time is not millisecond precision")
+)
+
+// Check checks that a time is valid
+func Check(t time.Time) error {
+	if t.Before(minNanoTime) || t.After(maxNanoTime) {
+		return ErrTimeOutOfRange
+	}
+	if t.Nanosecond()%int(mSecond) > 0 {
+		return ErrTimeNotMillisecond
+	}
+	return nil
+}