You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/08/03 13:53:20 UTC

[skywalking-banyandb] 04/05: Index memtable

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch index-mem
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit d2c9e1b1ed44cd7a8d0323e44d8dcf271c4b5f8e
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Aug 2 22:04:54 2021 +0800

    Index memtable
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 api/common/metadata.go              |   7 +
 api/event/discovery.go              |   9 +-
 api/proto/banyandb/v1/schema.pb.go  | 246 ++++++++++++++++++++++++++-----
 api/proto/banyandb/v1/schema.proto  |  13 +-
 banyand/index/index.go              | 193 +++++++++++++++++++++++-
 banyand/index/search.go             | 286 ++++++++++++++++++++++++++++++++++++
 banyand/index/tsdb/field_map.go     |   6 +-
 banyand/index/tsdb/mem.go           |  30 ++--
 banyand/index/tsdb/mem_test.go      |  64 ++++----
 banyand/index/tsdb/term_map.go      |  12 +-
 banyand/index/tsdb/tsdb.go          |  30 +++-
 banyand/internal/cmd/standalone.go  |   4 +-
 banyand/query/processor_test.go     |   2 +-
 banyand/series/trace/common_test.go |   7 +-
 banyand/series/trace/service.go     | 147 ++++++++++++++++++
 banyand/series/trace/trace.go       |  93 +-----------
 banyand/series/trace/write.go       |  45 ++++++
 pkg/posting/roaring/roaring.go      |   2 +
 18 files changed, 1005 insertions(+), 191 deletions(-)

diff --git a/api/common/metadata.go b/api/common/metadata.go
index 93ea657..1facc9e 100644
--- a/api/common/metadata.go
+++ b/api/common/metadata.go
@@ -33,3 +33,10 @@ func (md Metadata) Equal(other Metadata) bool {
 		md.Spec.Group == other.Spec.Group &&
 		md.Spec.Name == other.Spec.Name
 }
+
+func NewMetadata(spec *v1.Metadata) *Metadata {
+	return &Metadata{
+		KindVersion: MetadataKindVersion,
+		Spec:        spec,
+	}
+}
diff --git a/api/event/discovery.go b/api/event/discovery.go
index a38298c..10476b8 100644
--- a/api/event/discovery.go
+++ b/api/event/discovery.go
@@ -33,7 +33,9 @@ var (
 		Version: "v1",
 		Kind:    "event-series",
 	}
