You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/08/03 13:53:20 UTC
[skywalking-banyandb] 04/05: Index memtable
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch index-mem
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit d2c9e1b1ed44cd7a8d0323e44d8dcf271c4b5f8e
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Aug 2 22:04:54 2021 +0800
Index memtable
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
api/common/metadata.go | 7 +
api/event/discovery.go | 9 +-
api/proto/banyandb/v1/schema.pb.go | 246 ++++++++++++++++++++++++++-----
api/proto/banyandb/v1/schema.proto | 13 +-
banyand/index/index.go | 193 +++++++++++++++++++++++-
banyand/index/search.go | 286 ++++++++++++++++++++++++++++++++++++
banyand/index/tsdb/field_map.go | 6 +-
banyand/index/tsdb/mem.go | 30 ++--
banyand/index/tsdb/mem_test.go | 64 ++++----
banyand/index/tsdb/term_map.go | 12 +-
banyand/index/tsdb/tsdb.go | 30 +++-
banyand/internal/cmd/standalone.go | 4 +-
banyand/query/processor_test.go | 2 +-
banyand/series/trace/common_test.go | 7 +-
banyand/series/trace/service.go | 147 ++++++++++++++++++
banyand/series/trace/trace.go | 93 +-----------
banyand/series/trace/write.go | 45 ++++++
pkg/posting/roaring/roaring.go | 2 +
18 files changed, 1005 insertions(+), 191 deletions(-)
diff --git a/api/common/metadata.go b/api/common/metadata.go
index 93ea657..1facc9e 100644
--- a/api/common/metadata.go
+++ b/api/common/metadata.go
@@ -33,3 +33,10 @@ func (md Metadata) Equal(other Metadata) bool {
md.Spec.Group == other.Spec.Group &&
md.Spec.Name == other.Spec.Name
}
+
+func NewMetadata(spec *v1.Metadata) *Metadata {
+ return &Metadata{
+ KindVersion: MetadataKindVersion,
+ Spec: spec,
+ }
+}
diff --git a/api/event/discovery.go b/api/event/discovery.go
index a38298c..10476b8 100644
--- a/api/event/discovery.go
+++ b/api/event/discovery.go
@@ -33,7 +33,9 @@ var (
Version: "v1",
Kind: "event-series",
}
- TopicSeriesEvent = bus.UniTopic(SeriesEventKindVersion.String())
+ TopicSeriesEvent = bus.UniTopic(SeriesEventKindVersion.String())
+ IndexRuleKindVersion = common.KindVersion{Version: "v1", Kind: "index-rule"}
+ TopicIndexRule = bus.UniTopic(IndexRuleKindVersion.String())
)
type Shard struct {
@@ -45,3 +47,8 @@ type Series struct {
common.KindVersion
Payload v1.SeriesEvent
}
+
+type IndexRule struct {
+ common.KindVersion
+ Payload *v1.IndexRuleEvent
+}
diff --git a/api/proto/banyandb/v1/schema.pb.go b/api/proto/banyandb/v1/schema.pb.go
index 69f0218..56f6144 100644
--- a/api/proto/banyandb/v1/schema.pb.go
+++ b/api/proto/banyandb/v1/schema.pb.go
@@ -978,6 +978,132 @@ func (x *IndexRuleBinding) GetUpdatedAt() *timestamppb.Timestamp {
return nil
}
+type IndexRuleEvent struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Series *Metadata `protobuf:"bytes,1,opt,name=series,proto3" json:"series,omitempty"`
+ Rules []*IndexRuleEvent_ShardedIndexRule `protobuf:"bytes,2,rep,name=rules,proto3" json:"rules,omitempty"`
+ Action Action `protobuf:"varint,4,opt,name=action,proto3,enum=banyandb.v1.Action" json:"action,omitempty"`
+ Time *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=time,proto3" json:"time,omitempty"`
+}
+
+func (x *IndexRuleEvent) Reset() {
+ *x = IndexRuleEvent{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_banyandb_v1_schema_proto_msgTypes[10]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *IndexRuleEvent) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*IndexRuleEvent) ProtoMessage() {}
+
+func (x *IndexRuleEvent) ProtoReflect() protoreflect.Message {
+ mi := &file_banyandb_v1_schema_proto_msgTypes[10]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use IndexRuleEvent.ProtoReflect.Descriptor instead.
+func (*IndexRuleEvent) Descriptor() ([]byte, []int) {
+ return file_banyandb_v1_schema_proto_rawDescGZIP(), []int{10}
+}
+
+func (x *IndexRuleEvent) GetSeries() *Metadata {
+ if x != nil {
+ return x.Series
+ }
+ return nil
+}
+
+func (x *IndexRuleEvent) GetRules() []*IndexRuleEvent_ShardedIndexRule {
+ if x != nil {
+ return x.Rules
+ }
+ return nil
+}
+
+func (x *IndexRuleEvent) GetAction() Action {
+ if x != nil {
+ return x.Action
+ }
+ return Action_ACTION_UNSPECIFIED
+}
+
+func (x *IndexRuleEvent) GetTime() *timestamppb.Timestamp {
+ if x != nil {
+ return x.Time
+ }
+ return nil
+}
+
+type IndexRuleEvent_ShardedIndexRule struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ ShardId uint64 `protobuf:"varint,1,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"`
+ Rules []*IndexRule `protobuf:"bytes,2,rep,name=rules,proto3" json:"rules,omitempty"`
+}
+
+func (x *IndexRuleEvent_ShardedIndexRule) Reset() {
+ *x = IndexRuleEvent_ShardedIndexRule{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_banyandb_v1_schema_proto_msgTypes[11]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *IndexRuleEvent_ShardedIndexRule) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*IndexRuleEvent_ShardedIndexRule) ProtoMessage() {}
+
+func (x *IndexRuleEvent_ShardedIndexRule) ProtoReflect() protoreflect.Message {
+ mi := &file_banyandb_v1_schema_proto_msgTypes[11]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use IndexRuleEvent_ShardedIndexRule.ProtoReflect.Descriptor instead.
+func (*IndexRuleEvent_ShardedIndexRule) Descriptor() ([]byte, []int) {
+ return file_banyandb_v1_schema_proto_rawDescGZIP(), []int{10, 0}
+}
+
+func (x *IndexRuleEvent_ShardedIndexRule) GetShardId() uint64 {
+ if x != nil {
+ return x.ShardId
+ }
+ return 0
+}
+
+func (x *IndexRuleEvent_ShardedIndexRule) GetRules() []*IndexRule {
+ if x != nil {
+ return x.Rules
+ }
+ return nil
+}
+
var File_banyandb_v1_schema_proto protoreflect.FileDescriptor
var file_banyandb_v1_schema_proto_rawDesc = []byte{
@@ -1119,7 +1245,27 @@ var file_banyandb_v1_schema_proto_rawDesc = []byte{
0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69,
0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64,
- 0x41, 0x74, 0x42, 0x60, 0x0a, 0x1e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
+ 0x41, 0x74, 0x22, 0xbd, 0x02, 0x0a, 0x0e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65,
+ 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x2d, 0x0a, 0x06, 0x73, 0x65, 0x72, 0x69, 0x65, 0x73, 0x18,
+ 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
+ 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x06, 0x73, 0x65,
+ 0x72, 0x69, 0x65, 0x73, 0x12, 0x42, 0x0a, 0x05, 0x72, 0x75, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20,
+ 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x76,
+ 0x31, 0x2e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74,
+ 0x2e, 0x53, 0x68, 0x61, 0x72, 0x64, 0x65, 0x64, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c,
+ 0x65, 0x52, 0x05, 0x72, 0x75, 0x6c, 0x65, 0x73, 0x12, 0x2b, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69,
+ 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x13, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61,
+ 0x6e, 0x64, 0x62, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x06, 0x61,
+ 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2e, 0x0a, 0x04, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20,
+ 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
+ 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52,
+ 0x04, 0x74, 0x69, 0x6d, 0x65, 0x1a, 0x5b, 0x0a, 0x10, 0x53, 0x68, 0x61, 0x72, 0x64, 0x65, 0x64,
+ 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x68, 0x61,
+ 0x72, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x73, 0x68, 0x61,
+ 0x72, 0x64, 0x49, 0x64, 0x12, 0x2c, 0x0a, 0x05, 0x72, 0x75, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20,
+ 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x76,
+ 0x31, 0x2e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x52, 0x05, 0x72, 0x75, 0x6c,
+ 0x65, 0x73, 0x42, 0x60, 0x0a, 0x1e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79,
0x61, 0x6e, 0x64, 0x62, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d,
0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69,
@@ -1141,52 +1287,60 @@ func file_banyandb_v1_schema_proto_rawDescGZIP() []byte {
}
var file_banyandb_v1_schema_proto_enumTypes = make([]protoimpl.EnumInfo, 4)
-var file_banyandb_v1_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 10)
+var file_banyandb_v1_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 12)
var file_banyandb_v1_schema_proto_goTypes = []interface{}{
- (Duration_Duration)(0), // 0: banyandb.v1.Duration.Duration
- (FieldSpec_FieldType)(0), // 1: banyandb.v1.FieldSpec.FieldType
- (IndexObject_IndexType)(0), // 2: banyandb.v1.IndexObject.IndexType
- (Series_Catalog)(0), // 3: banyandb.v1.Series.Catalog
- (*ShardInfo)(nil), // 4: banyandb.v1.ShardInfo
- (*Duration)(nil), // 5: banyandb.v1.Duration
- (*FieldSpec)(nil), // 6: banyandb.v1.FieldSpec
- (*TraceStateMap)(nil), // 7: banyandb.v1.TraceStateMap
- (*TraceFieldMap)(nil), // 8: banyandb.v1.TraceFieldMap
- (*TraceSeries)(nil), // 9: banyandb.v1.TraceSeries
- (*IndexObject)(nil), // 10: banyandb.v1.IndexObject
- (*IndexRule)(nil), // 11: banyandb.v1.IndexRule
- (*Series)(nil), // 12: banyandb.v1.Series
- (*IndexRuleBinding)(nil), // 13: banyandb.v1.IndexRuleBinding
- (*Metadata)(nil), // 14: banyandb.v1.Metadata
- (*timestamppb.Timestamp)(nil), // 15: google.protobuf.Timestamp
+ (Duration_Duration)(0), // 0: banyandb.v1.Duration.Duration
+ (FieldSpec_FieldType)(0), // 1: banyandb.v1.FieldSpec.FieldType
+ (IndexObject_IndexType)(0), // 2: banyandb.v1.IndexObject.IndexType
+ (Series_Catalog)(0), // 3: banyandb.v1.Series.Catalog
+ (*ShardInfo)(nil), // 4: banyandb.v1.ShardInfo
+ (*Duration)(nil), // 5: banyandb.v1.Duration
+ (*FieldSpec)(nil), // 6: banyandb.v1.FieldSpec
+ (*TraceStateMap)(nil), // 7: banyandb.v1.TraceStateMap
+ (*TraceFieldMap)(nil), // 8: banyandb.v1.TraceFieldMap
+ (*TraceSeries)(nil), // 9: banyandb.v1.TraceSeries
+ (*IndexObject)(nil), // 10: banyandb.v1.IndexObject
+ (*IndexRule)(nil), // 11: banyandb.v1.IndexRule
+ (*Series)(nil), // 12: banyandb.v1.Series
+ (*IndexRuleBinding)(nil), // 13: banyandb.v1.IndexRuleBinding
+ (*IndexRuleEvent)(nil), // 14: banyandb.v1.IndexRuleEvent
+ (*IndexRuleEvent_ShardedIndexRule)(nil), // 15: banyandb.v1.IndexRuleEvent.ShardedIndexRule
+ (*Metadata)(nil), // 16: banyandb.v1.Metadata
+ (*timestamppb.Timestamp)(nil), // 17: google.protobuf.Timestamp
+ (Action)(0), // 18: banyandb.v1.Action
}
var file_banyandb_v1_schema_proto_depIdxs = []int32{
0, // 0: banyandb.v1.Duration.unit:type_name -> banyandb.v1.Duration.Duration
1, // 1: banyandb.v1.FieldSpec.type:type_name -> banyandb.v1.FieldSpec.FieldType
7, // 2: banyandb.v1.TraceFieldMap.state:type_name -> banyandb.v1.TraceStateMap
- 14, // 3: banyandb.v1.TraceSeries.metadata:type_name -> banyandb.v1.Metadata
+ 16, // 3: banyandb.v1.TraceSeries.metadata:type_name -> banyandb.v1.Metadata
6, // 4: banyandb.v1.TraceSeries.fields:type_name -> banyandb.v1.FieldSpec
8, // 5: banyandb.v1.TraceSeries.reserved_fields_map:type_name -> banyandb.v1.TraceFieldMap
4, // 6: banyandb.v1.TraceSeries.shard:type_name -> banyandb.v1.ShardInfo
5, // 7: banyandb.v1.TraceSeries.duration:type_name -> banyandb.v1.Duration
- 15, // 8: banyandb.v1.TraceSeries.updated_at:type_name -> google.protobuf.Timestamp
+ 17, // 8: banyandb.v1.TraceSeries.updated_at:type_name -> google.protobuf.Timestamp
2, // 9: banyandb.v1.IndexObject.type:type_name -> banyandb.v1.IndexObject.IndexType
- 14, // 10: banyandb.v1.IndexRule.metadata:type_name -> banyandb.v1.Metadata
+ 16, // 10: banyandb.v1.IndexRule.metadata:type_name -> banyandb.v1.Metadata
10, // 11: banyandb.v1.IndexRule.objects:type_name -> banyandb.v1.IndexObject
- 15, // 12: banyandb.v1.IndexRule.updated_at:type_name -> google.protobuf.Timestamp
+ 17, // 12: banyandb.v1.IndexRule.updated_at:type_name -> google.protobuf.Timestamp
3, // 13: banyandb.v1.Series.catalog:type_name -> banyandb.v1.Series.Catalog
- 14, // 14: banyandb.v1.Series.series:type_name -> banyandb.v1.Metadata
- 14, // 15: banyandb.v1.IndexRuleBinding.metadata:type_name -> banyandb.v1.Metadata
- 14, // 16: banyandb.v1.IndexRuleBinding.rule_ref:type_name -> banyandb.v1.Metadata
+ 16, // 14: banyandb.v1.Series.series:type_name -> banyandb.v1.Metadata
+ 16, // 15: banyandb.v1.IndexRuleBinding.metadata:type_name -> banyandb.v1.Metadata
+ 16, // 16: banyandb.v1.IndexRuleBinding.rule_ref:type_name -> banyandb.v1.Metadata
12, // 17: banyandb.v1.IndexRuleBinding.subjects:type_name -> banyandb.v1.Series
- 15, // 18: banyandb.v1.IndexRuleBinding.begin_at:type_name -> google.protobuf.Timestamp
- 15, // 19: banyandb.v1.IndexRuleBinding.expire_at:type_name -> google.protobuf.Timestamp
- 15, // 20: banyandb.v1.IndexRuleBinding.updated_at:type_name -> google.protobuf.Timestamp
- 21, // [21:21] is the sub-list for method output_type
- 21, // [21:21] is the sub-list for method input_type
- 21, // [21:21] is the sub-list for extension type_name
- 21, // [21:21] is the sub-list for extension extendee
- 0, // [0:21] is the sub-list for field type_name
+ 17, // 18: banyandb.v1.IndexRuleBinding.begin_at:type_name -> google.protobuf.Timestamp
+ 17, // 19: banyandb.v1.IndexRuleBinding.expire_at:type_name -> google.protobuf.Timestamp
+ 17, // 20: banyandb.v1.IndexRuleBinding.updated_at:type_name -> google.protobuf.Timestamp
+ 16, // 21: banyandb.v1.IndexRuleEvent.series:type_name -> banyandb.v1.Metadata
+ 15, // 22: banyandb.v1.IndexRuleEvent.rules:type_name -> banyandb.v1.IndexRuleEvent.ShardedIndexRule
+ 18, // 23: banyandb.v1.IndexRuleEvent.action:type_name -> banyandb.v1.Action
+ 17, // 24: banyandb.v1.IndexRuleEvent.time:type_name -> google.protobuf.Timestamp
+ 11, // 25: banyandb.v1.IndexRuleEvent.ShardedIndexRule.rules:type_name -> banyandb.v1.IndexRule
+ 26, // [26:26] is the sub-list for method output_type
+ 26, // [26:26] is the sub-list for method input_type
+ 26, // [26:26] is the sub-list for extension type_name
+ 26, // [26:26] is the sub-list for extension extendee
+ 0, // [0:26] is the sub-list for field type_name
}
func init() { file_banyandb_v1_schema_proto_init() }
@@ -1316,6 +1470,30 @@ func file_banyandb_v1_schema_proto_init() {
return nil
}
}
+ file_banyandb_v1_schema_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*IndexRuleEvent); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_banyandb_v1_schema_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*IndexRuleEvent_ShardedIndexRule); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
}
type x struct{}
out := protoimpl.TypeBuilder{
@@ -1323,7 +1501,7 @@ func file_banyandb_v1_schema_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_banyandb_v1_schema_proto_rawDesc,
NumEnums: 4,
- NumMessages: 10,
+ NumMessages: 12,
NumExtensions: 0,
NumServices: 0,
},
diff --git a/api/proto/banyandb/v1/schema.proto b/api/proto/banyandb/v1/schema.proto
index 273c155..aeb8e5a 100644
--- a/api/proto/banyandb/v1/schema.proto
+++ b/api/proto/banyandb/v1/schema.proto
@@ -184,4 +184,15 @@ message IndexRuleBinding {
google.protobuf.Timestamp expire_at = 5;
// updated_at_nanoseconds indicates when the IndexRuleBinding is updated
google.protobuf.Timestamp updated_at = 6;
-}
\ No newline at end of file
+}
+
+message IndexRuleEvent {
+ Metadata series = 1;
+ message ShardedIndexRule {
+ uint64 shard_id = 1;
+ repeated IndexRule rules = 2;
+ }
+ repeated ShardedIndexRule rules = 2;
+ Action action = 4;
+ google.protobuf.Timestamp time = 5;
+}
diff --git a/banyand/index/index.go b/banyand/index/index.go
index 6ab47f2..c09f27f 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -19,29 +19,49 @@ package index
import (
"context"
+ "sync"
+
+ "github.com/pkg/errors"
+ "go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/api/event"
apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
- "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/banyand/index/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/posting"
"github.com/apache/skywalking-banyandb/pkg/run"
)
+var (
+ ErrShardNotFound = errors.New("series doesn't exist")
+ ErrTraceSeriesNotFound = errors.New("trace series not found")
+ ErrUnknownField = errors.New("the field is unknown")
+)
+
type Condition struct {
Key string
Values [][]byte
Op apiv1.PairQuery_BinaryOp
}
-//go:generate mockgen -destination=./index_mock.go -package=index . Repo
+type Field struct {
+ ChunkID common.ChunkID
+ Name string
+ Value []byte
+}
+
+//go:generate mockgen -destination=./index_mock.go -package=index . Service
type Repo interface {
Search(seriesMeta common.Metadata, shardID uint, startTime, endTime uint64, indexObjectName string, conditions []Condition) (posting.List, error)
+ Insert(seriesMeta common.Metadata, shardID uint, fields *Field) error
}
type Builder interface {
- run.Config
run.PreRunner
+ run.Service
}
type Service interface {
@@ -49,6 +69,169 @@ type Service interface {
Builder
}
-func NewService(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Service, error) {
- return nil, nil
+type series struct {
+ repo map[uint]*shard
+}
+
+type shard struct {
+ meta map[string][]*apiv1.IndexObject
+ store tsdb.GlobalStore
+}
+
+type service struct {
+ meta *indexMeta
+ log *logger.Logger
+ repo discovery.ServiceRepo
+ stopCh chan struct{}
+ indexRuleListener *indexRuleListener
+}
+
+func NewService(_ context.Context, repo discovery.ServiceRepo) (Service, error) {
+ svc := &service{
+ repo: repo,
+ indexRuleListener: &indexRuleListener{},
+ }
+ svc.meta = &indexMeta{
+ meta: make(map[string]*series),
+ }
+ svc.indexRuleListener.indexMeta = svc.meta
+ svc.indexRuleListener.closeFunc = func() {
+ svc.stopCh <- struct{}{}
+ }
+ return svc, nil
+}
+
+func (s *service) Insert(series common.Metadata, shardID uint, field *Field) error {
+ sd, err := s.getShard(series, shardID)
+ if err != nil {
+ return err
+ }
+ objects, ok := sd.meta[field.Name]
+ if !ok {
+ return ErrUnknownField
+ }
+ for _, object := range objects {
+ err = multierr.Append(err, sd.store.Insert(&tsdb.Field{
+ Name: []byte(compositeFieldID(object.GetName(), field.Name)),
+ Value: field.Value,
+ }, field.ChunkID))
+ }
+ return err
+}
+
+func (s *service) getShard(series common.Metadata, shardID uint) (*shard, error) {
+ id := compositeSeriesID(series.Spec)
+ ss, ok := s.meta.meta[id]
+ if !ok {
+ return nil, errors.Wrapf(ErrTraceSeriesNotFound, "identify:%s", id)
+ }
+ sd, existSearcher := ss.repo[shardID]
+ if !existSearcher {
+ return nil, errors.Wrapf(ErrShardNotFound, "shardID:%d", shardID)
+ }
+ return sd, nil
+}
+
+func (s *service) Name() string {
+ return "index"
+}
+
+func (s *service) PreRun() error {
+ //TODO: listen to written data
+ s.log = logger.GetLogger("index")
+ s.indexRuleListener.log = s.log
+ return s.repo.Subscribe(event.TopicIndexRule, s.indexRuleListener)
+}
+
+func (s *service) Serve() error {
+ s.stopCh = make(chan struct{})
+ <-s.stopCh
+ return nil
+}
+
+func (s *service) GracefulStop() {
+ if s.stopCh != nil {
+ close(s.stopCh)
+ }
+}
+
+type indexMeta struct {
+ meta map[string]*series
+ sync.RWMutex
+}
+
+type indexRuleListener struct {
+ log *logger.Logger
+ indexMeta *indexMeta
+ closeFunc func()
+}
+
+func (i *indexRuleListener) Rev(message bus.Message) (resp bus.Message) {
+ indexRuleEvent, ok := message.Data().(*apiv1.IndexRuleEvent)
+ if !ok {
+ i.log.Warn().Msg("invalid event data type")
+ return
+ }
+ i.log.Info().
+ Str("action", apiv1.Action_name[int32(indexRuleEvent.Action)]).
+ Str("series", indexRuleEvent.Series.String()).
+ Msg("received an index rule")
+ i.indexMeta.Lock()
+ defer i.indexMeta.Unlock()
+ switch indexRuleEvent.Action {
+ case apiv1.Action_ACTION_PUT:
+ seriesID := compositeSeriesID(indexRuleEvent.Series)
+ newSeries := &series{
+ repo: make(map[uint]*shard),
+ }
+ for _, rule := range indexRuleEvent.Rules {
+ store := tsdb.NewStore(indexRuleEvent.Series.Name, indexRuleEvent.Series.Group, uint(rule.ShardId))
+ fields := make([]tsdb.FieldSpec, 0, len(rule.Rules))
+ meta := make(map[string][]*apiv1.IndexObject)
+ for _, indexRule := range rule.GetRules() {
+ for _, object := range indexRule.Objects {
+ fieldsSize := len(object.Fields)
+ if fieldsSize > 1 {
+ //TODO: to support composited index
+ i.log.Error().Str("name", object.Name).
+ Msg("index module doesn't support composited index object")
+ i.closeFunc()
+ } else if fieldsSize < 1 {
+ continue
+ }
+ field := object.Fields[0]
+ fieldSpec := tsdb.FieldSpec{
+ Name: compositeFieldID(object.Name, field),
+ }
+ fields = append(fields, fieldSpec)
+ objects, existed := meta[field]
+ if !existed {
+ objects = make([]*apiv1.IndexObject, 0, 1)
+ }
+ objects = append(objects, object)
+ meta[field] = objects
+ }
+ }
+ err := store.Initialize(fields)
+ if err != nil {
+ i.log.Warn().Err(err).Msg("failed to initialize index getShard")
+ }
+ newSeries.repo[uint(rule.ShardId)] = &shard{
+ store: store,
+ meta: meta,
+ }
+ }
+ i.indexMeta.meta[seriesID] = newSeries
+ default:
+ i.log.Warn().Msg("unsupported action")
+ }
+ return
+}
+
+func compositeFieldID(indexObjectName, field string) string {
+ return indexObjectName + ":" + field
+}
+
+func compositeSeriesID(series *apiv1.Metadata) string {
+ return series.Name + "-" + series.Group
}
diff --git a/banyand/index/search.go b/banyand/index/search.go
new file mode 100644
index 0000000..0c829f6
--- /dev/null
+++ b/banyand/index/search.go
@@ -0,0 +1,286 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package index
+
+import (
+ "github.com/pkg/errors"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ "github.com/apache/skywalking-banyandb/banyand/index/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/bytes"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
+ "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+var ErrNotRangeOperation = errors.New("this is not an range operation")
+
+type executable interface {
+ execute() (posting.List, error)
+}
+
+type searchTree interface {
+ executable
+}
+
+func (s *service) Search(series common.Metadata, shardID uint, startTime, endTime uint64, indexObjectName string, conditions []Condition) (posting.List, error) {
+ sd, err := s.getShard(series, shardID)
+ if err != nil {
+ return nil, err
+ }
+ store := sd.store
+ searcher, hasData := store.Window(startTime, endTime)
+ if !hasData {
+ return roaring.EmptyPostingList, nil
+ }
+ tree, errBuild := buildSearchTree(searcher, indexObjectName, conditions)
+ if errBuild != nil {
+ return nil, err
+ }
+ return tree.execute()
+}
+
+func buildSearchTree(searcher tsdb.Searcher, indexObject string, conditions []Condition) (searchTree, error) {
+ condMap := toMap(indexObject, conditions)
+ root := &andNode{
+ node: &node{
+ SubNodes: make([]executable, 0),
+ searcher: searcher,
+ },
+ }
+ for key, conds := range condMap {
+ var rangeLeaf *rangeOp
+ for _, cond := range conds {
+ if rangeLeaf != nil && !rangeOP(cond.Op) {
+ return nil, errors.Wrapf(ErrNotRangeOperation, "op:%s", cond.Op.String())
+ }
+ if rangeOP(cond.Op) {
+ if rangeLeaf == nil {
+ rangeLeaf = root.addRangeLeaf(key)
+ }
+ opts := rangeLeaf.Opts
+ switch cond.Op {
+ case apiv1.PairQuery_BINARY_OP_GT:
+ opts.Lower = bytes.Join(cond.Values...)
+ case apiv1.PairQuery_BINARY_OP_GE:
+ opts.Lower = bytes.Join(cond.Values...)
+ opts.IncludesLower = true
+ case apiv1.PairQuery_BINARY_OP_LT:
+ opts.Upper = bytes.Join(cond.Values...)
+ case apiv1.PairQuery_BINARY_OP_LE:
+ opts.Upper = bytes.Join(cond.Values...)
+ opts.IncludesUpper = true
+ }
+ continue
+ }
+ switch cond.Op {
+ case apiv1.PairQuery_BINARY_OP_EQ:
+ root.addEq(key, cond.Values)
+ case apiv1.PairQuery_BINARY_OP_NE:
+ root.addNot(key, root.newEq(key, cond.Values))
+ case apiv1.PairQuery_BINARY_OP_HAVING:
+ n := root.addOrNode(len(cond.Values))
+ for _, v := range cond.Values {
+ n.addEq(key, [][]byte{v})
+ }
+ case apiv1.PairQuery_BINARY_OP_NOT_HAVING:
+ n := root.addOrNode(len(cond.Values))
+ for _, v := range cond.Values {
+ n.addEq(key, [][]byte{v})
+ }
+ root.addNot(key, n)
+ }
+ }
+ }
+ return root, nil
+}
+
+func rangeOP(op apiv1.PairQuery_BinaryOp) bool {
+ switch op {
+ case apiv1.PairQuery_BINARY_OP_GT:
+ case apiv1.PairQuery_BINARY_OP_GE:
+ case apiv1.PairQuery_BINARY_OP_LT:
+ case apiv1.PairQuery_BINARY_OP_LE:
+ return true
+ }
+ return false
+}
+
+func toMap(indexObject string, condition []Condition) map[string][]Condition {
+ result := make(map[string][]Condition)
+ for _, c := range condition {
+ key := compositeFieldID(indexObject, c.Key)
+ l, ok := result[key]
+ if ok {
+ l = append(l, c)
+ result[key] = l
+ continue
+ }
+ result[key] = []Condition{c}
+ }
+ return result
+}
+
+type logicalOP interface {
+ merge(posting.List) error
+}
+
+type node struct {
+ logicalOP
+ executable
+ searcher tsdb.Searcher
+ value posting.List
+ SubNodes []executable `json:"sub_nodes,omitempty"`
+}
+
+func (n *node) newEq(key string, values [][]byte) *eq {
+ return &eq{
+ leaf: &leaf{
+ Key: []byte(key),
+ values: values,
+ searcher: n.searcher,
+ },
+ }
+}
+
+func (n *node) addEq(key string, values [][]byte) {
+ n.SubNodes = append(n.SubNodes, n.newEq(key, values))
+}
+
+func (n *node) addNot(key string, inner executable) {
+ n.SubNodes = append(n.SubNodes, ¬{
+ Key: []byte(key),
+ searcher: n.searcher,
+ Inner: inner,
+ })
+}
+
+func (n *node) addRangeLeaf(key string) *rangeOp {
+ r := &rangeOp{
+ leaf: &leaf{
+ Key: []byte(key),
+ searcher: n.searcher,
+ },
+ Opts: &tsdb.RangeOpts{},
+ }
+ n.SubNodes = append(n.SubNodes, r)
+ return r
+}
+
+func (n *node) addOrNode(size int) *orNode {
+ on := &orNode{
+ node: &node{
+ searcher: n.searcher,
+ SubNodes: make([]executable, 0, size),
+ },
+ }
+ n.SubNodes = append(n.SubNodes, on)
+ return on
+}
+
+func (n *node) pop() (executable, bool) {
+ if len(n.SubNodes) < 1 {
+ return nil, false
+ }
+ sn := n.SubNodes[0]
+ n.SubNodes = n.SubNodes[1:]
+ return sn, true
+}
+
+func (n *node) execute() (posting.List, error) {
+ ex, hasNext := n.pop()
+ if !hasNext {
+ return n.value, nil
+ }
+ r, err := ex.execute()
+ if err != nil {
+ return nil, err
+ }
+ if n.value == nil {
+ n.value = r
+ return n.execute()
+ }
+ err = n.merge(r)
+ if err != nil {
+ return nil, err
+ }
+ if n.value.IsEmpty() {
+ return n.value, nil
+ }
+ return n.execute()
+}
+
+type andNode struct {
+ *node
+}
+
+func (an *andNode) merge(list posting.List) error {
+ return an.value.Intersect(list)
+}
+
+type orNode struct {
+ *node
+}
+
+func (on *orNode) merge(list posting.List) error {
+ return on.value.Union(list)
+}
+
+type leaf struct {
+ executable
+ Key []byte `json:"Key"`
+ values [][]byte
+ searcher tsdb.Searcher
+}
+
+type not struct {
+ executable
+ Key []byte `json:"key"`
+ searcher tsdb.Searcher
+ Inner executable `json:"inner,omitempty"`
+}
+
+func (n *not) execute() (posting.List, error) {
+ all := n.searcher.MatchField(n.Key)
+ list, err := n.Inner.execute()
+ if err != nil {
+ return nil, err
+ }
+ err = all.Difference(list)
+ return all, err
+}
+
+type eq struct {
+ *leaf
+}
+
+func (eq *eq) execute() (posting.List, error) {
+ return eq.searcher.MatchTerms(&tsdb.Field{
+ Name: eq.Key,
+ Value: bytes.Join(eq.values...),
+ }), nil
+}
+
+type rangeOp struct {
+ *leaf
+ Opts *tsdb.RangeOpts `json:"opts"`
+}
+
+func (r *rangeOp) execute() (posting.List, error) {
+ return r.searcher.Range(r.Key, r.Opts), nil
+}
diff --git a/banyand/index/tsdb/field_map.go b/banyand/index/tsdb/field_map.go
index cc80bad..4d89713 100644
--- a/banyand/index/tsdb/field_map.go
+++ b/banyand/index/tsdb/field_map.go
@@ -51,11 +51,11 @@ func (fm *fieldMap) get(key []byte) (*fieldValue, bool) {
}
func (fm *fieldMap) put(fv *Field, id common.ChunkID) error {
- pm, ok := fm.get(fv.name)
+ pm, ok := fm.get(fv.Name)
if !ok {
- return errors.Wrapf(ErrFieldAbsent, "filed name:%s", fv.name)
+ return errors.Wrapf(ErrFieldAbsent, "filed Name:%s", fv.Name)
}
- return pm.value.put(fv.value, id)
+ return pm.value.put(fv.Value, id)
}
type fieldValue struct {
diff --git a/banyand/index/tsdb/mem.go b/banyand/index/tsdb/mem.go
index 39af209..a96b107 100644
--- a/banyand/index/tsdb/mem.go
+++ b/banyand/index/tsdb/mem.go
@@ -27,24 +27,24 @@ import (
var ErrFieldsAbsent = errors.New("fields are absent")
-var emptyPostingList = roaring.NewPostingList()
-
type MemTable struct {
- terms *fieldMap
- name string
- group string
+ terms *fieldMap
+ name string
+ group string
+ shardID uint
}
-func NewMemTable(name, group string) *MemTable {
+func NewMemTable(name, group string, shardID uint) *MemTable {
return &MemTable{
- name: name,
- group: group,
+ name: name,
+ group: group,
+ shardID: shardID,
}
}
type Field struct {
- name []byte
- value []byte
+ Name []byte
+ Value []byte
}
type FieldSpec struct {
@@ -69,23 +69,23 @@ func (m *MemTable) Insert(field *Field, chunkID common.ChunkID) error {
func (m *MemTable) MatchField(fieldName []byte) (list posting.List) {
fieldsValues, ok := m.terms.get(fieldName)
if !ok {
- return emptyPostingList
+ return roaring.EmptyPostingList
}
return fieldsValues.value.allValues()
}
func (m *MemTable) MatchTerms(field *Field) (list posting.List) {
- fieldsValues, ok := m.terms.get(field.name)
+ fieldsValues, ok := m.terms.get(field.Name)
if !ok {
- return emptyPostingList
+ return roaring.EmptyPostingList
}
- return fieldsValues.value.get(field.value)
+ return fieldsValues.value.get(field.Value)
}
func (m *MemTable) Range(fieldName []byte, opts *RangeOpts) (list posting.List) {
fieldsValues, ok := m.terms.get(fieldName)
if !ok {
- return emptyPostingList
+ return roaring.EmptyPostingList
}
return fieldsValues.value.getRange(opts)
}
diff --git a/banyand/index/tsdb/mem_test.go b/banyand/index/tsdb/mem_test.go
index 0ab2af2..fed7655 100644
--- a/banyand/index/tsdb/mem_test.go
+++ b/banyand/index/tsdb/mem_test.go
@@ -58,7 +58,7 @@ func TestMemTable_Initialize(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- m := NewMemTable("sw", "group")
+ m := NewMemTable("sw", "group", 0)
var err error
if err = m.Initialize(tt.args.fields); (err != nil) != tt.wantErr {
t.Errorf("Initialize() error = %v, wantErr %v", err, tt.wantErr)
@@ -76,7 +76,7 @@ func TestMemTable_Range(t *testing.T) {
fieldName []byte
opts *RangeOpts
}
- m := NewMemTable("sw", "group")
+ m := NewMemTable("sw", "group", 0)
setUp(t, m)
tests := []struct {
name string
@@ -93,8 +93,8 @@ func TestMemTable_Range(t *testing.T) {
},
},
wantList: m.MatchTerms(&Field{
- name: []byte("duration"),
- value: convert.Uint16ToBytes(200),
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(200),
}),
},
{
@@ -108,8 +108,8 @@ func TestMemTable_Range(t *testing.T) {
},
wantList: union(m,
&Field{
- name: []byte("duration"),
- value: convert.Uint16ToBytes(200),
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(200),
},
),
},
@@ -125,12 +125,12 @@ func TestMemTable_Range(t *testing.T) {
},
wantList: union(m,
&Field{
- name: []byte("duration"),
- value: convert.Uint16ToBytes(50),
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(50),
},
&Field{
- name: []byte("duration"),
- value: convert.Uint16ToBytes(200),
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(200),
},
),
},
@@ -146,12 +146,12 @@ func TestMemTable_Range(t *testing.T) {
},
wantList: union(m,
&Field{
- name: []byte("duration"),
- value: convert.Uint16ToBytes(200),
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(200),
},
&Field{
- name: []byte("duration"),
- value: convert.Uint16ToBytes(1000),
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(1000),
},
),
},
@@ -168,16 +168,16 @@ func TestMemTable_Range(t *testing.T) {
},
wantList: union(m,
&Field{
- name: []byte("duration"),
- value: convert.Uint16ToBytes(50),
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(50),
},
&Field{
- name: []byte("duration"),
- value: convert.Uint16ToBytes(200),
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(200),
},
&Field{
- name: []byte("duration"),
- value: convert.Uint16ToBytes(1000),
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(1000),
},
),
},
@@ -194,8 +194,8 @@ func TestMemTable_Range(t *testing.T) {
},
wantList: union(m,
&Field{
- name: []byte("duration"),
- value: convert.Uint16ToBytes(200),
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(200),
},
),
},
@@ -229,13 +229,13 @@ func setUp(t *testing.T, mt *MemTable) {
for i := 0; i < 100; i++ {
if i%2 == 0 {
assert.NoError(t, mt.Insert(&Field{
- name: []byte("service_name"),
- value: []byte("gateway"),
+ Name: []byte("service_name"),
+ Value: []byte("gateway"),
}, common.ChunkID(i)))
} else {
assert.NoError(t, mt.Insert(&Field{
- name: []byte("service_name"),
- value: []byte("webpage"),
+ Name: []byte("service_name"),
+ Value: []byte("webpage"),
}, common.ChunkID(i)))
}
}
@@ -243,18 +243,18 @@ func setUp(t *testing.T, mt *MemTable) {
switch {
case i%3 == 0:
assert.NoError(t, mt.Insert(&Field{
- name: []byte("duration"),
- value: convert.Uint16ToBytes(50),
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(50),
}, common.ChunkID(i)))
case i%3 == 1:
assert.NoError(t, mt.Insert(&Field{
- name: []byte("duration"),
- value: convert.Uint16ToBytes(200),
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(200),
}, common.ChunkID(i)))
case i%3 == 2:
assert.NoError(t, mt.Insert(&Field{
- name: []byte("duration"),
- value: convert.Uint16ToBytes(1000),
+ Name: []byte("duration"),
+ Value: convert.Uint16ToBytes(1000),
}, common.ChunkID(i)))
}
}
diff --git a/banyand/index/tsdb/term_map.go b/banyand/index/tsdb/term_map.go
index 77d84ef..f325282 100644
--- a/banyand/index/tsdb/term_map.go
+++ b/banyand/index/tsdb/term_map.go
@@ -49,7 +49,7 @@ func (p *postingMap) put(key []byte, id common.ChunkID) error {
func (p *postingMap) getOrCreate(key []byte) posting.List {
list := p.get(key)
- if list != emptyPostingList {
+ if list != roaring.EmptyPostingList {
return list
}
p.mutex.Lock()
@@ -64,10 +64,12 @@ func (p *postingMap) getOrCreate(key []byte) posting.List {
}
func (p *postingMap) get(key []byte) posting.List {
+ p.mutex.RLock()
+ defer p.mutex.RUnlock()
hashedKey := termHashID(convert.Hash(key))
v, ok := p.repo[hashedKey]
if !ok {
- return emptyPostingList
+ return roaring.EmptyPostingList
}
return v.value
}
@@ -83,13 +85,12 @@ func (p *postingMap) allValues() posting.List {
func (p *postingMap) getRange(opts *RangeOpts) posting.List {
switch bytes.Compare(opts.Upper, opts.Lower) {
case -1:
- return emptyPostingList
+ return roaring.EmptyPostingList
case 0:
if opts.IncludesUpper && opts.IncludesLower {
return p.get(opts.Upper)
- } else {
- return emptyPostingList
}
+ return roaring.EmptyPostingList
}
keys := make(Asc, 0, len(p.repo))
for _, v := range p.repo {
@@ -113,7 +114,6 @@ func (p *postingMap) getRange(opts *RangeOpts) posting.List {
if opts.IncludesUpper {
_ = result.Union(p.repo[termHashID(convert.Hash(k))].value)
}
- break
default:
_ = result.Union(p.repo[termHashID(convert.Hash(k))].value)
}
diff --git a/banyand/index/tsdb/tsdb.go b/banyand/index/tsdb/tsdb.go
index 06c96af..91f114d 100644
--- a/banyand/index/tsdb/tsdb.go
+++ b/banyand/index/tsdb/tsdb.go
@@ -29,9 +29,37 @@ type RangeOpts struct {
IncludesLower bool
}
-type Reader interface {
+type GlobalStore interface {
+ Window(startTime, endTime uint64) (Searcher, bool)
+ Initialize(fields []FieldSpec) error
Insert(field *Field, chunkID common.ChunkID) error
+}
+
+type Searcher interface {
MatchField(fieldNames []byte) (list posting.List)
MatchTerms(field *Field) (list posting.List)
Range(fieldName []byte, opts *RangeOpts) (list posting.List)
}
+
+type store struct {
+ memTable *MemTable
+ //TODO: add data tables
+}
+
+func (s *store) Window(_, _ uint64) (Searcher, bool) {
+ return s.memTable, true
+}
+
+func (s *store) Initialize(fields []FieldSpec) error {
+ return s.memTable.Initialize(fields)
+}
+
+func (s *store) Insert(field *Field, chunkID common.ChunkID) error {
+ return s.memTable.Insert(field, chunkID)
+}
+
+func NewStore(name, group string, shardID uint) GlobalStore {
+ return &store{
+ memTable: NewMemTable(name, group, shardID),
+ }
+}
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index b024cbe..d694895 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -57,11 +57,11 @@ func newStandaloneCmd() *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate database")
}
- idx, err := index.NewService(ctx, repo, pipeline)
+ idx, err := index.NewService(ctx, repo)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate index builder")
}
- traceSeries, err := trace.NewService(ctx, db, repo)
+ traceSeries, err := trace.NewService(ctx, db, repo, idx)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate trace series")
}
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index 7ebe12f..64e1bb3 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -70,7 +70,7 @@ func setupServices(tester *require.Assertions) (discovery.ServiceRepo, series.Se
tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
// Init `Trace` module
- traceSvc, err := trace.NewService(context.TODO(), db, repo)
+ traceSvc, err := trace.NewService(context.TODO(), db, repo, nil)
tester.NoError(err)
// Init `Query` module
diff --git a/banyand/series/trace/common_test.go b/banyand/series/trace/common_test.go
index 51f1603..f8b40ae 100644
--- a/banyand/series/trace/common_test.go
+++ b/banyand/series/trace/common_test.go
@@ -26,12 +26,14 @@ import (
"testing"
"time"
+ "github.com/golang/mock/gomock"
googleUUID "github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ "github.com/apache/skywalking-banyandb/banyand/index"
"github.com/apache/skywalking-banyandb/banyand/storage"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -63,7 +65,10 @@ func setup(t *testing.T) (*traceSeries, func()) {
assert.NoError(t, err)
rootPath := path.Join(os.TempDir(), "banyandb-"+uuid.String())
assert.NoError(t, db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
- svc, err := NewService(context.TODO(), db, nil)
+ ctrl := gomock.NewController(t)
+ mockIndex := index.NewMockService(ctrl)
+ mockIndex.EXPECT().Insert(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+ svc, err := NewService(context.TODO(), db, nil, mockIndex)
assert.NoError(t, err)
assert.NoError(t, svc.PreRun())
assert.NoError(t, db.PreRun())
diff --git a/banyand/series/trace/service.go b/banyand/series/trace/service.go
new file mode 100644
index 0000000..7049a9d
--- /dev/null
+++ b/banyand/series/trace/service.go
@@ -0,0 +1,147 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+ "context"
+ "time"
+
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ "github.com/apache/skywalking-banyandb/api/event"
+ v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/index"
+ "github.com/apache/skywalking-banyandb/banyand/series"
+ "github.com/apache/skywalking-banyandb/banyand/series/schema"
+ "github.com/apache/skywalking-banyandb/banyand/storage"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pb"
+)
+
+var _ series.Service = (*service)(nil)
+
+type service struct {
+ db storage.Database
+ schemaMap map[string]*traceSeries
+ l *logger.Logger
+ repo discovery.ServiceRepo
+ stopCh chan struct{}
+ idx index.Service
+}
+
+//NewService returns a new service
+func NewService(_ context.Context, db storage.Database, repo discovery.ServiceRepo, idx index.Service) (series.Service, error) {
+ return &service{
+ db: db,
+ repo: repo,
+ idx: idx,
+ }, nil
+}
+
+func (s *service) Name() string {
+ return "trace-series"
+}
+
+func (s *service) PreRun() error {
+ schemas, err := s.TraceSeries().List(context.Background(), schema.ListOpt{})
+ if err != nil {
+ return err
+ }
+ s.schemaMap = make(map[string]*traceSeries, len(schemas))
+ s.l = logger.GetLogger(s.Name())
+ for _, sa := range schemas {
+ ts, errTS := newTraceSeries(sa, s.l, s.idx)
+ if errTS != nil {
+ return errTS
+ }
+ s.db.Register(ts)
+ id := formatTraceSeriesID(ts.name, ts.group)
+ s.schemaMap[id] = ts
+ s.l.Info().Str("id", id).Msg("initialize Trace series")
+ }
+ return err
+}
+
+func (s *service) Serve() error {
+ now := time.Now().UnixNano()
+ for _, sMeta := range s.schemaMap {
+ e := pb.NewSeriesEventBuilder().
+ SeriesMetadata(sMeta.group, sMeta.name).
+ FieldNames(sMeta.fieldsNamesCompositeSeriesID...).
+ Time(time.Now()).
+ Action(v1.Action_ACTION_PUT).
+ Build()
+ _, err := s.repo.Publish(event.TopicSeriesEvent, bus.NewMessage(bus.MessageID(now), e))
+ if err != nil {
+ return err
+ }
+ seriesObj := &v1.Series{
+ Series: &v1.Metadata{
+ Name: sMeta.name,
+ Group: sMeta.group,
+ },
+ }
+ rules, errGetRules := s.IndexRules(context.Background(), seriesObj, nil)
+ if errGetRules != nil {
+ return errGetRules
+ }
+ shardedRuleIndex := make([]*v1.IndexRuleEvent_ShardedIndexRule, 0, len(rules)*int(sMeta.shardNum))
+ for i := 0; i < int(sMeta.shardNum); i++ {
+ t := time.Now()
+ e := pb.NewShardEventBuilder().Action(v1.Action_ACTION_PUT).Time(t).
+ Shard(
+ pb.NewShardBuilder().
+ ID(uint64(i)).Total(sMeta.shardNum).SeriesMetadata(sMeta.group, sMeta.name).UpdatedAt(t).CreatedAt(t).
+ Node(pb.NewNodeBuilder().
+ ID(s.repo.NodeID()).CreatedAt(t).UpdatedAt(t).Addr("localhost").
+ Build()).
+ Build()).
+ Build()
+ _, errShard := s.repo.Publish(event.TopicShardEvent, bus.NewMessage(bus.MessageID(now), e))
+ if errShard != nil {
+ return errShard
+ }
+ shardedRuleIndex = append(shardedRuleIndex, &v1.IndexRuleEvent_ShardedIndexRule{
+ ShardId: uint64(i),
+ Rules: rules,
+ })
+ }
+
+ indexRule := &v1.IndexRuleEvent{
+ Series: seriesObj.Series,
+ Rules: shardedRuleIndex,
+ Action: v1.Action_ACTION_PUT,
+ Time: timestamppb.New(time.Now()),
+ }
+ _, errPublishRules := s.repo.Publish(event.TopicIndexRule, bus.NewMessage(bus.MessageID(now), indexRule))
+ if errPublishRules != nil {
+ return errPublishRules
+ }
+ }
+ s.stopCh = make(chan struct{})
+ <-s.stopCh
+ return nil
+}
+
+func (s *service) GracefulStop() {
+ if s.stopCh != nil {
+ close(s.stopCh)
+ }
+}
diff --git a/banyand/series/trace/trace.go b/banyand/series/trace/trace.go
index 2551813..1c5f639 100644
--- a/banyand/series/trace/trace.go
+++ b/banyand/series/trace/trace.go
@@ -18,7 +18,6 @@
package trace
import (
- "context"
"strconv"
"time"
@@ -26,14 +25,11 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
- "github.com/apache/skywalking-banyandb/api/event"
v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
apischema "github.com/apache/skywalking-banyandb/api/schema"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/index"
"github.com/apache/skywalking-banyandb/banyand/series"
- "github.com/apache/skywalking-banyandb/banyand/series/schema"
"github.com/apache/skywalking-banyandb/banyand/storage"
- "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
@@ -76,89 +72,6 @@ const (
StateError = 1
)
-var _ series.Service = (*service)(nil)
-
-type service struct {
- db storage.Database
- schemaMap map[string]*traceSeries
- l *logger.Logger
- repo discovery.ServiceRepo
- stopCh chan struct{}
-}
-
-//NewService returns a new service
-func NewService(_ context.Context, db storage.Database, repo discovery.ServiceRepo) (series.Service, error) {
- return &service{
- db: db,
- repo: repo,
- }, nil
-}
-
-func (s *service) Name() string {
- return "trace-series"
-}
-
-func (s *service) PreRun() error {
- schemas, err := s.TraceSeries().List(context.Background(), schema.ListOpt{})
- if err != nil {
- return err
- }
- s.schemaMap = make(map[string]*traceSeries, len(schemas))
- s.l = logger.GetLogger(s.Name())
- for _, sa := range schemas {
- ts, errTS := newTraceSeries(sa, s.l)
- if errTS != nil {
- return errTS
- }
- s.db.Register(ts)
- id := formatTraceSeriesID(ts.name, ts.group)
- s.schemaMap[id] = ts
- s.l.Info().Str("id", id).Msg("initialize Trace series")
- }
- return err
-}
-
-func (s *service) Serve() error {
- now := time.Now().UnixNano()
- for _, sMeta := range s.schemaMap {
- e := pb.NewSeriesEventBuilder().
- SeriesMetadata(sMeta.group, sMeta.name).
- FieldNames(sMeta.fieldsNamesCompositeSeriesID...).
- Time(time.Now()).
- Action(v1.Action_ACTION_PUT).
- Build()
- _, err := s.repo.Publish(event.TopicSeriesEvent, bus.NewMessage(bus.MessageID(now), e))
- if err != nil {
- return err
- }
- for i := 0; i < int(sMeta.shardNum); i++ {
- t := time.Now()
- e := pb.NewShardEventBuilder().Action(v1.Action_ACTION_PUT).Time(t).
- Shard(
- pb.NewShardBuilder().
- ID(uint64(i)).Total(sMeta.shardNum).SeriesMetadata(sMeta.group, sMeta.name).UpdatedAt(t).CreatedAt(t).
- Node(pb.NewNodeBuilder().
- ID(s.repo.NodeID()).CreatedAt(t).UpdatedAt(t).Addr("localhost").
- Build()).
- Build()).
- Build()
- _, errShard := s.repo.Publish(event.TopicShardEvent, bus.NewMessage(bus.MessageID(now), e))
- if errShard != nil {
- return errShard
- }
- }
- }
- s.stopCh = make(chan struct{})
- <-s.stopCh
- return nil
-}
-
-func (s *service) GracefulStop() {
- if s.stopCh != nil {
- close(s.stopCh)
- }
-}
-
func (s *service) FetchTrace(traceSeries common.Metadata, traceID string, opt series.ScanOptions) (data.Trace, error) {
ts, err := s.getSeries(traceSeries)
if err != nil {
@@ -227,6 +140,7 @@ type traceSeries struct {
schema apischema.TraceSeries
reader storage.StoreRepo
writePoint storage.GetWritePoint
+ idx index.Service
shardNum uint32
fieldIndex map[string]int
traceIDIndex int
@@ -241,11 +155,12 @@ type traceSeries struct {
fieldsNamesCompositeSeriesID []string
}
-func newTraceSeries(schema apischema.TraceSeries, l *logger.Logger) (*traceSeries, error) {
+func newTraceSeries(schema apischema.TraceSeries, l *logger.Logger, idx index.Service) (*traceSeries, error) {
t := &traceSeries{
schema: schema,
idGen: series.NewIDGen(),
l: l,
+ idx: idx,
}
meta := t.schema.Spec.GetMetadata()
shardInfo := t.schema.Spec.GetShard()
diff --git a/banyand/series/trace/write.go b/banyand/series/trace/write.go
index 2b4466f..60ed91a 100644
--- a/banyand/series/trace/write.go
+++ b/banyand/series/trace/write.go
@@ -22,10 +22,12 @@ import (
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
+ "go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ "github.com/apache/skywalking-banyandb/banyand/index"
bydb_bytes "github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/partition"
@@ -92,9 +94,52 @@ func (t *traceSeries) Write(seriesID common.SeriesID, shardID uint, entity data.
Uint("trace_shard_id", traceIDShardID).
Uint("shard_id", shardID).
Msg("written to Trace series")
+ id := common.ChunkID(chunkID)
+ for i, field := range entityVal.GetFields() {
+ fieldSpec := t.schema.Spec.GetFields()[i]
+ fieldName := fieldSpec.GetName()
+ switch x := field.ValueType.(type) {
+ case *v1.Field_Str:
+ err = multierr.Append(err, t.writeStrToIndex(shardID, id, fieldName, x.Str.GetValue()))
+ case *v1.Field_Int:
+ err = multierr.Append(err, t.writeIntToIndex(shardID, id, fieldName, x.Int.GetValue()))
+ case *v1.Field_StrArray:
+ for _, s := range x.StrArray.GetValue() {
+ err = multierr.Append(err, t.writeStrToIndex(shardID, id, fieldName, s))
+ }
+ case *v1.Field_IntArray:
+ for _, integer := range x.IntArray.GetValue() {
+ err = multierr.Append(err, t.writeIntToIndex(shardID, id, fieldName, integer))
+ }
+ default:
+ continue
+ }
+ }
return common.ChunkID(chunkID), err
}
+func (t *traceSeries) writeIntToIndex(shardID uint, id common.ChunkID, name string, value int64) error {
+ return t.writeIndex(shardID, id, name, convert.Int64ToBytes(value))
+}
+
+func (t *traceSeries) writeStrToIndex(shardID uint, id common.ChunkID, name string, value string) error {
+ return t.writeIndex(shardID, id, name, []byte(value))
+}
+
+func (t *traceSeries) writeIndex(shardID uint, id common.ChunkID, name string, value []byte) error {
+ return t.idx.Insert(*common.NewMetadata(&v1.Metadata{
+ Name: t.name,
+ Group: t.group,
+ }),
+ shardID,
+ &index.Field{
+ ChunkID: id,
+ Name: name,
+ Value: value,
+ },
+ )
+}
+
// copyEntityValueWithoutDataBinary copies all fields without DataBinary
func copyEntityValueWithoutDataBinary(ev *v1.EntityValue) *v1.EntityValue {
return &v1.EntityValue{
diff --git a/pkg/posting/roaring/roaring.go b/pkg/posting/roaring/roaring.go
index 7c778d0..0593813 100644
--- a/pkg/posting/roaring/roaring.go
+++ b/pkg/posting/roaring/roaring.go
@@ -26,6 +26,8 @@ import (
)
var (
+ EmptyPostingList = NewPostingList()
+
ErrIntersectRoaringOnly = errors.New("Intersect only supported between roaringDocId sets")
ErrUnionRoaringOnly = errors.New("Union only supported between roaringDocId sets")
ErrDifferenceRoaringOnly = errors.New("Difference only supported between roaringDocId sets")