You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/04/20 02:54:22 UTC
[skywalking-banyandb] 01/01: Check input time range
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch patch-time
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c5eea4dd58694c1b0280fec636d21feeb74ba188
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Apr 20 02:52:45 2022 +0000
Check input time range
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/liaison/grpc/measure.go | 13 ++++++++++
banyand/liaison/grpc/measure_test.go | 5 ++--
banyand/liaison/grpc/stream.go | 13 ++++++++++
banyand/liaison/grpc/stream_test.go | 5 ++--
banyand/measure/measure_write.go | 6 ++++-
banyand/stream/stream_write.go | 6 ++++-
banyand/tsdb/shard_test.go | 50 +++++++++++++++---------------------
banyand/tsdb/tsdb_suite_test.go | 3 +++
pkg/timestamp/nano.go | 35 +++++++++++++++++++++++++
9 files changed, 101 insertions(+), 35 deletions(-)
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index f8ca54d..3cfc97f 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -22,10 +22,13 @@ import (
"io"
"time"
+ "github.com/pkg/errors"
+
"github.com/apache/skywalking-banyandb/api/data"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type measureService struct {
@@ -42,6 +45,10 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
if err != nil {
return err
}
+ if errTime := timestamp.Check(writeRequest.DataPoint.Timestamp.AsTime()); errTime != nil {
+ ms.log.Error().Err(errTime).Msg("the data point time is invalid")
+ continue
+ }
entity, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies())
if err != nil {
ms.log.Error().Err(err).Msg("failed to navigate to the write target")
@@ -63,6 +70,12 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
}
func (ms *measureService) Query(_ context.Context, entityCriteria *measurev1.QueryRequest) (*measurev1.QueryResponse, error) {
+ if err := timestamp.Check(entityCriteria.GetTimeRange().Begin.AsTime()); err != nil {
+ return nil, errors.WithMessage(err, "the begin of time range is invalid")
+ }
+ if err := timestamp.Check(entityCriteria.GetTimeRange().End.AsTime()); err != nil {
+ return nil, errors.WithMessage(err, "the end of time range is invalid")
+ }
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), entityCriteria)
feat, errQuery := ms.pipeline.Publish(data.TopicMeasureQuery, message)
if errQuery != nil {
diff --git a/banyand/liaison/grpc/measure_test.go b/banyand/liaison/grpc/measure_test.go
index b25b8a7..27fd27d 100644
--- a/banyand/liaison/grpc/measure_test.go
+++ b/banyand/liaison/grpc/measure_test.go
@@ -34,6 +34,7 @@ import (
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/test"
testmeasure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var _ = Describe("Measure", func() {
@@ -104,7 +105,7 @@ var _ = Describe("Measure", func() {
func writeMeasureData() *measurev1.WriteRequest {
return pbv1.NewMeasureWriteRequestBuilder().
Metadata("sw_metric", "service_cpm_minute").
- Timestamp(time.Now()).
+ Timestamp(timestamp.NowMilli()).
TagFamily(
pbv1.ID("1"),
"entity_1",
@@ -154,7 +155,7 @@ func measureWrite(conn *grpclib.ClientConn) {
func measureQuery(conn *grpclib.ClientConn) (int, error) {
c := measurev1.NewMeasureServiceClient(conn)
ctx := context.Background()
- resp, err := c.Query(ctx, queryMeasureCriteria(time.Now()))
+ resp, err := c.Query(ctx, queryMeasureCriteria(timestamp.NowMilli()))
return len(resp.GetDataPoints()), err
}
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 8d125b9..af2c1f5 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -22,10 +22,13 @@ import (
"io"
"time"
+ "github.com/pkg/errors"
+
"github.com/apache/skywalking-banyandb/api/data"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type streamService struct {
@@ -42,6 +45,10 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
if err != nil {
return err
}
+ if errTime := timestamp.Check(writeEntity.GetElement().Timestamp.AsTime()); errTime != nil {
+ s.log.Error().Err(errTime).Msg("the element time is invalid")
+ continue
+ }
entity, shardID, err := s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
if err != nil {
s.log.Error().Err(err).Msg("failed to navigate to the write target")
@@ -63,6 +70,12 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
}
func (s *streamService) Query(_ context.Context, entityCriteria *streamv1.QueryRequest) (*streamv1.QueryResponse, error) {
+ if err := timestamp.Check(entityCriteria.GetTimeRange().Begin.AsTime()); err != nil {
+ return nil, errors.WithMessage(err, "the begin of time range is invalid")
+ }
+ if err := timestamp.Check(entityCriteria.GetTimeRange().End.AsTime()); err != nil {
+ return nil, errors.WithMessage(err, "the end of time range is invalid")
+ }
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), entityCriteria)
feat, errQuery := s.pipeline.Publish(data.TopicStreamQuery, message)
if errQuery != nil {
diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/stream_test.go
index 994639d..8f02215 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/stream_test.go
@@ -41,6 +41,7 @@ import (
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/test"
teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var _ = Describe("Stream", func() {
@@ -163,7 +164,7 @@ func writeStreamData() *streamv1.WriteRequest {
return pbv1.NewStreamWriteRequestBuilder().
ID("1").
Metadata("default", "sw").
- Timestamp(time.Now()).
+ Timestamp(timestamp.NowMilli()).
TagFamily(bb).
TagFamily(
"trace_id-xxfff.111",
@@ -209,7 +210,7 @@ func streamWrite(conn *grpclib.ClientConn) {
func streamQuery(conn *grpclib.ClientConn) (int, error) {
c := streamv1.NewStreamServiceClient(conn)
ctx := context.Background()
- resp, err := c.Query(ctx, queryStreamCriteria(time.Now()))
+ resp, err := c.Query(ctx, queryStreamCriteria(timestamp.NowMilli()))
return len(resp.GetElements()), err
}
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index a8f60d9..d5f3db8 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -62,6 +62,10 @@ func (s *measure) Write(value *measurev1.DataPointValue) error {
}
func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *measurev1.DataPointValue, cb index.CallbackFn) error {
+ tp := value.GetTimestamp().AsTime()
+ if err := timestamp.Check(tp); err != nil {
+ return errors.WithMessage(err, "writing stream")
+ }
sm := s.schema
fLen := len(value.GetTagFamilies())
if fLen < 1 {
@@ -78,7 +82,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
if err != nil {
return err
}
- t := timestamp.MToN(value.GetTimestamp().AsTime())
+ t := timestamp.MToN(tp)
wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
if err != nil {
if wp != nil {
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 5affbc4..d73130e 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -53,6 +53,10 @@ func (s *stream) Write(value *streamv1.ElementValue) error {
}
func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *streamv1.ElementValue, cb index.CallbackFn) error {
+ tp := value.GetTimestamp().AsTime()
+ if err := timestamp.Check(tp); err != nil {
+ return errors.WithMessage(err, "writing stream")
+ }
sm := s.schema
fLen := len(value.GetTagFamilies())
if fLen < 1 {
@@ -69,7 +73,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
if err != nil {
return err
}
- t := timestamp.MToN(value.GetTimestamp().AsTime())
+ t := timestamp.MToN(tp)
wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
if err != nil {
if wp != nil {
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 9a3005f..0573e66 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -63,11 +63,11 @@ var _ = Describe("Shard", func() {
2,
)
Expect(err).NotTo(HaveOccurred())
- By("1st block is opened")
+ By("01/01 00:00 1st block is opened")
t1 := clock.Now()
Eventually(func() []tsdb.BlockState {
return shard.State().Blocks
- }).Should(Equal([]tsdb.BlockState{
+ }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -76,13 +76,12 @@ var _ = Describe("Shard", func() {
TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
},
}))
- By("2nd block is opened")
- // 01 10:00
+ By("01/01 10:00 2nd block is opened")
clock.Add(10 * time.Hour)
t2 := clock.Now().Add(2 * time.Hour)
Eventually(func() []tsdb.BlockState {
return shard.State().Blocks
- }).Should(Equal([]tsdb.BlockState{
+ }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -100,25 +99,23 @@ var _ = Describe("Shard", func() {
}))
Eventually(func() []tsdb.BlockID {
return shard.State().OpenBlocks
- }).Should(Equal([]tsdb.BlockID{}))
- By("moves to the 2nd block")
- // 01 13:00
+ }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{}))
+ By("01/01 13:00 moves to the 2nd block")
clock.Add(3 * time.Hour)
Eventually(func() []tsdb.BlockID {
return shard.State().OpenBlocks
- }).Should(Equal([]tsdb.BlockID{
+ }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
},
}))
- By("3rd block is opened")
- // 01 22:00
+ By("01/01 22:00 3rd block is opened")
clock.Add(9 * time.Hour)
t3 := clock.Now().Add(2 * time.Hour)
Eventually(func() []tsdb.BlockState {
return shard.State().Blocks
- }).Should(Equal([]tsdb.BlockState{
+ }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -141,8 +138,7 @@ var _ = Describe("Shard", func() {
TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
},
}))
- By("moves to 3rd block")
- // 02 01:00
+ By("01/02 01:00 moves to 3rd block")
clock.Add(3 * time.Hour)
Eventually(func() []tsdb.BlockID {
return shard.State().OpenBlocks
@@ -156,13 +152,12 @@ var _ = Describe("Shard", func() {
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
},
}))
- By("4th block is opened")
- // 02 10:00
+ By("01/02 10:00 4th block is opened")
clock.Add(9 * time.Hour)
t4 := clock.Now().Add(2 * time.Hour)
Eventually(func() []tsdb.BlockState {
return shard.State().Blocks
- }).Should(Equal([]tsdb.BlockState{
+ }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -192,12 +187,11 @@ var _ = Describe("Shard", func() {
TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
},
}))
- By("moves to 4th block")
- // 02 13:00
+ By("01/02 13:00 moves to 4th block")
clock.Add(3 * time.Hour)
Eventually(func() []tsdb.BlockID {
return shard.State().OpenBlocks
- }).Should(Equal([]tsdb.BlockID{
+ }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
@@ -211,13 +205,12 @@ var _ = Describe("Shard", func() {
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
},
}))
- By("5th block is opened")
- // 02 22:00
+ By("01/02 22:00 5th block is opened")
clock.Add(9 * time.Hour)
t5 := clock.Now().Add(2 * time.Hour)
Eventually(func() []tsdb.BlockState {
return shard.State().Blocks
- }).Should(Equal([]tsdb.BlockState{
+ }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -254,12 +247,11 @@ var _ = Describe("Shard", func() {
TimeRange: timestamp.NewTimeRangeDuration(t5, 12*time.Hour, true, false),
},
}))
- By("close 1st block by adding 5th block")
- // 03 01:00
+ By("01/03 01:00 close 1st block by adding 5th block")
clock.Add(3 * time.Hour)
Eventually(func() []tsdb.BlockID {
return shard.State().OpenBlocks
- }).Should(Equal([]tsdb.BlockID{
+ }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 00),
@@ -275,7 +267,7 @@ var _ = Describe("Shard", func() {
}))
Eventually(func() []tsdb.BlockState {
return shard.State().Blocks
- }).Should(Equal([]tsdb.BlockState{
+ }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -326,7 +318,7 @@ var _ = Describe("Shard", func() {
Expect(err).NotTo(HaveOccurred())
Eventually(func() []tsdb.BlockID {
return shard.State().OpenBlocks
- }).Should(Equal([]tsdb.BlockID{
+ }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102),
BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12),
@@ -342,7 +334,7 @@ var _ = Describe("Shard", func() {
}))
Eventually(func() []tsdb.BlockState {
return shard.State().Blocks
- }).Should(Equal([]tsdb.BlockState{
+ }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
{
ID: tsdb.BlockID{
SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101),
diff --git a/banyand/tsdb/tsdb_suite_test.go b/banyand/tsdb/tsdb_suite_test.go
index c761503..cd1109d 100644
--- a/banyand/tsdb/tsdb_suite_test.go
+++ b/banyand/tsdb/tsdb_suite_test.go
@@ -19,6 +19,7 @@ package tsdb_test
import (
"testing"
+ "time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -26,6 +27,8 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
)
+var defaultEventallyTimeout = 30 * time.Second
+
func TestTsdb(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Tsdb Suite")
diff --git a/pkg/timestamp/nano.go b/pkg/timestamp/nano.go
index 207f924..ee6e926 100644
--- a/pkg/timestamp/nano.go
+++ b/pkg/timestamp/nano.go
@@ -18,9 +18,13 @@
package timestamp
import (
+ "math"
"time"
+
// link runtime pkg fastrand
_ "unsafe"
+
+ "github.com/pkg/errors"
)
var (
@@ -46,3 +50,34 @@ func MToN(ms time.Time) time.Time {
func NowMilli() time.Time {
return time.UnixMilli(time.Now().UnixMilli())
}
+
+const (
+ // MinNanoTime is the minimum time that can be represented.
+ //
+ // 1677-09-21 00:12:43.145224192 +0000 UTC
+ MinNanoTime = int64(math.MinInt64) + 2
+
+ // MaxNanoTime is the maximum time that can be represented.
+ //
+ // 2262-04-11 23:47:16.854775807 +0000 UTC
+ MaxNanoTime = int64(math.MaxInt64) - 1
+)
+
+var (
+ minNanoTime = time.Unix(0, MinNanoTime).UTC()
+ maxNanoTime = time.Unix(0, MaxNanoTime).UTC()
+
+ ErrTimeOutOfRange = errors.Errorf("time is out of range %d - %d", MinNanoTime, MaxNanoTime)
+ ErrTimeNotMillisecond = errors.Errorf("time is not millisecond precision")
+)
+
+// Check checks that a time is valid
+func Check(t time.Time) error {
+ if t.Before(minNanoTime) || t.After(maxNanoTime) {
+ return ErrTimeOutOfRange
+ }
+ if t.Nanosecond()%int(mSecond) > 0 {
+ return ErrTimeNotMillisecond
+ }
+ return nil
+}