You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/01/04 06:25:49 UTC

[skywalking-banyandb] branch kv created (now a2ac6f0)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch kv
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


      at a2ac6f0  Correct the failure of opening existing blocks

This branch includes the following new commits:

     new a2ac6f0  Correct the failure of opening existing blocks

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking-banyandb] 01/01: Correct the failure of opening existing blocks

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch kv
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit a2ac6f00334027ea2a2a84713f579bbf358f951a
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Jan 4 06:25:07 2023 +0000

    Correct the failure of opening existing blocks
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 api/proto/banyandb/measure/v1/query.proto          |   2 +-
 api/proto/banyandb/model/v1/query.proto            |   2 +-
 banyand/tsdb/block_ctrl.go                         |   4 +-
 banyand/tsdb/segment_ctrl.go                       |   4 +-
 banyand/tsdb/shard.go                              |   4 +-
 banyand/tsdb/tsdb.go                               |  12 ++
 pkg/query/logical/index_filter.go                  |  79 ++++++++++--
 .../entity.yaml => input/all_only_fields.yaml}     |  17 +--
 test/cases/measure/data/input/entity.yaml          |   2 +-
 .../data/input/{entity.yaml => entity_in.yaml}     |   6 +-
 test/cases/measure/data/want/all_only_fields.yaml  | 138 +++++++++++++++++++++
 test/cases/measure/data/want/entity.yaml           |   4 -
 .../data/want/{entity.yaml => entity_in.yaml}      |  34 +++--
 test/cases/measure/measure.go                      |   4 +-
 14 files changed, 259 insertions(+), 53 deletions(-)

diff --git a/api/proto/banyandb/measure/v1/query.proto b/api/proto/banyandb/measure/v1/query.proto
index 53df387..8350ba8 100644
--- a/api/proto/banyandb/measure/v1/query.proto
+++ b/api/proto/banyandb/measure/v1/query.proto
@@ -57,7 +57,7 @@ message QueryRequest {
   // tag_families are indexed.
   model.v1.Criteria criteria = 4;
   // tag_projection can be used to select tags of the data points in the response
-  model.v1.TagProjection tag_projection = 5 [(validate.rules).message.required = true];
+  model.v1.TagProjection tag_projection = 5;
   message FieldProjection {
     repeated string names = 1;
   }
diff --git a/api/proto/banyandb/model/v1/query.proto b/api/proto/banyandb/model/v1/query.proto
index cb4a7a1..2fd325b 100644
--- a/api/proto/banyandb/model/v1/query.proto
+++ b/api/proto/banyandb/model/v1/query.proto
@@ -110,7 +110,7 @@ message TagProjection {
     string name = 1;
     repeated string tags = 2;
   }
-  repeated TagFamily tag_families = 1 [(validate.rules).repeated.min_items = 1];
+  repeated TagFamily tag_families = 1;
 }
 
 // TimeRange is a range query for uint64,
