You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/16 10:31:52 UTC
[skywalking-banyandb] branch main updated: Make sure index rule ID is set by (#104)
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 0f78d0d Make sure index rule ID is set by (#104)
0f78d0d is described below
commit 0f78d0df112f17193812a21461bcbaea9ebd806a
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Apr 16 18:31:48 2022 +0800
Make sure index rule ID is set by (#104)
* Make sure index rule ID is set
* Add a default value based on 32 bits CRC to the ID
* Check the ID on getting and listing index rules
* Improve metadata event handler
* Add a new gRPC error code
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/liaison/grpc/grpc_suite_test.go | 3 +++
banyand/liaison/grpc/measure_test.go | 4 +--
banyand/liaison/grpc/stream_test.go | 4 +--
banyand/metadata/schema/error.go | 2 ++
banyand/metadata/schema/index.go | 15 +++++++++++-
banyand/tsdb/shard_test.go | 14 ++++++-----
pkg/schema/metadata.go | 43 +++++++++++++++++++--------------
7 files changed, 56 insertions(+), 29 deletions(-)
diff --git a/banyand/liaison/grpc/grpc_suite_test.go b/banyand/liaison/grpc/grpc_suite_test.go
index b247b7d..d62547f 100644
--- a/banyand/liaison/grpc/grpc_suite_test.go
+++ b/banyand/liaison/grpc/grpc_suite_test.go
@@ -19,6 +19,7 @@ package grpc_test
import (
"testing"
+ "time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -26,6 +27,8 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
)
+var defaultEventallyTimeout = 30 * time.Second
+
func TestGrpc(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Grpc Suite")
diff --git a/banyand/liaison/grpc/measure_test.go b/banyand/liaison/grpc/measure_test.go
index 4d44bf1..f54a172 100644
--- a/banyand/liaison/grpc/measure_test.go
+++ b/banyand/liaison/grpc/measure_test.go
@@ -71,7 +71,7 @@ var _ = Describe("Measure", func() {
return 0
}
return num
- }).Should(Equal(1))
+ }, defaultEventallyTimeout).Should(Equal(1))
})
It("is a TLS server", func() {
flags := []string{"--tls=true", "--measure-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
@@ -91,7 +91,7 @@ var _ = Describe("Measure", func() {
measureWrite(conn)
Eventually(func() (int, error) {
return measureQuery(conn)
- }).Should(Equal(1))
+ }, defaultEventallyTimeout).Should(Equal(1))
})
AfterEach(func() {
_ = conn.Close()
diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/stream_test.go
index dbc271d..269f9c7 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/stream_test.go
@@ -78,7 +78,7 @@ var _ = Describe("Stream", func() {
return 0
}
return num
- }).Should(Equal(1))
+ }, defaultEventallyTimeout).Should(Equal(1))
})
It("is a TLS server", func() {
flags := []string{"--tls=true", "--stream-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
@@ -98,7 +98,7 @@ var _ = Describe("Stream", func() {
streamWrite(conn)
Eventually(func() (int, error) {
return streamQuery(conn)
- }).Should(Equal(1))
+ }, defaultEventallyTimeout).Should(Equal(1))
})
AfterEach(func() {
_ = conn.Close()
diff --git a/banyand/metadata/schema/error.go b/banyand/metadata/schema/error.go
index 8853e0f..d9cf005 100644
--- a/banyand/metadata/schema/error.go
+++ b/banyand/metadata/schema/error.go
@@ -31,6 +31,8 @@ var (
ErrGRPCResourceNotFound = statusGRPCResourceNotFound.Err()
statusGRPCAlreadyExists = status.New(codes.AlreadyExists, "banyandb: resource already exists")
ErrGRPCAlreadyExists = statusGRPCAlreadyExists.Err()
+ statusDataLoss = status.New(codes.DataLoss, "banyandb: resource corrupts.")
+ ErrGRPCDataLoss = statusDataLoss.Err()
)
func IsNotFound(err error) bool {
diff --git a/banyand/metadata/schema/index.go b/banyand/metadata/schema/index.go
index c79ebfc..f3bf415 100644
--- a/banyand/metadata/schema/index.go
+++ b/banyand/metadata/schema/index.go
@@ -19,6 +19,7 @@ package schema
import (
"context"
+ "hash/crc32"
"google.golang.org/protobuf/proto"
@@ -93,6 +94,9 @@ func (e *etcdSchemaRegistry) GetIndexRule(ctx context.Context, metadata *commonv
if err := e.get(ctx, formatIndexRuleKey(metadata), &entity); err != nil {
return nil, err
}
+ if entity.Metadata.Id == 0 {
+ return nil, ErrGRPCDataLoss
+ }
return &entity, nil
}
@@ -108,12 +112,21 @@ func (e *etcdSchemaRegistry) ListIndexRule(ctx context.Context, opt ListOpt) ([]
}
entities := make([]*databasev1.IndexRule, 0, len(messages))
for _, message := range messages {
- entities = append(entities, message.(*databasev1.IndexRule))
+ entity := message.(*databasev1.IndexRule)
+ if entity.Metadata.Id == 0 {
+ return nil, ErrGRPCDataLoss
+ }
+ entities = append(entities, entity)
}
return entities, nil
}
func (e *etcdSchemaRegistry) CreateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error {
+ if indexRule.Metadata.Id == 0 {
+ buf := []byte(indexRule.Metadata.Group)
+ buf = append(buf, indexRule.Metadata.Name...)
+ indexRule.Metadata.Id = crc32.ChecksumIEEE(buf)
+ }
return e.create(ctx, Metadata{
TypeMeta: TypeMeta{
Kind: KindIndexRule,
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 1ed4126..647970e 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -32,6 +32,8 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test"
)
+var defaultEventallyTimeout = time.Minute
+
var _ = Describe("Shard", func() {
Describe("Generate segments and blocks", func() {
var tmp string
@@ -73,7 +75,7 @@ var _ = Describe("Shard", func() {
})
Expect(err).NotTo(HaveOccurred())
return num
- }).WithTimeout(30 * time.Second).Should(BeNumerically(">=", 3))
+ }).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 3))
for _, d := range segDirectories {
Eventually(func() int {
num := 0
@@ -83,7 +85,7 @@ var _ = Describe("Shard", func() {
})
Expect(err).NotTo(HaveOccurred())
return num
- }).WithTimeout(30 * time.Second).Should(BeNumerically(">=", 3))
+ }).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 3))
}
})
It("closes blocks", func() {
@@ -112,7 +114,7 @@ var _ = Describe("Shard", func() {
})
Expect(errInternal).NotTo(HaveOccurred())
return num
- }).WithTimeout(30 * time.Second).Should(BeNumerically(">=", 1))
+ }).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 1))
Eventually(func() int {
num := 0
errInternal := tsdb.WalkDir(segDirectory, "block-", func(suffix, absolutePath string) error {
@@ -123,7 +125,7 @@ var _ = Describe("Shard", func() {
})
Expect(errInternal).NotTo(HaveOccurred())
return num
- }).WithTimeout(30 * time.Second).Should(BeNumerically(">=", 1))
+ }).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 1))
})
It("reopens closed blocks", func() {
var err error
@@ -147,7 +149,7 @@ var _ = Describe("Shard", func() {
}
}
return num
- }).WithTimeout(30 * time.Second).Should(BeNumerically(">=", 2))
+ }).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 2))
var closedBlocks []tsdb.BlockState
Eventually(func() int {
closedBlocks = nil
@@ -157,7 +159,7 @@ var _ = Describe("Shard", func() {
}
}
return len(closedBlocks)
- }).WithTimeout(30 * time.Second).Should(BeNumerically(">=", 1))
+ }).WithTimeout(defaultEventallyTimeout).Should(BeNumerically(">=", 1))
series, err := shard.Series().GetByID(common.SeriesID(11))
Expect(err).NotTo(HaveOccurred())
writeFn := func(bs tsdb.BlockState) {
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index cd77ae9..31a69b8 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -148,27 +148,34 @@ func (sr *schemaRepo) Watcher() {
}()
for {
select {
- case evt := <-sr.eventCh:
- var err error
- switch evt.Typ {
- case EventAddOrUpdate:
- switch evt.Kind {
- case EventKindGroup:
- _, err = sr.StoreGroup(evt.Metadata)
- case EventKindResource:
- _, err = sr.storeResource(evt.Metadata)
+ case evt, more := <-sr.eventCh:
+ if !more {
+ return
+ }
+ sr.l.Info().Interface("event", evt).Msg("received an event")
+ for i := 0; i < 10; i++ {
+ var err error
+ switch evt.Typ {
+ case EventAddOrUpdate:
+ switch evt.Kind {
+ case EventKindGroup:
+ _, err = sr.StoreGroup(evt.Metadata)
+ case EventKindResource:
+ _, err = sr.storeResource(evt.Metadata)
+ }
+ case EventDelete:
+ switch evt.Kind {
+ case EventKindGroup:
+ err = sr.deleteGroup(evt.Metadata)
+ case EventKindResource:
+ err = sr.deleteResource(evt.Metadata)
+ }
}
- case EventDelete:
- switch evt.Kind {
- case EventKindGroup:
- err = sr.deleteGroup(evt.Metadata)
- case EventKindResource:
- err = sr.deleteResource(evt.Metadata)
+ if err == nil {
+ break
}
- }
- if err != nil {
+ time.Sleep(time.Second)
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. retry...")
- sr.eventCh <- evt
}
case <-sr.workerStopCh:
return