You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/09/25 00:15:26 UTC
[skywalking-banyandb] branch main updated: Add more functions to
support test (#52)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 60f753b Add more functions to support test (#52)
60f753b is described below
commit 60f753b48d2a4b2ce03e4479cb1b852573ff5b7d
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Sep 25 08:15:22 2021 +0800
Add more functions to support test (#52)
* Add more functions to support test
Signed-off-by: Gao Hongtao <ha...@gmail.com>
* Remove redundant line
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/stream/index.go | 5 ++++
banyand/stream/service.go | 1 -
banyand/stream/stream_query.go | 6 +++++
banyand/stream/stream_query_test.go | 47 +++++--------------------------------
banyand/stream/stream_write.go | 44 +++++++++++++++++++++++++---------
banyand/stream/stream_write_test.go | 4 ++--
6 files changed, 52 insertions(+), 55 deletions(-)
diff --git a/banyand/stream/index.go b/banyand/stream/index.go
index c0a6c14..aa797f5 100644
--- a/banyand/stream/index.go
+++ b/banyand/stream/index.go
@@ -30,10 +30,12 @@ import (
"github.com/apache/skywalking-banyandb/pkg/partition"
)
+type callbackFn func()
type indexMessage struct {
localWriter tsdb.Writer
blockCloser io.Closer
value *streamv2.ElementValue
+ cb callbackFn
}
func (s *stream) bootIndexGenerator() {
@@ -57,6 +59,9 @@ func (s *stream) bootIndexGenerator() {
if err != nil {
s.l.Error().Err(err).Msg("encounter some errors when generating indices")
}
+ if m.cb != nil {
+ m.cb()
+ }
}
}()
}
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
index d7d1284..9cb37a6 100644
--- a/banyand/stream/service.go
+++ b/banyand/stream/service.go
@@ -100,7 +100,6 @@ func (s *service) PreRun() error {
}
id := formatStreamID(sm.name, sm.group)
s.schemaMap[id] = sm
- s.writeListener.schemaMap[id] = sm
s.l.Info().Str("id", id).Msg("initialize stream")
}
s.writeListener = setUpWriteCallback(s.l, s.schemaMap)
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index f8b7665..ea58fd1 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -18,6 +18,8 @@
package stream
import (
+ "io"
+
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
@@ -39,8 +41,12 @@ type Query interface {
}
type Stream interface {
+ io.Closer
+ Write(value *streamv2.ElementValue) error
Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
Shard(id common.ShardID) (tsdb.Shard, error)
+ ParseTagFamily(family string, item tsdb.Item) (*modelv2.TagFamily, error)
+ ParseElementID(item tsdb.Item) (string, error)
}
var _ Stream = (*stream)(nil)
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index 8319487..b608c14 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -19,7 +19,6 @@ package stream
import (
"bytes"
- "context"
"embed"
_ "embed"
"encoding/base64"
@@ -34,7 +33,6 @@ import (
"github.com/golang/protobuf/jsonpb"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/apache/skywalking-banyandb/api/common"
@@ -45,7 +43,6 @@ import (
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
- "github.com/apache/skywalking-banyandb/pkg/partition"
)
type shardStruct struct {
@@ -511,12 +508,13 @@ func Test_Stream_Series(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- got, err := queryData(tester, s, tt.args)
+ ast := assert.New(t)
+ got, err := queryData(ast, s, tt.args)
if tt.wantErr {
- tester.Error(err)
+ ast.Error(err)
return
}
- tester.NoError(err)
+ ast.NoError(err)
sort.SliceStable(got, func(i, j int) bool {
a := got[i]
b := got[j]
@@ -531,7 +529,7 @@ func Test_Stream_Series(t *testing.T) {
}
return true
})
- tester.Equal(tt.want, got)
+ ast.Equal(tt.want, got)
})
}
@@ -740,41 +738,8 @@ func setupQueryData(testing *testing.T, dataFile string, stream *stream) (baseTi
},
}
e.TagFamilies = append(e.TagFamilies, searchTagFamily)
- entity, errInner := stream.buildEntity(e)
- t.NoError(errInner)
- shardID, errInner := partition.ShardID(entity.Marshal(), stream.schema.GetShardNum())
- t.NoError(errInner)
- _, errInner = stream.write(common.ShardID(shardID), e)
+ errInner := stream.Write(e)
t.NoError(errInner)
}
- ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancelFunc()
- err = ready(ctx, t, stream, queryOpts{
- entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
- })
- require.NoError(testing, err)
return baseTime
}
-
-func ready(ctx context.Context, t *assert.Assertions, stream *stream, options queryOpts) error {
- for {
- loop:
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
- data, err := queryData(t, stream, options)
- if err != nil {
- return err
- }
- for _, d := range data {
- if len(d.elements) < 1 {
- time.Sleep(300 * time.Millisecond)
- break loop
- }
- }
- return nil
- }
- }
-}
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 610ef31..42360f8 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -32,6 +32,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/partition"
)
var (
@@ -39,26 +40,47 @@ var (
ErrUnsupportedTagForIndexField = errors.New("the tag type(for example, null) can not be as the index field value")
)
-func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*tsdb.GlobalItemID, error) {
+func (s *stream) Write(value *streamv2.ElementValue) error {
+ entity, err := s.buildEntity(value)
+ if err != nil {
+ return err
+ }
+ shardID, err := partition.ShardID(entity.Marshal(), s.schema.GetShardNum())
+ if err != nil {
+ return err
+ }
+ waitCh := make(chan struct{})
+ err = s.write(common.ShardID(shardID), value, func() {
+ close(waitCh)
+ })
+ if err != nil {
+ close(waitCh)
+ return err
+ }
+ <-waitCh
+ return nil
+}
+
+func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue, cb callbackFn) error {
sm := s.schema
fLen := len(value.GetTagFamilies())
if fLen < 1 {
- return nil, errors.Wrap(ErrMalformedElement, "no tag family")
+ return errors.Wrap(ErrMalformedElement, "no tag family")
}
if fLen > len(sm.TagFamilies) {
- return nil, errors.Wrap(ErrMalformedElement, "tag family number is more than expected")
+ return errors.Wrap(ErrMalformedElement, "tag family number is more than expected")
}
shard, err := s.db.Shard(shardID)
if err != nil {
- return nil, err
+ return err
}
entity, err := s.buildEntity(value)
if err != nil {
- return nil, err
+ return err
}
series, err := shard.Series().Get(entity)
if err != nil {
- return nil, err
+ return err
}
t := value.GetTimestamp().AsTime()
wp, err := series.Span(tsdb.NewTimeRangeDuration(t, 0))
@@ -66,7 +88,7 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*t
if wp != nil {
_ = wp.Close()
}
- return nil, err
+ return err
}
writeFn := func() (tsdb.Writer, error) {
builder := wp.WriterBuilder().Time(t)
@@ -109,12 +131,13 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*t
writer, err := writeFn()
if err != nil {
_ = wp.Close()
- return nil, err
+ return err
}
m := indexMessage{
localWriter: writer,
value: value,
blockCloser: wp,
+ cb: cb,
}
go func(m indexMessage) {
defer func() {
@@ -124,8 +147,7 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*t
}()
s.indexCh <- m
}(m)
- itemID := writer.ItemID()
- return &itemID, err
+ return err
}
func getIndexValue(ruleIndex indexRule, value *streamv2.ElementValue) (val []byte, isInt bool, err error) {
@@ -219,7 +241,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
}
sm := writeEvent.WriteRequest.GetMetadata()
id := formatStreamID(sm.GetName(), sm.GetGroup())
- _, err := w.schemaMap[id].write(common.ShardID(writeEvent.ShardID), writeEvent.WriteRequest.GetElement())
+ err := w.schemaMap[id].write(common.ShardID(writeEvent.ShardID), writeEvent.WriteRequest.GetElement(), nil)
if err != nil {
w.l.Debug().Err(err)
}
diff --git a/banyand/stream/stream_write_test.go b/banyand/stream/stream_write_test.go
index b0d8880..74ae262 100644
--- a/banyand/stream/stream_write_test.go
+++ b/banyand/stream/stream_write_test.go
@@ -197,7 +197,7 @@ func Test_Stream_Write(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- _, err := s.write(common.ShardID(tt.args.shardID), tt.args.ele)
+ err := s.write(common.ShardID(tt.args.shardID), tt.args.ele, nil)
if tt.wantErr {
tester.Error(err)
return
@@ -211,7 +211,7 @@ func Test_Stream_Write(t *testing.T) {
func setup(t *assert.Assertions) (*stream, func()) {
t.NoError(logger.Init(logger.Logging{
Env: "dev",
- Level: "trace",
+ Level: "info",
}))
tempDir, deferFunc := test.Space(t)
streamRepo, err := schema.NewStream()