diff --git a/banyand/tsdb/block_ctrl.go b/banyand/tsdb/block_ctrl.go
index b1071d3..8d251c4 100644
--- a/banyand/tsdb/block_ctrl.go
+++ b/banyand/tsdb/block_ctrl.go
@@ -226,7 +226,7 @@ func (bc *blockController) closeBlock(ctx context.Context, blockID SectionID) er
 func (bc *blockController) open() error {
 	bc.Lock()
 	defer bc.Unlock()
-	return loadSections(bc.location, bc, bc.blockSize, func(start, end time.Time) error {
+	return loadSections(bc.location, blockPathPrefix, bc, bc.blockSize, func(start, end time.Time) error {
 		_, err := bc.load(start, end, bc.location)
 		return err
 	})
@@ -261,7 +261,7 @@ func (bc *blockController) create(start time.Time) (*block, error) {
 	if end.After(bc.segTimeRange.End) {
 		end = bc.segTimeRange.End
 	}
-	_, err := mkdir(blockTemplate, bc.location, bc.Format(start))
+	_, err := mkdirIfNotExist(blockTemplate, bc.location, bc.Format(start))
 	if err != nil {
 		return nil, err
 	}
diff --git a/banyand/tsdb/segment_ctrl.go b/banyand/tsdb/segment_ctrl.go
index 98653a2..487295f 100644
--- a/banyand/tsdb/segment_ctrl.go
+++ b/banyand/tsdb/segment_ctrl.go
@@ -185,7 +185,7 @@ func (sc *segmentController) Parse(value string) (time.Time, error) {
 func (sc *segmentController) open() error {
 	sc.Lock()
 	defer sc.Unlock()
-	return loadSections(sc.location, sc, sc.segmentSize, func(start, end time.Time) error {
+	return loadSections(sc.location, segPathPrefix, sc, sc.segmentSize, func(start, end time.Time) error {
 		_, err := sc.load(start, end, sc.location)
 		if errors.Is(err, errEndOfSegment) {
 			return nil
@@ -214,7 +214,7 @@ func (sc *segmentController) create(start time.Time) (*segment, error) {
 	} else {
 		end = stdEnd
 	}
-	_, err := mkdir(segTemplate, sc.location, sc.Format(start))
+	_, err := mkdirIfNotExist(segTemplate, sc.location, sc.Format(start))
 	if err != nil {
 		return nil, err
 	}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 07b89a7..2bd94a7 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -228,11 +228,11 @@ type parser interface {
 	Parse(value string) (time.Time, error)
 }
 
-func loadSections(root string, parser parser, intervalRule IntervalRule, loadFn func(start, end time.Time) error) error {
+func loadSections(root, prefix string, parser parser, intervalRule IntervalRule, loadFn func(start, end time.Time) error) error {
 	var startTimeLst []time.Time
 	if err := walkDir(
 		root,
-		segPathPrefix,
+		prefix,
 		func(suffix string) error {
 			startTime, err := parser.Parse(suffix)
 			if err != nil {
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 9e65920..acc50f9 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -350,3 +350,15 @@ func mkdir(format string, a ...interface{}) (path string, err error) {
 	}
 	return path, err
 }
+
+func mkdirIfNotExist(format string, a ...interface{}) (path string, err error) {
+	path = fmt.Sprintf(format, a...)
+	if _, err = os.Stat(path); errors.Is(err, os.ErrNotExist) {
+		if err = os.MkdirAll(path, dirPerm); err != nil {
+			return "", errors.Wrapf(err, "failed to create %s", path)
+		}
+	} else {
+		return "", os.ErrExist
+	}
+	return path, err
+}
diff --git a/pkg/query/logical/index_filter.go b/pkg/query/logical/index_filter.go
index f7f2882..edd86a5 100644
--- a/pkg/query/logical/index_filter.go
+++ b/pkg/query/logical/index_filter.go
@@ -59,7 +59,7 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[
 			return nil, nil, err
 		}
 		if parsedEntity != nil {
-			return nil, []tsdb.Entity{parsedEntity}, nil
+			return nil, parsedEntity, nil
 		}
 		if ok, indexRule := schema.IndexDefined(cond.Name); ok {
 			if indexRule.Location == databasev1.IndexRule_LOCATION_GLOBAL {
@@ -168,40 +168,64 @@ func parseCondition(cond *modelv1.Condition, indexRule *databasev1.IndexRule, ex
 	return nil, nil, errors.WithMessagef(errUnsupportedConditionOp, "index filter parses %v", cond)
 }
 
-func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond *modelv1.Condition) (LiteralExpr, tsdb.Entity, error) {
-	parsedEntity := make(tsdb.Entity, len(entity))
-	copy(parsedEntity, entity)
+func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond *modelv1.Condition) (LiteralExpr, []tsdb.Entity, error) {
 	entityIdx, ok := entityDict[cond.Name]
-	if ok && cond.Op != modelv1.Condition_BINARY_OP_EQ {
-		return nil, nil, errors.WithMessagef(errUnsupportedConditionOp, "tag belongs to the entity only supports EQ operation in condition(%v)", cond)
+	if ok && cond.Op != modelv1.Condition_BINARY_OP_EQ && cond.Op != modelv1.Condition_BINARY_OP_IN {
+		return nil, nil, errors.WithMessagef(errUnsupportedConditionOp, "tag belongs to the entity only supports EQ or IN operation in condition(%v)", cond)
 	}
 	switch v := cond.Value.Value.(type) {
 	case *modelv1.TagValue_Str:
 		if ok {
+			parsedEntity := make(tsdb.Entity, len(entity))
+			copy(parsedEntity, entity)
 			parsedEntity[entityIdx] = []byte(v.Str.GetValue())
-			return nil, parsedEntity, nil
+			return nil, []tsdb.Entity{parsedEntity}, nil
 		}
 		return str(v.Str.GetValue()), nil, nil
 	case *modelv1.TagValue_Id:
 		if ok {
+			parsedEntity := make(tsdb.Entity, len(entity))
+			copy(parsedEntity, entity)
 			parsedEntity[entityIdx] = []byte(v.Id.GetValue())
-			return nil, parsedEntity, nil
+			return nil, []tsdb.Entity{parsedEntity}, nil
 		}
 		return id(v.Id.GetValue()), nil, nil
 
 	case *modelv1.TagValue_StrArray:
+		if ok && cond.Op == modelv1.Condition_BINARY_OP_IN {
+			entities := make([]tsdb.Entity, len(v.StrArray.Value))
+			for i, va := range v.StrArray.Value {
+				parsedEntity := make(tsdb.Entity, len(entity))
+				copy(parsedEntity, entity)
+				parsedEntity[entityIdx] = []byte(va)
+				entities[i] = parsedEntity
+			}
+			return nil, entities, nil
+		}
 		return &strArrLiteral{
 			arr: v.StrArray.GetValue(),
 		}, nil, nil
 	case *modelv1.TagValue_Int:
 		if ok {
+			parsedEntity := make(tsdb.Entity, len(entity))
+			copy(parsedEntity, entity)
 			parsedEntity[entityIdx] = convert.Int64ToBytes(v.Int.GetValue())
-			return nil, parsedEntity, nil
+			return nil, []tsdb.Entity{parsedEntity}, nil
 		}
 		return &int64Literal{
 			int64: v.Int.GetValue(),
 		}, nil, nil
 	case *modelv1.TagValue_IntArray:
+		if ok && cond.Op == modelv1.Condition_BINARY_OP_IN {
+			entities := make([]tsdb.Entity, len(v.IntArray.Value))
+			for i, va := range v.IntArray.Value {
+				parsedEntity := make(tsdb.Entity, len(entity))
+				copy(parsedEntity, entity)
+				parsedEntity[entityIdx] = convert.Int64ToBytes(va)
+				entities[i] = parsedEntity
+			}
+			return nil, entities, nil
+		}
 		return &int64ArrLiteral{
 			arr: v.IntArray.GetValue(),
 		}, nil, nil
@@ -214,11 +238,32 @@ func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond *mode
 func parseEntities(op modelv1.LogicalExpression_LogicalOp, input tsdb.Entity, left, right []tsdb.Entity) []tsdb.Entity {
 	count := len(input)
 	result := make(tsdb.Entity, count)
+	anyEntity := func(entities []tsdb.Entity) bool {
+		for _, entity := range entities {
+			for _, entry := range entity {
+				if !bytes.Equal(entry, tsdb.AnyEntry) {
+					return false
+
+				}
+			}
+		}
+		return true
+	}
+	leftAny := anyEntity(left)
+	rightAny := anyEntity(right)
+
 	mergedEntities := make([]tsdb.Entity, 0, len(left)+len(right))
-	mergedEntities = append(mergedEntities, left...)
-	mergedEntities = append(mergedEntities, right...)
+
 	switch op {
 	case modelv1.LogicalExpression_LOGICAL_OP_AND:
+		if leftAny && !rightAny {
+			return right
+		}
+		if !leftAny && rightAny {
+			return left
+		}
+		mergedEntities = append(mergedEntities, left...)
+		mergedEntities = append(mergedEntities, right...)
 		for i := 0; i < count; i++ {
 			entry := tsdb.AnyEntry
 			for j := 0; j < len(mergedEntities); j++ {
@@ -235,6 +280,14 @@ func parseEntities(op modelv1.LogicalExpression_LogicalOp, input tsdb.Entity, le
 			result[i] = entry
 		}
 	case modelv1.LogicalExpression_LOGICAL_OP_OR:
+		if leftAny {
+			return left
+		}
+		if rightAny {
+			return right
+		}
+		mergedEntities = append(mergedEntities, left...)
+		mergedEntities = append(mergedEntities, right...)
 		for i := 0; i < count; i++ {
 			entry := tsdb.AnyEntry
 			for j := 0; j < len(mergedEntities); j++ {
@@ -321,7 +374,7 @@ func newAnd(size int) *andNode {
 func (an *andNode) merge(list ...posting.List) (posting.List, error) {
 	var result posting.List
 	for _, l := range list {
-		if _, ok := l.(bypassList); ok {
+		if _, ok := l.(*bypassList); ok {
 			continue
 		}
 		if result == nil {
@@ -366,7 +419,7 @@ func (on *orNode) merge(list ...posting.List) (posting.List, error) {
 	for _, l := range list {
 		// If a predicator is not indexed, all predicator are ignored.
 		// The tagFilter will take up this job to filter this items.
-		if _, ok := l.(bypassList); ok {
+		if _, ok := l.(*bypassList); ok {
 			return bList, nil
 		}
 		if result == nil {
diff --git a/test/cases/measure/data/want/entity.yaml b/test/cases/measure/data/input/all_only_fields.yaml
similarity index 79%
copy from test/cases/measure/data/want/entity.yaml
copy to test/cases/measure/data/input/all_only_fields.yaml
index 7876c40..b3256df 100644
--- a/test/cases/measure/data/want/entity.yaml
+++ b/test/cases/measure/data/input/all_only_fields.yaml
@@ -15,15 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-dataPoints:
-  - tagFamilies:
-    - name: default
-      tags:
-      - key: id
-        value:
-          id:
-            value: "1"
-      - key: service_id
-        value:
-          str:
-            value: service_1
\ No newline at end of file
+metadata:
+  name: "service_cpm_minute"
+  group: "sw_metric"
+fieldProjection:
+  names: ["total", "value"]
diff --git a/test/cases/measure/data/input/entity.yaml b/test/cases/measure/data/input/entity.yaml
index 8a16c65..fde1b0c 100644
--- a/test/cases/measure/data/input/entity.yaml
+++ b/test/cases/measure/data/input/entity.yaml
@@ -21,7 +21,7 @@ metadata:
 tagProjection:
   tagFamilies:
   - name: "default"
-    tags: ["id", "service_id"]
+    tags: ["service_id"]
 criteria:
   le:
     op: "LOGICAL_OP_AND"
diff --git a/test/cases/measure/data/input/entity.yaml b/test/cases/measure/data/input/entity_in.yaml
similarity index 93%
copy from test/cases/measure/data/input/entity.yaml
copy to test/cases/measure/data/input/entity_in.yaml
index 8a16c65..07c938d 100644
--- a/test/cases/measure/data/input/entity.yaml
+++ b/test/cases/measure/data/input/entity_in.yaml
@@ -28,7 +28,7 @@ criteria:
     left:
       condition:
         name: "id"
-        op: "BINARY_OP_EQ"
+        op: "BINARY_OP_IN"
         value:
-          str:
-            value: "1"
+          str_array:
+            value: ["1", "2"]
diff --git a/test/cases/measure/data/want/all_only_fields.yaml b/test/cases/measure/data/want/all_only_fields.yaml
new file mode 100644
index 0000000..9e23c73
--- /dev/null
+++ b/test/cases/measure/data/want/all_only_fields.yaml
@@ -0,0 +1,138 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- fields:
+  - name: total
+    value:
+      int:
+        value: "100"
+  - name: value
+    value:
+      int:
+        value: "1"
+- fields:
+  - name: total
+    value:
+      int:
+        value: "100"
+  - name: value
+    value:
+      int:
+        value: "10"
+- fields:
+  - name: total
+    value:
+      int:
+        value: "100"
+  - name: value
+    value:
+      int:
+        value: "2"
+- fields:
+  - name: total
+    value:
+      int:
+        value: "100"
+  - name: value
+    value:
+      int:
+        value: "9"
+- fields:
+  - name: total
+    value:
+      int:
+        value: "100"
+  - name: value
+    value:
+      int:
+        value: "3"
+- fields:
+  - name: total
+    value:
+      int:
+        value: "100"
+  - name: value
+    value:
+      int:
+        value: "8"
+- fields:
+  - name: total
+    value:
+      int:
+        value: "100"
+  - name: value
+    value:
+      int:
+        value: "5"
+- fields:
+  - name: total
+    value:
+      int:
+        value: "100"
+  - name: value
+    value:
+      int:
+        value: "11"
+- fields:
+  - name: total
+    value:
+      int:
+        value: "50"
+  - name: value
+    value:
+      int:
+        value: "4"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        id:
+          value: "2"
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
+  timestamp: "2022-10-17T12:53:45.912Z"
+- fields:
+  - name: total
+    value:
+      int:
+        value: "50"
+  - name: value
+    value:
+      int:
+        value: "12"
+- fields:
+  - name: total
+    value:
+      int:
+        value: "300"
+  - name: value
+    value:
+      int:
+        value: "6"
+- fields:
+  - name: total
+    value:
+      int:
+        value: "300"
+  - name: value
+    value:
+      int:
+        value: "7"
diff --git a/test/cases/measure/data/want/entity.yaml b/test/cases/measure/data/want/entity.yaml
index 7876c40..8586f55 100644
--- a/test/cases/measure/data/want/entity.yaml
+++ b/test/cases/measure/data/want/entity.yaml
@@ -19,10 +19,6 @@ dataPoints:
   - tagFamilies:
     - name: default
       tags:
-      - key: id
-        value:
-          id:
-            value: "1"
       - key: service_id
         value:
           str:
diff --git a/test/cases/measure/data/want/entity.yaml b/test/cases/measure/data/want/entity_in.yaml
similarity index 69%
copy from test/cases/measure/data/want/entity.yaml
copy to test/cases/measure/data/want/entity_in.yaml
index 7876c40..5719b1e 100644
--- a/test/cases/measure/data/want/entity.yaml
+++ b/test/cases/measure/data/want/entity_in.yaml
@@ -16,14 +16,26 @@
 # under the License.
 
 dataPoints:
-  - tagFamilies:
-    - name: default
-      tags:
-      - key: id
-        value:
-          id:
-            value: "1"
-      - key: service_id
-        value:
-          str:
-            value: service_1
\ No newline at end of file
+- tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        id:
+          value: "1"
+    - key: service_id
+      value:
+        str:
+          value: service_1
+- tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        id:
+          value: "2"
+    - key: service_id
+      value:
+        str:
+          value: service_2
+          
\ No newline at end of file
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index 97ed283..900bbeb 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -41,6 +41,7 @@ var (
 
 var _ = g.DescribeTable("Scanning Measures", verify,
 	g.Entry("all", helpers.Args{Input: "all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("all_only_fields", helpers.Args{Input: "all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 	g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 	g.Entry("filter by a integer tag", helpers.Args{Input: "tag_filter_int", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 	g.Entry("filter by an unknown tag", helpers.Args{Input: "tag_filter_unknown", Duration: 25 * time.Minute, Offset: -20 * time.Minute, WantEmpty: true}),
@@ -53,7 +54,8 @@ var _ = g.DescribeTable("Scanning Measures", verify,
 	g.Entry("limit 3,2", helpers.Args{Input: "limit", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 	g.Entry("match a node", helpers.Args{Input: "match_node", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 	g.Entry("match nodes", helpers.Args{Input: "match_nodes", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
-	g.Entry("filter by entity id", helpers.Args{Input: "entity", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.FEntry("filter by entity id", helpers.Args{Input: "entity", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("filter by several entity ids", helpers.Args{Input: "entity_in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 	g.Entry("filter by entity id and service id", helpers.Args{Input: "entity_service", Want: "entity", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 	g.Entry("without field", helpers.Args{Input: "no_field", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 	g.Entry("invalid logical expression", helpers.Args{Input: "err_invalid_le", Duration: 25 * time.Minute, Offset: -20 * time.Minute, WantErr: true}),