You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/10/13 04:29:57 UTC

[skywalking-banyandb] 01/01: Fixes some issues found by Java client

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch client
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 5267d6c13c2ef820c766cc551c371c2a82c8c823
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Oct 13 04:27:49 2022 +0000

    Fixes some issues found by Java client
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/liaison/grpc/server.go                 | 18 ++++++++++--------
 banyand/liaison/http/server.go                 | 23 +++++++++++++----------
 banyand/tsdb/index/writer.go                   |  4 +++-
 bydbctl/internal/cmd/cmd_suite_test.go         |  9 +++++++++
 pkg/partition/index.go                         |  4 +++-
 pkg/query/logical/common.go                    |  4 +++-
 pkg/query/logical/index_filter.go              |  8 ++++----
 pkg/query/logical/tag_filter.go                |  9 ++++++---
 test/cases/stream/data/input/global_index.yaml | 15 +++++++++------
 test/cases/stream/stream.go                    |  2 +-
 10 files changed, 61 insertions(+), 35 deletions(-)

diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index aa23da8..66cc2b4 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -178,13 +178,6 @@ func (s *Server) Validate() error {
 }
 
 func (s *Server) Serve() run.StopNotify {
-	lis, err := net.Listen("tcp", s.addr)
-	if err != nil {
-		s.log.Fatal().Err(err).Msg("Failed to listen")
-	}
-	if errValidate := s.Validate(); errValidate != nil {
-		s.log.Fatal().Err(errValidate).Msg("Failed to validate data")
-	}
 	var opts []grpclib.ServerOption
 	if s.tls {
 		opts = []grpclib.ServerOption{grpclib.Creds(s.creds)}
@@ -208,8 +201,17 @@ func (s *Server) Serve() run.StopNotify {
 
 	s.stopCh = make(chan struct{})
 	go func() {
+		lis, err := net.Listen("tcp", s.addr)
+		if err != nil {
+			s.log.Error().Err(err).Msg("Failed to listen")
+			close(s.stopCh)
+			return
+		}
 		s.log.Info().Str("addr", s.addr).Msg("Listening to")
-		_ = s.ser.Serve(lis)
+		err = s.ser.Serve(lis)
+		if err != nil {
+			s.log.Error().Err(err).Msg("server is interrupted")
+		}
 		close(s.stopCh)
 	}()
 	return s.stopCh
diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index 7800296..e416286 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -91,7 +91,14 @@ func (p *service) PreRun() error {
 	fileServer := stdhttp.FileServer(stdhttp.FS(fSys))
 	serveIndex := serveFileContents("index.html", httpFS)
 	p.mux.Mount("/", intercept404(fileServer, serveIndex))
+	p.srv = &stdhttp.Server{
+		Addr:    p.listenAddr,
+		Handler: p.mux,
+	}
+	return nil
+}
 
+func (p *service) Serve() run.StopNotify {
 	var ctx context.Context
 	ctx, p.clientCloser = context.WithCancel(context.Background())
 	opts := []grpc.DialOption{
@@ -100,7 +107,9 @@ func (p *service) PreRun() error {
 	}
 	client, err := newHealthCheckClient(ctx, p.l, p.grpcAddr, opts)
 	if err != nil {
-		return err
+		p.l.Error().Err(err).Msg("Failed to health check client")
+		close(p.stopCh)
+		return p.stopCh
 	}
 	gwMux := runtime.NewServeMux(runtime.WithHealthzEndpoint(client))
 	err = multierr.Combine(
@@ -114,17 +123,11 @@ func (p *service) PreRun() error {
 		property_v1.RegisterPropertyServiceHandlerFromEndpoint(ctx, gwMux, p.grpcAddr, opts),
 	)
 	if err != nil {
-		return err
+		p.l.Error().Err(err).Msg("Failed to register endpoints")
+		close(p.stopCh)
+		return p.stopCh
 	}
 	p.mux.Mount("/api", http.StripPrefix("/api", gwMux))
-	p.srv = &stdhttp.Server{
-		Addr:    p.listenAddr,
-		Handler: p.mux,
-	}
-	return nil
-}
-
-func (p *service) Serve() run.StopNotify {
 	go func() {
 		p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start liaison http server")
 		if err := p.srv.ListenAndServe(); err != http.ErrServerClosed {
diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go
index a7b72f4..95dcf48 100644
--- a/banyand/tsdb/index/writer.go
+++ b/banyand/tsdb/index/writer.go
@@ -242,7 +242,9 @@ func getIndexValue(ruleIndex *partition.IndexRuleLocator, value Value) (val [][]
 	val = make([][]byte, 0)
 	var existInt bool
 	if len(ruleIndex.TagIndices) != 1 {
-		return nil, false, errors.Wrap(ErrUnsupportedIndexType, "the index rule didn't support composited tags")
+		return nil, false, errors.WithMessagef(ErrUnsupportedIndexType,
+			"the index rule %s(%v) didn't support composited tags",
+			ruleIndex.Rule.Metadata.Name, ruleIndex.Rule.Tags)
 	}
 	tIndex := ruleIndex.TagIndices[0]
 	tag, err := partition.GetTagByOffset(value.TagFamilies, tIndex.FamilyOffset, tIndex.TagOffset)
diff --git a/bydbctl/internal/cmd/cmd_suite_test.go b/bydbctl/internal/cmd/cmd_suite_test.go
index 36313e2..6dc56ae 100644
--- a/bydbctl/internal/cmd/cmd_suite_test.go
+++ b/bydbctl/internal/cmd/cmd_suite_test.go
@@ -22,9 +22,18 @@ import (
 
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
+
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 func TestCmd(t *testing.T) {
 	RegisterFailHandler(Fail)
 	RunSpecs(t, "Cmd Suite")
 }
+
+var _ = BeforeSuite(func() {
+	Expect(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "warn",
+	})).To(Succeed())
+})
diff --git a/pkg/partition/index.go b/pkg/partition/index.go
index a6f8b81..0cf38b3 100644
--- a/pkg/partition/index.go
+++ b/pkg/partition/index.go
@@ -36,7 +36,9 @@ func ParseIndexRuleLocators(families []*databasev1.TagFamilySpec, indexRules []*
 				tagIndices = append(tagIndices, TagLocator{FamilyOffset: fIndex, TagOffset: tIndex})
 			}
 		}
-		locators = append(locators, &IndexRuleLocator{Rule: rule, TagIndices: tagIndices})
+		if len(tagIndices) > 0 {
+			locators = append(locators, &IndexRuleLocator{Rule: rule, TagIndices: tagIndices})
+		}
 	}
 	return locators
 }
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index a80b0fc..3537ce7 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -31,7 +31,9 @@ import (
 var (
 	ErrTagNotDefined              = errors.New("tag is not defined")
 	ErrFieldNotDefined            = errors.New("field is not defined")
-	ErrInvalidConditionType       = errors.New("invalid pair type")
+	ErrUnsupportedConditionOp     = errors.New("unsupported condition operation")
+	ErrUnsupportedConditionValue  = errors.New("unsupported condition value type")
+	ErrInvalidCriteriaType        = errors.New("invalid criteria type")
 	ErrIncompatibleQueryCondition = errors.New("incompatible query condition type")
 	ErrIndexNotDefined            = errors.New("index is not define for the tag")
 	ErrMultipleGlobalIndexes      = errors.New("multiple global indexes are not supported")
diff --git a/pkg/query/logical/index_filter.go b/pkg/query/logical/index_filter.go
index 059ef34..c92ecf8 100644
--- a/pkg/query/logical/index_filter.go
+++ b/pkg/query/logical/index_filter.go
@@ -96,7 +96,7 @@ func BuildLocalFilter(criteria *model_v1.Criteria, schema Schema, entityDict map
 		}
 
 	}
-	return nil, nil, ErrInvalidConditionType
+	return nil, nil, ErrInvalidCriteriaType
 }
 
 func parseCondition(cond *model_v1.Condition, indexRule *database_v1.IndexRule, expr LiteralExpr, entity tsdb.Entity) (index.Filter, []tsdb.Entity, error) {
@@ -140,7 +140,7 @@ func parseCondition(cond *model_v1.Condition, indexRule *database_v1.IndexRule,
 		}
 		return newNot(indexRule, and), []tsdb.Entity{entity}, nil
 	}
-	return nil, nil, ErrInvalidConditionType
+	return nil, nil, errors.WithMessagef(ErrUnsupportedConditionOp, "index filter parses %v", cond)
 }
 
 func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond *model_v1.Condition) (LiteralExpr, tsdb.Entity, error) {
@@ -148,7 +148,7 @@ func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond *mode
 	copy(parsedEntity, entity)
 	entityIdx, ok := entityDict[cond.Name]
 	if ok && cond.Op != model_v1.Condition_BINARY_OP_EQ {
-		return nil, nil, errors.WithMessagef(ErrInvalidConditionType, "tag belongs to the entity only supports EQ operation in condition(%v)", cond)
+		return nil, nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag belongs to the entity only supports EQ operation in condition(%v)", cond)
 	}
 	switch v := cond.Value.Value.(type) {
 	case *model_v1.TagValue_Str:
@@ -183,7 +183,7 @@ func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond *mode
 	case *model_v1.TagValue_Null:
 		return nullLiteralExpr, nil, nil
 	}
-	return nil, nil, ErrInvalidConditionType
+	return nil, nil, errors.WithMessagef(ErrUnsupportedConditionValue, "index filter parses %v", cond)
 }
 
 func parseEntities(op model_v1.LogicalExpression_LogicalOp, input tsdb.Entity, left, right []tsdb.Entity) []tsdb.Entity {
diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go
index fd0c58a..5e0ef95 100644
--- a/pkg/query/logical/tag_filter.go
+++ b/pkg/query/logical/tag_filter.go
@@ -35,6 +35,9 @@ type TagFilter interface {
 }
 
 func BuildTagFilter(criteria *model_v1.Criteria, entityDict map[string]int, schema Schema, hasGlobalIndex bool) (TagFilter, error) {
+	if criteria == nil {
+		return BypassFilter, nil
+	}
 	switch criteria.GetExp().(type) {
 	case *model_v1.Criteria_Condition:
 		cond := criteria.GetCondition()
@@ -77,7 +80,7 @@ func BuildTagFilter(criteria *model_v1.Criteria, entityDict map[string]int, sche
 		}
 
 	}
-	return nil, ErrInvalidConditionType
+	return nil, ErrInvalidCriteriaType
 }
 
 func parseFilter(cond *model_v1.Condition, expr ComparableExpr) (TagFilter, error) {
@@ -109,7 +112,7 @@ func parseFilter(cond *model_v1.Condition, expr ComparableExpr) (TagFilter, erro
 	case model_v1.Condition_BINARY_OP_NOT_HAVING:
 		return newNotTag(newHavingTag(cond.Name, expr)), nil
 	}
-	return nil, ErrInvalidConditionType
+	return nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag filter parses %v", cond)
 }
 
 func parseExpr(value *model_v1.TagValue) (ComparableExpr, error) {
@@ -133,7 +136,7 @@ func parseExpr(value *model_v1.TagValue) (ComparableExpr, error) {
 	case *model_v1.TagValue_Null:
 		return nullLiteralExpr, nil
 	}
-	return nil, ErrInvalidConditionType
+	return nil, errors.WithMessagef(ErrUnsupportedConditionValue, "tag filter parses %v", value)
 }
 
 var BypassFilter = new(emptyFilter)
diff --git a/test/cases/stream/data/input/global_index.yaml b/test/cases/stream/data/input/global_index.yaml
index 84267ef..fb1b2b0 100644
--- a/test/cases/stream/data/input/global_index.yaml
+++ b/test/cases/stream/data/input/global_index.yaml
@@ -25,9 +25,12 @@ projection:
   - name: "data"
     tags: ["data_binary"]
 criteria:
-  condition:
-    name: "trace_id"
-    op: "BINARY_OP_EQ"
-    value:
-      str:
-        value: "1"
+  le:
+    op: "LOGICAL_OP_AND"
+    left:
+      condition:
+        name: "trace_id"
+        op: "BINARY_OP_EQ"
+        value:
+          str:
+            value: "1"
diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go
index 210b973..d99d11c 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/stream.go
@@ -48,7 +48,7 @@ var _ = g.DescribeTable("Scanning Streams", verify,
 		End:   timestamppb.New(time.Unix(0, math.MaxInt64).Truncate(time.Millisecond)),
 	}),
 	g.Entry("sort desc", helpers.Args{Input: "sort_desc", Duration: 1 * time.Hour}),
-	g.Entry("global index", helpers.Args{Input: "global_index", Duration: 1 * time.Hour}),
+	g.FEntry("global index", helpers.Args{Input: "global_index", Duration: 1 * time.Hour}),
 	g.Entry("filter by non-indexed tag", helpers.Args{Input: "filter_tag", Duration: 1 * time.Hour}),
 	g.Entry("get empty result by non-indexed tag", helpers.Args{Input: "filter_tag_empty", Duration: 1 * time.Hour, WantEmpty: true}),
 	g.Entry("numeric local index: less", helpers.Args{Input: "less", Duration: 1 * time.Hour}),