You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/10/13 04:29:57 UTC
[skywalking-banyandb] 01/01: Fixes some issues found by Java client
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch client
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 5267d6c13c2ef820c766cc551c371c2a82c8c823
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Oct 13 04:27:49 2022 +0000
Fixes some issues found by Java client
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/liaison/grpc/server.go | 18 ++++++++++--------
banyand/liaison/http/server.go | 23 +++++++++++++----------
banyand/tsdb/index/writer.go | 4 +++-
bydbctl/internal/cmd/cmd_suite_test.go | 9 +++++++++
pkg/partition/index.go | 4 +++-
pkg/query/logical/common.go | 4 +++-
pkg/query/logical/index_filter.go | 8 ++++----
pkg/query/logical/tag_filter.go | 9 ++++++---
test/cases/stream/data/input/global_index.yaml | 15 +++++++++------
test/cases/stream/stream.go | 2 +-
10 files changed, 61 insertions(+), 35 deletions(-)
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index aa23da8..66cc2b4 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -178,13 +178,6 @@ func (s *Server) Validate() error {
}
func (s *Server) Serve() run.StopNotify {
- lis, err := net.Listen("tcp", s.addr)
- if err != nil {
- s.log.Fatal().Err(err).Msg("Failed to listen")
- }
- if errValidate := s.Validate(); errValidate != nil {
- s.log.Fatal().Err(errValidate).Msg("Failed to validate data")
- }
var opts []grpclib.ServerOption
if s.tls {
opts = []grpclib.ServerOption{grpclib.Creds(s.creds)}
@@ -208,8 +201,17 @@ func (s *Server) Serve() run.StopNotify {
s.stopCh = make(chan struct{})
go func() {
+ lis, err := net.Listen("tcp", s.addr)
+ if err != nil {
+ s.log.Error().Err(err).Msg("Failed to listen")
+ close(s.stopCh)
+ return
+ }
s.log.Info().Str("addr", s.addr).Msg("Listening to")
- _ = s.ser.Serve(lis)
+ err = s.ser.Serve(lis)
+ if err != nil {
+ s.log.Error().Err(err).Msg("server is interrupted")
+ }
close(s.stopCh)
}()
return s.stopCh
diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index 7800296..e416286 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -91,7 +91,14 @@ func (p *service) PreRun() error {
fileServer := stdhttp.FileServer(stdhttp.FS(fSys))
serveIndex := serveFileContents("index.html", httpFS)
p.mux.Mount("/", intercept404(fileServer, serveIndex))
+ p.srv = &stdhttp.Server{
+ Addr: p.listenAddr,
+ Handler: p.mux,
+ }
+ return nil
+}
+func (p *service) Serve() run.StopNotify {
var ctx context.Context
ctx, p.clientCloser = context.WithCancel(context.Background())
opts := []grpc.DialOption{
@@ -100,7 +107,9 @@ func (p *service) PreRun() error {
}
client, err := newHealthCheckClient(ctx, p.l, p.grpcAddr, opts)
if err != nil {
- return err
+ p.l.Error().Err(err).Msg("Failed to health check client")
+ close(p.stopCh)
+ return p.stopCh
}
gwMux := runtime.NewServeMux(runtime.WithHealthzEndpoint(client))
err = multierr.Combine(
@@ -114,17 +123,11 @@ func (p *service) PreRun() error {
property_v1.RegisterPropertyServiceHandlerFromEndpoint(ctx, gwMux, p.grpcAddr, opts),
)
if err != nil {
- return err
+ p.l.Error().Err(err).Msg("Failed to register endpoints")
+ close(p.stopCh)
+ return p.stopCh
}
p.mux.Mount("/api", http.StripPrefix("/api", gwMux))
- p.srv = &stdhttp.Server{
- Addr: p.listenAddr,
- Handler: p.mux,
- }
- return nil
-}
-
-func (p *service) Serve() run.StopNotify {
go func() {
p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start liaison http server")
if err := p.srv.ListenAndServe(); err != http.ErrServerClosed {
diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go
index a7b72f4..95dcf48 100644
--- a/banyand/tsdb/index/writer.go
+++ b/banyand/tsdb/index/writer.go
@@ -242,7 +242,9 @@ func getIndexValue(ruleIndex *partition.IndexRuleLocator, value Value) (val [][]
val = make([][]byte, 0)
var existInt bool
if len(ruleIndex.TagIndices) != 1 {
- return nil, false, errors.Wrap(ErrUnsupportedIndexType, "the index rule didn't support composited tags")
+ return nil, false, errors.WithMessagef(ErrUnsupportedIndexType,
+ "the index rule %s(%v) didn't support composited tags",
+ ruleIndex.Rule.Metadata.Name, ruleIndex.Rule.Tags)
}
tIndex := ruleIndex.TagIndices[0]
tag, err := partition.GetTagByOffset(value.TagFamilies, tIndex.FamilyOffset, tIndex.TagOffset)
diff --git a/bydbctl/internal/cmd/cmd_suite_test.go b/bydbctl/internal/cmd/cmd_suite_test.go
index 36313e2..6dc56ae 100644
--- a/bydbctl/internal/cmd/cmd_suite_test.go
+++ b/bydbctl/internal/cmd/cmd_suite_test.go
@@ -22,9 +22,18 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
+
+ "github.com/apache/skywalking-banyandb/pkg/logger"
)
func TestCmd(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Cmd Suite")
}
+
+var _ = BeforeSuite(func() {
+ Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "warn",
+ })).To(Succeed())
+})
diff --git a/pkg/partition/index.go b/pkg/partition/index.go
index a6f8b81..0cf38b3 100644
--- a/pkg/partition/index.go
+++ b/pkg/partition/index.go
@@ -36,7 +36,9 @@ func ParseIndexRuleLocators(families []*databasev1.TagFamilySpec, indexRules []*
tagIndices = append(tagIndices, TagLocator{FamilyOffset: fIndex, TagOffset: tIndex})
}
}
- locators = append(locators, &IndexRuleLocator{Rule: rule, TagIndices: tagIndices})
+ if len(tagIndices) > 0 {
+ locators = append(locators, &IndexRuleLocator{Rule: rule, TagIndices: tagIndices})
+ }
}
return locators
}
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index a80b0fc..3537ce7 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -31,7 +31,9 @@ import (
var (
ErrTagNotDefined = errors.New("tag is not defined")
ErrFieldNotDefined = errors.New("field is not defined")
- ErrInvalidConditionType = errors.New("invalid pair type")
+ ErrUnsupportedConditionOp = errors.New("unsupported condition operation")
+ ErrUnsupportedConditionValue = errors.New("unsupported condition value type")
+ ErrInvalidCriteriaType = errors.New("invalid criteria type")
ErrIncompatibleQueryCondition = errors.New("incompatible query condition type")
ErrIndexNotDefined = errors.New("index is not define for the tag")
ErrMultipleGlobalIndexes = errors.New("multiple global indexes are not supported")
diff --git a/pkg/query/logical/index_filter.go b/pkg/query/logical/index_filter.go
index 059ef34..c92ecf8 100644
--- a/pkg/query/logical/index_filter.go
+++ b/pkg/query/logical/index_filter.go
@@ -96,7 +96,7 @@ func BuildLocalFilter(criteria *model_v1.Criteria, schema Schema, entityDict map
}
}
- return nil, nil, ErrInvalidConditionType
+ return nil, nil, ErrInvalidCriteriaType
}
func parseCondition(cond *model_v1.Condition, indexRule *database_v1.IndexRule, expr LiteralExpr, entity tsdb.Entity) (index.Filter, []tsdb.Entity, error) {
@@ -140,7 +140,7 @@ func parseCondition(cond *model_v1.Condition, indexRule *database_v1.IndexRule,
}
return newNot(indexRule, and), []tsdb.Entity{entity}, nil
}
- return nil, nil, ErrInvalidConditionType
+ return nil, nil, errors.WithMessagef(ErrUnsupportedConditionOp, "index filter parses %v", cond)
}
func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond *model_v1.Condition) (LiteralExpr, tsdb.Entity, error) {
@@ -148,7 +148,7 @@ func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond *mode
copy(parsedEntity, entity)
entityIdx, ok := entityDict[cond.Name]
if ok && cond.Op != model_v1.Condition_BINARY_OP_EQ {
- return nil, nil, errors.WithMessagef(ErrInvalidConditionType, "tag belongs to the entity only supports EQ operation in condition(%v)", cond)
+ return nil, nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag belongs to the entity only supports EQ operation in condition(%v)", cond)
}
switch v := cond.Value.Value.(type) {
case *model_v1.TagValue_Str:
@@ -183,7 +183,7 @@ func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond *mode
case *model_v1.TagValue_Null:
return nullLiteralExpr, nil, nil
}
- return nil, nil, ErrInvalidConditionType
+ return nil, nil, errors.WithMessagef(ErrUnsupportedConditionValue, "index filter parses %v", cond)
}
func parseEntities(op model_v1.LogicalExpression_LogicalOp, input tsdb.Entity, left, right []tsdb.Entity) []tsdb.Entity {
diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go
index fd0c58a..5e0ef95 100644
--- a/pkg/query/logical/tag_filter.go
+++ b/pkg/query/logical/tag_filter.go
@@ -35,6 +35,9 @@ type TagFilter interface {
}
func BuildTagFilter(criteria *model_v1.Criteria, entityDict map[string]int, schema Schema, hasGlobalIndex bool) (TagFilter, error) {
+ if criteria == nil {
+ return BypassFilter, nil
+ }
switch criteria.GetExp().(type) {
case *model_v1.Criteria_Condition:
cond := criteria.GetCondition()
@@ -77,7 +80,7 @@ func BuildTagFilter(criteria *model_v1.Criteria, entityDict map[string]int, sche
}
}
- return nil, ErrInvalidConditionType
+ return nil, ErrInvalidCriteriaType
}
func parseFilter(cond *model_v1.Condition, expr ComparableExpr) (TagFilter, error) {
@@ -109,7 +112,7 @@ func parseFilter(cond *model_v1.Condition, expr ComparableExpr) (TagFilter, erro
case model_v1.Condition_BINARY_OP_NOT_HAVING:
return newNotTag(newHavingTag(cond.Name, expr)), nil
}
- return nil, ErrInvalidConditionType
+ return nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag filter parses %v", cond)
}
func parseExpr(value *model_v1.TagValue) (ComparableExpr, error) {
@@ -133,7 +136,7 @@ func parseExpr(value *model_v1.TagValue) (ComparableExpr, error) {
case *model_v1.TagValue_Null:
return nullLiteralExpr, nil
}
- return nil, ErrInvalidConditionType
+ return nil, errors.WithMessagef(ErrUnsupportedConditionValue, "tag filter parses %v", value)
}
var BypassFilter = new(emptyFilter)
diff --git a/test/cases/stream/data/input/global_index.yaml b/test/cases/stream/data/input/global_index.yaml
index 84267ef..fb1b2b0 100644
--- a/test/cases/stream/data/input/global_index.yaml
+++ b/test/cases/stream/data/input/global_index.yaml
@@ -25,9 +25,12 @@ projection:
- name: "data"
tags: ["data_binary"]
criteria:
- condition:
- name: "trace_id"
- op: "BINARY_OP_EQ"
- value:
- str:
- value: "1"
+ le:
+ op: "LOGICAL_OP_AND"
+ left:
+ condition:
+ name: "trace_id"
+ op: "BINARY_OP_EQ"
+ value:
+ str:
+ value: "1"
diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go
index 210b973..d99d11c 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/stream.go
@@ -48,7 +48,7 @@ var _ = g.DescribeTable("Scanning Streams", verify,
End: timestamppb.New(time.Unix(0, math.MaxInt64).Truncate(time.Millisecond)),
}),
g.Entry("sort desc", helpers.Args{Input: "sort_desc", Duration: 1 * time.Hour}),
- g.Entry("global index", helpers.Args{Input: "global_index", Duration: 1 * time.Hour}),
+ g.FEntry("global index", helpers.Args{Input: "global_index", Duration: 1 * time.Hour}),
g.Entry("filter by non-indexed tag", helpers.Args{Input: "filter_tag", Duration: 1 * time.Hour}),
g.Entry("get empty result by non-indexed tag", helpers.Args{Input: "filter_tag_empty", Duration: 1 * time.Hour, WantEmpty: true}),
g.Entry("numeric local index: less", helpers.Args{Input: "less", Duration: 1 * time.Hour}),