-	TopicSeriesEvent = bus.UniTopic(SeriesEventKindVersion.String())
+	TopicSeriesEvent     = bus.UniTopic(SeriesEventKindVersion.String())
+	IndexRuleKindVersion = common.KindVersion{Version: "v1", Kind: "index-rule"}
+	TopicIndexRule       = bus.UniTopic(IndexRuleKindVersion.String())
 )
 
 type Shard struct {
@@ -45,3 +47,8 @@ type Series struct {
 	common.KindVersion
 	Payload v1.SeriesEvent
 }
+
+type IndexRule struct {
+	common.KindVersion
+	Payload *v1.IndexRuleEvent
+}
diff --git a/api/proto/banyandb/v1/schema.pb.go b/api/proto/banyandb/v1/schema.pb.go
index 69f0218..56f6144 100644
--- a/api/proto/banyandb/v1/schema.pb.go
+++ b/api/proto/banyandb/v1/schema.pb.go
@@ -978,6 +978,132 @@ func (x *IndexRuleBinding) GetUpdatedAt() *timestamppb.Timestamp {
 	return nil
 }
 
+type IndexRuleEvent struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Series *Metadata                          `protobuf:"bytes,1,opt,name=series,proto3" json:"series,omitempty"`
+	Rules  []*IndexRuleEvent_ShardedIndexRule `protobuf:"bytes,2,rep,name=rules,proto3" json:"rules,omitempty"`
+	Action Action                             `protobuf:"varint,4,opt,name=action,proto3,enum=banyandb.v1.Action" json:"action,omitempty"`
+	Time   *timestamppb.Timestamp             `protobuf:"bytes,5,opt,name=time,proto3" json:"time,omitempty"`
+}
+
+func (x *IndexRuleEvent) Reset() {
+	*x = IndexRuleEvent{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_banyandb_v1_schema_proto_msgTypes[10]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *IndexRuleEvent) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*IndexRuleEvent) ProtoMessage() {}
+
+func (x *IndexRuleEvent) ProtoReflect() protoreflect.Message {
+	mi := &file_banyandb_v1_schema_proto_msgTypes[10]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use IndexRuleEvent.ProtoReflect.Descriptor instead.
+func (*IndexRuleEvent) Descriptor() ([]byte, []int) {
+	return file_banyandb_v1_schema_proto_rawDescGZIP(), []int{10}
+}
+
+func (x *IndexRuleEvent) GetSeries() *Metadata {
+	if x != nil {
+		return x.Series
+	}
+	return nil
+}
+
+func (x *IndexRuleEvent) GetRules() []*IndexRuleEvent_ShardedIndexRule {
+	if x != nil {
+		return x.Rules
+	}
+	return nil
+}
+
+func (x *IndexRuleEvent) GetAction() Action {
+	if x != nil {
+		return x.Action
+	}
+	return Action_ACTION_UNSPECIFIED
+}
+
+func (x *IndexRuleEvent) GetTime() *timestamppb.Timestamp {
+	if x != nil {
+		return x.Time
+	}
+	return nil
+}
+
+type IndexRuleEvent_ShardedIndexRule struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	ShardId uint64       `protobuf:"varint,1,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"`
+	Rules   []*IndexRule `protobuf:"bytes,2,rep,name=rules,proto3" json:"rules,omitempty"`
+}
+
+func (x *IndexRuleEvent_ShardedIndexRule) Reset() {
+	*x = IndexRuleEvent_ShardedIndexRule{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_banyandb_v1_schema_proto_msgTypes[11]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *IndexRuleEvent_ShardedIndexRule) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*IndexRuleEvent_ShardedIndexRule) ProtoMessage() {}
+
+func (x *IndexRuleEvent_ShardedIndexRule) ProtoReflect() protoreflect.Message {
+	mi := &file_banyandb_v1_schema_proto_msgTypes[11]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use IndexRuleEvent_ShardedIndexRule.ProtoReflect.Descriptor instead.
+func (*IndexRuleEvent_ShardedIndexRule) Descriptor() ([]byte, []int) {
+	return file_banyandb_v1_schema_proto_rawDescGZIP(), []int{10, 0}
+}
+
+func (x *IndexRuleEvent_ShardedIndexRule) GetShardId() uint64 {
+	if x != nil {
+		return x.ShardId
+	}
+	return 0
+}
+
+func (x *IndexRuleEvent_ShardedIndexRule) GetRules() []*IndexRule {
+	if x != nil {
+		return x.Rules
+	}
+	return nil
+}
+
 var File_banyandb_v1_schema_proto protoreflect.FileDescriptor
 
 var file_banyandb_v1_schema_proto_rawDesc = []byte{
@@ -1119,7 +1245,27 @@ var file_banyandb_v1_schema_proto_rawDesc = []byte{
 	0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f,
 	0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69,
 	0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64,
-	0x41, 0x74, 0x42, 0x60, 0x0a, 0x1e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
+	0x41, 0x74, 0x22, 0xbd, 0x02, 0x0a, 0x0e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65,
+	0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x2d, 0x0a, 0x06, 0x73, 0x65, 0x72, 0x69, 0x65, 0x73, 0x18,
+	0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
+	0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x06, 0x73, 0x65,
+	0x72, 0x69, 0x65, 0x73, 0x12, 0x42, 0x0a, 0x05, 0x72, 0x75, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20,
+	0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x76,
+	0x31, 0x2e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74,
+	0x2e, 0x53, 0x68, 0x61, 0x72, 0x64, 0x65, 0x64, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c,
+	0x65, 0x52, 0x05, 0x72, 0x75, 0x6c, 0x65, 0x73, 0x12, 0x2b, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69,
+	0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x13, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61,
+	0x6e, 0x64, 0x62, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x06, 0x61,
+	0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2e, 0x0a, 0x04, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20,
+	0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
+	0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52,
+	0x04, 0x74, 0x69, 0x6d, 0x65, 0x1a, 0x5b, 0x0a, 0x10, 0x53, 0x68, 0x61, 0x72, 0x64, 0x65, 0x64,
+	0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x68, 0x61,
+	0x72, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x73, 0x68, 0x61,
+	0x72, 0x64, 0x49, 0x64, 0x12, 0x2c, 0x0a, 0x05, 0x72, 0x75, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20,
+	0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x76,
+	0x31, 0x2e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x52, 0x05, 0x72, 0x75, 0x6c,
+	0x65, 0x73, 0x42, 0x60, 0x0a, 0x1e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
 	0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79,
 	0x61, 0x6e, 0x64, 0x62, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d,
 	0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69,
@@ -1141,52 +1287,60 @@ func file_banyandb_v1_schema_proto_rawDescGZIP() []byte {
 }
 
 var file_banyandb_v1_schema_proto_enumTypes = make([]protoimpl.EnumInfo, 4)
-var file_banyandb_v1_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 10)
+var file_banyandb_v1_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 12)
 var file_banyandb_v1_schema_proto_goTypes = []interface{}{
-	(Duration_Duration)(0),        // 0: banyandb.v1.Duration.Duration
-	(FieldSpec_FieldType)(0),      // 1: banyandb.v1.FieldSpec.FieldType
-	(IndexObject_IndexType)(0),    // 2: banyandb.v1.IndexObject.IndexType
-	(Series_Catalog)(0),           // 3: banyandb.v1.Series.Catalog
-	(*ShardInfo)(nil),             // 4: banyandb.v1.ShardInfo
-	(*Duration)(nil),              // 5: banyandb.v1.Duration
-	(*FieldSpec)(nil),             // 6: banyandb.v1.FieldSpec
-	(*TraceStateMap)(nil),         // 7: banyandb.v1.TraceStateMap
-	(*TraceFieldMap)(nil),         // 8: banyandb.v1.TraceFieldMap
-	(*TraceSeries)(nil),           // 9: banyandb.v1.TraceSeries
-	(*IndexObject)(nil),           // 10: banyandb.v1.IndexObject
-	(*IndexRule)(nil),             // 11: banyandb.v1.IndexRule
-	(*Series)(nil),                // 12: banyandb.v1.Series
-	(*IndexRuleBinding)(nil),      // 13: banyandb.v1.IndexRuleBinding
-	(*Metadata)(nil),              // 14: banyandb.v1.Metadata
-	(*timestamppb.Timestamp)(nil), // 15: google.protobuf.Timestamp
+	(Duration_Duration)(0),                  // 0: banyandb.v1.Duration.Duration
+	(FieldSpec_FieldType)(0),                // 1: banyandb.v1.FieldSpec.FieldType
+	(IndexObject_IndexType)(0),              // 2: banyandb.v1.IndexObject.IndexType
+	(Series_Catalog)(0),                     // 3: banyandb.v1.Series.Catalog
+	(*ShardInfo)(nil),                       // 4: banyandb.v1.ShardInfo
+	(*Duration)(nil),                        // 5: banyandb.v1.Duration
+	(*FieldSpec)(nil),                       // 6: banyandb.v1.FieldSpec
+	(*TraceStateMap)(nil),                   // 7: banyandb.v1.TraceStateMap
+	(*TraceFieldMap)(nil),                   // 8: banyandb.v1.TraceFieldMap
+	(*TraceSeries)(nil),                     // 9: banyandb.v1.TraceSeries
+	(*IndexObject)(nil),                     // 10: banyandb.v1.IndexObject
+	(*IndexRule)(nil),                       // 11: banyandb.v1.IndexRule
+	(*Series)(nil),                          // 12: banyandb.v1.Series
+	(*IndexRuleBinding)(nil),                // 13: banyandb.v1.IndexRuleBinding
+	(*IndexRuleEvent)(nil),                  // 14: banyandb.v1.IndexRuleEvent
+	(*IndexRuleEvent_ShardedIndexRule)(nil), // 15: banyandb.v1.IndexRuleEvent.ShardedIndexRule
+	(*Metadata)(nil),                        // 16: banyandb.v1.Metadata
+	(*timestamppb.Timestamp)(nil),           // 17: google.protobuf.Timestamp
+	(Action)(0),                             // 18: banyandb.v1.Action
 }
 var file_banyandb_v1_schema_proto_depIdxs = []int32{
 	0,  // 0: banyandb.v1.Duration.unit:type_name -> banyandb.v1.Duration.Duration
 	1,  // 1: banyandb.v1.FieldSpec.type:type_name -> banyandb.v1.FieldSpec.FieldType
 	7,  // 2: banyandb.v1.TraceFieldMap.state:type_name -> banyandb.v1.TraceStateMap
-	14, // 3: banyandb.v1.TraceSeries.metadata:type_name -> banyandb.v1.Metadata
+	16, // 3: banyandb.v1.TraceSeries.metadata:type_name -> banyandb.v1.Metadata
 	6,  // 4: banyandb.v1.TraceSeries.fields:type_name -> banyandb.v1.FieldSpec
 	8,  // 5: banyandb.v1.TraceSeries.reserved_fields_map:type_name -> banyandb.v1.TraceFieldMap
 	4,  // 6: banyandb.v1.TraceSeries.shard:type_name -> banyandb.v1.ShardInfo
 	5,  // 7: banyandb.v1.TraceSeries.duration:type_name -> banyandb.v1.Duration
-	15, // 8: banyandb.v1.TraceSeries.updated_at:type_name -> google.protobuf.Timestamp
+	17, // 8: banyandb.v1.TraceSeries.updated_at:type_name -> google.protobuf.Timestamp
 	2,  // 9: banyandb.v1.IndexObject.type:type_name -> banyandb.v1.IndexObject.IndexType
-	14, // 10: banyandb.v1.IndexRule.metadata:type_name -> banyandb.v1.Metadata
+	16, // 10: banyandb.v1.IndexRule.metadata:type_name -> banyandb.v1.Metadata
 	10, // 11: banyandb.v1.IndexRule.objects:type_name -> banyandb.v1.IndexObject
-	15, // 12: banyandb.v1.IndexRule.updated_at:type_name -> google.protobuf.Timestamp
+	17, // 12: banyandb.v1.IndexRule.updated_at:type_name -> google.protobuf.Timestamp
 	3,  // 13: banyandb.v1.Series.catalog:type_name -> banyandb.v1.Series.Catalog
-	14, // 14: banyandb.v1.Series.series:type_name -> banyandb.v1.Metadata
-	14, // 15: banyandb.v1.IndexRuleBinding.metadata:type_name -> banyandb.v1.Metadata
-	14, // 16: banyandb.v1.IndexRuleBinding.rule_ref:type_name -> banyandb.v1.Metadata
+	16, // 14: banyandb.v1.Series.series:type_name -> banyandb.v1.Metadata
+	16, // 15: banyandb.v1.IndexRuleBinding.metadata:type_name -> banyandb.v1.Metadata
+	16, // 16: banyandb.v1.IndexRuleBinding.rule_ref:type_name -> banyandb.v1.Metadata
 	12, // 17: banyandb.v1.IndexRuleBinding.subjects:type_name -> banyandb.v1.Series
-	15, // 18: banyandb.v1.IndexRuleBinding.begin_at:type_name -> google.protobuf.Timestamp
-	15, // 19: banyandb.v1.IndexRuleBinding.expire_at:type_name -> google.protobuf.Timestamp
-	15, // 20: banyandb.v1.IndexRuleBinding.updated_at:type_name -> google.protobuf.Timestamp
-	21, // [21:21] is the sub-list for method output_type
-	21, // [21:21] is the sub-list for method input_type
-	21, // [21:21] is the sub-list for extension type_name
-	21, // [21:21] is the sub-list for extension extendee
-	0,  // [0:21] is the sub-list for field type_name
+	17, // 18: banyandb.v1.IndexRuleBinding.begin_at:type_name -> google.protobuf.Timestamp
+	17, // 19: banyandb.v1.IndexRuleBinding.expire_at:type_name -> google.protobuf.Timestamp
+	17, // 20: banyandb.v1.IndexRuleBinding.updated_at:type_name -> google.protobuf.Timestamp
+	16, // 21: banyandb.v1.IndexRuleEvent.series:type_name -> banyandb.v1.Metadata
+	15, // 22: banyandb.v1.IndexRuleEvent.rules:type_name -> banyandb.v1.IndexRuleEvent.ShardedIndexRule
+	18, // 23: banyandb.v1.IndexRuleEvent.action:type_name -> banyandb.v1.Action
+	17, // 24: banyandb.v1.IndexRuleEvent.time:type_name -> google.protobuf.Timestamp
+	11, // 25: banyandb.v1.IndexRuleEvent.ShardedIndexRule.rules:type_name -> banyandb.v1.IndexRule
+	26, // [26:26] is the sub-list for method output_type
+	26, // [26:26] is the sub-list for method input_type
+	26, // [26:26] is the sub-list for extension type_name
+	26, // [26:26] is the sub-list for extension extendee
+	0,  // [0:26] is the sub-list for field type_name
 }
 
 func init() { file_banyandb_v1_schema_proto_init() }
@@ -1316,6 +1470,30 @@ func file_banyandb_v1_schema_proto_init() {
 				return nil
 			}
 		}
