You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/09/25 00:15:26 UTC

[skywalking-banyandb] branch main updated: Add more functions to support test (#52)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 60f753b  Add more functions to support test (#52)
60f753b is described below

commit 60f753b48d2a4b2ce03e4479cb1b852573ff5b7d
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Sep 25 08:15:22 2021 +0800

    Add more functions to support test (#52)
    
    * Add more functions to support test
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
    
    * Remove redundant line
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/stream/index.go             |  5 ++++
 banyand/stream/service.go           |  1 -
 banyand/stream/stream_query.go      |  6 +++++
 banyand/stream/stream_query_test.go | 47 +++++--------------------------------
 banyand/stream/stream_write.go      | 44 +++++++++++++++++++++++++---------
 banyand/stream/stream_write_test.go |  4 ++--
 6 files changed, 52 insertions(+), 55 deletions(-)

diff --git a/banyand/stream/index.go b/banyand/stream/index.go
index c0a6c14..aa797f5 100644
--- a/banyand/stream/index.go
+++ b/banyand/stream/index.go
@@ -30,10 +30,12 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/partition"
 )
 
+type callbackFn func()
 type indexMessage struct {
 	localWriter tsdb.Writer
 	blockCloser io.Closer
 	value       *streamv2.ElementValue
+	cb          callbackFn
 }
 
 func (s *stream) bootIndexGenerator() {
@@ -57,6 +59,9 @@ func (s *stream) bootIndexGenerator() {
 			if err != nil {
 				s.l.Error().Err(err).Msg("encounter some errors when generating indices")
 			}
+			if m.cb != nil {
+				m.cb()
+			}
 		}
 	}()
 }
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
index d7d1284..9cb37a6 100644
--- a/banyand/stream/service.go
+++ b/banyand/stream/service.go
@@ -100,7 +100,6 @@ func (s *service) PreRun() error {
 		}
 		id := formatStreamID(sm.name, sm.group)
 		s.schemaMap[id] = sm
-		s.writeListener.schemaMap[id] = sm
 		s.l.Info().Str("id", id).Msg("initialize stream")
 	}
 	s.writeListener = setUpWriteCallback(s.l, s.schemaMap)
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index f8b7665..ea58fd1 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -18,6 +18,8 @@
 package stream
 
 import (
+	"io"
+
 	"github.com/golang/protobuf/proto"
 	"github.com/pkg/errors"
 
@@ -39,8 +41,12 @@ type Query interface {
 }
 
 type Stream interface {
+	io.Closer
+	Write(value *streamv2.ElementValue) error
 	Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
 	Shard(id common.ShardID) (tsdb.Shard, error)
+	ParseTagFamily(family string, item tsdb.Item) (*modelv2.TagFamily, error)
+	ParseElementID(item tsdb.Item) (string, error)
 }
 
 var _ Stream = (*stream)(nil)
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index 8319487..b608c14 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -19,7 +19,6 @@ package stream
 
 import (
 	"bytes"
-	"context"
 	"embed"
 	_ "embed"
 	"encoding/base64"
@@ -34,7 +33,6 @@ import (
 	"github.com/golang/protobuf/jsonpb"
 	"github.com/pkg/errors"
 	"github.com/stretchr/testify/assert"
-	"github.com/stretchr/testify/require"
 	"google.golang.org/protobuf/types/known/timestamppb"
 
 	"github.com/apache/skywalking-banyandb/api/common"
@@ -45,7 +43,6 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
-	"github.com/apache/skywalking-banyandb/pkg/partition"
 )
 
 type shardStruct struct {
@@ -511,12 +508,13 @@ func Test_Stream_Series(t *testing.T) {
 
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			got, err := queryData(tester, s, tt.args)
+			ast := assert.New(t)
+			got, err := queryData(ast, s, tt.args)
 			if tt.wantErr {
-				tester.Error(err)
+				ast.Error(err)
 				return
 			}
-			tester.NoError(err)
+			ast.NoError(err)
 			sort.SliceStable(got, func(i, j int) bool {
 				a := got[i]
 				b := got[j]
@@ -531,7 +529,7 @@ func Test_Stream_Series(t *testing.T) {
 				}
 				return true
 			})
-			tester.Equal(tt.want, got)
+			ast.Equal(tt.want, got)
 		})
 	}
 
@@ -740,41 +738,8 @@ func setupQueryData(testing *testing.T, dataFile string, stream *stream) (baseTi
 			},
 		}
 		e.TagFamilies = append(e.TagFamilies, searchTagFamily)
-		entity, errInner := stream.buildEntity(e)
-		t.NoError(errInner)
-		shardID, errInner := partition.ShardID(entity.Marshal(), stream.schema.GetShardNum())
-		t.NoError(errInner)
-		_, errInner = stream.write(common.ShardID(shardID), e)
+		errInner := stream.Write(e)
 		t.NoError(errInner)
 	}
