You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/09/15 15:05:43 UTC
[skywalking-banyandb] branch time-series updated: Implements all
indices
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/time-series by this push:
new ed5e146 Implements all indices
ed5e146 is described below
commit ed5e146f55a4e11b607eb4a88c8ea1d95ebc7a53
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Sep 15 23:04:04 2021 +0800
Implements all indices
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
api/proto/banyandb/common/v2/common.pb.go | 13 +-
api/proto/banyandb/common/v2/common.proto | 1 +
api/proto/banyandb/model/v2/query.pb.go | 150 +++++++++++++++------
api/proto/banyandb/model/v2/query.proto | 4 +
banyand/index/tsdb/field_map.go | 2 +-
.../schema/data/index_rules/db.instance.json | 1 +
.../metadata/schema/data/index_rules/db.type.json | 1 +
.../metadata/schema/data/index_rules/duration.json | 1 +
.../schema/data/index_rules/endpoint_id.json | 1 +
.../schema/data/index_rules/http.code.json | 1 +
.../schema/data/index_rules/http.method.json | 1 +
.../schema/data/index_rules/mq.broker.json | 1 +
.../metadata/schema/data/index_rules/mq.queue.json | 1 +
.../metadata/schema/data/index_rules/mq.topic.json | 1 +
.../metadata/schema/data/index_rules/trace_id.json | 1 +
banyand/stream/index.go | 20 ++-
banyand/stream/stream_query_test.go | 85 ++++++------
banyand/stream/stream_write.go | 15 ++-
banyand/stream/testdata/multiple_shards.json | 10 +-
banyand/tsdb/block.go | 6 +-
banyand/tsdb/indexdb.go | 26 +++-
banyand/tsdb/series_seek.go | 8 +-
banyand/tsdb/series_seek_filter.go | 12 +-
banyand/tsdb/series_seek_sort.go | 33 ++---
banyand/tsdb/series_write.go | 14 +-
pkg/convert/number.go | 30 ++++-
.../mem_test.go => convert/number_test.go} | 25 ++--
pkg/index/index.go | 77 ++++++++---
pkg/index/inverted/field_map.go | 16 +--
pkg/index/inverted/inverted.go | 108 +++++++++------
pkg/index/inverted/inverted_test.go | 24 ++--
pkg/index/inverted/mem.go | 44 +++---
pkg/index/inverted/mem_test.go | 10 +-
pkg/index/iterator.go | 73 ++++++----
pkg/index/lsm/lsm.go | 22 ++-
pkg/index/lsm/lsm_test.go | 10 +-
pkg/index/lsm/search.go | 20 +--
pkg/index/{lsm/lsm.go => metadata/metadata.go} | 47 ++++---
pkg/index/{test_cases => testcases}/duration.go | 56 ++++----
.../{test_cases => testcases}/service_name.go | 8 +-
pkg/index/tree.go | 2 +-
41 files changed, 629 insertions(+), 352 deletions(-)
diff --git a/api/proto/banyandb/common/v2/common.pb.go b/api/proto/banyandb/common/v2/common.pb.go
index d43c009..232783c 100644
--- a/api/proto/banyandb/common/v2/common.pb.go
+++ b/api/proto/banyandb/common/v2/common.pb.go
@@ -97,6 +97,7 @@ type Metadata struct {
Group string `protobuf:"bytes,1,opt,name=group,proto3" json:"group,omitempty"`
// name of the entity
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
+ Id uint32 `protobuf:"varint,3,opt,name=id,proto3" json:"id,omitempty"`
}
func (x *Metadata) Reset() {
@@ -145,16 +146,24 @@ func (x *Metadata) GetName() string {
return ""
}
+func (x *Metadata) GetId() uint32 {
+ if x != nil {
+ return x.Id
+ }
+ return 0
+}
+
var File_banyandb_common_v2_common_proto protoreflect.FileDescriptor
var file_banyandb_common_v2_common_proto_rawDesc = []byte{
0x0a, 0x1f, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f,
0x6e, 0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x12, 0x12, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d,
- 0x6f, 0x6e, 0x2e, 0x76, 0x32, 0x22, 0x34, 0x0a, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74,
+ 0x6f, 0x6e, 0x2e, 0x76, 0x32, 0x22, 0x44, 0x0a, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74,
0x61, 0x12, 0x14, 0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18,
- 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x2a, 0x4b, 0x0a, 0x07, 0x43,
+ 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69,
+ 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x2a, 0x4b, 0x0a, 0x07, 0x43,
0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x41, 0x54, 0x41, 0x4c, 0x4f,
0x47, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12,
0x12, 0x0a, 0x0e, 0x43, 0x41, 0x54, 0x41, 0x4c, 0x4f, 0x47, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41,
diff --git a/api/proto/banyandb/common/v2/common.proto b/api/proto/banyandb/common/v2/common.proto
index 130898d..5461978 100644
--- a/api/proto/banyandb/common/v2/common.proto
+++ b/api/proto/banyandb/common/v2/common.proto
@@ -34,4 +34,5 @@ message Metadata {
string group = 1;
// name of the entity
string name = 2;
+ uint32 id = 3;
}
diff --git a/api/proto/banyandb/model/v2/query.pb.go b/api/proto/banyandb/model/v2/query.pb.go
index 330251a..a58ce57 100644
--- a/api/proto/banyandb/model/v2/query.pb.go
+++ b/api/proto/banyandb/model/v2/query.pb.go
@@ -395,7 +395,7 @@ type Projection struct {
unknownFields protoimpl.UnknownFields
// The key_name refers to the key(s) of Pair(s).
- FagFamilies []*TagFamily `protobuf:"bytes,1,rep,name=fag_families,json=fagFamilies,proto3" json:"fag_families,omitempty"`
+ FagFamilies []*Projection_TagFamily `protobuf:"bytes,1,rep,name=fag_families,json=fagFamilies,proto3" json:"fag_families,omitempty"`
}
func (x *Projection) Reset() {
@@ -430,7 +430,7 @@ func (*Projection) Descriptor() ([]byte, []int) {
return file_banyandb_model_v2_query_proto_rawDescGZIP(), []int{4}
}
-func (x *Projection) GetFagFamilies() []*TagFamily {
+func (x *Projection) GetFagFamilies() []*Projection_TagFamily {
if x != nil {
return x.FagFamilies
}
@@ -494,6 +494,61 @@ func (x *TimeRange) GetEnd() *timestamppb.Timestamp {
return nil
}
+type Projection_TagFamily struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ Tags []string `protobuf:"bytes,2,rep,name=tags,proto3" json:"tags,omitempty"`
+}
+
+func (x *Projection_TagFamily) Reset() {
+ *x = Projection_TagFamily{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_banyandb_model_v2_query_proto_msgTypes[6]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Projection_TagFamily) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Projection_TagFamily) ProtoMessage() {}
+
+func (x *Projection_TagFamily) ProtoReflect() protoreflect.Message {
+ mi := &file_banyandb_model_v2_query_proto_msgTypes[6]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Projection_TagFamily.ProtoReflect.Descriptor instead.
+func (*Projection_TagFamily) Descriptor() ([]byte, []int) {
+ return file_banyandb_model_v2_query_proto_rawDescGZIP(), []int{4, 0}
+}
+
+func (x *Projection_TagFamily) GetName() string {
+ if x != nil {
+ return x.Name
+ }
+ return ""
+}
+
+func (x *Projection_TagFamily) GetTags() []string {
+ if x != nil {
+ return x.Tags
+ }
+ return nil
+}
+
var File_banyandb_model_v2_query_proto protoreflect.FileDescriptor
var file_banyandb_model_v2_query_proto_rawDesc = []byte{
@@ -543,26 +598,30 @@ var file_banyandb_model_v2_query_proto_rawDesc = []byte{
0x72, 0x74, 0x12, 0x14, 0x0a, 0x10, 0x53, 0x4f, 0x52, 0x54, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45,
0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x4f, 0x52, 0x54,
0x5f, 0x44, 0x45, 0x53, 0x43, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x4f, 0x52, 0x54, 0x5f,
- 0x41, 0x53, 0x43, 0x10, 0x02, 0x22, 0x4d, 0x0a, 0x0a, 0x50, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74,
- 0x69, 0x6f, 0x6e, 0x12, 0x3f, 0x0a, 0x0c, 0x66, 0x61, 0x67, 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c,
- 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x62, 0x61, 0x6e, 0x79,
- 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x54, 0x61,
- 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x52, 0x0b, 0x66, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69,
- 0x6c, 0x69, 0x65, 0x73, 0x22, 0x6b, 0x0a, 0x09, 0x54, 0x69, 0x6d, 0x65, 0x52, 0x61, 0x6e, 0x67,
- 0x65, 0x12, 0x30, 0x0a, 0x05, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b,
- 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
- 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x05, 0x62, 0x65,
- 0x67, 0x69, 0x6e, 0x12, 0x2c, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b,
- 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
- 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x03, 0x65, 0x6e,
- 0x64, 0x42, 0x6c, 0x0a, 0x27, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e,
- 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61,
- 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x5a, 0x41, 0x67, 0x69,
- 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f,
- 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61,
- 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61,
- 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76, 0x32, 0x62,
- 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x41, 0x53, 0x43, 0x10, 0x02, 0x22, 0x8d, 0x01, 0x0a, 0x0a, 0x50, 0x72, 0x6f, 0x6a, 0x65, 0x63,
+ 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x4a, 0x0a, 0x0c, 0x66, 0x61, 0x67, 0x5f, 0x66, 0x61, 0x6d, 0x69,
+ 0x6c, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x62, 0x61, 0x6e,
+ 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x50,
+ 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d,
+ 0x69, 0x6c, 0x79, 0x52, 0x0b, 0x66, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65, 0x73,
+ 0x1a, 0x33, 0x0a, 0x09, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x12, 0x12, 0x0a,
+ 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d,
+ 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52,
+ 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x6b, 0x0a, 0x09, 0x54, 0x69, 0x6d, 0x65, 0x52, 0x61, 0x6e,
+ 0x67, 0x65, 0x12, 0x30, 0x0a, 0x05, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28,
+ 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
+ 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x05, 0x62,
+ 0x65, 0x67, 0x69, 0x6e, 0x12, 0x2c, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28,
+ 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
+ 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x03, 0x65,
+ 0x6e, 0x64, 0x42, 0x6c, 0x0a, 0x27, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
+ 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79,
+ 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x5a, 0x41, 0x67,
+ 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
+ 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79,
+ 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62,
+ 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76, 0x32,
+ 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -578,7 +637,7 @@ func file_banyandb_model_v2_query_proto_rawDescGZIP() []byte {
}
var file_banyandb_model_v2_query_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
-var file_banyandb_model_v2_query_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
+var file_banyandb_model_v2_query_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
var file_banyandb_model_v2_query_proto_goTypes = []interface{}{
(Condition_BinaryOp)(0), // 0: banyandb.model.v2.Condition.BinaryOp
(QueryOrder_Sort)(0), // 1: banyandb.model.v2.QueryOrder.Sort
@@ -588,23 +647,24 @@ var file_banyandb_model_v2_query_proto_goTypes = []interface{}{
(*QueryOrder)(nil), // 5: banyandb.model.v2.QueryOrder
(*Projection)(nil), // 6: banyandb.model.v2.Projection
(*TimeRange)(nil), // 7: banyandb.model.v2.TimeRange
- (*TagValue)(nil), // 8: banyandb.model.v2.TagValue
- (*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp
+ (*Projection_TagFamily)(nil), // 8: banyandb.model.v2.Projection.TagFamily
+ (*TagValue)(nil), // 9: banyandb.model.v2.TagValue
+ (*timestamppb.Timestamp)(nil), // 10: google.protobuf.Timestamp
}
var file_banyandb_model_v2_query_proto_depIdxs = []int32{
- 8, // 0: banyandb.model.v2.Tag.value:type_name -> banyandb.model.v2.TagValue
- 2, // 1: banyandb.model.v2.TagFamily.tags:type_name -> banyandb.model.v2.Tag
- 0, // 2: banyandb.model.v2.Condition.op:type_name -> banyandb.model.v2.Condition.BinaryOp
- 8, // 3: banyandb.model.v2.Condition.value:type_name -> banyandb.model.v2.TagValue
- 1, // 4: banyandb.model.v2.QueryOrder.sort:type_name -> banyandb.model.v2.QueryOrder.Sort
- 3, // 5: banyandb.model.v2.Projection.fag_families:type_name -> banyandb.model.v2.TagFamily
- 9, // 6: banyandb.model.v2.TimeRange.begin:type_name -> google.protobuf.Timestamp
- 9, // 7: banyandb.model.v2.TimeRange.end:type_name -> google.protobuf.Timestamp
- 8, // [8:8] is the sub-list for method output_type
- 8, // [8:8] is the sub-list for method input_type
- 8, // [8:8] is the sub-list for extension type_name
- 8, // [8:8] is the sub-list for extension extendee
- 0, // [0:8] is the sub-list for field type_name
+ 9, // 0: banyandb.model.v2.Tag.value:type_name -> banyandb.model.v2.TagValue
+ 2, // 1: banyandb.model.v2.TagFamily.tags:type_name -> banyandb.model.v2.Tag
+ 0, // 2: banyandb.model.v2.Condition.op:type_name -> banyandb.model.v2.Condition.BinaryOp
+ 9, // 3: banyandb.model.v2.Condition.value:type_name -> banyandb.model.v2.TagValue
+ 1, // 4: banyandb.model.v2.QueryOrder.sort:type_name -> banyandb.model.v2.QueryOrder.Sort
+ 8, // 5: banyandb.model.v2.Projection.fag_families:type_name -> banyandb.model.v2.Projection.TagFamily
+ 10, // 6: banyandb.model.v2.TimeRange.begin:type_name -> google.protobuf.Timestamp
+ 10, // 7: banyandb.model.v2.TimeRange.end:type_name -> google.protobuf.Timestamp
+ 8, // [8:8] is the sub-list for method output_type
+ 8, // [8:8] is the sub-list for method input_type
+ 8, // [8:8] is the sub-list for extension type_name
+ 8, // [8:8] is the sub-list for extension extendee
+ 0, // [0:8] is the sub-list for field type_name
}
func init() { file_banyandb_model_v2_query_proto_init() }
@@ -686,6 +746,18 @@ func file_banyandb_model_v2_query_proto_init() {
return nil
}
}
+ file_banyandb_model_v2_query_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Projection_TagFamily); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
}
type x struct{}
out := protoimpl.TypeBuilder{
@@ -693,7 +765,7 @@ func file_banyandb_model_v2_query_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_banyandb_model_v2_query_proto_rawDesc,
NumEnums: 2,
- NumMessages: 6,
+ NumMessages: 7,
NumExtensions: 0,
NumServices: 0,
},
diff --git a/api/proto/banyandb/model/v2/query.proto b/api/proto/banyandb/model/v2/query.proto
index ed39c61..73262a9 100644
--- a/api/proto/banyandb/model/v2/query.proto
+++ b/api/proto/banyandb/model/v2/query.proto
@@ -76,6 +76,10 @@ message QueryOrder {
// Projection is used to select the names of keys to be returned.
message Projection {
+ message TagFamily {
+ string name = 1;
+ repeated string tags = 2;
+ }
// The key_name refers to the key(s) of Pair(s).
repeated TagFamily fag_families = 1;
}
diff --git a/banyand/index/tsdb/field_map.go b/banyand/index/tsdb/field_map.go
index 6cc52f8..32c9488 100644
--- a/banyand/index/tsdb/field_map.go
+++ b/banyand/index/tsdb/field_map.go
@@ -53,7 +53,7 @@ func (fm *fieldMap) get(key []byte) (*fieldValue, bool) {
func (fm *fieldMap) put(fv *Field, id common.ChunkID) error {
pm, ok := fm.get(fv.Name)
if !ok {
- return errors.Wrapf(ErrFieldAbsent, "filed Term:%s", fv.Name)
+ return errors.Wrapf(ErrFieldAbsent, "filed term:%s", fv.Name)
}
return pm.value.put(fv.Value, id)
}
diff --git a/banyand/metadata/schema/data/index_rules/db.instance.json b/banyand/metadata/schema/data/index_rules/db.instance.json
index 5d5c159..d6bcbb1 100644
--- a/banyand/metadata/schema/data/index_rules/db.instance.json
+++ b/banyand/metadata/schema/data/index_rules/db.instance.json
@@ -1,5 +1,6 @@
{
"metadata": {
+ "id": 1,
"name": "db.instance",
"group": "default"
},
diff --git a/banyand/metadata/schema/data/index_rules/db.type.json b/banyand/metadata/schema/data/index_rules/db.type.json
index 67914ee..b13aded 100644
--- a/banyand/metadata/schema/data/index_rules/db.type.json
+++ b/banyand/metadata/schema/data/index_rules/db.type.json
@@ -1,5 +1,6 @@
{
"metadata": {
+ "id": 2,
"name": "db.type",
"group": "default"
},
diff --git a/banyand/metadata/schema/data/index_rules/duration.json b/banyand/metadata/schema/data/index_rules/duration.json
index 6f18db1..96e2cc6 100644
--- a/banyand/metadata/schema/data/index_rules/duration.json
+++ b/banyand/metadata/schema/data/index_rules/duration.json
@@ -1,5 +1,6 @@
{
"metadata": {
+ "id": 3,
"name": "duration",
"group": "default"
},
diff --git a/banyand/metadata/schema/data/index_rules/endpoint_id.json b/banyand/metadata/schema/data/index_rules/endpoint_id.json
index 7da3b0d..7625890 100644
--- a/banyand/metadata/schema/data/index_rules/endpoint_id.json
+++ b/banyand/metadata/schema/data/index_rules/endpoint_id.json
@@ -1,5 +1,6 @@
{
"metadata": {
+ "id": 4,
"name": "endpoint_id",
"group": "default"
},
diff --git a/banyand/metadata/schema/data/index_rules/http.code.json b/banyand/metadata/schema/data/index_rules/http.code.json
index bd2e86f..31d23ed 100644
--- a/banyand/metadata/schema/data/index_rules/http.code.json
+++ b/banyand/metadata/schema/data/index_rules/http.code.json
@@ -1,5 +1,6 @@
{
"metadata": {
+ "id": 5,
"name": "http.code",
"group": "default"
},
diff --git a/banyand/metadata/schema/data/index_rules/http.method.json b/banyand/metadata/schema/data/index_rules/http.method.json
index 3001154..b27c103 100644
--- a/banyand/metadata/schema/data/index_rules/http.method.json
+++ b/banyand/metadata/schema/data/index_rules/http.method.json
@@ -1,5 +1,6 @@
{
"metadata": {
+ "id": 6,
"name": "http.method",
"group": "default"
},
diff --git a/banyand/metadata/schema/data/index_rules/mq.broker.json b/banyand/metadata/schema/data/index_rules/mq.broker.json
index 0cfbd27..1edfe6e 100644
--- a/banyand/metadata/schema/data/index_rules/mq.broker.json
+++ b/banyand/metadata/schema/data/index_rules/mq.broker.json
@@ -1,5 +1,6 @@
{
"metadata": {
+ "id": 7,
"name": "mq.broker",
"group": "default"
},
diff --git a/banyand/metadata/schema/data/index_rules/mq.queue.json b/banyand/metadata/schema/data/index_rules/mq.queue.json
index 0c589fc..455d712 100644
--- a/banyand/metadata/schema/data/index_rules/mq.queue.json
+++ b/banyand/metadata/schema/data/index_rules/mq.queue.json
@@ -1,5 +1,6 @@
{
"metadata": {
+ "id": 8,
"name": "mq.queue",
"group": "default"
},
diff --git a/banyand/metadata/schema/data/index_rules/mq.topic.json b/banyand/metadata/schema/data/index_rules/mq.topic.json
index 5dc51fc..5770289 100644
--- a/banyand/metadata/schema/data/index_rules/mq.topic.json
+++ b/banyand/metadata/schema/data/index_rules/mq.topic.json
@@ -1,5 +1,6 @@
{
"metadata": {
+ "id": 9,
"name": "mq.topic",
"group": "default"
},
diff --git a/banyand/metadata/schema/data/index_rules/trace_id.json b/banyand/metadata/schema/data/index_rules/trace_id.json
index 2f84bfb..b7bb2d2 100644
--- a/banyand/metadata/schema/data/index_rules/trace_id.json
+++ b/banyand/metadata/schema/data/index_rules/trace_id.json
@@ -1,5 +1,6 @@
{
"metadata": {
+ "id": 10,
"name": "trace_id",
"group": "default"
},
diff --git a/banyand/stream/index.go b/banyand/stream/index.go
index 31c0bc9..c0a6c14 100644
--- a/banyand/stream/index.go
+++ b/banyand/stream/index.go
@@ -63,7 +63,7 @@ func (s *stream) bootIndexGenerator() {
//TODO: should listen to pipeline in a distributed cluster
func (s *stream) writeGlobalIndex(ruleIndex indexRule, ref tsdb.GlobalItemID, value *streamv2.ElementValue) error {
- val, err := getIndexValue(ruleIndex, value)
+ val, _, err := getIndexValue(ruleIndex, value)
if err != nil {
return err
}
@@ -87,12 +87,16 @@ func (s *stream) writeGlobalIndex(ruleIndex indexRule, ref tsdb.GlobalItemID, va
switch rule.GetType() {
case databasev2.IndexRule_TYPE_INVERTED:
return indexWriter.WriteInvertedIndex(index.Field{
- Key: []byte(rule.Metadata.Name),
+ Key: index.FieldKey{
+ IndexRuleID: rule.GetMetadata().GetId(),
+ },
Term: val,
})
case databasev2.IndexRule_TYPE_TREE:
return indexWriter.WriteLSMIndex(index.Field{
- Key: []byte(rule.Metadata.Name),
+ Key: index.FieldKey{
+ IndexRuleID: rule.GetMetadata().GetId(),
+ },
Term: val,
})
}
@@ -100,7 +104,7 @@ func (s *stream) writeGlobalIndex(ruleIndex indexRule, ref tsdb.GlobalItemID, va
}
func writeLocalIndex(writer tsdb.Writer, ruleIndex indexRule, value *streamv2.ElementValue) (err error) {
- val, err := getIndexValue(ruleIndex, value)
+ val, _, err := getIndexValue(ruleIndex, value)
if err != nil {
return err
}
@@ -108,12 +112,16 @@ func writeLocalIndex(writer tsdb.Writer, ruleIndex indexRule, value *streamv2.El
switch rule.GetType() {
case databasev2.IndexRule_TYPE_INVERTED:
return writer.WriteInvertedIndex(index.Field{
- Key: []byte(rule.Metadata.Name),
+ Key: index.FieldKey{
+ IndexRuleID: rule.GetMetadata().GetId(),
+ },
Term: val,
})
case databasev2.IndexRule_TYPE_TREE:
return writer.WriteLSMIndex(index.Field{
- Key: []byte(rule.Metadata.Name),
+ Key: index.FieldKey{
+ IndexRuleID: rule.GetMetadata().GetId(),
+ },
Term: val,
})
}
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index f02921b..5a19576 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -117,23 +117,23 @@ func Test_Stream_Series(t *testing.T) {
want: shardsForTest{
{
id: 0,
- location: []string{"series_12243341348514563931", "data_flow_0"},
- elements: []string{"1"},
+ location: []string{"series_16283518706331625322", "data_flow_0"},
+ elements: []string{"4"},
},
{
id: 0,
- location: []string{"series_1671844747554927007", "data_flow_0"},
+ location: []string{"series_4862694201852929188", "data_flow_0"},
elements: []string{"2"},
},
{
id: 1,
- location: []string{"series_2374367181827824198", "data_flow_0"},
- elements: []string{"5", "3"},
+ location: []string{"series_13343478452567673284", "data_flow_0"},
+ elements: []string{"1"},
},
{
id: 1,
- location: []string{"series_8429137420168685297", "data_flow_0"},
- elements: []string{"4"},
+ location: []string{"series_7898679171060804990", "data_flow_0"},
+ elements: []string{"5", "3"},
},
},
},
@@ -147,21 +147,21 @@ func Test_Stream_Series(t *testing.T) {
want: shardsForTest{
{
id: 0,
- location: []string{"series_12243341348514563931", "data_flow_0"},
+ location: []string{"series_16283518706331625322", "data_flow_0"},
+ elements: []string{"4"},
},
{
id: 0,
- location: []string{"series_1671844747554927007", "data_flow_0"},
+ location: []string{"series_4862694201852929188", "data_flow_0"},
},
{
id: 1,
- location: []string{"series_2374367181827824198", "data_flow_0"},
- elements: []string{"5"},
+ location: []string{"series_13343478452567673284", "data_flow_0"},
},
{
id: 1,
- location: []string{"series_8429137420168685297", "data_flow_0"},
- elements: []string{"4"},
+ location: []string{"series_7898679171060804990", "data_flow_0"},
+ elements: []string{"5"},
},
},
},
@@ -173,13 +173,13 @@ func Test_Stream_Series(t *testing.T) {
},
want: shardsForTest{
{
- id: 0,
- location: []string{"series_12243341348514563931", "data_flow_0"},
+ id: 1,
+ location: []string{"series_13343478452567673284", "data_flow_0"},
elements: []string{"1"},
},
{
id: 1,
- location: []string{"series_2374367181827824198", "data_flow_0"},
+ location: []string{"series_7898679171060804990", "data_flow_0"},
elements: []string{"5", "3"},
},
},
@@ -187,13 +187,13 @@ func Test_Stream_Series(t *testing.T) {
{
name: "find a series",
args: queryOpts{
- entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Uint64ToBytes(1)},
+ entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Int64ToBytes(0)},
timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
},
want: shardsForTest{
{
id: 1,
- location: []string{"series_2374367181827824198", "data_flow_0"},
+ location: []string{"series_7898679171060804990", "data_flow_0"},
elements: []string{"5", "3"},
},
},
@@ -208,6 +208,7 @@ func Test_Stream_Series(t *testing.T) {
Metadata: &commonv2.Metadata{
Name: "endpoint_id",
Group: "default",
+ Id: 4,
},
Tags: []string{"endpoint_id"},
Type: databasev2.IndexRule_TYPE_INVERTED,
@@ -225,21 +226,21 @@ func Test_Stream_Series(t *testing.T) {
want: shardsForTest{
{
id: 0,
- location: []string{"series_12243341348514563931", "data_flow_0"},
- elements: []string{"1"},
+ location: []string{"series_16283518706331625322", "data_flow_0"},
},
{
id: 0,
- location: []string{"series_1671844747554927007", "data_flow_0"},
+ location: []string{"series_4862694201852929188", "data_flow_0"},
},
{
id: 1,
- location: []string{"series_2374367181827824198", "data_flow_0"},
- elements: []string{"3"},
+ location: []string{"series_13343478452567673284", "data_flow_0"},
+ elements: []string{"1"},
},
{
id: 1,
- location: []string{"series_8429137420168685297", "data_flow_0"},
+ location: []string{"series_7898679171060804990", "data_flow_0"},
+ elements: []string{"3"},
},
},
},
@@ -253,6 +254,7 @@ func Test_Stream_Series(t *testing.T) {
Metadata: &commonv2.Metadata{
Name: "duration",
Group: "default",
+ Id: 3,
},
Tags: []string{"duration"},
Type: databasev2.IndexRule_TYPE_TREE,
@@ -263,23 +265,23 @@ func Test_Stream_Series(t *testing.T) {
want: shardsForTest{
{
id: 0,
- location: []string{"series_12243341348514563931", "data_flow_0"},
- elements: []string{"1"},
+ location: []string{"series_16283518706331625322", "data_flow_0"},
+ elements: []string{"4"},
},
{
id: 0,
- location: []string{"series_1671844747554927007", "data_flow_0"},
+ location: []string{"series_4862694201852929188", "data_flow_0"},
elements: []string{"2"},
},
{
id: 1,
- location: []string{"series_2374367181827824198", "data_flow_0"},
- elements: []string{"3", "5"},
+ location: []string{"series_13343478452567673284", "data_flow_0"},
+ elements: []string{"1"},
},
{
id: 1,
- location: []string{"series_8429137420168685297", "data_flow_0"},
- elements: []string{"4"},
+ location: []string{"series_7898679171060804990", "data_flow_0"},
+ elements: []string{"3", "5"},
},
},
},
@@ -347,7 +349,10 @@ func Test_Stream_Global_Index(t *testing.T) {
err := func() error {
for _, shard := range shards {
itemIDs, err := shard.Index().Seek(index.Field{
- Key: []byte("trace_id"),
+ Key: index.FieldKey{
+ //trace_id
+ IndexRuleID: 10,
+ },
Term: []byte(tt.traceID),
})
if err != nil {
@@ -488,8 +493,8 @@ func setupQueryData(testing *testing.T, dataFile string, stream *stream) (baseTi
t.NoError(json.Unmarshal(content, &templates))
bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
for i, template := range templates {
- rawSearchTagFamily, err := json.Marshal(template)
- t.NoError(err)
+ rawSearchTagFamily, errMarshal := json.Marshal(template)
+ t.NoError(errMarshal)
searchTagFamily := &streamv2.ElementValue_TagFamily{}
t.NoError(jsonpb.UnmarshalString(string(rawSearchTagFamily), searchTagFamily))
e := &streamv2.ElementValue{
@@ -508,12 +513,12 @@ func setupQueryData(testing *testing.T, dataFile string, stream *stream) (baseTi
},
}
e.TagFamilies = append(e.TagFamilies, searchTagFamily)
- entity, err := stream.buildEntity(e)
- t.NoError(err)
- shardID, err := partition.ShardID(entity.Marshal(), stream.schema.GetShardNum())
- t.NoError(err)
- _, err = stream.write(common.ShardID(shardID), e)
- t.NoError(err)
+ entity, errInner := stream.buildEntity(e)
+ t.NoError(errInner)
+ shardID, errInner := partition.ShardID(entity.Marshal(), stream.schema.GetShardNum())
+ t.NoError(errInner)
+ _, errInner = stream.write(common.ShardID(shardID), e)
+ t.NoError(errInner)
}
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 9ce2732..dc536bf 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -128,20 +128,27 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*t
return &itemID, err
}
-func getIndexValue(ruleIndex indexRule, value *streamv2.ElementValue) (val []byte, err error) {
+func getIndexValue(ruleIndex indexRule, value *streamv2.ElementValue) (val []byte, isInt bool, err error) {
val = make([]byte, 0, len(ruleIndex.tagIndices))
+ var existInt bool
for _, tIndex := range ruleIndex.tagIndices {
tag, err := getTagByOffset(value, tIndex.family, tIndex.tag)
if err != nil {
- return nil, errors.WithMessagef(err, "index rule:%v", ruleIndex.rule.Metadata)
+ return nil, false, errors.WithMessagef(err, "index rule:%v", ruleIndex.rule.Metadata)
+ }
+ if tag.GetInt() != nil {
+ existInt = true
}
v, err := marshalIndexFieldValue(tag)
if err != nil {
- return nil, err
+ return nil, false, err
}
val = append(val, v...)
}
- return val, nil
+ if len(ruleIndex.tagIndices) == 1 && existInt {
+ return val, true, nil
+ }
+ return val, false, nil
}
func marshalIndexFieldValue(tagValue *modelv2.TagValue) ([]byte, error) {
diff --git a/banyand/stream/testdata/multiple_shards.json b/banyand/stream/testdata/multiple_shards.json
index 3f1a721..ec6427e 100644
--- a/banyand/stream/testdata/multiple_shards.json
+++ b/banyand/stream/testdata/multiple_shards.json
@@ -2,7 +2,7 @@
{
"tags": [
{"str":{"value": "1"}},
- {"int":{"value": 0}},
+ {"int":{"value": 1}},
{"str":{"value": "webapp_id"}},
{"str":{"value": "10.0.0.1_id"}},
{"str":{"value": "/home_id"}},
@@ -13,7 +13,7 @@
{
"tags": [
{"str":{"value": "2"}},
- {"int":{"value": 0}},
+ {"int":{"value": 1}},
{"str":{"value": "webapp_id"}},
{"str":{"value": "10.0.0.3_id"}},
{"str":{"value": "/product_id"}},
@@ -24,7 +24,7 @@
{
"tags": [
{"str":{"value": "3"}},
- {"int":{"value": 1}},
+ {"int":{"value": 0}},
{"str":{"value": "webapp_id"}},
{"str":{"value": "10.0.0.1_id"}},
{"str":{"value": "/home_id"}},
@@ -37,7 +37,7 @@
{
"tags": [
{"str":{"value": "4"}},
- {"int":{"value": 1}},
+ {"int":{"value": 0}},
{"str":{"value": "webapp_id"}},
{"str":{"value": "10.0.0.5_id"}},
{"str":{"value": "/price_id"}},
@@ -50,7 +50,7 @@
{
"tags": [
{"str":{"value": "5"}},
- {"int":{"value": 1}},
+ {"int":{"value": 0}},
{"str":{"value": "webapp_id"}},
{"str":{"value": "10.0.0.1_id"}},
{"str":{"value": "/item_id"}},
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 824c861..dac4161 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -86,10 +86,12 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
if !ok || len(rules) == 0 {
return b, nil
}
- b.invertedIndex, err = inverted.NewStore(inverted.StoreOpts{
+ if b.invertedIndex, err = inverted.NewStore(inverted.StoreOpts{
Path: b.path + "/inverted",
Logger: b.l,
- })
+ }); err != nil {
+ return nil, err
+ }
if b.lsmIndex, err = lsm.NewStore(lsm.StoreOpts{
Path: b.path + "/lsm",
Logger: b.l,
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index c9fd9d0..dcd6123 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -54,13 +54,17 @@ type indexDB struct {
lst []*segment
}
-func (i *indexDB) Seek(term index.Field) ([]GlobalItemID, error) {
+func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, error) {
result := make([]GlobalItemID, 0)
- err := i.lst[0].globalIndex.GetAll(term.Marshal(), func(rawBytes []byte) error {
+ f, err := field.MarshalStraight()
+ if err != nil {
+ return nil, err
+ }
+ err = i.lst[0].globalIndex.GetAll(f, func(rawBytes []byte) error {
id := &GlobalItemID{}
- err := id.UnMarshal(rawBytes)
- if err != nil {
- return err
+ errUnMarshal := id.UnMarshal(rawBytes)
+ if errUnMarshal != nil {
+ return errUnMarshal
}
result = append(result, *id)
return nil
@@ -136,9 +140,17 @@ type indexWriter struct {
}
func (i *indexWriter) WriteLSMIndex(field index.Field) error {
- return i.seg.globalIndex.PutWithVersion(field.Marshal(), i.itemID.Marshal(), uint64(i.ts.UnixNano()))
+ key, err := field.MarshalStraight()
+ if err != nil {
+ return err
+ }
+ return i.seg.globalIndex.PutWithVersion(key, i.itemID.Marshal(), uint64(i.ts.UnixNano()))
}
func (i *indexWriter) WriteInvertedIndex(field index.Field) error {
- return i.seg.globalIndex.PutWithVersion(field.Marshal(), i.itemID.Marshal(), uint64(i.ts.UnixNano()))
+ key, err := field.MarshalStraight()
+ if err != nil {
+ return err
+ }
+ return i.seg.globalIndex.PutWithVersion(key, i.itemID.Marshal(), uint64(i.ts.UnixNano()))
}
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index 4763ec6..5f46390 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -56,7 +56,7 @@ type seekerBuilder struct {
conditions []struct {
indexRuleType databasev2.IndexRule_Type
- indexRule string
+ indexRuleID uint32
condition Condition
}
order modelv2.QueryOrder_Sort
@@ -76,7 +76,11 @@ func (s *seekerBuilder) Build() (Seeker, error) {
if indexFilter != nil {
filters = append(filters, indexFilter)
}
- return newSeeker(s.buildSeries(filters)), nil
+ se, err := s.buildSeries(filters)
+ if err != nil {
+ return nil, err
+ }
+ return newSeeker(se), nil
}
func newSeekerBuilder(s *seriesSpan) SeekerBuilder {
diff --git a/banyand/tsdb/series_seek_filter.go b/banyand/tsdb/series_seek_filter.go
index 8e92715..eca279b 100644
--- a/banyand/tsdb/series_seek_filter.go
+++ b/banyand/tsdb/series_seek_filter.go
@@ -33,11 +33,11 @@ type Condition map[string][]index.ConditionValue
func (s *seekerBuilder) Filter(indexRule *databasev2.IndexRule, condition Condition) SeekerBuilder {
s.conditions = append(s.conditions, struct {
indexRuleType databasev2.IndexRule_Type
- indexRule string
+ indexRuleID uint32
condition Condition
}{
indexRuleType: indexRule.GetType(),
- indexRule: indexRule.GetMetadata().GetName(),
+ indexRuleID: indexRule.GetMetadata().GetId(),
condition: condition,
})
return s
@@ -55,8 +55,8 @@ func (s *seekerBuilder) buildIndexFilter() (filterFn, error) {
}
cond := make(index.Condition)
term := index.FieldKey{
- SeriesID: s.seriesSpan.seriesID,
- IndexRule: condition.indexRule,
+ SeriesID: s.seriesSpan.seriesID,
+ IndexRuleID: condition.indexRuleID,
}
for _, c := range condition.condition {
cond[term] = c
@@ -76,8 +76,8 @@ func (s *seekerBuilder) buildIndexFilter() (filterFn, error) {
return err
}
rangeOpts, found := tree.TrimRangeLeaf(index.FieldKey{
- SeriesID: s.seriesSpan.seriesID,
- IndexRule: s.indexRuleForSorting.GetMetadata().GetName(),
+ SeriesID: s.seriesSpan.seriesID,
+ IndexRuleID: s.indexRuleForSorting.GetMetadata().GetId(),
})
if found {
s.rangeOptsForSorting = rangeOpts
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index dbe1947..1529aa7 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -45,7 +45,7 @@ func (s *seekerBuilder) OrderByTime(order modelv2.QueryOrder_Sort) SeekerBuilder
return s
}
-func (s *seekerBuilder) buildSeries(filters []filterFn) []Iterator {
+func (s *seekerBuilder) buildSeries(filters []filterFn) ([]Iterator, error) {
if s.indexRuleForSorting == nil {
return s.buildSeriesByTime(filters)
}
@@ -60,32 +60,32 @@ func (s *seekerBuilder) buildSeries(filters []filterFn) []Iterator {
return s.buildSeriesByIndex(filters)
}
-func (s *seekerBuilder) buildSeriesByIndex(filters []filterFn) (series []Iterator) {
+func (s *seekerBuilder) buildSeriesByIndex(filters []filterFn) (series []Iterator, err error) {
for _, b := range s.seriesSpan.blocks {
var inner index.FieldIterator
- var found bool
+ var err error
fieldKey := index.FieldKey{
- SeriesID: s.seriesSpan.seriesID,
- IndexRule: s.indexRuleForSorting.GetMetadata().GetName(),
+ SeriesID: s.seriesSpan.seriesID,
+ IndexRuleID: s.indexRuleForSorting.GetMetadata().GetId(),
}
switch s.indexRuleForSorting.GetType() {
case databasev2.IndexRule_TYPE_TREE:
- inner, found = b.lsmIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order)
+ inner, err = b.lsmIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order)
case databasev2.IndexRule_TYPE_INVERTED:
- inner, found = b.invertedIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order)
- default:
- // only tree index supports sorting
- continue
+ inner, err = b.invertedIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order)
}
- if found {
+ if err != nil {
+ return nil, err
+ }
+ if inner != nil {
series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters))
}
}
return
}
-func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) []Iterator {
+func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) ([]Iterator, error) {
bb := s.seriesSpan.blocks
switch s.order {
case modelv2.QueryOrder_SORT_ASC:
@@ -108,7 +108,7 @@ func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) []Iterator {
}
for _, b := range bb {
bTimes = append(bTimes, b.startTime())
- inner, found := b.primaryIndexReader().
+ inner, err := b.primaryIndexReader().
Iterator(
index.FieldKey{
SeriesID: s.seriesSpan.seriesID,
@@ -116,7 +116,10 @@ func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) []Iterator {
termRange,
s.order,
)
- if found {
+ if err != nil {
+ return nil, err
+ }
+ if inner != nil {
delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters))
}
}
@@ -126,7 +129,7 @@ func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) []Iterator {
Uint64("series_id", uint64(s.seriesSpan.seriesID)).
Int("shard_id", int(s.seriesSpan.shardID)).
Msg("seek series by time")
- return []Iterator{newMergedIterator(delegated)}
+ return []Iterator{newMergedIterator(delegated)}, nil
}
var _ Iterator = (*searcherIterator)(nil)
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index ea5404b..cdec388 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -130,20 +130,12 @@ func (w *writer) ItemID() GlobalItemID {
}
func (w *writer) WriteLSMIndex(field index.Field) error {
- t := index.FieldKey{
- SeriesID: w.itemID.SeriesID,
- IndexRule: string(field.Key),
- }
- field.Key = t.Marshal()
+ field.Key.SeriesID = w.itemID.SeriesID
return w.block.writeLSMIndex(field, w.itemID.ID)
}
func (w *writer) WriteInvertedIndex(field index.Field) error {
- t := index.FieldKey{
- SeriesID: w.itemID.SeriesID,
- IndexRule: string(field.Key),
- }
- field.Key = t.Marshal()
+ field.Key.SeriesID = w.itemID.SeriesID
return w.block.writeInvertedIndex(field, w.itemID.ID)
}
@@ -174,7 +166,7 @@ func (w *writer) Write() (GlobalItemID, error) {
return id, w.block.writePrimaryIndex(index.Field{
Key: index.FieldKey{
SeriesID: id.SeriesID,
- }.Marshal(),
+ },
Term: convert.Int64ToBytes(w.ts.UnixNano()),
}, id.ID)
}
diff --git a/pkg/convert/number.go b/pkg/convert/number.go
index fa2d688..bedc5c0 100644
--- a/pkg/convert/number.go
+++ b/pkg/convert/number.go
@@ -17,7 +17,9 @@
package convert
-import "encoding/binary"
+import (
+ "encoding/binary"
+)
func Uint64ToBytes(u uint64) []byte {
bs := make([]byte, 8)
@@ -26,9 +28,17 @@ func Uint64ToBytes(u uint64) []byte {
}
func Int64ToBytes(i int64) []byte {
- var buf = make([]byte, 8)
- binary.BigEndian.PutUint64(buf, uint64(i))
- return buf
+ abs := i
+ if i < 0 {
+ abs = -abs
+ }
+ u := uint64(abs)
+ if i >= 0 {
+ u = u | 1<<63
+ } else {
+ u = 1<<63 - u
+ }
+ return Uint64ToBytes(u)
}
func Uint16ToBytes(u uint16) []byte {
@@ -44,7 +54,17 @@ func Uint32ToBytes(u uint32) []byte {
}
func BytesToInt64(b []byte) int64 {
- return int64(binary.BigEndian.Uint64(b))
+ u := binary.BigEndian.Uint64(b)
+ if b[0] >= 128 {
+ u = u ^ 1<<63
+ } else {
+ u = 1<<63 - u
+ }
+ abs := int64(u)
+ if b[0] < 128 {
+ abs = -abs
+ }
+ return abs
}
func BytesToUint64(b []byte) uint64 {
diff --git a/pkg/index/inverted/mem_test.go b/pkg/convert/number_test.go
similarity index 67%
copy from pkg/index/inverted/mem_test.go
copy to pkg/convert/number_test.go
index 9e7c4eb..0db986f 100644
--- a/pkg/index/inverted/mem_test.go
+++ b/pkg/convert/number_test.go
@@ -15,24 +15,19 @@
// specific language governing permissions and limitations
// under the License.
-package inverted
+package convert
import (
+ "fmt"
"testing"
-
- "github.com/stretchr/testify/assert"
-
- "github.com/apache/skywalking-banyandb/pkg/index/test_cases"
)
-func TestMemTable_MatchTerm(t *testing.T) {
- mt := newMemTable()
- test_cases.SetUp(assert.New(t), mt)
- test_cases.RunServiceName(t, mt)
-}
-
-func TestMemTable_Iterator(t *testing.T) {
- mt := newMemTable()
- data := test_cases.SetUpDuration(assert.New(t), mt)
- test_cases.RunDuration(t, data, mt)
+func TestInt64ToBytes(t *testing.T) {
+ fmt.Println(Int64ToBytes(-100))
+ fmt.Println(Int64ToBytes(-2))
+ fmt.Println(Int64ToBytes(-1))
+ fmt.Println(Int64ToBytes(0))
+ fmt.Println(Int64ToBytes(1))
+ fmt.Println(Int64ToBytes(2))
+ fmt.Println(Int64ToBytes(100))
}
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 9be8831..ea7b0a7 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -25,45 +25,90 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
)
var ErrMalformed = errors.New("the data is malformed")
type FieldKey struct {
- SeriesID common.SeriesID
- IndexRule string
+ SeriesID common.SeriesID
+ IndexRuleID uint32
+ EncodeTerm bool
}
func (f FieldKey) Marshal() []byte {
return bytes.Join([][]byte{
f.SeriesID.Marshal(),
- []byte(f.IndexRule),
+ convert.Uint32ToBytes(f.IndexRuleID),
}, nil)
}
+func (f *FieldKey) Unmarshal(raw []byte) error {
+ switch len(raw) {
+ case 12:
+ f.SeriesID = common.SeriesID(convert.BytesToUint64(raw[0:8]))
+ f.IndexRuleID = convert.BytesToUint32(raw[8:])
+ case 4:
+ f.IndexRuleID = convert.BytesToUint32(raw)
+ default:
+ return errors.Wrap(ErrMalformed, "unmarshal field key")
+ }
+ return nil
+}
+
func (f FieldKey) Equal(other FieldKey) bool {
- return f.SeriesID == other.SeriesID && f.IndexRule == other.IndexRule
+ return f.SeriesID == other.SeriesID && f.IndexRuleID == other.IndexRuleID
}
type Field struct {
- Key []byte
+ Key FieldKey
Term []byte
}
-func (f Field) Marshal() []byte {
- return bytes.Join([][]byte{f.Key, f.Term}, []byte(":"))
+func (f Field) MarshalStraight() ([]byte, error) {
+ return bytes.Join([][]byte{f.Key.Marshal(), f.Term}, nil), nil
+}
+
+func (f Field) Marshal(term metadata.Term) ([]byte, error) {
+ var t []byte
+ if f.Key.EncodeTerm {
+ var err error
+ t, err = term.ID(f.Term)
+ if err != nil {
+ return nil, errors.Wrap(err, "get term id")
+ }
+ f.Term = t
+ }
+ return f.MarshalStraight()
+}
+
+func (f *Field) Unmarshal(term metadata.Term, raw []byte) error {
+ err := f.UnmarshalStraight(raw)
+ if err != nil {
+ return err
+ }
+ if !f.Key.EncodeTerm {
+ return nil
+ }
+ t, err := term.Literal(f.Term)
+ if err != nil {
+ return errors.Wrap(err, "get term literal from metadata store")
+ }
+ f.Term = t
+ return nil
}
-func (f *Field) Unmarshal(raw []byte) error {
- bb := bytes.SplitN(raw, []byte(":"), 2)
- if len(bb) < 2 {
- return errors.Wrap(ErrMalformed, "unable to unmarshal the field")
+func (f *Field) UnmarshalStraight(raw []byte) error {
+ fk := &f.Key
+ err := fk.Unmarshal(raw[:len(raw)-8])
+ if err != nil {
+ return errors.Wrap(err, "unmarshal a field")
}
- f.Key = make([]byte, len(bb[0]))
- copy(f.Key, bb[0])
- f.Term = make([]byte, len(bb[1]))
- copy(f.Term, bb[1])
+ termID := raw[len(raw)-8:]
+ f.Term = make([]byte, len(termID))
+ copy(f.Term, termID)
return nil
}
@@ -116,7 +161,7 @@ type Writer interface {
}
type FieldIterable interface {
- Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv2.QueryOrder_Sort) (iter FieldIterator, found bool)
+ Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv2.QueryOrder_Sort) (iter FieldIterator, err error)
}
type Searcher interface {
diff --git a/pkg/index/inverted/field_map.go b/pkg/index/inverted/field_map.go
index 2ee0831..e94df9a 100644
--- a/pkg/index/inverted/field_map.go
+++ b/pkg/index/inverted/field_map.go
@@ -40,25 +40,25 @@ func newFieldMap(initialSize int) *fieldMap {
}
}
-func (fm *fieldMap) createKey(key []byte) *termContainer {
- k := fieldHashID(convert.Hash(key))
+func (fm *fieldMap) createKey(field index.Field) *termContainer {
result := &termContainer{
- key: key,
+ key: field.Key,
value: newPostingMap(),
}
+ k := fieldHashID(convert.Hash(field.Key.Marshal()))
fm.repo[k] = result
fm.lst = append(fm.lst, k)
return result
}
-func (fm *fieldMap) get(key []byte) (*termContainer, bool) {
+func (fm *fieldMap) get(key index.FieldKey) (*termContainer, bool) {
fm.mutex.RLock()
defer fm.mutex.RUnlock()
return fm.getWithoutLock(key)
}
-func (fm *fieldMap) getWithoutLock(key []byte) (*termContainer, bool) {
- v, ok := fm.repo[fieldHashID(convert.Hash(key))]
+func (fm *fieldMap) getWithoutLock(key index.FieldKey) (*termContainer, bool) {
+ v, ok := fm.repo[fieldHashID(convert.Hash(key.Marshal()))]
return v, ok
}
@@ -67,12 +67,12 @@ func (fm *fieldMap) put(fv index.Field, id common.ItemID) error {
defer fm.mutex.Unlock()
pm, ok := fm.getWithoutLock(fv.Key)
if !ok {
- pm = fm.createKey(fv.Key)
+ pm = fm.createKey(fv)
}
return pm.value.put(fv.Term, id)
}
type termContainer struct {
- key []byte
+ key index.FieldKey
value *termMap
}
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index ce50740..1241f98 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -28,6 +28,7 @@ import (
modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -36,6 +37,7 @@ import (
var _ index.Store = (*store)(nil)
type store struct {
+ termMetadata metadata.Term
diskTable kv.IndexStore
memTable *memTable
immutableMemTable *memTable
@@ -48,13 +50,21 @@ type StoreOpts struct {
}
func NewStore(opts StoreOpts) (index.Store, error) {
- diskTable, err := kv.OpenIndexStore(0, opts.Path, kv.IndexWithLogger(opts.Logger))
+ diskTable, err := kv.OpenIndexStore(0, opts.Path+"/table", kv.IndexWithLogger(opts.Logger))
if err != nil {
return nil, err
}
+ var md metadata.Term
+ if md, err = metadata.NewTerm(metadata.TermOpts{
+ Path: opts.Path + "/tmd",
+ Logger: opts.Logger,
+ }); err != nil {
+ return nil, err
+ }
return &store{
- memTable: newMemTable(),
- diskTable: diskTable,
+ memTable: newMemTable(),
+ diskTable: diskTable,
+ termMetadata: md,
}, nil
}
@@ -74,7 +84,7 @@ func (s *store) Flush() error {
s.memTable = newMemTable()
}
err := s.diskTable.
- Handover(s.immutableMemTable.Iter())
+ Handover(s.immutableMemTable.Iter(s.termMetadata))
if err != nil {
return err
}
@@ -87,6 +97,10 @@ func (s *store) MatchField(fieldKey index.FieldKey) (posting.List, error) {
}
func (s *store) MatchTerms(field index.Field) (posting.List, error) {
+ f, err := field.Marshal(s.termMetadata)
+ if err != nil {
+ return nil, err
+ }
result := roaring.NewPostingList()
result, errMem := s.searchInMemTables(result, func(table *memTable) (posting.List, error) {
list, errInner := table.MatchTerms(field)
@@ -98,7 +112,7 @@ func (s *store) MatchTerms(field index.Field) (posting.List, error) {
if errMem != nil {
return nil, errors.Wrap(errMem, "mem table of inverted index")
}
- raw, errTable := s.diskTable.Get(field.Marshal())
+ raw, errTable := s.diskTable.Get(f)
switch {
case errors.Is(errTable, kv.ErrKeyNotFound):
return result, nil
@@ -106,7 +120,7 @@ func (s *store) MatchTerms(field index.Field) (posting.List, error) {
return nil, errors.Wrap(errTable, "disk table of inverted index")
}
list := roaring.NewPostingList()
- err := list.Unmarshall(raw)
+ err = list.Unmarshall(raw)
if err != nil {
return nil, err
}
@@ -118,8 +132,11 @@ func (s *store) MatchTerms(field index.Field) (posting.List, error) {
}
func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) {
- iter, found := s.Iterator(fieldKey, opts, modelv2.QueryOrder_SORT_ASC)
- if !found {
+ iter, err := s.Iterator(fieldKey, opts, modelv2.QueryOrder_SORT_ASC)
+ if err != nil {
+ return roaring.EmptyPostingList, err
+ }
+ if iter == nil {
return roaring.EmptyPostingList, nil
}
list = roaring.NewPostingList()
@@ -131,7 +148,7 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti
}
func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
- order modelv2.QueryOrder_Sort) (index.FieldIterator, bool) {
+ order modelv2.QueryOrder_Sort) (index.FieldIterator, error) {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
tt := []*memTable{s.memTable, s.immutableMemTable}
@@ -140,48 +157,57 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
if table == nil {
continue
}
- it, found := table.Iterator(fieldKey, termRange, order)
- if !found {
- continue
- }
- iters = append(iters, it)
- }
- it := index.NewFieldIteratorTemplate(fieldKey, termRange, order, s.diskTable, func(term, val []byte, delegated kv.Iterator) (*index.PostingValue, error) {
- list := roaring.NewPostingList()
- err := list.Unmarshall(val)
+ it, err := table.Iterator(fieldKey, termRange, order)
if err != nil {
return nil, err
}
-
- pv := &index.PostingValue{
- Term: term,
- Value: list,
+ if it == nil {
+ continue
}
-
- for ; delegated.Valid(); delegated.Next() {
- f := index.Field{}
- err := f.Unmarshal(delegated.Key())
+ iters = append(iters, it)
+ }
+ it, err := index.NewFieldIteratorTemplate(fieldKey, termRange, order, s.diskTable, s.termMetadata,
+ func(term, val []byte, delegated kv.Iterator) (*index.PostingValue, error) {
+ list := roaring.NewPostingList()
+ err := list.Unmarshall(val)
if err != nil {
return nil, err
}
- if !bytes.Equal(f.Term, term) {
- break
- }
- l := roaring.NewPostingList()
- err = l.Unmarshall(delegated.Val())
- if err != nil {
- return nil, err
+
+ pv := &index.PostingValue{
+ Term: term,
+ Value: list,
}
- err = pv.Value.Union(l)
- if err != nil {
- return nil, err
+
+ for ; delegated.Valid(); delegated.Next() {
+ f := index.Field{
+ Key: fieldKey,
+ }
+ err := f.Unmarshal(s.termMetadata, delegated.Key())
+ if err != nil {
+ return nil, err
+ }
+ if !bytes.Equal(f.Term, term) {
+ break
+ }
+ l := roaring.NewPostingList()
+ err = l.Unmarshall(delegated.Val())
+ if err != nil {
+ return nil, err
+ }
+ err = pv.Value.Union(l)
+ if err != nil {
+ return nil, err
+ }
}
- }
- return pv, nil
- })
+ return pv, nil
+ })
+ if err != nil {
+ return nil, err
+ }
iters = append(iters, it)
if len(iters) < 1 {
- return nil, false
+ return nil, nil
}
var fn index.SwitchFn
switch order {
@@ -194,7 +220,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
return bytes.Compare(a, b) < 0
}
}
- return index.NewMergedIterator(iters, fn), true
+ return index.NewMergedIterator(iters, fn), nil
}
func (s *store) searchInMemTables(result posting.List, entityFunc entityFunc) (posting.List, error) {
diff --git a/pkg/index/inverted/inverted_test.go b/pkg/index/inverted/inverted_test.go
index 88a427b..f08e69f 100644
--- a/pkg/index/inverted/inverted_test.go
+++ b/pkg/index/inverted/inverted_test.go
@@ -24,7 +24,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
- "github.com/apache/skywalking-banyandb/pkg/index/test_cases"
+ "github.com/apache/skywalking-banyandb/pkg/index/testcases"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test"
)
@@ -41,8 +41,8 @@ func TestStore_MatchTerm(t *testing.T) {
fn()
}()
tester.NoError(err)
- test_cases.SetUp(tester, s)
- test_cases.RunServiceName(t, s)
+ testcases.SetUp(tester, s)
+ testcases.RunServiceName(t, s)
}
func TestStore_MatchTerm_AfterFlush(t *testing.T) {
@@ -57,9 +57,9 @@ func TestStore_MatchTerm_AfterFlush(t *testing.T) {
fn()
}()
tester.NoError(err)
- test_cases.SetUp(tester, s)
+ testcases.SetUp(tester, s)
tester.NoError(s.(*store).Flush())
- test_cases.RunServiceName(t, s)
+ testcases.RunServiceName(t, s)
}
func TestStore_Iterator(t *testing.T) {
@@ -74,8 +74,8 @@ func TestStore_Iterator(t *testing.T) {
fn()
}()
tester.NoError(err)
- data := test_cases.SetUpDuration(tester, s)
- test_cases.RunDuration(t, data, s)
+ data := testcases.SetUpDuration(tester, s)
+ testcases.RunDuration(t, data, s)
}
func TestStore_Iterator_AfterFlush(t *testing.T) {
@@ -90,9 +90,9 @@ func TestStore_Iterator_AfterFlush(t *testing.T) {
fn()
}()
tester.NoError(err)
- data := test_cases.SetUpDuration(tester, s)
+ data := testcases.SetUpDuration(tester, s)
tester.NoError(s.(*store).Flush())
- test_cases.RunDuration(t, data, s)
+ testcases.RunDuration(t, data, s)
}
func TestStore_Iterator_Hybrid(t *testing.T) {
@@ -114,7 +114,7 @@ func TestStore_Iterator_Hybrid(t *testing.T) {
1000: nil,
2000: roaring.NewPostingList(),
}
- data1 := test_cases.SetUpPartialDuration(tester, s, r)
+ data1 := testcases.SetUpPartialDuration(tester, s, r)
tester.NoError(s.(*store).Flush())
r = map[int]posting.List{
50: nil,
@@ -123,13 +123,13 @@ func TestStore_Iterator_Hybrid(t *testing.T) {
1000: roaring.NewPostingList(),
2000: nil,
}
- data := test_cases.SetUpPartialDuration(tester, s, r)
+ data := testcases.SetUpPartialDuration(tester, s, r)
for i, list := range data {
if list == nil {
data[i] = data1[i]
}
}
- test_cases.RunDuration(t, data, s)
+ testcases.RunDuration(t, data, s)
}
func setUp(t *assert.Assertions) (tempDir string, deferFunc func()) {
diff --git a/pkg/index/inverted/mem.go b/pkg/index/inverted/mem.go
index cd19027..dc83c73 100644
--- a/pkg/index/inverted/mem.go
+++ b/pkg/index/inverted/mem.go
@@ -27,6 +27,7 @@ import (
modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
)
@@ -46,8 +47,8 @@ func newMemTable() *memTable {
}
}
-func (m *memTable) Write(field index.Field, chunkID common.ItemID) error {
- return m.fields.put(field, chunkID)
+func (m *memTable) Write(field index.Field, itemID common.ItemID) error {
+ return m.fields.put(field, itemID)
}
var _ index.FieldIterator = (*fIterator)(nil)
@@ -93,10 +94,10 @@ func newFieldIterator(keys [][]byte, fValue *termMap) index.FieldIterator {
}
func (m *memTable) Iterator(fieldKey index.FieldKey, rangeOpts index.RangeOpts,
- order modelv2.QueryOrder_Sort) (iter index.FieldIterator, found bool) {
- fieldsValues, ok := m.fields.get(fieldKey.Marshal())
+ order modelv2.QueryOrder_Sort) (iter index.FieldIterator, err error) {
+ fieldsValues, ok := m.fields.get(fieldKey)
if !ok {
- return nil, false
+ return nil, nil
}
fValue := fieldsValues.value
var terms [][]byte
@@ -110,7 +111,7 @@ func (m *memTable) Iterator(fieldKey index.FieldKey, rangeOpts index.RangeOpts,
}
}
if len(terms) < 1 {
- return nil, false
+ return nil, nil
}
switch order {
case modelv2.QueryOrder_SORT_ASC, modelv2.QueryOrder_SORT_UNSPECIFIED:
@@ -122,7 +123,7 @@ func (m *memTable) Iterator(fieldKey index.FieldKey, rangeOpts index.RangeOpts,
return bytes.Compare(terms[i], terms[j]) > 0
})
}
- return newFieldIterator(terms, fValue), true
+ return newFieldIterator(terms, fValue), nil
}
func (m *memTable) MatchTerms(field index.Field) (posting.List, error) {
@@ -140,13 +141,14 @@ func (m *memTable) MatchTerms(field index.Field) (posting.List, error) {
var _ kv.Iterator = (*flushIterator)(nil)
type flushIterator struct {
- fieldIdx int
- termIdx int
- key []byte
- value []byte
- fields *fieldMap
- valid bool
- err error
+ fieldIdx int
+ termIdx int
+ key []byte
+ value []byte
+ fields *fieldMap
+ valid bool
+ err error
+ termMetadata metadata.Term
}
func (i *flushIterator) Next() {
@@ -216,15 +218,21 @@ func (i *flushIterator) setCurr() bool {
return false
}
i.value = v
- i.key = index.Field{
+ f := index.Field{
Key: term.key,
Term: value.Term,
- }.Marshal()
+ }
+ i.key, err = f.Marshal(i.termMetadata)
+ if err != nil {
+ i.err = multierr.Append(i.err, err)
+ return false
+ }
return true
}
-func (m *memTable) Iter() kv.Iterator {
+func (m *memTable) Iter(termMetadata metadata.Term) kv.Iterator {
return &flushIterator{
- fields: m.fields,
+ fields: m.fields,
+ termMetadata: termMetadata,
}
}
diff --git a/pkg/index/inverted/mem_test.go b/pkg/index/inverted/mem_test.go
index 9e7c4eb..7d9a3e4 100644
--- a/pkg/index/inverted/mem_test.go
+++ b/pkg/index/inverted/mem_test.go
@@ -22,17 +22,17 @@ import (
"github.com/stretchr/testify/assert"
- "github.com/apache/skywalking-banyandb/pkg/index/test_cases"
+ "github.com/apache/skywalking-banyandb/pkg/index/testcases"
)
func TestMemTable_MatchTerm(t *testing.T) {
mt := newMemTable()
- test_cases.SetUp(assert.New(t), mt)
- test_cases.RunServiceName(t, mt)
+ testcases.SetUp(assert.New(t), mt)
+ testcases.RunServiceName(t, mt)
}
func TestMemTable_Iterator(t *testing.T) {
mt := newMemTable()
- data := test_cases.SetUpDuration(assert.New(t), mt)
- test_cases.RunDuration(t, data, mt)
+ data := testcases.SetUpDuration(assert.New(t), mt)
+ testcases.RunDuration(t, data, mt)
}
diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go
index ef41c92..3fbeca7 100644
--- a/pkg/index/iterator.go
+++ b/pkg/index/iterator.go
@@ -19,44 +19,50 @@ package index
import (
"bytes"
+ "math"
"go.uber.org/multierr"
modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
"github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/index/metadata"
)
type CompositePostingValueFn = func(term, value []byte, delegated kv.Iterator) (*PostingValue, error)
var _ FieldIterator = (*FieldIteratorTemplate)(nil)
+var DefaultUpper = convert.Uint64ToBytes(math.MaxUint64)
+var DefaultLower = convert.Uint64ToBytes(0)
type FieldIteratorTemplate struct {
delegated kv.Iterator
- init bool
- curr *PostingValue
- err error
- termRange RangeOpts
- fn CompositePostingValueFn
- reverse bool
- field Field
+ init bool
+ curr *PostingValue
+ err error
+ termRange RangeOpts
+ fn CompositePostingValueFn
+ reverse bool
+ seekKey []byte
+ fieldKeyBytes []byte
+ parseKeyFn func(key []byte) (Field, error)
}
func (f *FieldIteratorTemplate) Next() bool {
if !f.init {
f.init = true
- f.delegated.Seek(f.field.Marshal())
+ f.delegated.Seek(f.seekKey)
}
if !f.delegated.Valid() {
return false
}
- field := &Field{}
- err := field.Unmarshal(f.delegated.Key())
+ field, err := f.parseKeyFn(f.delegated.Key())
if err != nil {
f.err = err
return false
}
- if !bytes.Equal(field.Key, f.field.Key) {
+ if !bytes.Equal(field.Key.Marshal(), f.fieldKeyBytes) {
return false
}
pv, err := f.fn(field.Term, f.delegated.Val(), f.delegated)
@@ -69,15 +75,13 @@ func (f *FieldIteratorTemplate) Next() bool {
case in > 0:
if f.reverse {
return f.Next()
- } else {
- return false
}
+ return false
case in < 0:
if f.reverse {
return false
- } else {
- return f.Next()
}
+ return f.Next()
}
f.curr = pv
return true
@@ -88,10 +92,17 @@ func (f *FieldIteratorTemplate) Val() *PostingValue {
}
func (f *FieldIteratorTemplate) Close() error {
- return f.delegated.Close()
+ return multierr.Append(f.err, f.delegated.Close())
}
-func NewFieldIteratorTemplate(fieldKey FieldKey, termRange RangeOpts, order modelv2.QueryOrder_Sort, iterable kv.Iterable, fn CompositePostingValueFn) *FieldIteratorTemplate {
+func NewFieldIteratorTemplate(fieldKey FieldKey, termRange RangeOpts, order modelv2.QueryOrder_Sort, iterable kv.Iterable,
+ metadata metadata.Term, fn CompositePostingValueFn) (*FieldIteratorTemplate, error) {
+ if termRange.Upper == nil {
+ termRange.Upper = DefaultUpper
+ }
+ if termRange.Lower == nil {
+ termRange.Lower = DefaultLower
+ }
var reverse bool
var term []byte
switch order {
@@ -109,20 +120,32 @@ func NewFieldIteratorTemplate(fieldKey FieldKey, termRange RangeOpts, order mode
Prefix: fieldKey.Marshal(),
Reverse: reverse,
})
- iter = iterable.NewIterator(kv.ScanOpts{
- Prefix: fieldKey.Marshal(),
- Reverse: reverse,
- })
+ field := Field{
+ Key: fieldKey,
+ Term: term,
+ }
+ seekKey, err := field.Marshal(metadata)
+ if err != nil {
+ return nil, err
+ }
return &FieldIteratorTemplate{
delegated: iter,
termRange: termRange,
fn: fn,
reverse: reverse,
- field: Field{
- Key: fieldKey.Marshal(),
- Term: term,
+ parseKeyFn: func(key []byte) (Field, error) {
+ f := &Field{
+ Key: fieldKey,
+ }
+ errParseKey := f.Unmarshal(metadata, key)
+ if errParseKey != nil {
+ return *f, err
+ }
+ return *f, nil
},
- }
+ seekKey: seekKey,
+ fieldKeyBytes: fieldKey.Marshal(),
+ }, nil
}
type SwitchFn = func(a, b []byte) bool
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index 14961a8..288d16c 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -22,13 +22,15 @@ import (
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
var _ index.Store = (*store)(nil)
type store struct {
- lsm kv.Store
+ lsm kv.Store
+ termMetadata metadata.Term
}
func (s *store) Close() error {
@@ -36,8 +38,12 @@ func (s *store) Close() error {
}
func (s *store) Write(field index.Field, itemID common.ItemID) error {
+ f, err := field.Marshal(s.termMetadata)
+ if err != nil {
+ return err
+ }
itemIDInt := uint64(itemID)
- return s.lsm.PutWithVersion(field.Marshal(), convert.Uint64ToBytes(itemIDInt), itemIDInt)
+ return s.lsm.PutWithVersion(f, convert.Uint64ToBytes(itemIDInt), itemIDInt)
}
type StoreOpts struct {
@@ -48,10 +54,18 @@ type StoreOpts struct {
func NewStore(opts StoreOpts) (index.Store, error) {
var err error
var lsm kv.Store
- if lsm, err = kv.OpenStore(0, opts.Path, kv.StoreWithLogger(opts.Logger)); err != nil {
+ if lsm, err = kv.OpenStore(0, opts.Path+"/lsm", kv.StoreWithLogger(opts.Logger)); err != nil {
+ return nil, err
+ }
+ var md metadata.Term
+ if md, err = metadata.NewTerm(metadata.TermOpts{
+ Path: opts.Path + "/tmd",
+ Logger: opts.Logger,
+ }); err != nil {
return nil, err
}
return &store{
- lsm: lsm,
+ lsm: lsm,
+ termMetadata: md,
}, nil
}
diff --git a/pkg/index/lsm/lsm_test.go b/pkg/index/lsm/lsm_test.go
index 2e8f8f8..374d8ef 100644
--- a/pkg/index/lsm/lsm_test.go
+++ b/pkg/index/lsm/lsm_test.go
@@ -22,7 +22,7 @@ import (
"github.com/stretchr/testify/assert"
- "github.com/apache/skywalking-banyandb/pkg/index/test_cases"
+ "github.com/apache/skywalking-banyandb/pkg/index/testcases"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test"
)
@@ -39,8 +39,8 @@ func TestStore_MatchTerm(t *testing.T) {
fn()
}()
tester.NoError(err)
- test_cases.SetUp(tester, s)
- test_cases.RunServiceName(t, s)
+ testcases.SetUp(tester, s)
+ testcases.RunServiceName(t, s)
}
func TestStore_Iterator(t *testing.T) {
@@ -55,8 +55,8 @@ func TestStore_Iterator(t *testing.T) {
fn()
}()
tester.NoError(err)
- data := test_cases.SetUpDuration(tester, s)
- test_cases.RunDuration(t, data, s)
+ data := testcases.SetUpDuration(tester, s)
+ testcases.RunDuration(t, data, s)
}
func setUp(t *assert.Assertions) (tempDir string, deferFunc func()) {
diff --git a/pkg/index/lsm/search.go b/pkg/index/lsm/search.go
index 53bf0aa..5a6e344 100644
--- a/pkg/index/lsm/search.go
+++ b/pkg/index/lsm/search.go
@@ -37,8 +37,12 @@ func (s *store) MatchField(fieldKey index.FieldKey) (list posting.List, err erro
}
func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
+ f, err := field.Marshal(s.termMetadata)
+ if err != nil {
+ return nil, err
+ }
list = roaring.NewPostingList()
- err = s.lsm.GetAll(field.Marshal(), func(itemID []byte) error {
+ err = s.lsm.GetAll(f, func(itemID []byte) error {
list.Insert(common.ItemID(convert.BytesToUint64(itemID)))
return nil
})
@@ -49,9 +53,9 @@ func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
}
func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) {
- iter, found := s.Iterator(fieldKey, opts, modelv2.QueryOrder_SORT_ASC)
- if !found {
- return roaring.EmptyPostingList, nil
+ iter, err := s.Iterator(fieldKey, opts, modelv2.QueryOrder_SORT_ASC)
+ if err != nil {
+ return roaring.EmptyPostingList, err
}
list = roaring.NewPostingList()
for iter.Next() {
@@ -61,8 +65,8 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti
return
}
-func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv2.QueryOrder_Sort) (index.FieldIterator, bool) {
- return index.NewFieldIteratorTemplate(fieldKey, termRange, order, s.lsm, func(term, value []byte, delegated kv.Iterator) (*index.PostingValue, error) {
+func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv2.QueryOrder_Sort) (index.FieldIterator, error) {
+ return index.NewFieldIteratorTemplate(fieldKey, termRange, order, s.lsm, s.termMetadata, func(term, value []byte, delegated kv.Iterator) (*index.PostingValue, error) {
pv := &index.PostingValue{
Term: term,
Value: roaring.NewPostingListWithInitialData(convert.BytesToUint64(value)),
@@ -70,7 +74,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord
for ; delegated.Valid(); delegated.Next() {
f := index.Field{}
- err := f.Unmarshal(delegated.Key())
+ err := f.Unmarshal(s.termMetadata, delegated.Key())
if err != nil {
return nil, err
}
@@ -80,5 +84,5 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord
pv.Value.Insert(common.ItemID(convert.BytesToUint64(delegated.Val())))
}
return pv, nil
- }), true
+ })
}
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/metadata/metadata.go
similarity index 59%
copy from pkg/index/lsm/lsm.go
copy to pkg/index/metadata/metadata.go
index 14961a8..750e3db 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/metadata/metadata.go
@@ -15,43 +15,52 @@
// specific language governing permissions and limitations
// under the License.
-package lsm
+package metadata
import (
- "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/pkg/errors"
+
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/convert"
- "github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
-var _ index.Store = (*store)(nil)
-
-type store struct {
- lsm kv.Store
+type Term interface {
+ ID(term []byte) (id []byte, err error)
+ Literal(id []byte) (term []byte, err error)
}
-func (s *store) Close() error {
- return s.lsm.Close()
-}
+var _ Term = (*term)(nil)
-func (s *store) Write(field index.Field, itemID common.ItemID) error {
- itemIDInt := uint64(itemID)
- return s.lsm.PutWithVersion(field.Marshal(), convert.Uint64ToBytes(itemIDInt), itemIDInt)
+type term struct {
+ store kv.Store
}
-type StoreOpts struct {
+type TermOpts struct {
Path string
Logger *logger.Logger
}
-func NewStore(opts StoreOpts) (index.Store, error) {
+func NewTerm(opts TermOpts) (Term, error) {
+ var store kv.Store
var err error
- var lsm kv.Store
- if lsm, err = kv.OpenStore(0, opts.Path, kv.StoreWithLogger(opts.Logger)); err != nil {
+ if store, err = kv.OpenStore(0, opts.Path, kv.StoreWithNamedLogger("term_metadata", opts.Logger)); err != nil {
return nil, err
}
- return &store{
- lsm: lsm,
+ return &term{
+ store: store,
}, nil
}
+
+func (t *term) ID(term []byte) (id []byte, err error) {
+ id = convert.Uint64ToBytes(convert.Hash(term))
+ _, err = t.store.Get(id)
+ if errors.Is(err, kv.ErrKeyNotFound) {
+ return id, t.store.Put(id, term)
+ }
+ return id, nil
+}
+
+func (t *term) Literal(id []byte) (term []byte, err error) {
+ return t.store.Get(id)
+}
diff --git a/pkg/index/test_cases/duration.go b/pkg/index/testcases/duration.go
similarity index 91%
rename from pkg/index/test_cases/duration.go
rename to pkg/index/testcases/duration.go
index e6277c2..59cf50a 100644
--- a/pkg/index/test_cases/duration.go
+++ b/pkg/index/testcases/duration.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package test_cases
+package testcases
import (
"sort"
@@ -34,7 +34,8 @@ import (
var (
duration = index.FieldKey{
- IndexRule: "duration",
+ //duration
+ IndexRuleID: 3,
}
)
@@ -71,14 +72,14 @@ func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) {
},
want: []int{50, 200, 500, 1000, 2000},
},
- //{
- // name: "sort in desc order",
- // args: args{
- // fieldKey: duration,
- // orderType: modelv2.QueryOrder_SORT_DESC,
- // },
- // want: []int{2000, 1000, 500, 200, 50},
- //},
+ {
+ name: "sort in desc order",
+ args: args{
+ fieldKey: duration,
+ orderType: modelv2.QueryOrder_SORT_DESC,
+ },
+ want: []int{2000, 1000, 500, 200, 50},
+ },
{
name: "scan in (lower, upper) and sort in asc order",
args: args{
@@ -143,18 +144,18 @@ func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) {
},
want: []int{200, 500, 1000, 2000},
},
- //{
- // name: "scan in [lower, undefined) and sort in desc order",
- // args: args{
- // fieldKey: duration,
- // orderType: modelv2.QueryOrder_SORT_DESC,
- // termRange: index.RangeOpts{
- // Lower: convert.Int64ToBytes(200),
- // IncludesLower: true,
- // },
- // },
- // want: []int{2000, 1000, 500, 200},
- //},
+ {
+ name: "scan in [lower, undefined) and sort in desc order",
+ args: args{
+ fieldKey: duration,
+ orderType: modelv2.QueryOrder_SORT_DESC,
+ termRange: index.RangeOpts{
+ Lower: convert.Int64ToBytes(200),
+ IncludesLower: true,
+ },
+ },
+ want: []int{2000, 1000, 500, 200},
+ },
{
name: "scan in (undefined, upper] and sort in asc order",
args: args{
@@ -239,7 +240,7 @@ func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) {
name: "unknown field key",
args: args{
fieldKey: index.FieldKey{
- IndexRule: "unknown",
+ IndexRuleID: 0,
},
},
},
@@ -263,8 +264,9 @@ func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- iter, found := store.Iterator(tt.args.fieldKey, tt.args.termRange, tt.args.orderType)
- if !found {
+ iter, err := store.Iterator(tt.args.fieldKey, tt.args.termRange, tt.args.orderType)
+ is.NoError(err)
+ if iter == nil {
tester.Empty(tt.want)
return
}
@@ -322,7 +324,7 @@ func SetUpDuration(t *assert.Assertions, store index.Writer) map[int]posting.Lis
func SetUpPartialDuration(t *assert.Assertions, store index.Writer, r map[int]posting.List) map[int]posting.List {
idx := make([]int, 0, len(r))
- for key, _ := range r {
+ for key := range r {
idx = append(idx, key)
}
sort.Ints(idx)
@@ -333,7 +335,7 @@ func SetUpPartialDuration(t *assert.Assertions, store index.Writer, r map[int]po
continue
}
t.NoError(store.Write(index.Field{
- Key: duration.Marshal(),
+ Key: duration,
Term: convert.Int64ToBytes(int64(term)),
}, id))
r[term].Insert(id)
diff --git a/pkg/index/test_cases/service_name.go b/pkg/index/testcases/service_name.go
similarity index 97%
rename from pkg/index/test_cases/service_name.go
rename to pkg/index/testcases/service_name.go
index 40a0714..c6e174d 100644
--- a/pkg/index/test_cases/service_name.go
+++ b/pkg/index/testcases/service_name.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package test_cases
+package testcases
import (
"testing"
@@ -29,8 +29,10 @@ import (
)
var serviceName = index.FieldKey{
- IndexRule: "service_name",
-}.Marshal()
+ // http_method
+ IndexRuleID: 6,
+ EncodeTerm: true,
+}
func RunServiceName(t *testing.T, store SimpleStore) {
tester := assert.New(t)
diff --git a/pkg/index/tree.go b/pkg/index/tree.go
index 1f5bda1..6ac7022 100644
--- a/pkg/index/tree.go
+++ b/pkg/index/tree.go
@@ -296,7 +296,7 @@ type eq struct {
func (eq *eq) Execute() (posting.List, error) {
return eq.searcher.MatchTerms(Field{
- Key: eq.Key.Marshal(),
+ Key: eq.Key,
Term: bytes.Join(eq.Values, nil),
})
}