+		file_banyandb_v1_schema_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*IndexRuleEvent); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_banyandb_v1_schema_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*IndexRuleEvent_ShardedIndexRule); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
 	}
 	type x struct{}
 	out := protoimpl.TypeBuilder{
@@ -1323,7 +1501,7 @@ func file_banyandb_v1_schema_proto_init() {
 			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
 			RawDescriptor: file_banyandb_v1_schema_proto_rawDesc,
 			NumEnums:      4,
-			NumMessages:   10,
+			NumMessages:   12,
 			NumExtensions: 0,
 			NumServices:   0,
 		},
diff --git a/api/proto/banyandb/v1/schema.proto b/api/proto/banyandb/v1/schema.proto
index 273c155..aeb8e5a 100644
--- a/api/proto/banyandb/v1/schema.proto
+++ b/api/proto/banyandb/v1/schema.proto
@@ -184,4 +184,15 @@ message IndexRuleBinding {
   google.protobuf.Timestamp expire_at = 5;
   // updated_at_nanoseconds indicates when the IndexRuleBinding is updated
   google.protobuf.Timestamp updated_at = 6;
-}
\ No newline at end of file
+}
+
+message IndexRuleEvent {
+  Metadata series = 1;
+  message ShardedIndexRule {
+    uint64 shard_id = 1;
+    repeated IndexRule rules  = 2;
+  }
+  repeated ShardedIndexRule rules = 2;
+  Action action = 4;
+  google.protobuf.Timestamp time = 5;
+}
diff --git a/banyand/index/index.go b/banyand/index/index.go
index 6ab47f2..c09f27f 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -19,29 +19,49 @@ package index
 
 import (
 	"context"
+	"sync"
+
+	"github.com/pkg/errors"
+	"go.uber.org/multierr"
 
 	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/api/event"
 	apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
-	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/banyand/index/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/posting"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+var (
+	ErrShardNotFound       = errors.New("series doesn't exist")
+	ErrTraceSeriesNotFound = errors.New("trace series not found")
+	ErrUnknownField        = errors.New("the field is unknown")
+)
+
 type Condition struct {
 	Key    string
 	Values [][]byte
 	Op     apiv1.PairQuery_BinaryOp
 }
 
