You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/08/04 07:38:39 UTC
[skywalking-banyandb] branch main updated: Introduce index module
with memtable only (#26)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new e2627b7 Introduce index module with memtable only (#26)
e2627b7 is described below
commit e2627b7a6a53f8485aa051b38536e566c9a1eb39
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Aug 4 15:38:32 2021 +0800
Introduce index module with memtable only (#26)
* Add series event
Signed-off-by: Gao Hongtao <ha...@gmail.com>
* Use build method to build topics
Signed-off-by: Gao Hongtao <ha...@gmail.com>
* Introduce posting list
Signed-off-by: Gao Hongtao <ha...@gmail.com>
* Add name to IndexObject
Signed-off-by: Gao Hongtao <ha...@gmail.com>
* Add mem table
Signed-off-by: Gao Hongtao <ha...@gmail.com>
* Index memtable
Signed-off-by: Gao Hongtao <ha...@gmail.com>
* Test search
Signed-off-by: Gao Hongtao <ha...@gmail.com>
* Fix some nits
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
api/common/metadata.go | 17 +
api/event/discovery.go | 9 +-
api/proto/banyandb/v1/schema.pb.go | 246 ++++++++++++--
api/proto/banyandb/v1/schema.proto | 13 +-
banyand/index/index.go | 203 +++++++++++-
banyand/index/index_test.go | 185 +++++++++++
banyand/index/search.go | 356 +++++++++++++++++++++
banyand/index/search_test.go | 255 +++++++++++++++
.../posting.go => banyand/index/tsdb/field_map.go | 67 ++--
banyand/index/tsdb/mem.go | 91 ++++++
banyand/index/tsdb/mem_test.go | 261 +++++++++++++++
banyand/index/tsdb/term_map.go | 143 +++++++++
banyand/index/tsdb/tsdb.go | 65 ++++
banyand/internal/cmd/standalone.go | 4 +-
banyand/query/processor_test.go | 11 +-
banyand/series/trace/common_test.go | 7 +-
banyand/series/trace/service.go | 147 +++++++++
banyand/series/trace/trace.go | 93 +-----
banyand/series/trace/write.go | 45 +++
pkg/convert/number.go | 4 +
pkg/posting/posting.go | 2 +
pkg/posting/roaring/roaring.go | 12 +
22 files changed, 2064 insertions(+), 172 deletions(-)
diff --git a/api/common/metadata.go b/api/common/metadata.go
index 93ea657..8706aaf 100644
--- a/api/common/metadata.go
+++ b/api/common/metadata.go
@@ -33,3 +33,20 @@ func (md Metadata) Equal(other Metadata) bool {
md.Spec.Group == other.Spec.Group &&
md.Spec.Name == other.Spec.Name
}
+
+func NewMetadata(spec *v1.Metadata) *Metadata {
+ return &Metadata{
+ KindVersion: MetadataKindVersion,
+ Spec: spec,
+ }
+}
+
+func NewMetadataByNameAndGroup(name, group string) *Metadata {
+ return &Metadata{
+ KindVersion: MetadataKindVersion,
+ Spec: &v1.Metadata{
+ Name: name,
+ Group: group,
+ },
+ }
+}
diff --git a/api/event/discovery.go b/api/event/discovery.go
index a38298c..10476b8 100644
--- a/api/event/discovery.go
+++ b/api/event/discovery.go
@@ -33,7 +33,9 @@ var (
Version: "v1",
Kind: "event-series",
}
- TopicSeriesEvent = bus.UniTopic(SeriesEventKindVersion.String())
+ TopicSeriesEvent = bus.UniTopic(SeriesEventKindVersion.String())
+ IndexRuleKindVersion = common.KindVersion{Version: "v1", Kind: "index-rule"}
+ TopicIndexRule = bus.UniTopic(IndexRuleKindVersion.String())
)
type Shard struct {
@@ -45,3 +47,8 @@ type Series struct {
common.KindVersion
Payload v1.SeriesEvent
}
+
+type IndexRule struct {
+ common.KindVersion
+ Payload *v1.IndexRuleEvent
+}
diff --git a/api/proto/banyandb/v1/schema.pb.go b/api/proto/banyandb/v1/schema.pb.go
index 69f0218..56f6144 100644
--- a/api/proto/banyandb/v1/schema.pb.go
+++ b/api/proto/banyandb/v1/schema.pb.go
@@ -978,6 +978,132 @@ func (x *IndexRuleBinding) GetUpdatedAt() *timestamppb.Timestamp {
return nil
}
+type IndexRuleEvent struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Series *Metadata `protobuf:"bytes,1,opt,name=series,proto3" json:"series,omitempty"`
+ Rules []*IndexRuleEvent_ShardedIndexRule `protobuf:"bytes,2,rep,name=rules,proto3" json:"rules,omitempty"`
+ Action Action `protobuf:"varint,4,opt,name=action,proto3,enum=banyandb.v1.Action" json:"action,omitempty"`
+ Time *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=time,proto3" json:"time,omitempty"`
+}
+
+func (x *IndexRuleEvent) Reset() {
+ *x = IndexRuleEvent{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_banyandb_v1_schema_proto_msgTypes[10]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *IndexRuleEvent) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*IndexRuleEvent) ProtoMessage() {}
+
+func (x *IndexRuleEvent) ProtoReflect() protoreflect.Message {
+ mi := &file_banyandb_v1_schema_proto_msgTypes[10]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use IndexRuleEvent.ProtoReflect.Descriptor instead.
+func (*IndexRuleEvent) Descriptor() ([]byte, []int) {
+ return file_banyandb_v1_schema_proto_rawDescGZIP(), []int{10}
+}
+
+func (x *IndexRuleEvent) GetSeries() *Metadata {
+ if x != nil {
+ return x.Series
+ }
+ return nil
+}
+
+func (x *IndexRuleEvent) GetRules() []*IndexRuleEvent_ShardedIndexRule {
+ if x != nil {
+ return x.Rules
+ }
+ return nil
+}
+
+func (x *IndexRuleEvent) GetAction() Action {
+ if x != nil {
+ return x.Action
+ }
+ return Action_ACTION_UNSPECIFIED
+}
+
+func (x *IndexRuleEvent) GetTime() *timestamppb.Timestamp {
+ if x != nil {
+ return x.Time
+ }
+ return nil
+}
+
+type IndexRuleEvent_ShardedIndexRule struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ ShardId uint64 `protobuf:"varint,1,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"`
+ Rules []*IndexRule `protobuf:"bytes,2,rep,name=rules,proto3" json:"rules,omitempty"`
+}
+
+func (x *IndexRuleEvent_ShardedIndexRule) Reset() {
+ *x = IndexRuleEvent_ShardedIndexRule{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_banyandb_v1_schema_proto_msgTypes[11]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *IndexRuleEvent_ShardedIndexRule) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*IndexRuleEvent_ShardedIndexRule) ProtoMessage() {}
+
+func (x *IndexRuleEvent_ShardedIndexRule) ProtoReflect() protoreflect.Message {
+ mi := &file_banyandb_v1_schema_proto_msgTypes[11]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use IndexRuleEvent_ShardedIndexRule.ProtoReflect.Descriptor instead.
+func (*IndexRuleEvent_ShardedIndexRule) Descriptor() ([]byte, []int) {
+ return file_banyandb_v1_schema_proto_rawDescGZIP(), []int{10, 0}
+}
+
+func (x *IndexRuleEvent_ShardedIndexRule) GetShardId() uint64 {
+ if x != nil {
+ return x.ShardId
+ }
+ return 0
+}
+
+func (x *IndexRuleEvent_ShardedIndexRule) GetRules() []*IndexRule {
+ if x != nil {
+ return x.Rules
+ }
+ return nil
+}
+
var File_banyandb_v1_schema_proto protoreflect.FileDescriptor
var file_banyandb_v1_schema_proto_rawDesc = []byte{
@@ -1119,7 +1245,27 @@ var file_banyandb_v1_schema_proto_rawDesc = []byte{
0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69,
0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64,
- 0x41, 0x74, 0x42, 0x60, 0x0a, 0x1e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
+ 0x41, 0x74, 0x22, 0xbd, 0x02, 0x0a, 0x0e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65,
+ 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x2d, 0x0a, 0x06, 0x73, 0x65, 0x72, 0x69, 0x65, 0x73, 0x18,
+ 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
+ 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x06, 0x73, 0x65,
+ 0x72, 0x69, 0x65, 0x73, 0x12, 0x42, 0x0a, 0x05, 0x72, 0x75, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20,
+ 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x76,
+ 0x31, 0x2e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74,
+ 0x2e, 0x53, 0x68, 0x61, 0x72, 0x64, 0x65, 0x64, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c,
+ 0x65, 0x52, 0x05, 0x72, 0x75, 0x6c, 0x65, 0x73, 0x12, 0x2b, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69,
+ 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x13, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61,
+ 0x6e, 0x64, 0x62, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x06, 0x61,
+ 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2e, 0x0a, 0x04, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20,
+ 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
+ 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52,
+ 0x04, 0x74, 0x69, 0x6d, 0x65, 0x1a, 0x5b, 0x0a, 0x10, 0x53, 0x68, 0x61, 0x72, 0x64, 0x65, 0x64,
+ 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x68, 0x61,
+ 0x72, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x73, 0x68, 0x61,
+ 0x72, 0x64, 0x49, 0x64, 0x12, 0x2c, 0x0a, 0x05, 0x72, 0x75, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20,
+ 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x76,
+ 0x31, 0x2e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x52, 0x05, 0x72, 0x75, 0x6c,
+ 0x65, 0x73, 0x42, 0x60, 0x0a, 0x1e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79,
0x61, 0x6e, 0x64, 0x62, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d,
0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69,
@@ -1141,52 +1287,60 @@ func file_banyandb_v1_schema_proto_rawDescGZIP() []byte {
}
var file_banyandb_v1_schema_proto_enumTypes = make([]protoimpl.EnumInfo, 4)
-var file_banyandb_v1_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 10)
+var file_banyandb_v1_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 12)
var file_banyandb_v1_schema_proto_goTypes = []interface{}{
- (Duration_Duration)(0), // 0: banyandb.v1.Duration.Duration
- (FieldSpec_FieldType)(0), // 1: banyandb.v1.FieldSpec.FieldType
- (IndexObject_IndexType)(0), // 2: banyandb.v1.IndexObject.IndexType
- (Series_Catalog)(0), // 3: banyandb.v1.Series.Catalog
- (*ShardInfo)(nil), // 4: banyandb.v1.ShardInfo
- (*Duration)(nil), // 5: banyandb.v1.Duration
- (*FieldSpec)(nil), // 6: banyandb.v1.FieldSpec
- (*TraceStateMap)(nil), // 7: banyandb.v1.TraceStateMap
- (*TraceFieldMap)(nil), // 8: banyandb.v1.TraceFieldMap
- (*TraceSeries)(nil), // 9: banyandb.v1.TraceSeries
- (*IndexObject)(nil), // 10: banyandb.v1.IndexObject
- (*IndexRule)(nil), // 11: banyandb.v1.IndexRule
- (*Series)(nil), // 12: banyandb.v1.Series
- (*IndexRuleBinding)(nil), // 13: banyandb.v1.IndexRuleBinding
- (*Metadata)(nil), // 14: banyandb.v1.Metadata
- (*timestamppb.Timestamp)(nil), // 15: google.protobuf.Timestamp
+ (Duration_Duration)(0), // 0: banyandb.v1.Duration.Duration
+ (FieldSpec_FieldType)(0), // 1: banyandb.v1.FieldSpec.FieldType
+ (IndexObject_IndexType)(0), // 2: banyandb.v1.IndexObject.IndexType
+ (Series_Catalog)(0), // 3: banyandb.v1.Series.Catalog
+ (*ShardInfo)(nil), // 4: banyandb.v1.ShardInfo
+ (*Duration)(nil), // 5: banyandb.v1.Duration
+ (*FieldSpec)(nil), // 6: banyandb.v1.FieldSpec
+ (*TraceStateMap)(nil), // 7: banyandb.v1.TraceStateMap
+ (*TraceFieldMap)(nil), // 8: banyandb.v1.TraceFieldMap
+ (*TraceSeries)(nil), // 9: banyandb.v1.TraceSeries
+ (*IndexObject)(nil), // 10: banyandb.v1.IndexObject
+ (*IndexRule)(nil), // 11: banyandb.v1.IndexRule
+ (*Series)(nil), // 12: banyandb.v1.Series
+ (*IndexRuleBinding)(nil), // 13: banyandb.v1.IndexRuleBinding
+ (*IndexRuleEvent)(nil), // 14: banyandb.v1.IndexRuleEvent
+ (*IndexRuleEvent_ShardedIndexRule)(nil), // 15: banyandb.v1.IndexRuleEvent.ShardedIndexRule
+ (*Metadata)(nil), // 16: banyandb.v1.Metadata
+ (*timestamppb.Timestamp)(nil), // 17: google.protobuf.Timestamp
+ (Action)(0), // 18: banyandb.v1.Action
}
var file_banyandb_v1_schema_proto_depIdxs = []int32{
0, // 0: banyandb.v1.Duration.unit:type_name -> banyandb.v1.Duration.Duration
1, // 1: banyandb.v1.FieldSpec.type:type_name -> banyandb.v1.FieldSpec.FieldType
7, // 2: banyandb.v1.TraceFieldMap.state:type_name -> banyandb.v1.TraceStateMap
- 14, // 3: banyandb.v1.TraceSeries.metadata:type_name -> banyandb.v1.Metadata
+ 16, // 3: banyandb.v1.TraceSeries.metadata:type_name -> banyandb.v1.Metadata
6, // 4: banyandb.v1.TraceSeries.fields:type_name -> banyandb.v1.FieldSpec
8, // 5: banyandb.v1.TraceSeries.reserved_fields_map:type_name -> banyandb.v1.TraceFieldMap
4, // 6: banyandb.v1.TraceSeries.shard:type_name -> banyandb.v1.ShardInfo
5, // 7: banyandb.v1.TraceSeries.duration:type_name -> banyandb.v1.Duration
- 15, // 8: banyandb.v1.TraceSeries.updated_at:type_name -> google.protobuf.Timestamp
+ 17, // 8: banyandb.v1.TraceSeries.updated_at:type_name -> google.protobuf.Timestamp
2, // 9: banyandb.v1.IndexObject.type:type_name -> banyandb.v1.IndexObject.IndexType
- 14, // 10: banyandb.v1.IndexRule.metadata:type_name -> banyandb.v1.Metadata
+ 16, // 10: banyandb.v1.IndexRule.metadata:type_name -> banyandb.v1.Metadata
10, // 11: banyandb.v1.IndexRule.objects:type_name -> banyandb.v1.IndexObject
- 15, // 12: banyandb.v1.IndexRule.updated_at:type_name -> google.protobuf.Timestamp
+ 17, // 12: banyandb.v1.IndexRule.updated_at:type_name -> google.protobuf.Timestamp
3, // 13: banyandb.v1.Series.catalog:type_name -> banyandb.v1.Series.Catalog
- 14, // 14: banyandb.v1.Series.series:type_name -> banyandb.v1.Metadata
- 14, // 15: banyandb.v1.IndexRuleBinding.metadata:type_name -> banyandb.v1.Metadata
- 14, // 16: banyandb.v1.IndexRuleBinding.rule_ref:type_name -> banyandb.v1.Metadata
+ 16, // 14: banyandb.v1.Series.series:type_name -> banyandb.v1.Metadata
+ 16, // 15: banyandb.v1.IndexRuleBinding.metadata:type_name -> banyandb.v1.Metadata
+ 16, // 16: banyandb.v1.IndexRuleBinding.rule_ref:type_name -> banyandb.v1.Metadata
12, // 17: banyandb.v1.IndexRuleBinding.subjects:type_name -> banyandb.v1.Series
- 15, // 18: banyandb.v1.IndexRuleBinding.begin_at:type_name -> google.protobuf.Timestamp
- 15, // 19: banyandb.v1.IndexRuleBinding.expire_at:type_name -> google.protobuf.Timestamp
- 15, // 20: banyandb.v1.IndexRuleBinding.updated_at:type_name -> google.protobuf.Timestamp
- 21, // [21:21] is the sub-list for method output_type
- 21, // [21:21] is the sub-list for method input_type
- 21, // [21:21] is the sub-list for extension type_name
- 21, // [21:21] is the sub-list for extension extendee
- 0, // [0:21] is the sub-list for field type_name
+ 17, // 18: banyandb.v1.IndexRuleBinding.begin_at:type_name -> google.protobuf.Timestamp
+ 17, // 19: banyandb.v1.IndexRuleBinding.expire_at:type_name -> google.protobuf.Timestamp
+ 17, // 20: banyandb.v1.IndexRuleBinding.updated_at:type_name -> google.protobuf.Timestamp
+ 16, // 21: banyandb.v1.IndexRuleEvent.series:type_name -> banyandb.v1.Metadata
+ 15, // 22: banyandb.v1.IndexRuleEvent.rules:type_name -> banyandb.v1.IndexRuleEvent.ShardedIndexRule
+ 18, // 23: banyandb.v1.IndexRuleEvent.action:type_name -> banyandb.v1.Action
+ 17, // 24: banyandb.v1.IndexRuleEvent.time:type_name -> google.protobuf.Timestamp
+ 11, // 25: banyandb.v1.IndexRuleEvent.ShardedIndexRule.rules:type_name -> banyandb.v1.IndexRule
+ 26, // [26:26] is the sub-list for method output_type
+ 26, // [26:26] is the sub-list for method input_type
+ 26, // [26:26] is the sub-list for extension type_name
+ 26, // [26:26] is the sub-list for extension extendee
+ 0, // [0:26] is the sub-list for field type_name
}
func init() { file_banyandb_v1_schema_proto_init() }
@@ -1316,6 +1470,30 @@ func file_banyandb_v1_schema_proto_init() {
return nil
}
}
+ file_banyandb_v1_schema_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*IndexRuleEvent); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_banyandb_v1_schema_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*IndexRuleEvent_ShardedIndexRule); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
}
type x struct{}
out := protoimpl.TypeBuilder{
@@ -1323,7 +1501,7 @@ func file_banyandb_v1_schema_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_banyandb_v1_schema_proto_rawDesc,
NumEnums: 4,
- NumMessages: 10,
+ NumMessages: 12,
NumExtensions: 0,
NumServices: 0,
},
diff --git a/api/proto/banyandb/v1/schema.proto b/api/proto/banyandb/v1/schema.proto
index 273c155..aeb8e5a 100644
--- a/api/proto/banyandb/v1/schema.proto
+++ b/api/proto/banyandb/v1/schema.proto
@@ -184,4 +184,15 @@ message IndexRuleBinding {
google.protobuf.Timestamp expire_at = 5;
// updated_at_nanoseconds indicates when the IndexRuleBinding is updated
google.protobuf.Timestamp updated_at = 6;
-}
\ No newline at end of file
+}
+
+message IndexRuleEvent {
+ Metadata series = 1;
+ message ShardedIndexRule {
+ uint64 shard_id = 1;
+ repeated IndexRule rules = 2;
+ }
+ repeated ShardedIndexRule rules = 2;
+ Action action = 4;
+ google.protobuf.Timestamp time = 5;
+}
diff --git a/banyand/index/index.go b/banyand/index/index.go
index 6ab47f2..89122e9 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -19,29 +19,49 @@ package index
import (
"context"
+ "sync"
+
+ "github.com/pkg/errors"
+ "go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/api/event"
apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
- "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/banyand/index/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/posting"
"github.com/apache/skywalking-banyandb/pkg/run"
)
+var (
+ ErrShardNotFound = errors.New("series doesn't exist")
+ ErrTraceSeriesNotFound = errors.New("trace series not found")
+ ErrUnknownField = errors.New("the field is unknown")
+)
+
type Condition struct {
Key string
Values [][]byte
Op apiv1.PairQuery_BinaryOp
}
-//go:generate mockgen -destination=./index_mock.go -package=index . Repo
+type Field struct {
+ ChunkID common.ChunkID
+ Name string
+ Value []byte
+}
+
+//go:generate mockgen -destination=./index_mock.go -package=index . Service
type Repo interface {
Search(seriesMeta common.Metadata, shardID uint, startTime, endTime uint64, indexObjectName string, conditions []Condition) (posting.List, error)
+ Insert(seriesMeta common.Metadata, shardID uint, fields *Field) error
}
type Builder interface {
- run.Config
run.PreRunner
+ run.Service
}
type Service interface {
@@ -49,6 +69,179 @@ type Service interface {
Builder
}
-func NewService(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Service, error) {
- return nil, nil
+type series struct {
+ repo map[uint]*shard
+}
+
+type shard struct {
+ meta map[string][]*apiv1.IndexObject
+ store tsdb.GlobalStore
+}
+
+type service struct {
+ meta *indexMeta
+ log *logger.Logger
+ repo discovery.ServiceRepo
+ stopCh chan struct{}
+ indexRuleListener *indexRuleListener
+}
+
+func NewService(_ context.Context, repo discovery.ServiceRepo) (Service, error) {
+ svc := &service{
+ repo: repo,
+ indexRuleListener: &indexRuleListener{},
+ }
+ svc.meta = &indexMeta{
+ meta: make(map[string]*series),
+ }
+ svc.indexRuleListener.indexMeta = svc.meta
+ svc.indexRuleListener.closeFunc = func() {
+ svc.stopCh <- struct{}{}
+ }
+ return svc, nil
+}
+
+func (s *service) Insert(series common.Metadata, shardID uint, field *Field) error {
+ sd, err := s.getShard(series, shardID)
+ if err != nil {
+ return err
+ }
+ objects, ok := sd.meta[field.Name]
+ if !ok {
+ return ErrUnknownField
+ }
+ for _, object := range objects {
+ err = multierr.Append(err, sd.store.Insert(&tsdb.Field{
+ Name: []byte(compositeFieldID(object.GetName(), field.Name)),
+ Value: field.Value,
+ }, field.ChunkID))
+ }
+ return err
+}
+
+func (s *service) getShard(series common.Metadata, shardID uint) (*shard, error) {
+ ss := s.meta.get(series.Spec)
+ if ss == nil {
+ return nil, errors.Wrapf(ErrTraceSeriesNotFound, "identify:%s", compositeSeriesID(series.Spec))
+ }
+ sd, existSearcher := ss.repo[shardID]
+ if !existSearcher {
+ return nil, errors.Wrapf(ErrShardNotFound, "shardID:%d", shardID)
+ }
+ return sd, nil
+}
+
+func (s *service) Name() string {
+ return "index"
+}
+
+func (s *service) PreRun() error {
+ //TODO: listen to written data
+ s.log = logger.GetLogger("index")
+ s.indexRuleListener.log = s.log
+ return s.repo.Subscribe(event.TopicIndexRule, s.indexRuleListener)
+}
+
+func (s *service) Serve() error {
+ s.stopCh = make(chan struct{})
+ <-s.stopCh
+ return nil
+}
+
+func (s *service) GracefulStop() {
+ if s.stopCh != nil {
+ close(s.stopCh)
+ }
+}
+
+type indexMeta struct {
+ meta map[string]*series
+ sync.RWMutex
+}
+
+func (i *indexMeta) get(series *apiv1.Metadata) *series {
+ i.RWMutex.RLock()
+ defer i.RWMutex.RUnlock()
+ s, ok := i.meta[compositeSeriesID(series)]
+ if ok {
+ return s
+ }
+ return nil
+}
+
+type indexRuleListener struct {
+ log *logger.Logger
+ indexMeta *indexMeta
+ closeFunc func()
+}
+
+func (i *indexRuleListener) Rev(message bus.Message) (resp bus.Message) {
+ indexRuleEvent, ok := message.Data().(*apiv1.IndexRuleEvent)
+ if !ok {
+ i.log.Warn().Msg("invalid event data type")
+ return
+ }
+ i.log.Info().
+ Str("action", apiv1.Action_name[int32(indexRuleEvent.Action)]).
+ Str("series-name", indexRuleEvent.Series.Name).
+ Str("series-group", indexRuleEvent.Series.Group).
+ Msg("received an index rule")
+ i.indexMeta.Lock()
+ defer i.indexMeta.Unlock()
+ switch indexRuleEvent.Action {
+ case apiv1.Action_ACTION_PUT:
+ seriesID := compositeSeriesID(indexRuleEvent.Series)
+ newSeries := &series{
+ repo: make(map[uint]*shard),
+ }
+ for _, rule := range indexRuleEvent.Rules {
+ store := tsdb.NewStore(indexRuleEvent.Series.Name, indexRuleEvent.Series.Group, uint(rule.ShardId))
+ fields := make([]tsdb.FieldSpec, 0, len(rule.Rules))
+ meta := make(map[string][]*apiv1.IndexObject)
+ for _, indexRule := range rule.GetRules() {
+ for _, object := range indexRule.Objects {
+ fieldsSize := len(object.Fields)
+ if fieldsSize > 1 {
+ //TODO: to support composited index
+ i.log.Error().Str("name", object.Name).
+ Msg("index module doesn't support composited index object")
+ i.closeFunc()
+ } else if fieldsSize < 1 {
+ continue
+ }
+ field := object.Fields[0]
+ fieldSpec := tsdb.FieldSpec{
+ Name: compositeFieldID(object.Name, field),
+ }
+ fields = append(fields, fieldSpec)
+ objects, existed := meta[field]
+ if !existed {
+ objects = make([]*apiv1.IndexObject, 0, 1)
+ }
+ objects = append(objects, object)
+ meta[field] = objects
+ }
+ }
+ err := store.Initialize(fields)
+ if err != nil {
+ i.log.Warn().Err(err).Msg("failed to initialize index getShard")
+ }
+ newSeries.repo[uint(rule.ShardId)] = &shard{
+ store: store,
+ meta: meta,
+ }
+ }
+ i.indexMeta.meta[seriesID] = newSeries
+ default:
+ i.log.Warn().Msg("unsupported action")
+ }
+ return
+}
+
+func compositeFieldID(indexObjectName, field string) string {
+ return indexObjectName + ":" + field
+}
+
+func compositeSeriesID(series *apiv1.Metadata) string {
+ return series.Name + "-" + series.Group
}
diff --git a/banyand/index/index_test.go b/banyand/index/index_test.go
new file mode 100644
index 0000000..44a89af
--- /dev/null
+++ b/banyand/index/index_test.go
@@ -0,0 +1,185 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package index
+
+import (
+ "context"
+ "math"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/api/event"
+ apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func Test_service_Insert(t *testing.T) {
+ tester := assert.New(t)
+ type args struct {
+ series common.Metadata
+ shardID uint
+ field *Field
+ }
+ tests := []struct {
+ name string
+ args args
+ wantErr bool
+ }{
+ {
+ name: "str field",
+ args: args{
+ series: *common.NewMetadataByNameAndGroup("sw", "default"),
+ shardID: 0,
+ field: &Field{
+ ChunkID: common.ChunkID(1),
+ Name: "endpoint",
+ Value: []byte("/test"),
+ },
+ },
+ },
+ {
+ name: "int field",
+ args: args{
+ series: *common.NewMetadataByNameAndGroup("sw", "default"),
+ shardID: 1,
+ field: &Field{
+ ChunkID: common.ChunkID(2),
+ Name: "duration",
+ Value: convert.Int64ToBytes(500),
+ },
+ },
+ },
+ {
+ name: "unknown series",
+ args: args{
+ series: *common.NewMetadataByNameAndGroup("unknown", "default"),
+ shardID: 0,
+ field: &Field{
+ ChunkID: common.ChunkID(2),
+ Name: "duration",
+ Value: convert.Int64ToBytes(500),
+ },
+ },
+ wantErr: true,
+ },
+ {
+ name: "unknown shard",
+ args: args{
+ series: *common.NewMetadataByNameAndGroup("sw", "default"),
+ shardID: math.MaxInt64,
+ field: &Field{
+ ChunkID: common.ChunkID(2),
+ Name: "duration",
+ Value: convert.Int64ToBytes(500),
+ },
+ },
+ wantErr: true,
+ },
+ {
+ name: "unknown field",
+ args: args{
+ series: *common.NewMetadataByNameAndGroup("sw", "default"),
+ shardID: 0,
+ field: &Field{
+ ChunkID: common.ChunkID(2),
+ Name: "unknown",
+ Value: convert.Int64ToBytes(500),
+ },
+ },
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := setUpModules(tester)
+ if err := s.Insert(tt.args.series, tt.args.shardID, tt.args.field); (err != nil) != tt.wantErr {
+ t.Errorf("Insert() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ })
+ }
+}
+
+func Test_service_Init(t *testing.T) {
+ tester := assert.New(t)
+ s := setUpModules(tester)
+ tester.Equal(1, len(s.meta.meta))
+ tester.Equal(2, len(s.meta.meta["sw-default"].repo))
+}
+
+func setUpModules(tester *assert.Assertions) *service {
+ _ = logger.Bootstrap()
+ repo, err := discovery.NewServiceRepo(context.TODO())
+ tester.NoError(err)
+ svc, err := NewService(context.TODO(), repo)
+ tester.NoError(err)
+ tester.NoError(svc.PreRun())
+
+ rules := []*apiv1.IndexRule{
+ {
+ Objects: []*apiv1.IndexObject{
+ {
+ Name: "endpoint",
+ Fields: []string{"endpoint"},
+ },
+ {
+ Name: "duration",
+ Fields: []string{"duration"},
+ },
+ },
+ },
+ }
+ seriesID := &apiv1.Metadata{
+ Name: "sw",
+ Group: "default",
+ }
+ _, err = repo.Publish(event.TopicIndexRule, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &apiv1.IndexRuleEvent{
+ Series: seriesID,
+ Rules: []*apiv1.IndexRuleEvent_ShardedIndexRule{
+ {
+ ShardId: 0,
+ Rules: rules,
+ },
+ {
+ ShardId: 1,
+ Rules: rules,
+ },
+ },
+ Action: apiv1.Action_ACTION_PUT,
+ Time: timestamppb.Now(),
+ }))
+ tester.NoError(err)
+ s, ok := svc.(*service)
+ tester.True(ok)
+ deadline := time.Now().Add(10 * time.Second)
+ for {
+ if s.meta.get(seriesID) != nil {
+ break
+ }
+ if time.Now().After(deadline) {
+ tester.Fail("timeout")
+ }
+ }
+ return s
+}
diff --git a/banyand/index/search.go b/banyand/index/search.go
new file mode 100644
index 0000000..97da458
--- /dev/null
+++ b/banyand/index/search.go
@@ -0,0 +1,356 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package index
+
+import (
+ "encoding/base64"
+ "encoding/json"
+ "strings"
+
+ "github.com/pkg/errors"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ "github.com/apache/skywalking-banyandb/banyand/index/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/bytes"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
+ "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+var ErrNotRangeOperation = errors.New("this is not an range operation")
+
+type executable interface {
+ execute() (posting.List, error)
+}
+
+type searchTree interface {
+ executable
+}
+
+func (s *service) Search(series common.Metadata, shardID uint, startTime, endTime uint64, indexObjectName string, conditions []Condition) (posting.List, error) {
+ sd, err := s.getShard(series, shardID)
+ if err != nil {
+ return nil, err
+ }
+ store := sd.store
+ searcher, hasData := store.Window(startTime, endTime)
+ if !hasData {
+ return roaring.EmptyPostingList, nil
+ }
+ tree, errBuild := buildSearchTree(searcher, indexObjectName, conditions)
+ if errBuild != nil {
+ return nil, err
+ }
+ s.log.Debug().Interface("search-tree", tree).Msg("build search tree")
+
+ result, err := tree.execute()
+ if result == nil {
+ return roaring.EmptyPostingList, err
+ }
+ return result, err
+}
+
+func buildSearchTree(searcher tsdb.Searcher, indexObject string, conditions []Condition) (searchTree, error) {
+ condMap := toMap(indexObject, conditions)
+ root := &andNode{
+ node: &node{
+ SubNodes: make([]executable, 0),
+ searcher: searcher,
+ },
+ }
+ for key, conds := range condMap {
+ var rangeLeaf *rangeOp
+ for _, cond := range conds {
+ if rangeLeaf != nil && !rangeOP(cond.Op) {
+ return nil, errors.Wrapf(ErrNotRangeOperation, "op:%s", cond.Op.String())
+ }
+ if rangeOP(cond.Op) {
+ if rangeLeaf == nil {
+ rangeLeaf = root.addRangeLeaf(key)
+ }
+ opts := rangeLeaf.Opts
+ switch cond.Op {
+ case apiv1.PairQuery_BINARY_OP_GT:
+ opts.Lower = bytes.Join(cond.Values...)
+ case apiv1.PairQuery_BINARY_OP_GE:
+ opts.Lower = bytes.Join(cond.Values...)
+ opts.IncludesLower = true
+ case apiv1.PairQuery_BINARY_OP_LT:
+ opts.Upper = bytes.Join(cond.Values...)
+ case apiv1.PairQuery_BINARY_OP_LE:
+ opts.Upper = bytes.Join(cond.Values...)
+ opts.IncludesUpper = true
+ }
+ continue
+ }
+ switch cond.Op {
+ case apiv1.PairQuery_BINARY_OP_EQ:
+ root.addEq(key, cond.Values)
+ case apiv1.PairQuery_BINARY_OP_NE:
+ root.addNot(key, root.newEq(key, cond.Values))
+ case apiv1.PairQuery_BINARY_OP_HAVING:
+ n := root.addOrNode(len(cond.Values))
+ for _, v := range cond.Values {
+ n.addEq(key, [][]byte{v})
+ }
+ case apiv1.PairQuery_BINARY_OP_NOT_HAVING:
+ n := root.newOrNode(len(cond.Values))
+ for _, v := range cond.Values {
+ n.addEq(key, [][]byte{v})
+ }
+ root.addNot(key, n)
+ }
+ }
+ }
+ return root, nil
+}
+
+func rangeOP(op apiv1.PairQuery_BinaryOp) bool {
+ switch op {
+ case apiv1.PairQuery_BINARY_OP_GT,
+ apiv1.PairQuery_BINARY_OP_GE,
+ apiv1.PairQuery_BINARY_OP_LT,
+ apiv1.PairQuery_BINARY_OP_LE:
+ return true
+ }
+ return false
+}
+
+func toMap(indexObject string, condition []Condition) map[string][]Condition {
+ result := make(map[string][]Condition)
+ for _, c := range condition {
+ key := compositeFieldID(indexObject, c.Key)
+ l, ok := result[key]
+ if ok {
+ l = append(l, c)
+ result[key] = l
+ continue
+ }
+ result[key] = []Condition{c}
+ }
+ return result
+}
+
+type logicalOP interface {
+ executable
+ merge(posting.List) error
+}
+
+type node struct {
+ searcher tsdb.Searcher
+ value posting.List
+ SubNodes []executable `json:"sub_nodes,omitempty"`
+}
+
+func (n *node) newEq(key string, values [][]byte) *eq {
+ return &eq{
+ leaf: &leaf{
+ Key: key,
+ Values: values,
+ searcher: n.searcher,
+ },
+ }
+}
+
+func (n *node) addEq(key string, values [][]byte) {
+ n.SubNodes = append(n.SubNodes, n.newEq(key, values))
+}
+
+func (n *node) addNot(key string, inner executable) {
+ n.SubNodes = append(n.SubNodes, ¬{
+ Key: key,
+ searcher: n.searcher,
+ Inner: inner,
+ })
+}
+
+func (n *node) addRangeLeaf(key string) *rangeOp {
+ r := &rangeOp{
+ leaf: &leaf{
+ Key: key,
+ searcher: n.searcher,
+ },
+ Opts: &tsdb.RangeOpts{},
+ }
+ n.SubNodes = append(n.SubNodes, r)
+ return r
+}
+
+func (n *node) newOrNode(size int) *orNode {
+ return &orNode{
+ node: &node{
+ searcher: n.searcher,
+ SubNodes: make([]executable, 0, size),
+ },
+ }
+}
+
+func (n *node) addOrNode(size int) *orNode {
+ on := n.newOrNode(size)
+ n.SubNodes = append(n.SubNodes, on)
+ return on
+}
+
+func (n *node) pop() (executable, bool) {
+ if len(n.SubNodes) < 1 {
+ return nil, false
+ }
+ sn := n.SubNodes[0]
+ n.SubNodes = n.SubNodes[1:]
+ return sn, true
+}
+
+func execute(n *node, lp logicalOP) (posting.List, error) {
+ ex, hasNext := n.pop()
+ if !hasNext {
+ return n.value, nil
+ }
+ r, err := ex.execute()
+ if err != nil {
+ return nil, err
+ }
+ if n.value == nil {
+ n.value = r
+ return lp.execute()
+ }
+ err = lp.merge(r)
+ if err != nil {
+ return nil, err
+ }
+ if n.value.IsEmpty() {
+ return n.value, nil
+ }
+ return lp.execute()
+}
+
+type andNode struct {
+ *node
+}
+
+func (an *andNode) merge(list posting.List) error {
+ return an.value.Intersect(list)
+}
+
+func (an *andNode) execute() (posting.List, error) {
+ return execute(an.node, an)
+}
+
+func (an *andNode) MarshalJSON() ([]byte, error) {
+ data := make(map[string]interface{}, 1)
+ data["and"] = an.node.SubNodes
+ return json.Marshal(data)
+}
+
+type orNode struct {
+ *node
+}
+
+func (on *orNode) merge(list posting.List) error {
+ return on.value.Union(list)
+}
+
+func (on *orNode) execute() (posting.List, error) {
+ return execute(on.node, on)
+}
+
+func (on *orNode) MarshalJSON() ([]byte, error) {
+ data := make(map[string]interface{}, 1)
+ data["or"] = on.node.SubNodes
+ return json.Marshal(data)
+}
+
+type leaf struct {
+ executable
+ Key string
+ Values [][]byte
+ searcher tsdb.Searcher
+}
+
+type not struct {
+ executable
+ Key string
+ searcher tsdb.Searcher
+ Inner executable
+}
+
+func (n *not) execute() (posting.List, error) {
+ all := n.searcher.MatchField([]byte(n.Key))
+ list, err := n.Inner.execute()
+ if err != nil {
+ return nil, err
+ }
+ err = all.Difference(list)
+ return all, err
+}
+
+func (n *not) MarshalJSON() ([]byte, error) {
+ data := make(map[string]interface{}, 1)
+ data["not"] = n.Inner
+ return json.Marshal(data)
+}
+
+type eq struct {
+ *leaf
+}
+
+func (eq *eq) execute() (posting.List, error) {
+ return eq.searcher.MatchTerms(&tsdb.Field{
+ Name: []byte(eq.Key),
+ Value: bytes.Join(eq.Values...),
+ }), nil
+}
+
+func (eq *eq) MarshalJSON() ([]byte, error) {
+ data := make(map[string]interface{}, 1)
+ data["eq"] = eq.leaf
+ return json.Marshal(data)
+}
+
+type rangeOp struct {
+ *leaf
+ Opts *tsdb.RangeOpts
+}
+
+func (r *rangeOp) execute() (posting.List, error) {
+ return r.searcher.Range([]byte(r.Key), r.Opts), nil
+}
+
+func (r *rangeOp) MarshalJSON() ([]byte, error) {
+ data := make(map[string]interface{}, 1)
+ var builder strings.Builder
+ if r.Opts.Lower != nil {
+ if r.Opts.IncludesLower {
+ builder.WriteString("[")
+ } else {
+ builder.WriteString("(")
+ }
+ }
+ builder.WriteString(base64.StdEncoding.EncodeToString(r.Opts.Lower))
+ builder.WriteString(",")
+ builder.WriteString(base64.StdEncoding.EncodeToString(r.Opts.Upper))
+ if r.Opts.Upper != nil {
+ if r.Opts.IncludesUpper {
+ builder.WriteString("]")
+ } else {
+ builder.WriteString(")")
+ }
+ }
+ data["key"] = r.Key
+ data["range"] = builder.String()
+ return json.Marshal(data)
+}
diff --git a/banyand/index/search_test.go b/banyand/index/search_test.go
new file mode 100644
index 0000000..d9eb8d9
--- /dev/null
+++ b/banyand/index/search_test.go
@@ -0,0 +1,255 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package index
+
+import (
+ "math"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
+ "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+func Test_service_Search(t *testing.T) {
+ tester := assert.New(t)
+ type args struct {
+ indexObjectName string
+ conditions []Condition
+ }
+ tests := []struct {
+ name string
+ args args
+ want posting.List
+ wantErr bool
+ }{
+ {
+ name: "str equal",
+ args: args{
+ indexObjectName: "endpoint",
+ conditions: []Condition{
+ {
+ Key: "endpoint",
+ Op: apiv1.PairQuery_BINARY_OP_EQ,
+ Values: [][]byte{[]byte("/product")},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(1),
+ },
+ {
+ name: "str not equal",
+ args: args{
+ indexObjectName: "endpoint",
+ conditions: []Condition{
+ {
+ Key: "endpoint",
+ Op: apiv1.PairQuery_BINARY_OP_NE,
+ Values: [][]byte{[]byte("/product")},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(2, 3),
+ },
+ {
+ name: "str having",
+ args: args{
+ indexObjectName: "endpoint",
+ conditions: []Condition{
+ {
+ Key: "endpoint",
+ Op: apiv1.PairQuery_BINARY_OP_HAVING,
+ Values: [][]byte{[]byte("/product"), []byte("/sales")},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(1, 3),
+ },
+ {
+ name: "str not having",
+ args: args{
+ indexObjectName: "endpoint",
+ conditions: []Condition{
+ {
+ Key: "endpoint",
+ Op: apiv1.PairQuery_BINARY_OP_NOT_HAVING,
+ Values: [][]byte{[]byte("/product"), []byte("/sales")},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(2),
+ },
+ {
+ name: "int equal",
+ args: args{
+ indexObjectName: "duration",
+ conditions: []Condition{
+ {
+ Key: "duration",
+ Op: apiv1.PairQuery_BINARY_OP_EQ,
+ Values: [][]byte{convert.Int64ToBytes(500)},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(12),
+ },
+ {
+ name: "int not equal",
+ args: args{
+ indexObjectName: "duration",
+ conditions: []Condition{
+ {
+ Key: "duration",
+ Op: apiv1.PairQuery_BINARY_OP_NE,
+ Values: [][]byte{convert.Int64ToBytes(500)},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(11, 13, 14),
+ },
+ {
+ name: "int having",
+ args: args{
+ indexObjectName: "duration",
+ conditions: []Condition{
+ {
+ Key: "duration",
+ Op: apiv1.PairQuery_BINARY_OP_HAVING,
+ Values: [][]byte{convert.Int64ToBytes(500), convert.Int64ToBytes(50)},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(11, 12),
+ },
+ {
+ name: "int not having",
+ args: args{
+ indexObjectName: "duration",
+ conditions: []Condition{
+ {
+ Key: "duration",
+ Op: apiv1.PairQuery_BINARY_OP_NOT_HAVING,
+ Values: [][]byte{convert.Int64ToBytes(500), convert.Int64ToBytes(50)},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(13, 14),
+ },
+ {
+ name: "int in range",
+ args: args{
+ indexObjectName: "duration",
+ conditions: []Condition{
+ {
+ Key: "duration",
+ Op: apiv1.PairQuery_BINARY_OP_GT,
+ Values: [][]byte{convert.Int64ToBytes(50)},
+ },
+ {
+ Key: "duration",
+ Op: apiv1.PairQuery_BINARY_OP_LT,
+ Values: [][]byte{convert.Int64ToBytes(5000)},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(13, 12),
+ },
+ {
+ name: "int includes edges",
+ args: args{
+ indexObjectName: "duration",
+ conditions: []Condition{
+ {
+ Key: "duration",
+ Op: apiv1.PairQuery_BINARY_OP_GE,
+ Values: [][]byte{convert.Int64ToBytes(50)},
+ },
+ {
+ Key: "duration",
+ Op: apiv1.PairQuery_BINARY_OP_LE,
+ Values: [][]byte{convert.Int64ToBytes(5000)},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(13, 12, 11, 14),
+ },
+ }
+ s := setUpModules(tester)
+ setupData(tester, s)
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := s.Search(*common.NewMetadataByNameAndGroup("sw", "default"), 0, 0, math.MaxInt64, tt.args.indexObjectName, tt.args.conditions)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("Search() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !got.Equal(tt.want) {
+ t.Errorf("Search() got = %v, want %v", got.ToSlice(), tt.want.ToSlice())
+ }
+ })
+ }
+}
+
+func setupData(tester *assert.Assertions, s *service) {
+ fields := []*Field{
+ {
+ ChunkID: common.ChunkID(1),
+ Name: "endpoint",
+ Value: []byte("/product"),
+ },
+ {
+ ChunkID: common.ChunkID(2),
+ Name: "endpoint",
+ Value: []byte("/home"),
+ },
+ {
+ ChunkID: common.ChunkID(3),
+ Name: "endpoint",
+ Value: []byte("/sales"),
+ },
+ {
+ ChunkID: common.ChunkID(11),
+ Name: "duration",
+ Value: convert.Int64ToBytes(50),
+ },
+ {
+ ChunkID: common.ChunkID(12),
+ Name: "duration",
+ Value: convert.Int64ToBytes(500),
+ },
+ {
+ ChunkID: common.ChunkID(13),
+ Name: "duration",
+ Value: convert.Int64ToBytes(100),
+ },
+ {
+ ChunkID: common.ChunkID(14),
+ Name: "duration",
+ Value: convert.Int64ToBytes(5000),
+ },
+ }
+ for _, field := range fields {
+ if err := s.Insert(*common.NewMetadataByNameAndGroup("sw", "default"), 0, field); err != nil {
+ tester.NoError(err)
+ }
+ }
+}
diff --git a/pkg/posting/posting.go b/banyand/index/tsdb/field_map.go
similarity index 51%
copy from pkg/posting/posting.go
copy to banyand/index/tsdb/field_map.go
index 3812162..4d89713 100644
--- a/pkg/posting/posting.go
+++ b/banyand/index/tsdb/field_map.go
@@ -15,55 +15,50 @@
// specific language governing permissions and limitations
// under the License.
-package posting
+package tsdb
import (
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
)
-var ErrListEmpty = errors.New("postings list is empty")
+var ErrFieldAbsent = errors.New("field doesn't exist")
-// List is a collection of common.ChunkID.
-type List interface {
- Contains(id common.ChunkID) bool
+type fieldHashID uint64
- IsEmpty() bool
-
- Max() (common.ChunkID, error)
-
- Len() int
-
- Iterator() Iterator
-
- Clone() List
-
- Equal(other List) bool
-
- Insert(i common.ChunkID)
-
- Intersect(other List) error
-
- Difference(other List) error
-
- Union(other List) error
-
- UnionMany(others []List) error
-
- AddIterator(iter Iterator) error
-
- AddRange(min, max common.ChunkID) error
+type fieldMap struct {
+ repo map[fieldHashID]*fieldValue
+}
- RemoveRange(min, max common.ChunkID) error
+func newFieldMap(initialSize int) *fieldMap {
+ return &fieldMap{
+ repo: make(map[fieldHashID]*fieldValue, initialSize),
+ }
+}
- Reset()
+func (fm *fieldMap) createKey(key []byte) {
+ fm.repo[fieldHashID(convert.Hash(key))] = &fieldValue{
+ key: key,
+ value: newPostingMap(),
+ }
}
-type Iterator interface {
- Next() bool
+func (fm *fieldMap) get(key []byte) (*fieldValue, bool) {
+ v, ok := fm.repo[fieldHashID(convert.Hash(key))]
+ return v, ok
+}
- Current() common.ChunkID
+func (fm *fieldMap) put(fv *Field, id common.ChunkID) error {
+ pm, ok := fm.get(fv.Name)
+ if !ok {
+ return errors.Wrapf(ErrFieldAbsent, "filed Name:%s", fv.Name)
+ }
+ return pm.value.put(fv.Value, id)
+}
- Close() error
+type fieldValue struct {
+ key []byte
+ value *postingMap
}
diff --git a/banyand/index/tsdb/mem.go b/banyand/index/tsdb/mem.go
new file mode 100644
index 0000000..2e2ea9b
--- /dev/null
+++ b/banyand/index/tsdb/mem.go
@@ -0,0 +1,91 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+ "github.com/pkg/errors"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
+ "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+var ErrFieldsAbsent = errors.New("fields are absent")
+
+type MemTable struct {
+ terms *fieldMap
+ name string
+ group string
+ shardID uint
+}
+
+func NewMemTable(name, group string, shardID uint) *MemTable {
+ return &MemTable{
+ name: name,
+ group: group,
+ shardID: shardID,
+ }
+}
+
+type Field struct {
+ Name []byte
+ Value []byte
+}
+
+type FieldSpec struct {
+ Name string
+}
+
+func (m *MemTable) Initialize(fields []FieldSpec) error {
+ if len(fields) < 1 {
+ return ErrFieldsAbsent
+ }
+ m.terms = newFieldMap(len(fields))
+ for _, f := range fields {
+ m.terms.createKey([]byte(f.Name))
+ }
+ return nil
+}
+
+func (m *MemTable) Insert(field *Field, chunkID common.ChunkID) error {
+ return m.terms.put(field, chunkID)
+}
+
+func (m *MemTable) MatchField(fieldName []byte) (list posting.List) {
+ fieldsValues, ok := m.terms.get(fieldName)
+ if !ok {
+ return roaring.EmptyPostingList
+ }
+ return fieldsValues.value.allValues()
+}
+
+func (m *MemTable) MatchTerms(field *Field) (list posting.List) {
+ fieldsValues, ok := m.terms.get(field.Name)
+ if !ok {
+ return roaring.EmptyPostingList
+ }
+ return fieldsValues.value.get(field.Value).Clone()
+}
+
+func (m *MemTable) Range(fieldName []byte, opts *RangeOpts) (list posting.List) {
+ fieldsValues, ok := m.terms.get(fieldName)
+ if !ok {
+ return roaring.EmptyPostingList
+ }
+ return fieldsValues.value.getRange(opts)
+}
diff --git a/banyand/index/tsdb/mem_test.go b/banyand/index/tsdb/mem_test.go
new file mode 100644
index 0000000..fed7655
--- /dev/null
+++ b/banyand/index/tsdb/mem_test.go
@@ -0,0 +1,261 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+ "reflect"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
+ "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+func TestMemTable_Initialize(t *testing.T) {
+ type args struct {
+ fields []FieldSpec
+ }
+ tests := []struct {
+ name string
+ args args
+ wantErr bool
+ }{
+ {
+ name: "golden path",
+ args: args{
+ fields: []FieldSpec{
+ {
+ Name: "service_name",
+ },
+ {
+ Name: "duration",
+ },
+ },
+ },
+ },
+ {
+ name: "fields absent",
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ m := NewMemTable("sw", "group", 0)
+ var err error
+ if err = m.Initialize(tt.args.fields); (err != nil) != tt.wantErr {
+ t.Errorf("Initialize() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ if err != nil {
+ return
+ }
+ assert.Equal(t, len(m.terms.repo), len(tt.args.fields))
+ })
+ }
+}
+
+func TestMemTable_Range(t *testing.T) {
+ type args struct {
+ fieldName []byte
+ opts *RangeOpts
+ }
+ m := NewMemTable("sw", "group", 0)
+ setUp(t, m)
+ tests := []struct {
+ name string
+ args args
+ wantList posting.List
+ }{
+ {
+ name: "in range",
+ args: args{
+ fieldName: []byte("duration"),
+ opts: &RangeOpts{
+ Lower: convert.Uint16ToBytes(100),
+ Upper: convert.Uint16ToBytes(500),
+ },
+ },
+ wantList: m.MatchTerms(&Field{
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(200),
+ }),
+ },
+ {
+ name: "excludes edge",
+ args: args{
+ fieldName: []byte("duration"),
+ opts: &RangeOpts{
+ Lower: convert.Uint16ToBytes(50),
+ Upper: convert.Uint16ToBytes(1000),
+ },
+ },
+ wantList: union(m,
+ &Field{
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(200),
+ },
+ ),
+ },
+ {
+ name: "includes lower",
+ args: args{
+ fieldName: []byte("duration"),
+ opts: &RangeOpts{
+ Lower: convert.Uint16ToBytes(50),
+ Upper: convert.Uint16ToBytes(1000),
+ IncludesLower: true,
+ },
+ },
+ wantList: union(m,
+ &Field{
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(50),
+ },
+ &Field{
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(200),
+ },
+ ),
+ },
+ {
+ name: "includes upper",
+ args: args{
+ fieldName: []byte("duration"),
+ opts: &RangeOpts{
+ Lower: convert.Uint16ToBytes(50),
+ Upper: convert.Uint16ToBytes(1000),
+ IncludesUpper: true,
+ },
+ },
+ wantList: union(m,
+ &Field{
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(200),
+ },
+ &Field{
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(1000),
+ },
+ ),
+ },
+ {
+ name: "includes edges",
+ args: args{
+ fieldName: []byte("duration"),
+ opts: &RangeOpts{
+ Lower: convert.Uint16ToBytes(50),
+ Upper: convert.Uint16ToBytes(1000),
+ IncludesUpper: true,
+ IncludesLower: true,
+ },
+ },
+ wantList: union(m,
+ &Field{
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(50),
+ },
+ &Field{
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(200),
+ },
+ &Field{
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(1000),
+ },
+ ),
+ },
+ {
+ name: "match one",
+ args: args{
+ fieldName: []byte("duration"),
+ opts: &RangeOpts{
+ Lower: convert.Uint16ToBytes(200),
+ Upper: convert.Uint16ToBytes(200),
+ IncludesUpper: true,
+ IncludesLower: true,
+ },
+ },
+ wantList: union(m,
+ &Field{
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(200),
+ },
+ ),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if gotList := m.Range(tt.args.fieldName, tt.args.opts); !reflect.DeepEqual(gotList, tt.wantList) {
+ t.Errorf("Range() = %v, want %v", gotList.Len(), tt.wantList.Len())
+ }
+ })
+ }
+}
+
+func union(memTable *MemTable, fields ...*Field) posting.List {
+ result := roaring.NewPostingList()
+ for _, f := range fields {
+ _ = result.Union(memTable.MatchTerms(f))
+ }
+ return result
+}
+
+func setUp(t *testing.T, mt *MemTable) {
+ assert.NoError(t, mt.Initialize([]FieldSpec{
+ {
+ Name: "service_name",
+ },
+ {
+ Name: "duration",
+ },
+ }))
+ for i := 0; i < 100; i++ {
+ if i%2 == 0 {
+ assert.NoError(t, mt.Insert(&Field{
+ Name: []byte("service_name"),
+ Value: []byte("gateway"),
+ }, common.ChunkID(i)))
+ } else {
+ assert.NoError(t, mt.Insert(&Field{
+ Name: []byte("service_name"),
+ Value: []byte("webpage"),
+ }, common.ChunkID(i)))
+ }
+ }
+ for i := 100; i < 200; i++ {
+ switch {
+ case i%3 == 0:
+ assert.NoError(t, mt.Insert(&Field{
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(50),
+ }, common.ChunkID(i)))
+ case i%3 == 1:
+ assert.NoError(t, mt.Insert(&Field{
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(200),
+ }, common.ChunkID(i)))
+ case i%3 == 2:
+ assert.NoError(t, mt.Insert(&Field{
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(1000),
+ }, common.ChunkID(i)))
+ }
+ }
+}
diff --git a/banyand/index/tsdb/term_map.go b/banyand/index/tsdb/term_map.go
new file mode 100644
index 0000000..20bc492
--- /dev/null
+++ b/banyand/index/tsdb/term_map.go
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+ "bytes"
+ "sort"
+ "sync"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
+ "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+type termHashID uint64
+
+type postingMap struct {
+ repo map[termHashID]*postingValue
+ mutex sync.RWMutex
+}
+
+func newPostingMap() *postingMap {
+ return &postingMap{
+ repo: make(map[termHashID]*postingValue),
+ }
+}
+
+func (p *postingMap) put(key []byte, id common.ChunkID) error {
+ list := p.getOrCreate(key)
+ list.Insert(id)
+ return nil
+}
+
+func (p *postingMap) getOrCreate(key []byte) posting.List {
+ list := p.get(key)
+ if list != roaring.EmptyPostingList {
+ return list
+ }
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+ hashedKey := termHashID(convert.Hash(key))
+ v := &postingValue{
+ key: key,
+ value: roaring.NewPostingList(),
+ }
+ p.repo[hashedKey] = v
+ return v.value
+}
+
+func (p *postingMap) get(key []byte) posting.List {
+ p.mutex.RLock()
+ defer p.mutex.RUnlock()
+ hashedKey := termHashID(convert.Hash(key))
+ v, ok := p.repo[hashedKey]
+ if !ok {
+ return roaring.EmptyPostingList
+ }
+ return v.value
+}
+
+func (p *postingMap) allValues() posting.List {
+ result := roaring.NewPostingList()
+ for _, value := range p.repo {
+ _ = result.Union(value.value)
+ }
+ return result
+}
+
+func (p *postingMap) getRange(opts *RangeOpts) posting.List {
+ switch bytes.Compare(opts.Upper, opts.Lower) {
+ case -1:
+ return roaring.EmptyPostingList
+ case 0:
+ if opts.IncludesUpper && opts.IncludesLower {
+ return p.get(opts.Upper)
+ }
+ return roaring.EmptyPostingList
+ }
+ p.mutex.RLock()
+ defer p.mutex.RUnlock()
+ keys := make(Asc, 0, len(p.repo))
+ for _, v := range p.repo {
+ keys = append(keys, v.key)
+ }
+ sort.Sort(keys)
+ index := sort.Search(len(keys), func(i int) bool {
+ return bytes.Compare(keys[i], opts.Lower) >= 0
+ })
+ result := roaring.NewPostingList()
+ for i := index; i < len(keys); i++ {
+ k := keys[i]
+ switch {
+ case bytes.Equal(k, opts.Lower):
+ if opts.IncludesLower {
+ _ = result.Union(p.repo[termHashID(convert.Hash(k))].value)
+ }
+ case bytes.Compare(k, opts.Upper) > 0:
+ break
+ case bytes.Equal(k, opts.Upper):
+ if opts.IncludesUpper {
+ _ = result.Union(p.repo[termHashID(convert.Hash(k))].value)
+ }
+ default:
+ _ = result.Union(p.repo[termHashID(convert.Hash(k))].value)
+ }
+ }
+ return result
+}
+
+type Asc [][]byte
+
+func (a Asc) Len() int {
+ return len(a)
+}
+
+func (a Asc) Less(i, j int) bool {
+ return bytes.Compare(a[i], a[j]) < 0
+}
+
+func (a Asc) Swap(i, j int) {
+ a[i], a[j] = a[j], a[i]
+}
+
+type postingValue struct {
+ key []byte
+ value posting.List
+}
diff --git a/banyand/index/tsdb/tsdb.go b/banyand/index/tsdb/tsdb.go
new file mode 100644
index 0000000..91f114d
--- /dev/null
+++ b/banyand/index/tsdb/tsdb.go
@@ -0,0 +1,65 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
+)
+
+type RangeOpts struct {
+ Upper []byte
+ Lower []byte
+ IncludesUpper bool
+ IncludesLower bool
+}
+
+type GlobalStore interface {
+ Window(startTime, endTime uint64) (Searcher, bool)
+ Initialize(fields []FieldSpec) error
+ Insert(field *Field, chunkID common.ChunkID) error
+}
+
+type Searcher interface {
+ MatchField(fieldNames []byte) (list posting.List)
+ MatchTerms(field *Field) (list posting.List)
+ Range(fieldName []byte, opts *RangeOpts) (list posting.List)
+}
+
+type store struct {
+ memTable *MemTable
+ //TODO: add data tables
+}
+
+func (s *store) Window(_, _ uint64) (Searcher, bool) {
+ return s.memTable, true
+}
+
+func (s *store) Initialize(fields []FieldSpec) error {
+ return s.memTable.Initialize(fields)
+}
+
+func (s *store) Insert(field *Field, chunkID common.ChunkID) error {
+ return s.memTable.Insert(field, chunkID)
+}
+
+func NewStore(name, group string, shardID uint) GlobalStore {
+ return &store{
+ memTable: NewMemTable(name, group, shardID),
+ }
+}
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index b024cbe..d694895 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -57,11 +57,11 @@ func newStandaloneCmd() *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate database")
}
- idx, err := index.NewService(ctx, repo, pipeline)
+ idx, err := index.NewService(ctx, repo)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate index builder")
}
- traceSeries, err := trace.NewService(ctx, db, repo)
+ traceSeries, err := trace.NewService(ctx, db, repo, idx)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate trace series")
}
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index 7ebe12f..7136e0e 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -24,6 +24,7 @@ import (
"testing"
"time"
+ "github.com/golang/mock/gomock"
googleUUID "github.com/google/uuid"
"github.com/stretchr/testify/require"
@@ -31,6 +32,7 @@ import (
"github.com/apache/skywalking-banyandb/api/event"
v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/index"
"github.com/apache/skywalking-banyandb/banyand/series"
"github.com/apache/skywalking-banyandb/banyand/series/trace"
"github.com/apache/skywalking-banyandb/banyand/storage"
@@ -49,7 +51,7 @@ type entityValue struct {
items []interface{}
}
-func setupServices(tester *require.Assertions) (discovery.ServiceRepo, series.Service, func()) {
+func setupServices(t *testing.T, tester *require.Assertions) (discovery.ServiceRepo, series.Service, func()) {
// Bootstrap logger system
tester.NoError(logger.Init(logger.Logging{
Env: "dev",
@@ -70,7 +72,10 @@ func setupServices(tester *require.Assertions) (discovery.ServiceRepo, series.Se
tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
// Init `Trace` module
- traceSvc, err := trace.NewService(context.TODO(), db, repo)
+ ctrl := gomock.NewController(t)
+ mockIndex := index.NewMockService(ctrl)
+ mockIndex.EXPECT().Insert(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+ traceSvc, err := trace.NewService(context.TODO(), db, repo, mockIndex)
tester.NoError(err)
// Init `Query` module
@@ -257,7 +262,7 @@ func TestQueryProcessor(t *testing.T) {
tester := require.New(t)
// setup services
- repo, traceSvc, gracefulStop := setupServices(tester)
+ repo, traceSvc, gracefulStop := setupServices(t, tester)
defer gracefulStop()
baseTs := time.Now()
diff --git a/banyand/series/trace/common_test.go b/banyand/series/trace/common_test.go
index 51f1603..f8b40ae 100644
--- a/banyand/series/trace/common_test.go
+++ b/banyand/series/trace/common_test.go
@@ -26,12 +26,14 @@ import (
"testing"
"time"
+ "github.com/golang/mock/gomock"
googleUUID "github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ "github.com/apache/skywalking-banyandb/banyand/index"
"github.com/apache/skywalking-banyandb/banyand/storage"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -63,7 +65,10 @@ func setup(t *testing.T) (*traceSeries, func()) {
assert.NoError(t, err)
rootPath := path.Join(os.TempDir(), "banyandb-"+uuid.String())
assert.NoError(t, db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
- svc, err := NewService(context.TODO(), db, nil)
+ ctrl := gomock.NewController(t)
+ mockIndex := index.NewMockService(ctrl)
+ mockIndex.EXPECT().Insert(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+ svc, err := NewService(context.TODO(), db, nil, mockIndex)
assert.NoError(t, err)
assert.NoError(t, svc.PreRun())
assert.NoError(t, db.PreRun())
diff --git a/banyand/series/trace/service.go b/banyand/series/trace/service.go
new file mode 100644
index 0000000..7049a9d
--- /dev/null
+++ b/banyand/series/trace/service.go
@@ -0,0 +1,147 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+ "context"
+ "time"
+
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ "github.com/apache/skywalking-banyandb/api/event"
+ v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/index"
+ "github.com/apache/skywalking-banyandb/banyand/series"
+ "github.com/apache/skywalking-banyandb/banyand/series/schema"
+ "github.com/apache/skywalking-banyandb/banyand/storage"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pb"
+)
+
+var _ series.Service = (*service)(nil)
+
+type service struct {
+ db storage.Database
+ schemaMap map[string]*traceSeries
+ l *logger.Logger
+ repo discovery.ServiceRepo
+ stopCh chan struct{}
+ idx index.Service
+}
+
+//NewService returns a new service
+func NewService(_ context.Context, db storage.Database, repo discovery.ServiceRepo, idx index.Service) (series.Service, error) {
+ return &service{
+ db: db,
+ repo: repo,
+ idx: idx,
+ }, nil
+}
+
+func (s *service) Name() string {
+ return "trace-series"
+}
+
+func (s *service) PreRun() error {
+ schemas, err := s.TraceSeries().List(context.Background(), schema.ListOpt{})
+ if err != nil {
+ return err
+ }
+ s.schemaMap = make(map[string]*traceSeries, len(schemas))
+ s.l = logger.GetLogger(s.Name())
+ for _, sa := range schemas {
+ ts, errTS := newTraceSeries(sa, s.l, s.idx)
+ if errTS != nil {
+ return errTS
+ }
+ s.db.Register(ts)
+ id := formatTraceSeriesID(ts.name, ts.group)
+ s.schemaMap[id] = ts
+ s.l.Info().Str("id", id).Msg("initialize Trace series")
+ }
+ return err
+}
+
+func (s *service) Serve() error {
+ now := time.Now().UnixNano()
+ for _, sMeta := range s.schemaMap {
+ e := pb.NewSeriesEventBuilder().
+ SeriesMetadata(sMeta.group, sMeta.name).
+ FieldNames(sMeta.fieldsNamesCompositeSeriesID...).
+ Time(time.Now()).
+ Action(v1.Action_ACTION_PUT).
+ Build()
+ _, err := s.repo.Publish(event.TopicSeriesEvent, bus.NewMessage(bus.MessageID(now), e))
+ if err != nil {
+ return err
+ }
+ seriesObj := &v1.Series{
+ Series: &v1.Metadata{
+ Name: sMeta.name,
+ Group: sMeta.group,
+ },
+ }
+ rules, errGetRules := s.IndexRules(context.Background(), seriesObj, nil)
+ if errGetRules != nil {
+ return errGetRules
+ }
+ shardedRuleIndex := make([]*v1.IndexRuleEvent_ShardedIndexRule, 0, len(rules)*int(sMeta.shardNum))
+ for i := 0; i < int(sMeta.shardNum); i++ {
+ t := time.Now()
+ e := pb.NewShardEventBuilder().Action(v1.Action_ACTION_PUT).Time(t).
+ Shard(
+ pb.NewShardBuilder().
+ ID(uint64(i)).Total(sMeta.shardNum).SeriesMetadata(sMeta.group, sMeta.name).UpdatedAt(t).CreatedAt(t).
+ Node(pb.NewNodeBuilder().
+ ID(s.repo.NodeID()).CreatedAt(t).UpdatedAt(t).Addr("localhost").
+ Build()).
+ Build()).
+ Build()
+ _, errShard := s.repo.Publish(event.TopicShardEvent, bus.NewMessage(bus.MessageID(now), e))
+ if errShard != nil {
+ return errShard
+ }
+ shardedRuleIndex = append(shardedRuleIndex, &v1.IndexRuleEvent_ShardedIndexRule{
+ ShardId: uint64(i),
+ Rules: rules,
+ })
+ }
+
+ indexRule := &v1.IndexRuleEvent{
+ Series: seriesObj.Series,
+ Rules: shardedRuleIndex,
+ Action: v1.Action_ACTION_PUT,
+ Time: timestamppb.New(time.Now()),
+ }
+ _, errPublishRules := s.repo.Publish(event.TopicIndexRule, bus.NewMessage(bus.MessageID(now), indexRule))
+ if errPublishRules != nil {
+ return errPublishRules
+ }
+ }
+ s.stopCh = make(chan struct{})
+ <-s.stopCh
+ return nil
+}
+
+func (s *service) GracefulStop() {
+ if s.stopCh != nil {
+ close(s.stopCh)
+ }
+}
diff --git a/banyand/series/trace/trace.go b/banyand/series/trace/trace.go
index 2551813..1c5f639 100644
--- a/banyand/series/trace/trace.go
+++ b/banyand/series/trace/trace.go
@@ -18,7 +18,6 @@
package trace
import (
- "context"
"strconv"
"time"
@@ -26,14 +25,11 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
- "github.com/apache/skywalking-banyandb/api/event"
v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
apischema "github.com/apache/skywalking-banyandb/api/schema"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/index"
"github.com/apache/skywalking-banyandb/banyand/series"
- "github.com/apache/skywalking-banyandb/banyand/series/schema"
"github.com/apache/skywalking-banyandb/banyand/storage"
- "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
@@ -76,89 +72,6 @@ const (
StateError = 1
)
-var _ series.Service = (*service)(nil)
-
-type service struct {
- db storage.Database
- schemaMap map[string]*traceSeries
- l *logger.Logger
- repo discovery.ServiceRepo
- stopCh chan struct{}
-}
-
-//NewService returns a new service
-func NewService(_ context.Context, db storage.Database, repo discovery.ServiceRepo) (series.Service, error) {
- return &service{
- db: db,
- repo: repo,
- }, nil
-}
-
-func (s *service) Name() string {
- return "trace-series"
-}
-
-func (s *service) PreRun() error {
- schemas, err := s.TraceSeries().List(context.Background(), schema.ListOpt{})
- if err != nil {
- return err
- }
- s.schemaMap = make(map[string]*traceSeries, len(schemas))
- s.l = logger.GetLogger(s.Name())
- for _, sa := range schemas {
- ts, errTS := newTraceSeries(sa, s.l)
- if errTS != nil {
- return errTS
- }
- s.db.Register(ts)
- id := formatTraceSeriesID(ts.name, ts.group)
- s.schemaMap[id] = ts
- s.l.Info().Str("id", id).Msg("initialize Trace series")
- }
- return err
-}
-
-func (s *service) Serve() error {
- now := time.Now().UnixNano()
- for _, sMeta := range s.schemaMap {
- e := pb.NewSeriesEventBuilder().
- SeriesMetadata(sMeta.group, sMeta.name).
- FieldNames(sMeta.fieldsNamesCompositeSeriesID...).
- Time(time.Now()).
- Action(v1.Action_ACTION_PUT).
- Build()
- _, err := s.repo.Publish(event.TopicSeriesEvent, bus.NewMessage(bus.MessageID(now), e))
- if err != nil {
- return err
- }
- for i := 0; i < int(sMeta.shardNum); i++ {
- t := time.Now()
- e := pb.NewShardEventBuilder().Action(v1.Action_ACTION_PUT).Time(t).
- Shard(
- pb.NewShardBuilder().
- ID(uint64(i)).Total(sMeta.shardNum).SeriesMetadata(sMeta.group, sMeta.name).UpdatedAt(t).CreatedAt(t).
- Node(pb.NewNodeBuilder().
- ID(s.repo.NodeID()).CreatedAt(t).UpdatedAt(t).Addr("localhost").
- Build()).
- Build()).
- Build()
- _, errShard := s.repo.Publish(event.TopicShardEvent, bus.NewMessage(bus.MessageID(now), e))
- if errShard != nil {
- return errShard
- }
- }
- }
- s.stopCh = make(chan struct{})
- <-s.stopCh
- return nil
-}
-
-func (s *service) GracefulStop() {
- if s.stopCh != nil {
- close(s.stopCh)
- }
-}
-
func (s *service) FetchTrace(traceSeries common.Metadata, traceID string, opt series.ScanOptions) (data.Trace, error) {
ts, err := s.getSeries(traceSeries)
if err != nil {
@@ -227,6 +140,7 @@ type traceSeries struct {
schema apischema.TraceSeries
reader storage.StoreRepo
writePoint storage.GetWritePoint
+ idx index.Service
shardNum uint32
fieldIndex map[string]int
traceIDIndex int
@@ -241,11 +155,12 @@ type traceSeries struct {
fieldsNamesCompositeSeriesID []string
}
-func newTraceSeries(schema apischema.TraceSeries, l *logger.Logger) (*traceSeries, error) {
+func newTraceSeries(schema apischema.TraceSeries, l *logger.Logger, idx index.Service) (*traceSeries, error) {
t := &traceSeries{
schema: schema,
idGen: series.NewIDGen(),
l: l,
+ idx: idx,
}
meta := t.schema.Spec.GetMetadata()
shardInfo := t.schema.Spec.GetShard()
diff --git a/banyand/series/trace/write.go b/banyand/series/trace/write.go
index 2b4466f..60ed91a 100644
--- a/banyand/series/trace/write.go
+++ b/banyand/series/trace/write.go
@@ -22,10 +22,12 @@ import (
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
+ "go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ "github.com/apache/skywalking-banyandb/banyand/index"
bydb_bytes "github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/partition"
@@ -92,9 +94,52 @@ func (t *traceSeries) Write(seriesID common.SeriesID, shardID uint, entity data.
Uint("trace_shard_id", traceIDShardID).
Uint("shard_id", shardID).
Msg("written to Trace series")
+ id := common.ChunkID(chunkID)
+ for i, field := range entityVal.GetFields() {
+ fieldSpec := t.schema.Spec.GetFields()[i]
+ fieldName := fieldSpec.GetName()
+ switch x := field.ValueType.(type) {
+ case *v1.Field_Str:
+ err = multierr.Append(err, t.writeStrToIndex(shardID, id, fieldName, x.Str.GetValue()))
+ case *v1.Field_Int:
+ err = multierr.Append(err, t.writeIntToIndex(shardID, id, fieldName, x.Int.GetValue()))
+ case *v1.Field_StrArray:
+ for _, s := range x.StrArray.GetValue() {
+ err = multierr.Append(err, t.writeStrToIndex(shardID, id, fieldName, s))
+ }
+ case *v1.Field_IntArray:
+ for _, integer := range x.IntArray.GetValue() {
+ err = multierr.Append(err, t.writeIntToIndex(shardID, id, fieldName, integer))
+ }
+ default:
+ continue
+ }
+ }
return common.ChunkID(chunkID), err
}
+func (t *traceSeries) writeIntToIndex(shardID uint, id common.ChunkID, name string, value int64) error {
+ return t.writeIndex(shardID, id, name, convert.Int64ToBytes(value))
+}
+
+func (t *traceSeries) writeStrToIndex(shardID uint, id common.ChunkID, name string, value string) error {
+ return t.writeIndex(shardID, id, name, []byte(value))
+}
+
+func (t *traceSeries) writeIndex(shardID uint, id common.ChunkID, name string, value []byte) error {
+ return t.idx.Insert(*common.NewMetadata(&v1.Metadata{
+ Name: t.name,
+ Group: t.group,
+ }),
+ shardID,
+ &index.Field{
+ ChunkID: id,
+ Name: name,
+ Value: value,
+ },
+ )
+}
+
// copyEntityValueWithoutDataBinary copies all fields without DataBinary
func copyEntityValueWithoutDataBinary(ev *v1.EntityValue) *v1.EntityValue {
return &v1.EntityValue{
diff --git a/pkg/convert/number.go b/pkg/convert/number.go
index c06d7a4..fa2d688 100644
--- a/pkg/convert/number.go
+++ b/pkg/convert/number.go
@@ -43,6 +43,10 @@ func Uint32ToBytes(u uint32) []byte {
return bs
}
+func BytesToInt64(b []byte) int64 {
+ return int64(binary.BigEndian.Uint64(b))
+}
+
func BytesToUint64(b []byte) uint64 {
return binary.BigEndian.Uint64(b)
}
diff --git a/pkg/posting/posting.go b/pkg/posting/posting.go
index 3812162..2d7ad16 100644
--- a/pkg/posting/posting.go
+++ b/pkg/posting/posting.go
@@ -58,6 +58,8 @@ type List interface {
RemoveRange(min, max common.ChunkID) error
Reset()
+
+ ToSlice() []common.ChunkID
}
type Iterator interface {
diff --git a/pkg/posting/roaring/roaring.go b/pkg/posting/roaring/roaring.go
index 7c778d0..0b1c9b6 100644
--- a/pkg/posting/roaring/roaring.go
+++ b/pkg/posting/roaring/roaring.go
@@ -26,6 +26,8 @@ import (
)
var (
+ EmptyPostingList = NewPostingList()
+
ErrIntersectRoaringOnly = errors.New("Intersect only supported between roaringDocId sets")
ErrUnionRoaringOnly = errors.New("Union only supported between roaringDocId sets")
ErrDifferenceRoaringOnly = errors.New("Difference only supported between roaringDocId sets")
@@ -192,3 +194,13 @@ func (it *roaringIterator) Close() error {
it.closed = true
return nil
}
+
+func (p *postingsList) ToSlice() []common.ChunkID {
+ iter := p.Iterator()
+ defer iter.Close()
+ s := make([]common.ChunkID, 0, p.Len())
+ for iter.Next() {
+ s = append(s, iter.Current())
+ }
+ return s
+}