You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/16 10:31:52 UTC

[skywalking-banyandb] branch main updated: Make sure index rule ID is set by (#104)

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f78d0d  Make sure index rule ID is set by (#104)
0f78d0d is described below

commit 0f78d0df112f17193812a21461bcbaea9ebd806a
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Apr 16 18:31:48 2022 +0800

    Make sure index rule ID is set by (#104)
    
    * Make sure index rule ID is set
    * Add a default value based on 32 bits CRC to the ID
    * Check the ID on getting and listing index rules
    * Improve metadata event handler
    * Add a new gRPC error code
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/liaison/grpc/grpc_suite_test.go |  3 +++
 banyand/liaison/grpc/measure_test.go    |  4 +--
 banyand/liaison/grpc/stream_test.go     |  4 +--
 banyand/metadata/schema/error.go        |  2 ++
 banyand/metadata/schema/index.go        | 15 +++++++++++-
 banyand/tsdb/shard_test.go              | 14 ++++++-----
 pkg/schema/metadata.go                  | 43 +++++++++++++++++++--------------
 7 files changed, 56 insertions(+), 29 deletions(-)

diff --git a/banyand/liaison/grpc/grpc_suite_test.go b/banyand/liaison/grpc/grpc_suite_test.go
index b247b7d..d62547f 100644
--- a/banyand/liaison/grpc/grpc_suite_test.go
+++ b/banyand/liaison/grpc/grpc_suite_test.go
@@ -19,6 +19,7 @@ package grpc_test
 
 import (
 	"testing"
+	"time"
 
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
@@ -26,6 +27,8 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
+var defaultEventallyTimeout = 30 * time.Second
+
 func TestGrpc(t *testing.T) {
 	RegisterFailHandler(Fail)
 	RunSpecs(t, "Grpc Suite")
diff --git a/banyand/liaison/grpc/measure_test.go b/banyand/liaison/grpc/measure_test.go
index 4d44bf1..f54a172 100644
--- a/banyand/liaison/grpc/measure_test.go
+++ b/banyand/liaison/grpc/measure_test.go
@@ -71,7 +71,7 @@ var _ = Describe("Measure", func() {
 				return 0
 			}
 			return num
-		}).Should(Equal(1))
+		}, defaultEventallyTimeout).Should(Equal(1))
 	})
 	It("is a TLS server", func() {
 		flags := []string{"--tls=true", "--measure-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
@@ -91,7 +91,7 @@ var _ = Describe("Measure", func() {
 		measureWrite(conn)
 		Eventually(func() (int, error) {
 			return measureQuery(conn)
-		}).Should(Equal(1))
+		}, defaultEventallyTimeout).Should(Equal(1))
 	})
 	AfterEach(func() {
 		_ = conn.Close()
diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/stream_test.go
index dbc271d..269f9c7 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/stream_test.go
@@ -78,7 +78,7 @@ var _ = Describe("Stream", func() {
 				return 0
 			}
 			return num
-		}).Should(Equal(1))
+		}, defaultEventallyTimeout).Should(Equal(1))
 	})
 	It("is a TLS server", func() {
 		flags := []string{"--tls=true", "--stream-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
@@ -98,7 +98,7 @@ var _ = Describe("Stream", func() {
 		streamWrite(conn)
 		Eventually(func() (int, error) {
 			return streamQuery(conn)
-		}).Should(Equal(1))
+		}, defaultEventallyTimeout).Should(Equal(1))
 	})
 	AfterEach(func() {
 		_ = conn.Close()
diff --git a/banyand/metadata/schema/error.go b/banyand/metadata/schema/error.go
index 8853e0f..d9cf005 100644
--- a/banyand/metadata/schema/error.go
+++ b/banyand/metadata/schema/error.go
@@ -31,6 +31,8 @@ var (
 	ErrGRPCResourceNotFound    = statusGRPCResourceNotFound.Err()
 	statusGRPCAlreadyExists    = status.New(codes.AlreadyExists, "banyandb: resource already exists")
 	ErrGRPCAlreadyExists       = statusGRPCAlreadyExists.Err()
+	statusDataLoss             = status.New(codes.DataLoss, "banyandb: resource corrupts.")
+	ErrGRPCDataLoss            = statusDataLoss.Err()
 )
 
 func IsNotFound(err error) bool {
diff --git a/banyand/metadata/schema/index.go b/banyand/metadata/schema/index.go
index c79ebfc..f3bf415 100644
--- a/banyand/metadata/schema/index.go
+++ b/banyand/metadata/schema/index.go
@@ -19,6 +19,7 @@ package schema
 
 import (
 	"context"
+	"hash/crc32"
 
 	"google.golang.org/protobuf/proto"
 
@@ -93,6 +94,9 @@ func (e *etcdSchemaRegistry) GetIndexRule(ctx context.Context, metadata *commonv
 	if err := e.get(ctx, formatIndexRuleKey(metadata), &entity); err != nil {
 		return nil, err
 	}
+	if entity.Metadata.Id == 0 {
+		return nil, ErrGRPCDataLoss
+	}
 	return &entity, nil
 }
 
@@ -108,12 +112,21 @@ func (e *etcdSchemaRegistry) ListIndexRule(ctx context.Context, opt ListOpt) ([]
 	}
 	entities := make([]*databasev1.IndexRule, 0, len(messages))
 	for _, message := range messages {
-		entities = append(entities, message.(*databasev1.IndexRule))
+		entity := message.(*databasev1.IndexRule)
+		if entity.Metadata.Id == 0 {
+			return nil, ErrGRPCDataLoss
+		}
+		entities = append(entities, entity)
 	}
 	return entities, nil
 }
 
 func (e *etcdSchemaRegistry) CreateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error {
+	if indexRule.Metadata.Id == 0 {
+		buf := []byte(indexRule.Metadata.Group)
+		buf = append(buf, indexRule.Metadata.Name...)
+		indexRule.Metadata.Id = crc32.ChecksumIEEE(buf)
+	}
 	return e.create(ctx, Metadata{
 		TypeMeta: TypeMeta{
 			Kind:  KindIndexRule,
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 1ed4126..647970e 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -32,6 +32,8 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/test"
 )
 
+var defaultEventallyTimeout = time.Minute
+
 var _ = Describe("Shard", func() {
 	Describe("Generate segments and blocks", func() {
 		var tmp string
@@ -73,7 +75,7 @@ var _ = Describe("Shard", func() {
 				})
 				Expect(err).NotTo(HaveOccurred())
 				return num
-			}).WithTimeout(30 * time.Second).Should(BeNumerically(">=", 3))
+			}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 3))
 			for _, d := range segDirectories {
 				Eventually(func() int {
 					num := 0
@@ -83,7 +85,7 @@ var _ = Describe("Shard", func() {
 					})
 					Expect(err).NotTo(HaveOccurred())
 					return num
-				}).WithTimeout(30 * time.Second).Should(BeNumerically(">=", 3))
+				}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 3))
 			}
 		})
 		It("closes blocks", func() {
@@ -112,7 +114,7 @@ var _ = Describe("Shard", func() {
 				})
 				Expect(errInternal).NotTo(HaveOccurred())
 				return num
-			}).WithTimeout(30 * time.Second).Should(BeNumerically(">=", 1))
+			}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 1))
 			Eventually(func() int {
 				num := 0
 				errInternal := tsdb.WalkDir(segDirectory, "block-", func(suffix, absolutePath string) error {
@@ -123,7 +125,7 @@ var _ = Describe("Shard", func() {
 				})
 				Expect(errInternal).NotTo(HaveOccurred())
 				return num
-			}).WithTimeout(30 * time.Second).Should(BeNumerically(">=", 1))
+			}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 1))
 		})
 		It("reopens closed blocks", func() {
 			var err error
@@ -147,7 +149,7 @@ var _ = Describe("Shard", func() {
 					}
 				}
 				return num
-			}).WithTimeout(30 * time.Second).Should(BeNumerically(">=", 2))
+			}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 2))
 			var closedBlocks []tsdb.BlockState
 			Eventually(func() int {
 				closedBlocks = nil
@@ -157,7 +159,7 @@ var _ = Describe("Shard", func() {
 					}
 				}
 				return len(closedBlocks)
-			}).WithTimeout(30 * time.Second).Should(BeNumerically(">=", 1))
+			}).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 1))
 			series, err := shard.Series().GetByID(common.SeriesID(11))
 			Expect(err).NotTo(HaveOccurred())
 			writeFn := func(bs tsdb.BlockState) {
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index cd77ae9..31a69b8 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -148,27 +148,34 @@ func (sr *schemaRepo) Watcher() {
 	}()
 	for {
 		select {
-		case evt := <-sr.eventCh:
-			var err error
-			switch evt.Typ {
-			case EventAddOrUpdate:
-				switch evt.Kind {
-				case EventKindGroup:
-					_, err = sr.StoreGroup(evt.Metadata)
-				case EventKindResource:
-					_, err = sr.storeResource(evt.Metadata)
+		case evt, more := <-sr.eventCh:
+			if !more {
+				return
+			}
+			sr.l.Info().Interface("event", evt).Msg("received an event")
+			for i := 0; i < 10; i++ {
+				var err error
+				switch evt.Typ {
+				case EventAddOrUpdate:
+					switch evt.Kind {
+					case EventKindGroup:
+						_, err = sr.StoreGroup(evt.Metadata)
+					case EventKindResource:
+						_, err = sr.storeResource(evt.Metadata)
+					}
+				case EventDelete:
+					switch evt.Kind {
+					case EventKindGroup:
+						err = sr.deleteGroup(evt.Metadata)
+					case EventKindResource:
+						err = sr.deleteResource(evt.Metadata)
+					}
 				}
-			case EventDelete:
-				switch evt.Kind {
-				case EventKindGroup:
-					err = sr.deleteGroup(evt.Metadata)
-				case EventKindResource:
-					err = sr.deleteResource(evt.Metadata)
+				if err == nil {
+					break
 				}
-			}
-			if err != nil {
+				time.Sleep(time.Second)
 				sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. retry...")
-				sr.eventCh <- evt
 			}
 		case <-sr.workerStopCh:
 			return