-//go:generate mockgen -destination=./index_mock.go -package=index . Repo
+type Field struct {
+	ChunkID common.ChunkID
+	Name    string
+	Value   []byte
+}
+
+//go:generate mockgen -destination=./index_mock.go -package=index . Service
 type Repo interface {
 	Search(seriesMeta common.Metadata, shardID uint, startTime, endTime uint64, indexObjectName string, conditions []Condition) (posting.List, error)
+	Insert(seriesMeta common.Metadata, shardID uint, fields *Field) error
 }
 
 type Builder interface {
-	run.Config
 	run.PreRunner
+	run.Service
 }
 
 type Service interface {
@@ -49,6 +69,169 @@ type Service interface {
 	Builder
 }
 
-func NewService(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Service, error) {
-	return nil, nil
+type series struct {
+	repo map[uint]*shard
+}
+
+type shard struct {
+	meta  map[string][]*apiv1.IndexObject
+	store tsdb.GlobalStore
+}
+
+type service struct {
+	meta              *indexMeta
+	log               *logger.Logger
+	repo              discovery.ServiceRepo
+	stopCh            chan struct{}
+	indexRuleListener *indexRuleListener
+}
+
+func NewService(_ context.Context, repo discovery.ServiceRepo) (Service, error) {
+	svc := &service{
+		repo:              repo,
+		indexRuleListener: &indexRuleListener{},
+	}
+	svc.meta = &indexMeta{
+		meta: make(map[string]*series),
+	}
+	svc.indexRuleListener.indexMeta = svc.meta
+	svc.indexRuleListener.closeFunc = func() {
+		svc.stopCh <- struct{}{}
+	}
+	return svc, nil
+}
+
+func (s *service) Insert(series common.Metadata, shardID uint, field *Field) error {
+	sd, err := s.getShard(series, shardID)
+	if err != nil {
+		return err
+	}
+	objects, ok := sd.meta[field.Name]
+	if !ok {
+		return ErrUnknownField
+	}
+	for _, object := range objects {
+		err = multierr.Append(err, sd.store.Insert(&tsdb.Field{
+			Name:  []byte(compositeFieldID(object.GetName(), field.Name)),
+			Value: field.Value,
+		}, field.ChunkID))
+	}
+	return err
+}
+
+func (s *service) getShard(series common.Metadata, shardID uint) (*shard, error) {
+	id := compositeSeriesID(series.Spec)
+	ss, ok := s.meta.meta[id]
+	if !ok {
+		return nil, errors.Wrapf(ErrTraceSeriesNotFound, "identify:%s", id)
+	}
+	sd, existSearcher := ss.repo[shardID]
+	if !existSearcher {
+		return nil, errors.Wrapf(ErrShardNotFound, "shardID:%d", shardID)
+	}
+	return sd, nil
+}
+
+func (s *service) Name() string {
+	return "index"
+}
+
+func (s *service) PreRun() error {
+	//TODO: listen to written data
+	s.log = logger.GetLogger("index")
+	s.indexRuleListener.log = s.log
+	return s.repo.Subscribe(event.TopicIndexRule, s.indexRuleListener)
+}
+
+func (s *service) Serve() error {
+	s.stopCh = make(chan struct{})
+	<-s.stopCh
+	return nil
+}
+
+func (s *service) GracefulStop() {
+	if s.stopCh != nil {
+		close(s.stopCh)
+	}
+}
+
+type indexMeta struct {
+	meta map[string]*series
+	sync.RWMutex
+}
+
+type indexRuleListener struct {
+	log       *logger.Logger
+	indexMeta *indexMeta
+	closeFunc func()
+}
+
+func (i *indexRuleListener) Rev(message bus.Message) (resp bus.Message) {
+	indexRuleEvent, ok := message.Data().(*apiv1.IndexRuleEvent)
+	if !ok {
+		i.log.Warn().Msg("invalid event data type")
+		return
+	}
+	i.log.Info().
+		Str("action", apiv1.Action_name[int32(indexRuleEvent.Action)]).
+		Str("series", indexRuleEvent.Series.String()).
+		Msg("received an index rule")
+	i.indexMeta.Lock()
+	defer i.indexMeta.Unlock()
+	switch indexRuleEvent.Action {
+	case apiv1.Action_ACTION_PUT:
+		seriesID := compositeSeriesID(indexRuleEvent.Series)
+		newSeries := &series{
+			repo: make(map[uint]*shard),
+		}
+		for _, rule := range indexRuleEvent.Rules {
+			store := tsdb.NewStore(indexRuleEvent.Series.Name, indexRuleEvent.Series.Group, uint(rule.ShardId))
+			fields := make([]tsdb.FieldSpec, 0, len(rule.Rules))
+			meta := make(map[string][]*apiv1.IndexObject)
+			for _, indexRule := range rule.GetRules() {
+				for _, object := range indexRule.Objects {
+					fieldsSize := len(object.Fields)
+					if fieldsSize > 1 {
+						//TODO: to support composited index
+						i.log.Error().Str("name", object.Name).
+							Msg("index module doesn't support composited index object")
+						i.closeFunc()
+					} else if fieldsSize < 1 {
+						continue
+					}
+					field := object.Fields[0]
+					fieldSpec := tsdb.FieldSpec{
+						Name: compositeFieldID(object.Name, field),
+					}
+					fields = append(fields, fieldSpec)
+					objects, existed := meta[field]
+					if !existed {
+						objects = make([]*apiv1.IndexObject, 0, 1)
+					}
+					objects = append(objects, object)
+					meta[field] = objects
+				}
+			}
+			err := store.Initialize(fields)
+			if err != nil {
+				i.log.Warn().Err(err).Msg("failed to initialize index getShard")
+			}
+			newSeries.repo[uint(rule.ShardId)] = &shard{
+				store: store,
+				meta:  meta,
+			}
+		}
+		i.indexMeta.meta[seriesID] = newSeries
+	default:
+		i.log.Warn().Msg("unsupported action")
+	}
+	return
+}
+
+func compositeFieldID(indexObjectName, field string) string {
+	return indexObjectName + ":" + field
+}
+
+func compositeSeriesID(series *apiv1.Metadata) string {
+	return series.Name + "-" + series.Group
 }
diff --git a/banyand/index/search.go b/banyand/index/search.go
new file mode 100644
index 0000000..0c829f6
--- /dev/null
+++ b/banyand/index/search.go
@@ -0,0 +1,286 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package index
+
+import (
+	"github.com/pkg/errors"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+	"github.com/apache/skywalking-banyandb/banyand/index/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/bytes"
+	"github.com/apache/skywalking-banyandb/pkg/posting"
+	"github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+var ErrNotRangeOperation = errors.New("this is not an range operation")
+
+type executable interface {
+	execute() (posting.List, error)
+}
+
+type searchTree interface {
+	executable
+}
+
+func (s *service) Search(series common.Metadata, shardID uint, startTime, endTime uint64, indexObjectName string, conditions []Condition) (posting.List, error) {
+	sd, err := s.getShard(series, shardID)
+	if err != nil {
+		return nil, err
+	}
+	store := sd.store
+	searcher, hasData := store.Window(startTime, endTime)
+	if !hasData {
+		return roaring.EmptyPostingList, nil
+	}
+	tree, errBuild := buildSearchTree(searcher, indexObjectName, conditions)
+	if errBuild != nil {
+		return nil, err
+	}
+	return tree.execute()
+}
+
+func buildSearchTree(searcher tsdb.Searcher, indexObject string, conditions []Condition) (searchTree, error) {
+	condMap := toMap(indexObject, conditions)
+	root := &andNode{
+		node: &node{
+			SubNodes: make([]executable, 0),
+			searcher: searcher,
+		},
+	}
+	for key, conds := range condMap {
+		var rangeLeaf *rangeOp
+		for _, cond := range conds {
+			if rangeLeaf != nil && !rangeOP(cond.Op) {
+				return nil, errors.Wrapf(ErrNotRangeOperation, "op:%s", cond.Op.String())
+			}
+			if rangeOP(cond.Op) {
+				if rangeLeaf == nil {
+					rangeLeaf = root.addRangeLeaf(key)
+				}
+				opts := rangeLeaf.Opts
+				switch cond.Op {
+				case apiv1.PairQuery_BINARY_OP_GT:
+					opts.Lower = bytes.Join(cond.Values...)
+				case apiv1.PairQuery_BINARY_OP_GE:
+					opts.Lower = bytes.Join(cond.Values...)
+					opts.IncludesLower = true
+				case apiv1.PairQuery_BINARY_OP_LT:
+					opts.Upper = bytes.Join(cond.Values...)
+				case apiv1.PairQuery_BINARY_OP_LE:
+					opts.Upper = bytes.Join(cond.Values...)
+					opts.IncludesUpper = true
+				}
+				continue
+			}
+			switch cond.Op {
+			case apiv1.PairQuery_BINARY_OP_EQ:
+				root.addEq(key, cond.Values)
+			case apiv1.PairQuery_BINARY_OP_NE:
+				root.addNot(key, root.newEq(key, cond.Values))
+			case apiv1.PairQuery_BINARY_OP_HAVING:
+				n := root.addOrNode(len(cond.Values))
+				for _, v := range cond.Values {
+					n.addEq(key, [][]byte{v})
+				}
+			case apiv1.PairQuery_BINARY_OP_NOT_HAVING:
+				n := root.addOrNode(len(cond.Values))
+				for _, v := range cond.Values {
+					n.addEq(key, [][]byte{v})
+				}
+				root.addNot(key, n)
+			}
+		}
+	}
+	return root, nil
+}
+
+func rangeOP(op apiv1.PairQuery_BinaryOp) bool {
+	switch op {
+	case apiv1.PairQuery_BINARY_OP_GT:
+	case apiv1.PairQuery_BINARY_OP_GE:
+	case apiv1.PairQuery_BINARY_OP_LT:
+	case apiv1.PairQuery_BINARY_OP_LE:
+		return true
+	}
+	return false
+}
+
+func toMap(indexObject string, condition []Condition) map[string][]Condition {
+	result := make(map[string][]Condition)
+	for _, c := range condition {
+		key := compositeFieldID(indexObject, c.Key)
+		l, ok := result[key]
+		if ok {
+			l = append(l, c)
+			result[key] = l
+			continue
+		}
+		result[key] = []Condition{c}
+	}
+	return result
+}
+
+type logicalOP interface {
+	merge(posting.List) error
+}
+
+type node struct {
+	logicalOP
+	executable
+	searcher tsdb.Searcher
+	value    posting.List
+	SubNodes []executable `json:"sub_nodes,omitempty"`
+}
+
+func (n *node) newEq(key string, values [][]byte) *eq {
+	return &eq{
+		leaf: &leaf{
+			Key:      []byte(key),
+			values:   values,
+			searcher: n.searcher,
+		},
+	}
+}
+
+func (n *node) addEq(key string, values [][]byte) {
+	n.SubNodes = append(n.SubNodes, n.newEq(key, values))
+}
+
+func (n *node) addNot(key string, inner executable) {
+	n.SubNodes = append(n.SubNodes, &not{
+		Key:      []byte(key),
+		searcher: n.searcher,
+		Inner:    inner,
+	})
+}
+
+func (n *node) addRangeLeaf(key string) *rangeOp {
+	r := &rangeOp{
+		leaf: &leaf{
+			Key:      []byte(key),
+			searcher: n.searcher,
+		},
+		Opts: &tsdb.RangeOpts{},
+	}
+	n.SubNodes = append(n.SubNodes, r)
+	return r
+}
+
+func (n *node) addOrNode(size int) *orNode {
+	on := &orNode{
+		node: &node{
+			searcher: n.searcher,
+			SubNodes: make([]executable, 0, size),
+		},
+	}
+	n.SubNodes = append(n.SubNodes, on)
+	return on
+}
+
+func (n *node) pop() (executable, bool) {
+	if len(n.SubNodes) < 1 {
+		return nil, false
+	}
+	sn := n.SubNodes[0]
+	n.SubNodes = n.SubNodes[1:]
+	return sn, true
+}
+
+func (n *node) execute() (posting.List, error) {
+	ex, hasNext := n.pop()
+	if !hasNext {
+		return n.value, nil
+	}
+	r, err := ex.execute()
+	if err != nil {
+		return nil, err
+	}
+	if n.value == nil {
+		n.value = r
+		return n.execute()
+	}
+	err = n.merge(r)
+	if err != nil {
+		return nil, err
+	}
+	if n.value.IsEmpty() {
+		return n.value, nil
+	}
+	return n.execute()
+}
+
+type andNode struct {
+	*node
+}
+
+func (an *andNode) merge(list posting.List) error {
+	return an.value.Intersect(list)
+}
+
+type orNode struct {
+	*node
+}
+
+func (on *orNode) merge(list posting.List) error {
+	return on.value.Union(list)
+}
+
+type leaf struct {
+	executable
+	Key      []byte `json:"Key"`
+	values   [][]byte
+	searcher tsdb.Searcher
+}
+
+type not struct {
+	executable
+	Key      []byte `json:"key"`
+	searcher tsdb.Searcher
+	Inner    executable `json:"inner,omitempty"`
+}
+
+func (n *not) execute() (posting.List, error) {
+	all := n.searcher.MatchField(n.Key)
+	list, err := n.Inner.execute()
+	if err != nil {
+		return nil, err
+	}
+	err = all.Difference(list)
+	return all, err
+}
+
+type eq struct {
+	*leaf
+}
+
+func (eq *eq) execute() (posting.List, error) {
+	return eq.searcher.MatchTerms(&tsdb.Field{
+		Name:  eq.Key,
+		Value: bytes.Join(eq.values...),
+	}), nil
+}
+
+type rangeOp struct {
+	*leaf
+	Opts *tsdb.RangeOpts `json:"opts"`
+}
+
+func (r *rangeOp) execute() (posting.List, error) {
+	return r.searcher.Range(r.Key, r.Opts), nil
+}
diff --git a/banyand/index/tsdb/field_map.go b/banyand/index/tsdb/field_map.go
index cc80bad..4d89713 100644
--- a/banyand/index/tsdb/field_map.go
+++ b/banyand/index/tsdb/field_map.go
@@ -51,11 +51,11 @@ func (fm *fieldMap) get(key []byte) (*fieldValue, bool) {
 }
 
 func (fm *fieldMap) put(fv *Field, id common.ChunkID) error {
-	pm, ok := fm.get(fv.name)
+	pm, ok := fm.get(fv.Name)
 	if !ok {
-		return errors.Wrapf(ErrFieldAbsent, "filed name:%s", fv.name)
+		return errors.Wrapf(ErrFieldAbsent, "filed Name:%s", fv.Name)
 	}
-	return pm.value.put(fv.value, id)
+	return pm.value.put(fv.Value, id)
 }
 
 type fieldValue struct {
diff --git a/banyand/index/tsdb/mem.go b/banyand/index/tsdb/mem.go
index 39af209..a96b107 100644
--- a/banyand/index/tsdb/mem.go
+++ b/banyand/index/tsdb/mem.go
@@ -27,24 +27,24 @@ import (
 
 var ErrFieldsAbsent = errors.New("fields are absent")
 
-var emptyPostingList = roaring.NewPostingList()
-
 type MemTable struct {
-	terms *fieldMap
-	name  string
-	group string
+	terms   *fieldMap
+	name    string
+	group   string
+	shardID uint
 }
 
-func NewMemTable(name, group string) *MemTable {
+func NewMemTable(name, group string, shardID uint) *MemTable {
 	return &MemTable{
-		name:  name,
-		group: group,
+		name:    name,
+		group:   group,
+		shardID: shardID,
 	}
 }
 
 type Field struct {
-	name  []byte
-	value []byte
+	Name  []byte
+	Value []byte
 }
 
 type FieldSpec struct {
@@ -69,23 +69,23 @@ func (m *MemTable) Insert(field *Field, chunkID common.ChunkID) error {
 func (m *MemTable) MatchField(fieldName []byte) (list posting.List) {
 	fieldsValues, ok := m.terms.get(fieldName)
 	if !ok {
-		return emptyPostingList
+		return roaring.EmptyPostingList
 	}
 	return fieldsValues.value.allValues()
 }
 
 func (m *MemTable) MatchTerms(field *Field) (list posting.List) {
-	fieldsValues, ok := m.terms.get(field.name)
+	fieldsValues, ok := m.terms.get(field.Name)
 	if !ok {
-		return emptyPostingList
+		return roaring.EmptyPostingList
 	}
-	return fieldsValues.value.get(field.value)
+	return fieldsValues.value.get(field.Value)
 }
 
 func (m *MemTable) Range(fieldName []byte, opts *RangeOpts) (list posting.List) {
 	fieldsValues, ok := m.terms.get(fieldName)
 	if !ok {
-		return emptyPostingList
+		return roaring.EmptyPostingList
 	}
 	return fieldsValues.value.getRange(opts)
 }
diff --git a/banyand/index/tsdb/mem_test.go b/banyand/index/tsdb/mem_test.go
index 0ab2af2..fed7655 100644
--- a/banyand/index/tsdb/mem_test.go
+++ b/banyand/index/tsdb/mem_test.go
@@ -58,7 +58,7 @@ func TestMemTable_Initialize(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			m := NewMemTable("sw", "group")
+			m := NewMemTable("sw", "group", 0)
 			var err error
 			if err = m.Initialize(tt.args.fields); (err != nil) != tt.wantErr {
 				t.Errorf("Initialize() error = %v, wantErr %v", err, tt.wantErr)
@@ -76,7 +76,7 @@ func TestMemTable_Range(t *testing.T) {
 		fieldName []byte
 		opts      *RangeOpts
 	}
-	m := NewMemTable("sw", "group")
+	m := NewMemTable("sw", "group", 0)
 	setUp(t, m)
 	tests := []struct {
 		name     string
@@ -93,8 +93,8 @@ func TestMemTable_Range(t *testing.T) {
 				},
 			},
 			wantList: m.MatchTerms(&Field{
-				name:  []byte("duration"),
-				value: convert.Uint16ToBytes(200),
+				Name:  []byte("duration"),
+				Value: convert.Uint16ToBytes(200),
 			}),
 		},
 		{
@@ -108,8 +108,8 @@ func TestMemTable_Range(t *testing.T) {
 			},
 			wantList: union(m,
 				&Field{
-					name:  []byte("duration"),
-					value: convert.Uint16ToBytes(200),
+					Name:  []byte("duration"),
+					Value: convert.Uint16ToBytes(200),
 				},
 			),
 		},
@@ -125,12 +125,12 @@ func TestMemTable_Range(t *testing.T) {
 			},
 			wantList: union(m,
 				&Field{
-					name:  []byte("duration"),
-					value: convert.Uint16ToBytes(50),
+					Name:  []byte("duration"),
+					Value: convert.Uint16ToBytes(50),
 				},
 				&Field{
-					name:  []byte("duration"),
-					value: convert.Uint16ToBytes(200),
+					Name:  []byte("duration"),
+					Value: convert.Uint16ToBytes(200),
 				},
 			),
 		},
@@ -146,12 +146,12 @@ func TestMemTable_Range(t *testing.T) {
 			},
 			wantList: union(m,
 				&Field{
-					name:  []byte("duration"),
-					value: convert.Uint16ToBytes(200),
+					Name:  []byte("duration"),
+					Value: convert.Uint16ToBytes(200),
 				},
 				&Field{
-					name:  []byte("duration"),
-					value: convert.Uint16ToBytes(1000),
+					Name:  []byte("duration"),
+					Value: convert.Uint16ToBytes(1000),
 				},
 			),
 		},
@@ -168,16 +168,16 @@ func TestMemTable_Range(t *testing.T) {
 			},
 			wantList: union(m,
 				&Field{
-					name:  []byte("duration"),
-					value: convert.Uint16ToBytes(50),
+					Name:  []byte("duration"),
+					Value: convert.Uint16ToBytes(50),
 				},
 				&Field{
-					name:  []byte("duration"),
-					value: convert.Uint16ToBytes(200),
+					Name:  []byte("duration"),
+					Value: convert.Uint16ToBytes(200),
 				},
 				&Field{
-					name:  []byte("duration"),
-					value: convert.Uint16ToBytes(1000),
+					Name:  []byte("duration"),
+					Value: convert.Uint16ToBytes(1000),
 				},
 			),
 		},
@@ -194,8 +194,8 @@ func TestMemTable_Range(t *testing.T) {
 			},
 			wantList: union(m,
 				&Field{
-					name:  []byte("duration"),
-					value: convert.Uint16ToBytes(200),
+					Name:  []byte("duration"),
+					Value: convert.Uint16ToBytes(200),
 				},
 			),
 		},
@@ -229,13 +229,13 @@ func setUp(t *testing.T, mt *MemTable) {
 	for i := 0; i < 100; i++ {
 		if i%2 == 0 {
 			assert.NoError(t, mt.Insert(&Field{
-				name:  []byte("service_name"),
-				value: []byte("gateway"),
+				Name:  []byte("service_name"),
+				Value: []byte("gateway"),
 			}, common.ChunkID(i)))
 		} else {
 			assert.NoError(t, mt.Insert(&Field{
-				name:  []byte("service_name"),
-				value: []byte("webpage"),
+				Name:  []byte("service_name"),
+				Value: []byte("webpage"),
 			}, common.ChunkID(i)))
 		}
 	}
@@ -243,18 +243,18 @@ func setUp(t *testing.T, mt *MemTable) {
 		switch {
 		case i%3 == 0:
 			assert.NoError(t, mt.Insert(&Field{
-				name:  []byte("duration"),
-				value: convert.Uint16ToBytes(50),
+				Name:  []byte("duration"),
+				Value: convert.Uint16ToBytes(50),
 			}, common.ChunkID(i)))
 		case i%3 == 1:
 			assert.NoError(t, mt.Insert(&Field{
-				name:  []byte("duration"),
-				value: convert.Uint16ToBytes(200),
+				Name:  []byte("duration"),
+				Value: convert.Uint16ToBytes(200),
 			}, common.ChunkID(i)))
 		case i%3 == 2:
 			assert.NoError(t, mt.Insert(&Field{
-				name:  []byte("duration"),
-				value: convert.Uint16ToBytes(1000),
+				Name:  []byte("duration"),
+				Value: convert.Uint16ToBytes(1000),
 			}, common.ChunkID(i)))
 		}
 	}
diff --git a/banyand/index/tsdb/term_map.go b/banyand/index/tsdb/term_map.go
index 77d84ef..f325282 100644
--- a/banyand/index/tsdb/term_map.go
+++ b/banyand/index/tsdb/term_map.go
@@ -49,7 +49,7 @@ func (p *postingMap) put(key []byte, id common.ChunkID) error {
 
 func (p *postingMap) getOrCreate(key []byte) posting.List {
 	list := p.get(key)
-	if list != emptyPostingList {
+	if list != roaring.EmptyPostingList {
 		return list
 	}
 	p.mutex.Lock()
@@ -64,10 +64,12 @@ func (p *postingMap) getOrCreate(key []byte) posting.List {
 }
 
 func (p *postingMap) get(key []byte) posting.List {
+	p.mutex.RLock()
+	defer p.mutex.RUnlock()
 	hashedKey := termHashID(convert.Hash(key))
 	v, ok := p.repo[hashedKey]
 	if !ok {
-		return emptyPostingList
+		return roaring.EmptyPostingList
 	}
 	return v.value
 }
@@ -83,13 +85,12 @@ func (p *postingMap) allValues() posting.List {
 func (p *postingMap) getRange(opts *RangeOpts) posting.List {
 	switch bytes.Compare(opts.Upper, opts.Lower) {
 	case -1:
-		return emptyPostingList
+		return roaring.EmptyPostingList
 	case 0:
 		if opts.IncludesUpper && opts.IncludesLower {
 			return p.get(opts.Upper)
-		} else {
-			return emptyPostingList
 		}
+		return roaring.EmptyPostingList
 	}
 	keys := make(Asc, 0, len(p.repo))
 	for _, v := range p.repo {
@@ -113,7 +114,6 @@ func (p *postingMap) getRange(opts *RangeOpts) posting.List {
 			if opts.IncludesUpper {
 				_ = result.Union(p.repo[termHashID(convert.Hash(k))].value)
 			}
-			break
 		default:
 			_ = result.Union(p.repo[termHashID(convert.Hash(k))].value)
 		}
diff --git a/banyand/index/tsdb/tsdb.go b/banyand/index/tsdb/tsdb.go
index 06c96af..91f114d 100644
--- a/banyand/index/tsdb/tsdb.go
+++ b/banyand/index/tsdb/tsdb.go
@@ -29,9 +29,37 @@ type RangeOpts struct {
 	IncludesLower bool
 }
 
-type Reader interface {
+type GlobalStore interface {
+	Window(startTime, endTime uint64) (Searcher, bool)
+	Initialize(fields []FieldSpec) error
 	Insert(field *Field, chunkID common.ChunkID) error
+}
+
+type Searcher interface {
 	MatchField(fieldNames []byte) (list posting.List)
 	MatchTerms(field *Field) (list posting.List)
 	Range(fieldName []byte, opts *RangeOpts) (list posting.List)
 }
+
+type store struct {
+	memTable *MemTable
+	//TODO: add data tables
+}
+
+func (s *store) Window(_, _ uint64) (Searcher, bool) {
+	return s.memTable, true
+}
+
+func (s *store) Initialize(fields []FieldSpec) error {
+	return s.memTable.Initialize(fields)
+}
+
+func (s *store) Insert(field *Field, chunkID common.ChunkID) error {
+	return s.memTable.Insert(field, chunkID)
+}
+
+func NewStore(name, group string, shardID uint) GlobalStore {
+	return &store{
+		memTable: NewMemTable(name, group, shardID),
+	}
+}
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index b024cbe..d694895 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -57,11 +57,11 @@ func newStandaloneCmd() *cobra.Command {
 	if err != nil {
 		l.Fatal().Err(err).Msg("failed to initiate database")
 	}
-	idx, err := index.NewService(ctx, repo, pipeline)
+	idx, err := index.NewService(ctx, repo)
 	if err != nil {
 		l.Fatal().Err(err).Msg("failed to initiate index builder")
 	}
-	traceSeries, err := trace.NewService(ctx, db, repo)
+	traceSeries, err := trace.NewService(ctx, db, repo, idx)
 	if err != nil {
 		l.Fatal().Err(err).Msg("failed to initiate trace series")
 	}
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index 7ebe12f..64e1bb3 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -70,7 +70,7 @@ func setupServices(tester *require.Assertions) (discovery.ServiceRepo, series.Se
 	tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
 
 	// Init `Trace` module
-	traceSvc, err := trace.NewService(context.TODO(), db, repo)
+	traceSvc, err := trace.NewService(context.TODO(), db, repo, nil)
 	tester.NoError(err)
 
 	// Init `Query` module
diff --git a/banyand/series/trace/common_test.go b/banyand/series/trace/common_test.go
index 51f1603..f8b40ae 100644
--- a/banyand/series/trace/common_test.go
+++ b/banyand/series/trace/common_test.go
@@ -26,12 +26,14 @@ import (
 	"testing"
 	"time"
 
+	"github.com/golang/mock/gomock"
 	googleUUID "github.com/google/uuid"
 	"github.com/stretchr/testify/assert"
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/api/data"
 	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+	"github.com/apache/skywalking-banyandb/banyand/index"
 	"github.com/apache/skywalking-banyandb/banyand/storage"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -63,7 +65,10 @@ func setup(t *testing.T) (*traceSeries, func()) {
 	assert.NoError(t, err)
 	rootPath := path.Join(os.TempDir(), "banyandb-"+uuid.String())
 	assert.NoError(t, db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
-	svc, err := NewService(context.TODO(), db, nil)
+	ctrl := gomock.NewController(t)
+	mockIndex := index.NewMockService(ctrl)
+	mockIndex.EXPECT().Insert(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+	svc, err := NewService(context.TODO(), db, nil, mockIndex)
 	assert.NoError(t, err)
 	assert.NoError(t, svc.PreRun())
 	assert.NoError(t, db.PreRun())
diff --git a/banyand/series/trace/service.go b/banyand/series/trace/service.go
new file mode 100644
index 0000000..7049a9d
--- /dev/null
+++ b/banyand/series/trace/service.go
@@ -0,0 +1,147 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+	"context"
+	"time"
+
+	"google.golang.org/protobuf/types/known/timestamppb"
+
+	"github.com/apache/skywalking-banyandb/api/event"
+	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/index"
+	"github.com/apache/skywalking-banyandb/banyand/series"
+	"github.com/apache/skywalking-banyandb/banyand/series/schema"
+	"github.com/apache/skywalking-banyandb/banyand/storage"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/pb"
+)
+
+var _ series.Service = (*service)(nil)
+
+type service struct {
+	db        storage.Database
+	schemaMap map[string]*traceSeries
+	l         *logger.Logger
+	repo      discovery.ServiceRepo
+	stopCh    chan struct{}
+	idx       index.Service
+}
+
+//NewService returns a new service
+func NewService(_ context.Context, db storage.Database, repo discovery.ServiceRepo, idx index.Service) (series.Service, error) {
+	return &service{
+		db:   db,
+		repo: repo,
+		idx:  idx,
+	}, nil
+}
+
+func (s *service) Name() string {
+	return "trace-series"
+}
+
+func (s *service) PreRun() error {
+	schemas, err := s.TraceSeries().List(context.Background(), schema.ListOpt{})
+	if err != nil {
+		return err
+	}
+	s.schemaMap = make(map[string]*traceSeries, len(schemas))
+	s.l = logger.GetLogger(s.Name())
+	for _, sa := range schemas {
+		ts, errTS := newTraceSeries(sa, s.l, s.idx)
+		if errTS != nil {
+			return errTS
+		}
+		s.db.Register(ts)
+		id := formatTraceSeriesID(ts.name, ts.group)
+		s.schemaMap[id] = ts
+		s.l.Info().Str("id", id).Msg("initialize Trace series")
+	}
+	return err
+}
+
+func (s *service) Serve() error {
+	now := time.Now().UnixNano()
+	for _, sMeta := range s.schemaMap {
+		e := pb.NewSeriesEventBuilder().
+			SeriesMetadata(sMeta.group, sMeta.name).
+			FieldNames(sMeta.fieldsNamesCompositeSeriesID...).
+			Time(time.Now()).
+			Action(v1.Action_ACTION_PUT).
+			Build()
+		_, err := s.repo.Publish(event.TopicSeriesEvent, bus.NewMessage(bus.MessageID(now), e))
+		if err != nil {
+			return err
+		}
+		seriesObj := &v1.Series{
+			Series: &v1.Metadata{
+				Name:  sMeta.name,
+				Group: sMeta.group,
+			},
+		}
+		rules, errGetRules := s.IndexRules(context.Background(), seriesObj, nil)
+		if errGetRules != nil {
+			return errGetRules
+		}
+		shardedRuleIndex := make([]*v1.IndexRuleEvent_ShardedIndexRule, 0, len(rules)*int(sMeta.shardNum))
+		for i := 0; i < int(sMeta.shardNum); i++ {
+			t := time.Now()
+			e := pb.NewShardEventBuilder().Action(v1.Action_ACTION_PUT).Time(t).
+				Shard(
+					pb.NewShardBuilder().
+						ID(uint64(i)).Total(sMeta.shardNum).SeriesMetadata(sMeta.group, sMeta.name).UpdatedAt(t).CreatedAt(t).
+						Node(pb.NewNodeBuilder().
+							ID(s.repo.NodeID()).CreatedAt(t).UpdatedAt(t).Addr("localhost").
+							Build()).
+						Build()).
+				Build()
+			_, errShard := s.repo.Publish(event.TopicShardEvent, bus.NewMessage(bus.MessageID(now), e))
+			if errShard != nil {
+				return errShard
+			}
+			shardedRuleIndex = append(shardedRuleIndex, &v1.IndexRuleEvent_ShardedIndexRule{
+				ShardId: uint64(i),
+				Rules:   rules,
+			})
+		}
+
+		indexRule := &v1.IndexRuleEvent{
+			Series: seriesObj.Series,
+			Rules:  shardedRuleIndex,
+			Action: v1.Action_ACTION_PUT,
+			Time:   timestamppb.New(time.Now()),
+		}
+		_, errPublishRules := s.repo.Publish(event.TopicIndexRule, bus.NewMessage(bus.MessageID(now), indexRule))
+		if errPublishRules != nil {
+			return errPublishRules
+		}
+	}
+	s.stopCh = make(chan struct{})
+	<-s.stopCh
+	return nil
+}
+
+func (s *service) GracefulStop() {
+	if s.stopCh != nil {
+		close(s.stopCh)
+	}
+}
diff --git a/banyand/series/trace/trace.go b/banyand/series/trace/trace.go
index 2551813..1c5f639 100644
--- a/banyand/series/trace/trace.go
+++ b/banyand/series/trace/trace.go
@@ -18,7 +18,6 @@
 package trace
 
 import (
-	"context"
 	"strconv"
 	"time"
 
@@ -26,14 +25,11 @@ import (
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/api/data"
-	"github.com/apache/skywalking-banyandb/api/event"
 	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
 	apischema "github.com/apache/skywalking-banyandb/api/schema"
-	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/index"
 	"github.com/apache/skywalking-banyandb/banyand/series"
-	"github.com/apache/skywalking-banyandb/banyand/series/schema"
 	"github.com/apache/skywalking-banyandb/banyand/storage"
-	"github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
@@ -76,89 +72,6 @@ const (
 	StateError   = 1
 )
 
-var _ series.Service = (*service)(nil)
-
-type service struct {
-	db        storage.Database
-	schemaMap map[string]*traceSeries
-	l         *logger.Logger
-	repo      discovery.ServiceRepo
-	stopCh    chan struct{}
-}
-
-//NewService returns a new service
-func NewService(_ context.Context, db storage.Database, repo discovery.ServiceRepo) (series.Service, error) {
-	return &service{
-		db:   db,
-		repo: repo,
-	}, nil
-}
-
-func (s *service) Name() string {
-	return "trace-series"
-}
-
-func (s *service) PreRun() error {
-	schemas, err := s.TraceSeries().List(context.Background(), schema.ListOpt{})
-	if err != nil {
-		return err
-	}
-	s.schemaMap = make(map[string]*traceSeries, len(schemas))
-	s.l = logger.GetLogger(s.Name())
-	for _, sa := range schemas {
-		ts, errTS := newTraceSeries(sa, s.l)
-		if errTS != nil {
-			return errTS
-		}
-		s.db.Register(ts)
-		id := formatTraceSeriesID(ts.name, ts.group)
-		s.schemaMap[id] = ts
-		s.l.Info().Str("id", id).Msg("initialize Trace series")
-	}
-	return err
-}
-
-func (s *service) Serve() error {
-	now := time.Now().UnixNano()
-	for _, sMeta := range s.schemaMap {
-		e := pb.NewSeriesEventBuilder().
-			SeriesMetadata(sMeta.group, sMeta.name).
-			FieldNames(sMeta.fieldsNamesCompositeSeriesID...).
-			Time(time.Now()).
-			Action(v1.Action_ACTION_PUT).
-			Build()
-		_, err := s.repo.Publish(event.TopicSeriesEvent, bus.NewMessage(bus.MessageID(now), e))
-		if err != nil {
-			return err
-		}
-		for i := 0; i < int(sMeta.shardNum); i++ {
-			t := time.Now()
-			e := pb.NewShardEventBuilder().Action(v1.Action_ACTION_PUT).Time(t).
-				Shard(
-					pb.NewShardBuilder().
-						ID(uint64(i)).Total(sMeta.shardNum).SeriesMetadata(sMeta.group, sMeta.name).UpdatedAt(t).CreatedAt(t).
-						Node(pb.NewNodeBuilder().
-							ID(s.repo.NodeID()).CreatedAt(t).UpdatedAt(t).Addr("localhost").
-							Build()).
-						Build()).
-				Build()
-			_, errShard := s.repo.Publish(event.TopicShardEvent, bus.NewMessage(bus.MessageID(now), e))
-			if errShard != nil {
-				return errShard
-			}
-		}
-	}
-	s.stopCh = make(chan struct{})
-	<-s.stopCh
-	return nil
-}
-
-func (s *service) GracefulStop() {
-	if s.stopCh != nil {
-		close(s.stopCh)
-	}
-}
-
 func (s *service) FetchTrace(traceSeries common.Metadata, traceID string, opt series.ScanOptions) (data.Trace, error) {
 	ts, err := s.getSeries(traceSeries)
 	if err != nil {
@@ -227,6 +140,7 @@ type traceSeries struct {
 	schema                       apischema.TraceSeries
 	reader                       storage.StoreRepo
 	writePoint                   storage.GetWritePoint
+	idx                          index.Service
 	shardNum                     uint32
 	fieldIndex                   map[string]int
 	traceIDIndex                 int
@@ -241,11 +155,12 @@ type traceSeries struct {
 	fieldsNamesCompositeSeriesID []string
 }
 
-func newTraceSeries(schema apischema.TraceSeries, l *logger.Logger) (*traceSeries, error) {
+func newTraceSeries(schema apischema.TraceSeries, l *logger.Logger, idx index.Service) (*traceSeries, error) {
 	t := &traceSeries{
 		schema: schema,
 		idGen:  series.NewIDGen(),
 		l:      l,
+		idx:    idx,
 	}
 	meta := t.schema.Spec.GetMetadata()
 	shardInfo := t.schema.Spec.GetShard()
diff --git a/banyand/series/trace/write.go b/banyand/series/trace/write.go
index 2b4466f..60ed91a 100644
--- a/banyand/series/trace/write.go
+++ b/banyand/series/trace/write.go
@@ -22,10 +22,12 @@ import (
 
 	"github.com/golang/protobuf/proto"
 	"github.com/pkg/errors"
+	"go.uber.org/multierr"
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/api/data"
 	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+	"github.com/apache/skywalking-banyandb/banyand/index"
 	bydb_bytes "github.com/apache/skywalking-banyandb/pkg/bytes"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
@@ -92,9 +94,52 @@ func (t *traceSeries) Write(seriesID common.SeriesID, shardID uint, entity data.
 		Uint("trace_shard_id", traceIDShardID).
 		Uint("shard_id", shardID).
 		Msg("written to Trace series")
+	id := common.ChunkID(chunkID)
+	for i, field := range entityVal.GetFields() {
+		fieldSpec := t.schema.Spec.GetFields()[i]
+		fieldName := fieldSpec.GetName()
+		switch x := field.ValueType.(type) {
+		case *v1.Field_Str:
+			err = multierr.Append(err, t.writeStrToIndex(shardID, id, fieldName, x.Str.GetValue()))
+		case *v1.Field_Int:
+			err = multierr.Append(err, t.writeIntToIndex(shardID, id, fieldName, x.Int.GetValue()))
+		case *v1.Field_StrArray:
+			for _, s := range x.StrArray.GetValue() {
+				err = multierr.Append(err, t.writeStrToIndex(shardID, id, fieldName, s))
+			}
+		case *v1.Field_IntArray:
+			for _, integer := range x.IntArray.GetValue() {
+				err = multierr.Append(err, t.writeIntToIndex(shardID, id, fieldName, integer))
+			}
+		default:
+			continue
+		}
+	}
 	return common.ChunkID(chunkID), err
 }
 
+func (t *traceSeries) writeIntToIndex(shardID uint, id common.ChunkID, name string, value int64) error {
+	return t.writeIndex(shardID, id, name, convert.Int64ToBytes(value))
+}
+
+func (t *traceSeries) writeStrToIndex(shardID uint, id common.ChunkID, name string, value string) error {
+	return t.writeIndex(shardID, id, name, []byte(value))
+}
+
+func (t *traceSeries) writeIndex(shardID uint, id common.ChunkID, name string, value []byte) error {
+	return t.idx.Insert(*common.NewMetadata(&v1.Metadata{
+		Name:  t.name,
+		Group: t.group,
+	}),
+		shardID,
+		&index.Field{
+			ChunkID: id,
+			Name:    name,
+			Value:   value,
+		},
+	)
+}
+
 // copyEntityValueWithoutDataBinary copies all fields without DataBinary
 func copyEntityValueWithoutDataBinary(ev *v1.EntityValue) *v1.EntityValue {
 	return &v1.EntityValue{
diff --git a/pkg/posting/roaring/roaring.go b/pkg/posting/roaring/roaring.go
index 7c778d0..0593813 100644
--- a/pkg/posting/roaring/roaring.go
+++ b/pkg/posting/roaring/roaring.go
@@ -26,6 +26,8 @@ import (
 )
 
 var (
+	EmptyPostingList = NewPostingList()
+
 	ErrIntersectRoaringOnly  = errors.New("Intersect only supported between roaringDocId sets")
 	ErrUnionRoaringOnly      = errors.New("Union only supported between roaringDocId sets")
 	ErrDifferenceRoaringOnly = errors.New("Difference only supported between roaringDocId sets")