You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/08/05 07:36:36 UTC
[skywalking-banyandb] branch main updated: Refactor index startup
and remove index field check (#29)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new c98656e Refactor index startup and remove index field check (#29)
c98656e is described below
commit c98656e5971d4c5d12131c0fab5f56686308895d
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Thu Aug 5 15:34:31 2021 +0800
Refactor index startup and remove index field check (#29)
* refactor index startup and remove index field check
Signed-off-by: Megrez Lu <lu...@gmail.com>
* add log and break iteration asap
Signed-off-by: Megrez Lu <lu...@gmail.com>
* Update banyand/index/index.go
polish condition check
Co-authored-by: Gao Hongtao <ha...@gmail.com>
Co-authored-by: Gao Hongtao <ha...@gmail.com>
---
banyand/index/index.go | 44 +++++++++++++++++++++++++++++++++++++++--
banyand/index/index_test.go | 15 +++++---------
banyand/query/processor_test.go | 11 +++++++----
3 files changed, 54 insertions(+), 16 deletions(-)
diff --git a/banyand/index/index.go b/banyand/index/index.go
index 89122e9..98a049a 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -38,7 +38,6 @@ import (
var (
ErrShardNotFound = errors.New("series doesn't exist")
ErrTraceSeriesNotFound = errors.New("trace series not found")
- ErrUnknownField = errors.New("the field is unknown")
)
type Condition struct {
@@ -64,9 +63,25 @@ type Builder interface {
run.Service
}
+type ReadyOption func(map[string]*series) bool
+
+func MetaExists(group, name string) ReadyOption {
+ seriesID := &apiv1.Metadata{
+ Name: "sw",
+ Group: "default",
+ }
+ return func(m map[string]*series) bool {
+ if _, ok := m[compositeSeriesID(seriesID)]; ok {
+ return true
+ }
+ return false
+ }
+}
+
type Service interface {
Repo
Builder
+ Ready(context.Context, ...ReadyOption) bool
}
type series struct {
@@ -108,7 +123,8 @@ func (s *service) Insert(series common.Metadata, shardID uint, field *Field) err
}
objects, ok := sd.meta[field.Name]
if !ok {
- return ErrUnknownField
+ s.log.Debug().Str("field", field.Name).Msg("field is not indexed")
+ return nil
}
for _, object := range objects {
err = multierr.Append(err, sd.store.Insert(&tsdb.Field{
@@ -154,6 +170,30 @@ func (s *service) GracefulStop() {
}
}
+func (s *service) Ready(ctx context.Context, options ...ReadyOption) bool {
+ options = append(options, func(m map[string]*series) bool {
+ return len(m) > 0
+ })
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err() == nil
+ default:
+ allMatches := true
+ for _, opt := range options {
+ if allMatches = opt(s.meta.meta); !allMatches {
+ break
+ }
+ }
+ if !allMatches {
+ continue
+ }
+ return true
+ }
+ }
+}
+
type indexMeta struct {
meta map[string]*series
sync.RWMutex
diff --git a/banyand/index/index_test.go b/banyand/index/index_test.go
index 44a89af..cd09710 100644
--- a/banyand/index/index_test.go
+++ b/banyand/index/index_test.go
@@ -108,7 +108,7 @@ func Test_service_Insert(t *testing.T) {
Value: convert.Int64ToBytes(500),
},
},
- wantErr: true,
+ wantErr: false,
},
}
for _, tt := range tests {
@@ -172,14 +172,9 @@ func setUpModules(tester *assert.Assertions) *service {
tester.NoError(err)
s, ok := svc.(*service)
tester.True(ok)
- deadline := time.Now().Add(10 * time.Second)
- for {
- if s.meta.get(seriesID) != nil {
- break
- }
- if time.Now().After(deadline) {
- tester.Fail("timeout")
- }
- }
+
+ ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancelFunc()
+ tester.True(svc.Ready(ctx, MetaExists("default", "sw")))
return s
}
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index 54d2f65..dbea2ae 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -111,6 +111,10 @@ func setupServices(t *testing.T, tester *require.Assertions) (discovery.ServiceR
tester.NoError(err)
}()
+ ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancelFunc()
+ tester.True(indexSvc.Ready(ctx, index.MetaExists("default", "sw")))
+
return repo, traceSvc, func() {
db.GracefulStop()
_ = os.RemoveAll(rootPath)
@@ -269,10 +273,9 @@ func setupData(tester *require.Assertions, baseTs time.Time, svc series.Service)
}
for _, ev := range entityValues {
- _, _ = svc.Write(metadata, ev.ts, ev.seriesID, ev.entityID, ev.dataBinary, ev.items...)
- // TODO: every field should be indexed?
- //tester.True(ok)
- //tester.NoError(err)
+ ok, err := svc.Write(metadata, ev.ts, ev.seriesID, ev.entityID, ev.dataBinary, ev.items...)
+ tester.True(ok)
+ tester.NoError(err)
}
}