-	ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
-	defer cancelFunc()
-	err = ready(ctx, t, stream, queryOpts{
-		entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
-		timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
-	})
-	require.NoError(testing, err)
 	return baseTime
 }
-
-func ready(ctx context.Context, t *assert.Assertions, stream *stream, options queryOpts) error {
-	for {
-	loop:
-		select {
-		case <-ctx.Done():
-			return ctx.Err()
-		default:
-			data, err := queryData(t, stream, options)
-			if err != nil {
-				return err
-			}
-			for _, d := range data {
-				if len(d.elements) < 1 {
-					time.Sleep(300 * time.Millisecond)
-					break loop
-				}
-			}
-			return nil
-		}
-	}
-}
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 610ef31..42360f8 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -32,6 +32,7 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/partition"
 )
 
 var (
@@ -39,26 +40,47 @@ var (
 	ErrUnsupportedTagForIndexField = errors.New("the tag type(for example, null) can not be as the index field value")
 )
 
-func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*tsdb.GlobalItemID, error) {
+func (s *stream) Write(value *streamv2.ElementValue) error {
+	entity, err := s.buildEntity(value)
+	if err != nil {
+		return err
+	}
+	shardID, err := partition.ShardID(entity.Marshal(), s.schema.GetShardNum())
+	if err != nil {
+		return err
+	}
+	waitCh := make(chan struct{})
+	err = s.write(common.ShardID(shardID), value, func() {
+		close(waitCh)
+	})
+	if err != nil {
+		close(waitCh)
+		return err
+	}
+	<-waitCh
+	return nil
+}
+
+func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue, cb callbackFn) error {
 	sm := s.schema
 	fLen := len(value.GetTagFamilies())
 	if fLen < 1 {
-		return nil, errors.Wrap(ErrMalformedElement, "no tag family")
+		return errors.Wrap(ErrMalformedElement, "no tag family")
 	}
 	if fLen > len(sm.TagFamilies) {
-		return nil, errors.Wrap(ErrMalformedElement, "tag family number is more than expected")
+		return errors.Wrap(ErrMalformedElement, "tag family number is more than expected")
 	}
 	shard, err := s.db.Shard(shardID)
 	if err != nil {
-		return nil, err
+		return err
 	}
 	entity, err := s.buildEntity(value)
 	if err != nil {
-		return nil, err
+		return err
 	}
 	series, err := shard.Series().Get(entity)
 	if err != nil {
-		return nil, err
+		return err
 	}
 	t := value.GetTimestamp().AsTime()
 	wp, err := series.Span(tsdb.NewTimeRangeDuration(t, 0))
@@ -66,7 +88,7 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*t
 		if wp != nil {
 			_ = wp.Close()
 		}
-		return nil, err
+		return err
 	}
 	writeFn := func() (tsdb.Writer, error) {
 		builder := wp.WriterBuilder().Time(t)
@@ -109,12 +131,13 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*t
 	writer, err := writeFn()
 	if err != nil {
 		_ = wp.Close()
-		return nil, err
+		return err
 	}
 	m := indexMessage{
 		localWriter: writer,
 		value:       value,
 		blockCloser: wp,
+		cb:          cb,
 	}
 	go func(m indexMessage) {
 		defer func() {
@@ -124,8 +147,7 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*t
 		}()
 		s.indexCh <- m
 	}(m)
-	itemID := writer.ItemID()
-	return &itemID, err
+	return err
 }
 
 func getIndexValue(ruleIndex indexRule, value *streamv2.ElementValue) (val []byte, isInt bool, err error) {
@@ -219,7 +241,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
 	}
 	sm := writeEvent.WriteRequest.GetMetadata()
 	id := formatStreamID(sm.GetName(), sm.GetGroup())
-	_, err := w.schemaMap[id].write(common.ShardID(writeEvent.ShardID), writeEvent.WriteRequest.GetElement())
+	err := w.schemaMap[id].write(common.ShardID(writeEvent.ShardID), writeEvent.WriteRequest.GetElement(), nil)
 	if err != nil {
 		w.l.Debug().Err(err)
 	}
diff --git a/banyand/stream/stream_write_test.go b/banyand/stream/stream_write_test.go
index b0d8880..74ae262 100644
--- a/banyand/stream/stream_write_test.go
+++ b/banyand/stream/stream_write_test.go
@@ -197,7 +197,7 @@ func Test_Stream_Write(t *testing.T) {
 
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			_, err := s.write(common.ShardID(tt.args.shardID), tt.args.ele)
+			err := s.write(common.ShardID(tt.args.shardID), tt.args.ele, nil)
 			if tt.wantErr {
 				tester.Error(err)
 				return
@@ -211,7 +211,7 @@ func Test_Stream_Write(t *testing.T) {
 func setup(t *assert.Assertions) (*stream, func()) {
 	t.NoError(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "trace",
+		Level: "info",
 	}))
 	tempDir, deferFunc := test.Space(t)
 	streamRepo, err := schema.NewStream()