You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/08/05 07:36:36 UTC

[skywalking-banyandb] branch main updated: Refactor index startup and remove index field check (#29)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new c98656e  Refactor index startup and remove index field check (#29)
c98656e is described below

commit c98656e5971d4c5d12131c0fab5f56686308895d
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Thu Aug 5 15:34:31 2021 +0800

    Refactor index startup and remove index field check (#29)
    
    * refactor index startup and remove index field check
    
    Signed-off-by: Megrez Lu <lu...@gmail.com>
    
    * add log and break iteration asap
    
    Signed-off-by: Megrez Lu <lu...@gmail.com>
    
    * Update banyand/index/index.go
    
    polish condition check
    
    Co-authored-by: Gao Hongtao <ha...@gmail.com>
    
    Co-authored-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/index/index.go          | 44 +++++++++++++++++++++++++++++++++++++++--
 banyand/index/index_test.go     | 15 +++++---------
 banyand/query/processor_test.go | 11 +++++++----
 3 files changed, 54 insertions(+), 16 deletions(-)

diff --git a/banyand/index/index.go b/banyand/index/index.go
index 89122e9..98a049a 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -38,7 +38,6 @@ import (
 var (
 	ErrShardNotFound       = errors.New("series doesn't exist")
 	ErrTraceSeriesNotFound = errors.New("trace series not found")
-	ErrUnknownField        = errors.New("the field is unknown")
 )
 
 type Condition struct {
@@ -64,9 +63,25 @@ type Builder interface {
 	run.Service
 }
 
+type ReadyOption func(map[string]*series) bool
+
+func MetaExists(group, name string) ReadyOption {
+	seriesID := &apiv1.Metadata{
+		Name:  "sw",
+		Group: "default",
+	}
+	return func(m map[string]*series) bool {
+		if _, ok := m[compositeSeriesID(seriesID)]; ok {
+			return true
+		}
+		return false
+	}
+}
+
 type Service interface {
 	Repo
 	Builder
+	Ready(context.Context, ...ReadyOption) bool
 }
 
 type series struct {
@@ -108,7 +123,8 @@ func (s *service) Insert(series common.Metadata, shardID uint, field *Field) err
 	}
 	objects, ok := sd.meta[field.Name]
 	if !ok {
-		return ErrUnknownField
+		s.log.Debug().Str("field", field.Name).Msg("field is not indexed")
+		return nil
 	}
 	for _, object := range objects {
 		err = multierr.Append(err, sd.store.Insert(&tsdb.Field{
@@ -154,6 +170,30 @@ func (s *service) GracefulStop() {
 	}
 }
 
+func (s *service) Ready(ctx context.Context, options ...ReadyOption) bool {
+	options = append(options, func(m map[string]*series) bool {
+		return len(m) > 0
+	})
+
+	for {
+		select {
+		case <-ctx.Done():
+			return ctx.Err() == nil
+		default:
+			allMatches := true
+			for _, opt := range options {
+				if allMatches = opt(s.meta.meta); !allMatches {
+					break
+				}
+			}
+			if !allMatches {
+				continue
+			}
+			return true
+		}
+	}
+}
+
 type indexMeta struct {
 	meta map[string]*series
 	sync.RWMutex
diff --git a/banyand/index/index_test.go b/banyand/index/index_test.go
index 44a89af..cd09710 100644
--- a/banyand/index/index_test.go
+++ b/banyand/index/index_test.go
@@ -108,7 +108,7 @@ func Test_service_Insert(t *testing.T) {
 					Value:   convert.Int64ToBytes(500),
 				},
 			},
-			wantErr: true,
+			wantErr: false,
 		},
 	}
 	for _, tt := range tests {
@@ -172,14 +172,9 @@ func setUpModules(tester *assert.Assertions) *service {
 	tester.NoError(err)
 	s, ok := svc.(*service)
 	tester.True(ok)
-	deadline := time.Now().Add(10 * time.Second)
-	for {
-		if s.meta.get(seriesID) != nil {
-			break
-		}
-		if time.Now().After(deadline) {
-			tester.Fail("timeout")
-		}
-	}
+
+	ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancelFunc()
+	tester.True(svc.Ready(ctx, MetaExists("default", "sw")))
 	return s
 }
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index 54d2f65..dbea2ae 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -111,6 +111,10 @@ func setupServices(t *testing.T, tester *require.Assertions) (discovery.ServiceR
 		tester.NoError(err)
 	}()
 
+	ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancelFunc()
+	tester.True(indexSvc.Ready(ctx, index.MetaExists("default", "sw")))
+
 	return repo, traceSvc, func() {
 		db.GracefulStop()
 		_ = os.RemoveAll(rootPath)
@@ -269,10 +273,9 @@ func setupData(tester *require.Assertions, baseTs time.Time, svc series.Service)
 	}
 
 	for _, ev := range entityValues {
-		_, _ = svc.Write(metadata, ev.ts, ev.seriesID, ev.entityID, ev.dataBinary, ev.items...)
-		// TODO: every field should be indexed?
-		//tester.True(ok)
-		//tester.NoError(err)
+		ok, err := svc.Write(metadata, ev.ts, ev.seriesID, ev.entityID, ev.dataBinary, ev.items...)
+		tester.True(ok)
+		tester.NoError(err)
 	}
 }