You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/09/03 17:40:52 UTC

[skywalking-banyandb] branch time-series updated (30a74a6 -> 5c0e83c)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.


 discard 30a74a6  Add stream moduel
     new bbf89b1  Add stream moduel
     new 5c0e83c  Introduce indices generation

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (30a74a6)
            \
             N -- N -- N   refs/heads/time-series (5c0e83c)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 api/common/id.go                            |   1 +
 api/proto/banyandb/database/v2/schema.pb.go |  37 +++----
 api/proto/banyandb/database/v2/schema.proto |   1 +
 banyand/liaison/grpc/trace_test.go          |   2 +-
 banyand/metadata/schema/data/stream.json    |   2 +-
 banyand/series/trace/common_test.go         |   4 +-
 banyand/series/trace/query_test.go          |   6 +-
 banyand/series/trace/write_test.go          |   4 +-
 banyand/stream/index.go                     | 107 +++++++++++++++++++++
 banyand/stream/service.go                   |  11 ++-
 banyand/stream/stream.go                    | 105 +++++++++++++-------
 banyand/stream/stream_write.go              | 112 ++++++++++++++++++----
 banyand/stream/stream_write_test.go         | 144 +++++++++++++++++++++++++++-
 banyand/tsdb/block.go                       |  21 ++++
 banyand/tsdb/indexdb.go                     | 104 +++++++++++++++++++-
 banyand/tsdb/segment.go                     |  35 ++++++-
 banyand/tsdb/series.go                      |  91 ++++++++++++------
 banyand/tsdb/seriesdb.go                    |  27 +++---
 banyand/tsdb/seriesdb_test.go               |   4 +-
 banyand/tsdb/shard.go                       |  34 ++++++-
 banyand/tsdb/tsdb.go                        |  20 ++--
 21 files changed, 723 insertions(+), 149 deletions(-)
 create mode 100644 banyand/stream/index.go

[skywalking-banyandb] 01/02: Add stream moduel

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit bbf89b1146ae04c11bf8d0946b10d8cf6f16ade0
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Sep 2 00:23:29 2021 +0800

    Add stream moduel
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 .../model/v2/write.proto => data/stream.go}        |  36 ++-
 api/proto/banyandb/database/v2/schema.pb.go        |  37 +--
 api/proto/banyandb/database/v2/schema.proto        |   1 +
 api/proto/banyandb/model/v2/write.pb.go            |  38 ++-
 api/proto/banyandb/model/v2/write.proto            |   1 +
 api/proto/banyandb/stream/v2/write.pb.go           | 169 +++++++-----
 api/proto/banyandb/stream/v2/write.proto           |  14 +-
 banyand/metadata/metadata.go                       |  21 +-
 banyand/metadata/schema/data/stream.json           |   2 +-
 banyand/stream/service.go                          | 102 ++++++++
 banyand/stream/stream.go                           | 104 ++++++++
 banyand/stream/stream_write.go                     | 152 +++++++++++
 banyand/stream/stream_write_test.go                | 282 +++++++++++++++++++++
 banyand/stream/testdata/shard0.json                |  18 ++
 banyand/tsdb/block.go                              |  57 ++++-
 banyand/tsdb/series.go                             | 168 +++++++++++-
 banyand/tsdb/seriesdb.go                           | 113 +++++----
 banyand/tsdb/seriesdb_test.go                      |  56 ++--
 banyand/tsdb/shard.go                              |   2 +-
 banyand/tsdb/tsdb.go                               |  32 ++-
 go.mod                                             |   1 +
 21 files changed, 1205 insertions(+), 201 deletions(-)

diff --git a/api/proto/banyandb/model/v2/write.proto b/api/data/stream.go
similarity index 56%
copy from api/proto/banyandb/model/v2/write.proto
copy to api/data/stream.go
index 836423a..65fc2ef 100644
--- a/api/proto/banyandb/model/v2/write.proto
+++ b/api/data/stream.go
@@ -15,22 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-syntax = "proto3";
+package data
 
-option java_package = "org.apache.skywalking.banyandb.model.v2";
-option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2";
+import (
+	"github.com/apache/skywalking-banyandb/api/common"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+)
 
-package banyandb.model.v2;
+var StreamKindVersion = common.KindVersion{Version: "v2", Kind: "data-stream"}
 
-import "google/protobuf/struct.proto";
-import "banyandb/model/v2/common.proto";
+var StreamWriteEventKindVersion = common.KindVersion{
+	Version: "v2",
+	Kind:    "stream-write",
+}
+
+var TopicStreamWriteEvent = bus.UniTopic(StreamWriteEventKindVersion.String())
+
+type Stream struct {
+	common.KindVersion
+	Entities []Entity
+}
 
-message Tag {
-    oneof value_type {
-        google.protobuf.NullValue null = 1;
-        Str str = 2;
-        StrArray str_array = 3;
-        Int int = 4;
-        IntArray int_array = 5;
-    }
+type StreamWriteData struct {
+	ShardID      uint
+	SeriesID     uint64
+	WriteRequest *streamv2.WriteRequest
 }
diff --git a/api/proto/banyandb/database/v2/schema.pb.go b/api/proto/banyandb/database/v2/schema.pb.go
index 5f8168e..f2fed85 100644
--- a/api/proto/banyandb/database/v2/schema.pb.go
+++ b/api/proto/banyandb/database/v2/schema.pb.go
@@ -49,6 +49,7 @@ const (
 	TagType_TAG_TYPE_INT          TagType = 2
 	TagType_TAG_TYPE_STRING_ARRAY TagType = 3
 	TagType_TAG_TYPE_INT_ARRAY    TagType = 4
+	TagType_TAG_TYPE_DATA_BINARY  TagType = 5
 )
 
 // Enum value maps for TagType.
@@ -59,6 +60,7 @@ var (
 		2: "TAG_TYPE_INT",
 		3: "TAG_TYPE_STRING_ARRAY",
 		4: "TAG_TYPE_INT_ARRAY",
+		5: "TAG_TYPE_DATA_BINARY",
 	}
 	TagType_value = map[string]int32{
 		"TAG_TYPE_UNSPECIFIED":  0,
@@ -66,6 +68,7 @@ var (
 		"TAG_TYPE_INT":          2,
 		"TAG_TYPE_STRING_ARRAY": 3,
 		"TAG_TYPE_INT_ARRAY":    4,
+		"TAG_TYPE_DATA_BINARY":  5,
 	}
 )
 
@@ -912,22 +915,24 @@ var file_banyandb_database_v2_schema_proto_rawDesc = []byte{
 	0x64, 0x5f, 0x61, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f,
 	0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d,
 	0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41,
-	0x74, 0x2a, 0x7d, 0x0a, 0x07, 0x54, 0x61, 0x67, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14,
-	0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49,
-	0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59,
-	0x50, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x54,
-	0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x49, 0x4e, 0x54, 0x10, 0x02, 0x12, 0x19, 0x0a,
-	0x15, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47,
-	0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10, 0x03, 0x12, 0x16, 0x0a, 0x12, 0x54, 0x41, 0x47, 0x5f,
-	0x54, 0x59, 0x50, 0x45, 0x5f, 0x49, 0x4e, 0x54, 0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10, 0x04,
-	0x42, 0x72, 0x0a, 0x2a, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73,
-	0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e,
-	0x64, 0x62, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x76, 0x32, 0x5a, 0x44,
-	0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68,
-	0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e,
-	0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f,
-	0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73,
-	0x65, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x74, 0x2a, 0x97, 0x01, 0x0a, 0x07, 0x54, 0x61, 0x67, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a,
+	0x14, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43,
+	0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x41, 0x47, 0x5f, 0x54,
+	0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c,
+	0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x49, 0x4e, 0x54, 0x10, 0x02, 0x12, 0x19,
+	0x0a, 0x15, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e,
+	0x47, 0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10, 0x03, 0x12, 0x16, 0x0a, 0x12, 0x54, 0x41, 0x47,
+	0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x49, 0x4e, 0x54, 0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10,
+	0x04, 0x12, 0x18, 0x0a, 0x14, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x44, 0x41,
+	0x54, 0x41, 0x5f, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x10, 0x05, 0x42, 0x72, 0x0a, 0x2a, 0x6f,
+	0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c,
+	0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64, 0x61,
+	0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x76, 0x32, 0x5a, 0x44, 0x67, 0x69, 0x74, 0x68, 0x75,
+	0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79,
+	0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
+	0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61,
+	0x6e, 0x64, 0x62, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2f, 0x76, 0x32, 0x62,
+	0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
diff --git a/api/proto/banyandb/database/v2/schema.proto b/api/proto/banyandb/database/v2/schema.proto
index c3745ff..cb358aa 100644
--- a/api/proto/banyandb/database/v2/schema.proto
+++ b/api/proto/banyandb/database/v2/schema.proto
@@ -44,6 +44,7 @@ enum TagType {
     TAG_TYPE_INT = 2;
     TAG_TYPE_STRING_ARRAY = 3;
     TAG_TYPE_INT_ARRAY = 4;
+    TAG_TYPE_DATA_BINARY = 5;
 }
 
 message TagFamily {
diff --git a/api/proto/banyandb/model/v2/write.pb.go b/api/proto/banyandb/model/v2/write.pb.go
index 0d36e62..1640f11 100644
--- a/api/proto/banyandb/model/v2/write.pb.go
+++ b/api/proto/banyandb/model/v2/write.pb.go
@@ -50,6 +50,7 @@ type Tag struct {
 	//	*Tag_StrArray
 	//	*Tag_Int
 	//	*Tag_IntArray
+	//	*Tag_BinaryData
 	ValueType isTag_ValueType `protobuf_oneof:"value_type"`
 }
 
@@ -127,6 +128,13 @@ func (x *Tag) GetIntArray() *IntArray {
 	return nil
 }
 
+func (x *Tag) GetBinaryData() []byte {
+	if x, ok := x.GetValueType().(*Tag_BinaryData); ok {
+		return x.BinaryData
+	}
+	return nil
+}
+
 type isTag_ValueType interface {
 	isTag_ValueType()
 }
@@ -151,6 +159,10 @@ type Tag_IntArray struct {
 	IntArray *IntArray `protobuf:"bytes,5,opt,name=int_array,json=intArray,proto3,oneof"`
 }
 
+type Tag_BinaryData struct {
+	BinaryData []byte `protobuf:"bytes,6,opt,name=binary_data,json=binaryData,proto3,oneof"`
+}
+
 func (*Tag_Null) isTag_ValueType() {}
 
 func (*Tag_Str) isTag_ValueType() {}
@@ -161,6 +173,8 @@ func (*Tag_Int) isTag_ValueType() {}
 
 func (*Tag_IntArray) isTag_ValueType() {}
 
+func (*Tag_BinaryData) isTag_ValueType() {}
+
 var File_banyandb_model_v2_write_proto protoreflect.FileDescriptor
 
 var file_banyandb_model_v2_write_proto_rawDesc = []byte{
@@ -171,7 +185,7 @@ var file_banyandb_model_v2_write_proto_rawDesc = []byte{
 	0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
 	0x1a, 0x1e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c,
 	0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
-	0x22, 0x95, 0x02, 0x0a, 0x03, 0x54, 0x61, 0x67, 0x12, 0x30, 0x0a, 0x04, 0x6e, 0x75, 0x6c, 0x6c,
+	0x22, 0xb8, 0x02, 0x0a, 0x03, 0x54, 0x61, 0x67, 0x12, 0x30, 0x0a, 0x04, 0x6e, 0x75, 0x6c, 0x6c,
 	0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
 	0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x4e, 0x75, 0x6c, 0x6c, 0x56, 0x61, 0x6c,
 	0x75, 0x65, 0x48, 0x00, 0x52, 0x04, 0x6e, 0x75, 0x6c, 0x6c, 0x12, 0x2a, 0x0a, 0x03, 0x73, 0x74,
@@ -187,15 +201,18 @@ var file_banyandb_model_v2_write_proto_rawDesc = []byte{
 	0x0a, 0x09, 0x69, 0x6e, 0x74, 0x5f, 0x61, 0x72, 0x72, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28,
 	0x0b, 0x32, 0x1b, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64,
 	0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x49, 0x6e, 0x74, 0x41, 0x72, 0x72, 0x61, 0x79, 0x48, 0x00,
-	0x52, 0x08, 0x69, 0x6e, 0x74, 0x41, 0x72, 0x72, 0x61, 0x79, 0x42, 0x0c, 0x0a, 0x0a, 0x76, 0x61,
-	0x6c, 0x75, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x42, 0x6c, 0x0a, 0x27, 0x6f, 0x72, 0x67, 0x2e,
-	0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e,
-	0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c,
-	0x2e, 0x76, 0x32, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
-	0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e,
-	0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70,
-	0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x6d, 0x6f,
-	0x64, 0x65, 0x6c, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x52, 0x08, 0x69, 0x6e, 0x74, 0x41, 0x72, 0x72, 0x61, 0x79, 0x12, 0x21, 0x0a, 0x0b, 0x62, 0x69,
+	0x6e, 0x61, 0x72, 0x79, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x48,
+	0x00, 0x52, 0x0a, 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x44, 0x61, 0x74, 0x61, 0x42, 0x0c, 0x0a,
+	0x0a, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x42, 0x6c, 0x0a, 0x27, 0x6f,
+	0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c,
+	0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f,
+	0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63,
+	0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c,
+	0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70,
+	0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
+	0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
+	0x33,
 }
 
 var (
@@ -258,6 +275,7 @@ func file_banyandb_model_v2_write_proto_init() {
 		(*Tag_StrArray)(nil),
 		(*Tag_Int)(nil),
 		(*Tag_IntArray)(nil),
+		(*Tag_BinaryData)(nil),
 	}
 	type x struct{}
 	out := protoimpl.TypeBuilder{
diff --git a/api/proto/banyandb/model/v2/write.proto b/api/proto/banyandb/model/v2/write.proto
index 836423a..4636bd3 100644
--- a/api/proto/banyandb/model/v2/write.proto
+++ b/api/proto/banyandb/model/v2/write.proto
@@ -32,5 +32,6 @@ message Tag {
         StrArray str_array = 3;
         Int int = 4;
         IntArray int_array = 5;
+        bytes binary_data = 6;
     }
 }
diff --git a/api/proto/banyandb/stream/v2/write.pb.go b/api/proto/banyandb/stream/v2/write.pb.go
index 664078b..ecc7655 100644
--- a/api/proto/banyandb/stream/v2/write.pb.go
+++ b/api/proto/banyandb/stream/v2/write.pb.go
@@ -31,8 +31,8 @@ import (
 	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
 	timestamppb "google.golang.org/protobuf/types/known/timestamppb"
 
-	v21 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
-	v2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	v2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+	v21 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
 )
 
 const (
@@ -53,13 +53,8 @@ type ElementValue struct {
 	// 1) either the start time of a Span/Segment,
 	// 2) or the timestamp of a log
 	Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
-	// binary representation of segments, including tags, spans...
-	DataBinary []byte `protobuf:"bytes,3,opt,name=data_binary,json=dataBinary,proto3" json:"data_binary,omitempty"`
-	// support all of indexed tags in the tags.
-	// Tag only has value, as the value of value_type match with the key
-	// by the index rules and index rule bindings of Metadata group.
-	// indexed tags of multiple entities are compression in the tags.
-	Tags []*v2.Tag `protobuf:"bytes,4,rep,name=tags,proto3" json:"tags,omitempty"`
+	// the order of tag_families' items match the stream schema
+	TagFamilies []*ElementValue_TagFamily `protobuf:"bytes,3,rep,name=tag_families,json=tagFamilies,proto3" json:"tag_families,omitempty"`
 }
 
 func (x *ElementValue) Reset() {
@@ -108,16 +103,9 @@ func (x *ElementValue) GetTimestamp() *timestamppb.Timestamp {
 	return nil
 }
 
-func (x *ElementValue) GetDataBinary() []byte {
+func (x *ElementValue) GetTagFamilies() []*ElementValue_TagFamily {
 	if x != nil {
-		return x.DataBinary
-	}
-	return nil
-}
-
-func (x *ElementValue) GetTags() []*v2.Tag {
-	if x != nil {
-		return x.Tags
+		return x.TagFamilies
 	}
 	return nil
 }
@@ -128,7 +116,7 @@ type WriteRequest struct {
 	unknownFields protoimpl.UnknownFields
 
 	// the metadata is only required in the first write.
-	Metadata *v21.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
+	Metadata *v2.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
 	// the element is required.
 	Element *ElementValue `protobuf:"bytes,2,opt,name=element,proto3" json:"element,omitempty"`
 }
@@ -165,7 +153,7 @@ func (*WriteRequest) Descriptor() ([]byte, []int) {
 	return file_banyandb_stream_v2_write_proto_rawDescGZIP(), []int{1}
 }
 
-func (x *WriteRequest) GetMetadata() *v21.Metadata {
+func (x *WriteRequest) GetMetadata() *v2.Metadata {
 	if x != nil {
 		return x.Metadata
 	}
@@ -217,6 +205,53 @@ func (*WriteResponse) Descriptor() ([]byte, []int) {
 	return file_banyandb_stream_v2_write_proto_rawDescGZIP(), []int{2}
 }
 
+type ElementValue_TagFamily struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Tags []*v21.Tag `protobuf:"bytes,1,rep,name=tags,proto3" json:"tags,omitempty"`
+}
+
+func (x *ElementValue_TagFamily) Reset() {
+	*x = ElementValue_TagFamily{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_banyandb_stream_v2_write_proto_msgTypes[3]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *ElementValue_TagFamily) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ElementValue_TagFamily) ProtoMessage() {}
+
+func (x *ElementValue_TagFamily) ProtoReflect() protoreflect.Message {
+	mi := &file_banyandb_stream_v2_write_proto_msgTypes[3]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use ElementValue_TagFamily.ProtoReflect.Descriptor instead.
+func (*ElementValue_TagFamily) Descriptor() ([]byte, []int) {
+	return file_banyandb_stream_v2_write_proto_rawDescGZIP(), []int{0, 0}
+}
+
+func (x *ElementValue_TagFamily) GetTags() []*v21.Tag {
+	if x != nil {
+		return x.Tags
+	}
+	return nil
+}
+
 var File_banyandb_stream_v2_write_proto protoreflect.FileDescriptor
 
 var file_banyandb_stream_v2_write_proto_rawDesc = []byte{
@@ -229,35 +264,39 @@ var file_banyandb_stream_v2_write_proto_rawDesc = []byte{
 	0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e,
 	0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
 	0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76, 0x32, 0x2f, 0x77, 0x72, 0x69, 0x74, 0x65, 0x2e,
-	0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xb4, 0x01, 0x0a, 0x0c, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e,
+	0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xef, 0x01, 0x0a, 0x0c, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e,
 	0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x6e,
 	0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x65, 0x6c, 0x65, 0x6d,
 	0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
 	0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
 	0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
 	0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12,
-	0x1f, 0x0a, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x18, 0x03,
-	0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x42, 0x69, 0x6e, 0x61, 0x72, 0x79,
-	0x12, 0x2a, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16,
-	0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e,
-	0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x84, 0x01, 0x0a,
-	0x0c, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a,
-	0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32,
-	0x1c, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f,
-	0x6e, 0x2e, 0x76, 0x32, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d,
-	0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x3a, 0x0a, 0x07, 0x65, 0x6c, 0x65, 0x6d, 0x65,
-	0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61,
-	0x6e, 0x64, 0x62, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6c,
-	0x65, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x07, 0x65, 0x6c, 0x65, 0x6d,
-	0x65, 0x6e, 0x74, 0x22, 0x0f, 0x0a, 0x0d, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70,
-	0x6f, 0x6e, 0x73, 0x65, 0x42, 0x6e, 0x0a, 0x28, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63,
-	0x68, 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61,
-	0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32,
-	0x5a, 0x42, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61,
-	0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62,
-	0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74,
-	0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61,
-	0x6d, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x4d, 0x0a, 0x0c, 0x74, 0x61, 0x67, 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65, 0x73, 0x18,
+	0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
+	0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6c, 0x65, 0x6d, 0x65,
+	0x6e, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x2e, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c,
+	0x79, 0x52, 0x0b, 0x74, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65, 0x73, 0x1a, 0x37,
+	0x0a, 0x09, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x12, 0x2a, 0x0a, 0x04, 0x74,
+	0x61, 0x67, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x62, 0x61, 0x6e, 0x79,
+	0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x54, 0x61,
+	0x67, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x84, 0x01, 0x0a, 0x0c, 0x57, 0x72, 0x69, 0x74,
+	0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61,
+	0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x62, 0x61, 0x6e,
+	0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x32, 0x2e,
+	0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61,
+	0x74, 0x61, 0x12, 0x3a, 0x0a, 0x07, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20,
+	0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x73,
+	0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74,
+	0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x07, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x0f,
+	0x0a, 0x0d, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42,
+	0x6e, 0x0a, 0x28, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b,
+	0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64,
+	0x62, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32, 0x5a, 0x42, 0x67, 0x69, 0x74,
+	0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73,
+	0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e,
+	0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e,
+	0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x76, 0x32, 0x62,
+	0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
@@ -272,25 +311,27 @@ func file_banyandb_stream_v2_write_proto_rawDescGZIP() []byte {
 	return file_banyandb_stream_v2_write_proto_rawDescData
 }
 
-var file_banyandb_stream_v2_write_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
+var file_banyandb_stream_v2_write_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
 var file_banyandb_stream_v2_write_proto_goTypes = []interface{}{
-	(*ElementValue)(nil),          // 0: banyandb.stream.v2.ElementValue
-	(*WriteRequest)(nil),          // 1: banyandb.stream.v2.WriteRequest
-	(*WriteResponse)(nil),         // 2: banyandb.stream.v2.WriteResponse
-	(*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp
-	(*v2.Tag)(nil),                // 4: banyandb.model.v2.Tag
-	(*v21.Metadata)(nil),          // 5: banyandb.common.v2.Metadata
+	(*ElementValue)(nil),           // 0: banyandb.stream.v2.ElementValue
+	(*WriteRequest)(nil),           // 1: banyandb.stream.v2.WriteRequest
+	(*WriteResponse)(nil),          // 2: banyandb.stream.v2.WriteResponse
+	(*ElementValue_TagFamily)(nil), // 3: banyandb.stream.v2.ElementValue.TagFamily
+	(*timestamppb.Timestamp)(nil),  // 4: google.protobuf.Timestamp
+	(*v2.Metadata)(nil),            // 5: banyandb.common.v2.Metadata
+	(*v21.Tag)(nil),                // 6: banyandb.model.v2.Tag
 }
 var file_banyandb_stream_v2_write_proto_depIdxs = []int32{
-	3, // 0: banyandb.stream.v2.ElementValue.timestamp:type_name -> google.protobuf.Timestamp
-	4, // 1: banyandb.stream.v2.ElementValue.tags:type_name -> banyandb.model.v2.Tag
+	4, // 0: banyandb.stream.v2.ElementValue.timestamp:type_name -> google.protobuf.Timestamp
+	3, // 1: banyandb.stream.v2.ElementValue.tag_families:type_name -> banyandb.stream.v2.ElementValue.TagFamily
 	5, // 2: banyandb.stream.v2.WriteRequest.metadata:type_name -> banyandb.common.v2.Metadata
 	0, // 3: banyandb.stream.v2.WriteRequest.element:type_name -> banyandb.stream.v2.ElementValue
-	4, // [4:4] is the sub-list for method output_type
-	4, // [4:4] is the sub-list for method input_type
-	4, // [4:4] is the sub-list for extension type_name
-	4, // [4:4] is the sub-list for extension extendee
-	0, // [0:4] is the sub-list for field type_name
+	6, // 4: banyandb.stream.v2.ElementValue.TagFamily.tags:type_name -> banyandb.model.v2.Tag
+	5, // [5:5] is the sub-list for method output_type
+	5, // [5:5] is the sub-list for method input_type
+	5, // [5:5] is the sub-list for extension type_name
+	5, // [5:5] is the sub-list for extension extendee
+	0, // [0:5] is the sub-list for field type_name
 }
 
 func init() { file_banyandb_stream_v2_write_proto_init() }
@@ -335,6 +376,18 @@ func file_banyandb_stream_v2_write_proto_init() {
 				return nil
 			}
 		}
+		file_banyandb_stream_v2_write_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*ElementValue_TagFamily); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
 	}
 	type x struct{}
 	out := protoimpl.TypeBuilder{
@@ -342,7 +395,7 @@ func file_banyandb_stream_v2_write_proto_init() {
 			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
 			RawDescriptor: file_banyandb_stream_v2_write_proto_rawDesc,
 			NumEnums:      0,
-			NumMessages:   3,
+			NumMessages:   4,
 			NumExtensions: 0,
 			NumServices:   0,
 		},
diff --git a/api/proto/banyandb/stream/v2/write.proto b/api/proto/banyandb/stream/v2/write.proto
index 59b0eef..39ef649 100644
--- a/api/proto/banyandb/stream/v2/write.proto
+++ b/api/proto/banyandb/stream/v2/write.proto
@@ -26,6 +26,8 @@ import "google/protobuf/timestamp.proto";
 import "banyandb/common/v2/common.proto";
 import "banyandb/model/v2/write.proto";
 
+
+
 message ElementValue {
   // element_id could be span_id of a Span or segment_id of a Segment in the context of stream
   string element_id = 1;
@@ -33,13 +35,11 @@ message ElementValue {
   // 1) either the start time of a Span/Segment,
   // 2) or the timestamp of a log
   google.protobuf.Timestamp timestamp = 2;
-  // binary representation of segments, including tags, spans...
-  bytes data_binary = 3;
-  // support all of indexed tags in the tags.
-  // Tag only has value, as the value of value_type match with the key
-  // by the index rules and index rule bindings of Metadata group.
-  // indexed tags of multiple entities are compression in the tags.
-  repeated model.v2.Tag tags = 4;
+  message TagFamily {
+    repeated model.v2.Tag tags = 1;
+  }
+  // the order of tag_families' items match the stream schema
+  repeated TagFamily tag_families = 3;
 }
 
 message WriteRequest {
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 57e1fd9..7bf3fc8 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -35,8 +35,13 @@ type IndexFilter interface {
 	IndexRules(ctx context.Context, subject *commonv2.Metadata) ([]*databasev2.IndexRule, error)
 }
 
-type Metadata interface {
+type Repo interface {
 	IndexFilter
+	Stream() schema.Stream
+}
+
+type Service interface {
+	Repo
 	run.Unit
 }
 
@@ -46,11 +51,7 @@ type service struct {
 	indexRuleBinding schema.IndexRuleBinding
 }
 
-func (s *service) Name() string {
-	return "metadata"
-}
-
-func NewService(_ context.Context) (Metadata, error) {
+func NewService(_ context.Context) (Repo, error) {
 	stream, err := schema.NewStream()
 	if err != nil {
 		return nil, err
@@ -70,6 +71,14 @@ func NewService(_ context.Context) (Metadata, error) {
 	}, nil
 }
 
+func (s *service) Stream() schema.Stream {
+	return s.stream
+}
+
+func (s *service) Name() string {
+	return "metadata"
+}
+
 func (s *service) IndexRules(ctx context.Context, subject *commonv2.Metadata) ([]*databasev2.IndexRule, error) {
 	bindings, err := s.indexRuleBinding.List(ctx, schema.ListOpt{Group: subject.Group})
 	if err != nil {
diff --git a/banyand/metadata/schema/data/stream.json b/banyand/metadata/schema/data/stream.json
index c34ec65..41b3b52 100644
--- a/banyand/metadata/schema/data/stream.json
+++ b/banyand/metadata/schema/data/stream.json
@@ -9,7 +9,7 @@
       "tags": [
         {
           "name": "data_binary",
-          "type": "TAG_TYPE_UNSPECIFIED"
+          "type": "TAG_TYPE_DATA_BINARY"
         }
       ]
     },
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
new file mode 100644
index 0000000..a187f29
--- /dev/null
+++ b/banyand/stream/service.go
@@ -0,0 +1,102 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+	"context"
+
+	"github.com/pkg/errors"
+
+	"github.com/apache/skywalking-banyandb/banyand/metadata"
+	"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+var ErrEmptyRootPath = errors.New("root path is empty")
+
+type Service interface {
+	run.PreRunner
+	run.Config
+	run.Service
+}
+
+var _ Service = (*service)(nil)
+
+type service struct {
+	schemaMap     map[string]*stream
+	writeListener *writeCallback
+	l             *logger.Logger
+	metadata      metadata.Repo
+	root          string
+}
+
+func (s *service) FlagSet() *run.FlagSet {
+	flagS := run.NewFlagSet("storage")
+	flagS.StringVar(&s.root, "root-path", "/tmp", "the root path of database")
+	return flagS
+}
+
+func (s *service) Validate() error {
+	if s.root == "" {
+		return ErrEmptyRootPath
+	}
+	return nil
+}
+
+func (s *service) Name() string {
+	return "stream"
+}
+
+func (s *service) PreRun() error {
+	schemas, err := s.metadata.Stream().List(context.Background(), schema.ListOpt{})
+	if err != nil {
+		return err
+	}
+
+	s.schemaMap = make(map[string]*stream, len(schemas))
+	s.l = logger.GetLogger(s.Name())
+	for _, sa := range schemas {
+		sm, errTS := openStream(s.root, sa, s.l)
+		if errTS != nil {
+			return errTS
+		}
+		id := formatStreamID(sm.name, sm.group)
+		s.schemaMap[id] = sm
+		s.writeListener.schemaMap[id] = sm
+		s.l.Info().Str("id", id).Msg("initialize stream")
+	}
+	s.writeListener = setUpWriteCallback(s.l, s.schemaMap)
+	return err
+}
+
+func (s *service) Serve() error {
+	panic("implement me")
+}
+
+func (s *service) GracefulStop() {
+	panic("implement me")
+}
+
+//NewService returns a new service
+func NewService(_ context.Context, metadata metadata.Repo, pipeline queue.Queue) (Service, error) {
+	return &service{
+		metadata: metadata,
+	}, nil
+}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
new file mode 100644
index 0000000..88da951
--- /dev/null
+++ b/banyand/stream/stream.go
@@ -0,0 +1,104 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+	"context"
+
+	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+type stream struct {
+	name        string
+	group       string
+	l           *logger.Logger
+	schema      *databasev2.Stream
+	db          tsdb.Database
+	entityIndex []struct {
+		family int
+		tag    int
+	}
+}
+
+func (s *stream) Close() error {
+	return s.db.Close()
+}
+
+func (s *stream) parseSchema() {
+	sm := s.schema
+	meta := sm.GetMetadata()
+	s.name, s.group = meta.GetName(), meta.GetGroup()
+	for _, tagInEntity := range sm.Entity.GetTagNames() {
+	nextEntityTag:
+		for fi, family := range sm.GetTagFamilies() {
+			for ti, tag := range family.Tags {
+				if tagInEntity == tag.GetName() {
+					s.entityIndex = append(s.entityIndex, struct {
+						family int
+						tag    int
+					}{family: fi, tag: ti})
+					break nextEntityTag
+				}
+			}
+		}
+	}
+}
+
+func openStream(root string, schema *databasev2.Stream, l *logger.Logger) (*stream, error) {
+	sm := &stream{
+		schema: schema,
+		l:      l,
+	}
+	sm.parseSchema()
+	db, err := tsdb.OpenDatabase(
+		context.WithValue(context.Background(), logger.ContextKey, l),
+		tsdb.DatabaseOpts{
+			Location: root,
+			ShardNum: uint(schema.GetShardNum()),
+		})
+	if err != nil {
+		return nil, err
+	}
+	sm.db = db
+	return sm, nil
+}
+
+func formatStreamID(name, group string) string {
+	return name + ":" + group
+}
+
+func tagValueTypeConv(tag *modelv2.Tag) (tagType databasev2.TagType, isNull bool) {
+	switch tag.GetValueType().(type) {
+	case *modelv2.Tag_Int:
+		return databasev2.TagType_TAG_TYPE_INT, false
+	case *modelv2.Tag_Str:
+		return databasev2.TagType_TAG_TYPE_STRING, false
+	case *modelv2.Tag_IntArray:
+		return databasev2.TagType_TAG_TYPE_INT_ARRAY, false
+	case *modelv2.Tag_StrArray:
+		return databasev2.TagType_TAG_TYPE_STRING_ARRAY, false
+	case *modelv2.Tag_BinaryData:
+		return databasev2.TagType_TAG_TYPE_DATA_BINARY, false
+	case *modelv2.Tag_Null:
+		return databasev2.TagType_TAG_TYPE_UNSPECIFIED, true
+	}
+	return databasev2.TagType_TAG_TYPE_UNSPECIFIED, false
+}
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
new file mode 100644
index 0000000..c16e75b
--- /dev/null
+++ b/banyand/stream/stream_write.go
@@ -0,0 +1,152 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+	"github.com/pkg/errors"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-banyandb/api/data"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+	ErrMalformedElement          = errors.New("element is malformed")
+	ErrUnsupportedTagTypeAsEntry = errors.New("the tag type can not be as an entry in an entity")
+)
+
+func (s *stream) Write(shardID uint, value *streamv2.ElementValue) error {
+	sm := s.schema
+	fLen := len(value.GetTagFamilies())
+	if fLen < 1 {
+		return errors.Wrap(ErrMalformedElement, "no tag family")
+	}
+	if fLen > len(sm.TagFamilies) {
+		return errors.Wrap(ErrMalformedElement, "tag family number is more than expected")
+	}
+	shard, err := s.db.Shard(shardID)
+	if err != nil {
+		return err
+	}
+	entity, err := s.buildEntity(value)
+	if err != nil {
+		return err
+	}
+	series, err := shard.Series().Get(entity)
+	if err != nil {
+		return err
+	}
+	t := value.GetTimestamp().AsTime()
+	wp, err := series.Span(tsdb.TimeRange{
+		Start:    t,
+		Duration: 0,
+	})
+	if err != nil {
+		return err
+	}
+	defer func() {
+		_ = wp.Close()
+	}()
+	builder := wp.WriterBuilder().Time(t)
+	for fi, family := range value.GetTagFamilies() {
+		familySpec := sm.GetTagFamilies()[fi]
+		if len(family.GetTags()) > len(familySpec.GetTags()) {
+			return errors.Wrap(ErrMalformedElement, "tag number is more than expected")
+		}
+		for ti, tag := range family.GetTags() {
+			tagSpec := familySpec.GetTags()[ti]
+			tType, isNull := tagValueTypeConv(tag)
+			if isNull {
+				continue
+			}
+			if tType != tagSpec.GetType() {
+				return errors.Wrapf(ErrMalformedElement, "tag %s type is unexpected", tagSpec.GetName())
+			}
+		}
+		bb, errMarshal := proto.Marshal(family)
+		if errMarshal != nil {
+			return errMarshal
+		}
+		builder.Family(sm.GetTagFamilies()[fi].GetName(), bb)
+	}
+	writer, err := builder.Build()
+	if err != nil {
+		return err
+	}
+	_, err = writer.Write()
+	return err
+}
+
+func (s *stream) buildEntity(value *streamv2.ElementValue) (entity tsdb.Entity, err error) {
+	for _, index := range s.entityIndex {
+		family := value.GetTagFamilies()[index.family]
+		if index.tag >= len(family.GetTags()) {
+			return nil, errors.Wrap(ErrMalformedElement, "the tag which composite the entity doesn't exist ")
+		}
+		entry, err := tagConvEntry(family.GetTags()[index.tag])
+		if err != nil {
+			return nil, err
+		}
+		entity = append(entity, entry)
+	}
+	return entity, nil
+}
+
+func tagConvEntry(tag *modelv2.Tag) (tsdb.Entry, error) {
+	switch tag.GetValueType().(type) {
+	case *modelv2.Tag_Str:
+		return tsdb.Entry(tag.GetStr().GetValue()), nil
+	case *modelv2.Tag_Int:
+		return convert.Int64ToBytes(tag.GetInt().GetValue()), nil
+	default:
+		return nil, ErrUnsupportedTagTypeAsEntry
+	}
+}
+
+type writeCallback struct {
+	l         *logger.Logger
+	schemaMap map[string]*stream
+}
+
+func setUpWriteCallback(l *logger.Logger, schemaMap map[string]*stream) *writeCallback {
+	wcb := &writeCallback{
+		l:         l,
+		schemaMap: schemaMap,
+	}
+	return wcb
+}
+
+func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
+	writeEvent, ok := message.Data().(data.StreamWriteData)
+	if !ok {
+		w.l.Warn().Msg("invalid event data type")
+		return
+	}
+	sm := writeEvent.WriteRequest.GetMetadata()
+	id := formatStreamID(sm.GetName(), sm.GetGroup())
+	err := w.schemaMap[id].Write(writeEvent.ShardID, writeEvent.WriteRequest.GetElement())
+	if err != nil {
+		w.l.Debug().Err(err)
+	}
+	return
+}
diff --git a/banyand/stream/stream_write_test.go b/banyand/stream/stream_write_test.go
new file mode 100644
index 0000000..dac9281
--- /dev/null
+++ b/banyand/stream/stream_write_test.go
@@ -0,0 +1,282 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+	"context"
+	"encoding/base64"
+	"math"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/protobuf/types/known/timestamppb"
+
+	commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func Test_Stream_Write(t *testing.T) {
+	tester := assert.New(t)
+	s, deferFunc := setup(tester)
+	defer deferFunc()
+
+	type args struct {
+		shardID uint
+		ele     *streamv2.ElementValue
+	}
+	tests := []struct {
+		name    string
+		args    args
+		wantErr bool
+	}{
+		{
+			name: "golden path",
+			args: args{
+				shardID: 0,
+				ele: getEle(
+					"trace_id-xxfff.111323",
+					0,
+					"webapp_id",
+					"10.0.0.1_id",
+					"/home_id",
+					300,
+					1622933202000000000,
+				),
+			},
+		},
+		{
+			name: "minimal",
+			args: args{
+				shardID: 1,
+				ele: getEle(
+					nil,
+					1,
+					"webapp_id",
+					"10.0.0.1_id",
+				),
+			},
+		},
+		{
+			name: "http",
+			args: args{
+				shardID: 0,
+				ele: getEle(
+					"trace_id-xxfff.111323",
+					0,
+					"webapp_id",
+					"10.0.0.1_id",
+					"/home_id",
+					300,
+					1622933202000000000,
+					"GET",
+					"200",
+				),
+			},
+		},
+		{
+			name: "database",
+			args: args{
+				shardID: 0,
+				ele: getEle(
+					"trace_id-xxfff.111323",
+					0,
+					"webapp_id",
+					"10.0.0.1_id",
+					"/home_id",
+					300,
+					1622933202000000000,
+					nil,
+					nil,
+					"MySQL",
+					"10.1.1.2",
+				),
+			},
+		},
+		{
+			name: "mq",
+			args: args{
+				shardID: 0,
+				ele: getEle(
+					"trace_id-xxfff.111323",
+					1,
+					"webapp_id",
+					"10.0.0.1_id",
+					"/home_id",
+					300,
+					1622933202000000000,
+					nil,
+					nil,
+					nil,
+					nil,
+					"test_topic",
+					"10.0.0.1",
+					"broker",
+				),
+			},
+		},
+		{
+			name: "invalid trace id",
+			args: args{
+				shardID: 1,
+				ele: getEle(
+					1212323,
+					1,
+					"webapp_id",
+					"10.0.0.1_id",
+				),
+			},
+			wantErr: true,
+		},
+		{
+			name:    "empty input",
+			args:    args{},
+			wantErr: true,
+		},
+		{
+			name: "invalid shard id",
+			args: args{
+				shardID: math.MaxUint64,
+				ele: getEle(
+					"trace_id-xxfff.111323",
+					0,
+					"webapp_id",
+					"10.0.0.1_id",
+					"/home_id",
+					300,
+					1622933202000000000,
+				),
+			},
+			wantErr: true,
+		},
+		{
+			name: "unknown tags",
+			args: args{
+				shardID: 0,
+				ele: getEle(
+					"trace_id-xxfff.111323",
+					1,
+					"webapp_id",
+					"10.0.0.1_id",
+					"/home_id",
+					300,
+					1622933202000000000,
+					nil,
+					nil,
+					nil,
+					nil,
+					"test_topic",
+					"10.0.0.1",
+					"broker",
+					"unknown",
+				),
+			},
+			wantErr: true,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			err := s.Write(tt.args.shardID, tt.args.ele)
+			if tt.wantErr {
+				tester.Error(err)
+				return
+			}
+			tester.NoError(err)
+		})
+	}
+
+}
+
+func setup(t *assert.Assertions) (*stream, func()) {
+	t.NoError(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "info",
+	}))
+	tempDir, deferFunc := test.Space(t)
+	streamRepo, err := schema.NewStream()
+	t.NoError(err)
+	streamSpec, err := streamRepo.Get(context.TODO(), &commonv2.Metadata{
+		Name:  "sw",
+		Group: "default",
+	})
+	t.NoError(err)
+	s, err := openStream(tempDir, streamSpec, logger.GetLogger("test"))
+	t.NoError(err)
+	return s, func() {
+		_ = s.Close()
+		deferFunc()
+	}
+}
+
+func getEle(tags ...interface{}) *streamv2.ElementValue {
+	searchableTags := make([]*modelv2.Tag, 0)
+	for _, tag := range tags {
+		searchableTags = append(searchableTags, getTag(tag))
+	}
+	bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
+	e := &streamv2.ElementValue{
+		ElementId: "1231.dfd.123123ssf",
+		Timestamp: timestamppb.Now(),
+		TagFamilies: []*streamv2.ElementValue_TagFamily{
+			{
+				Tags: []*modelv2.Tag{
+					{
+						ValueType: &modelv2.Tag_BinaryData{
+							BinaryData: bb,
+						},
+					},
+				},
+			},
+			{
+				Tags: searchableTags,
+			},
+		},
+	}
+	return e
+}
+
+func getTag(tag interface{}) *modelv2.Tag {
+	if tag == nil {
+		return &modelv2.Tag{
+			ValueType: &modelv2.Tag_Null{},
+		}
+	}
+	switch t := tag.(type) {
+	case int:
+		return &modelv2.Tag{
+			ValueType: &modelv2.Tag_Int{
+				Int: &modelv2.Int{
+					Value: int64(t),
+				},
+			},
+		}
+	case string:
+		return &modelv2.Tag{
+			ValueType: &modelv2.Tag_Str{
+				Str: &modelv2.Str{
+					Value: t,
+				},
+			},
+		}
+	}
+	return nil
+}
diff --git a/banyand/stream/testdata/shard0.json b/banyand/stream/testdata/shard0.json
new file mode 100644
index 0000000..28de2f3
--- /dev/null
+++ b/banyand/stream/testdata/shard0.json
@@ -0,0 +1,18 @@
+[
+  {
+    "element_id": "1",
+    "timestamp": "2021-04-15T01:30:15.01Z",
+    "tag_families": [
+      {
+        "tags": [
+          {"binary_data": "YWJjMTIzIT8kKiYoKSctPUB+"}
+        ]
+      },
+      {
+        "tags": [
+          {"str": ""}
+        ]
+      }
+    ]
+  }
+]
\ No newline at end of file
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index dcab966..6505582 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -20,6 +20,9 @@ package tsdb
 import (
 	"context"
 	"io"
+	"time"
+
+	"github.com/dgraph-io/ristretto/z"
 
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -28,10 +31,14 @@ import (
 type block struct {
 	path string
 	l    *logger.Logger
+	ref  *z.Closer
 
 	store       kv.TimeSeriesStore
 	treeIndex   kv.Store
 	closableLst []io.Closer
+	endTime     time.Time
+	startTime   time.Time
+
 	//revertedIndex kv.Store
 }
 
@@ -43,7 +50,9 @@ type blockOpts struct {
 
 func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 	b = &block{
-		path: opts.path,
+		path:      opts.path,
+		ref:       z.NewCloser(1),
+		startTime: time.Now(),
 	}
 	parentLogger := ctx.Value(logger.ContextKey)
 	if parentLogger != nil {
@@ -62,8 +71,54 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 	return b, nil
 }
 
+func (b *block) delegate() blockDelegate {
+	b.incRef()
+	return &bDelegate{
+		delegate: b,
+	}
+}
+
+func (b *block) dscRef() {
+	b.ref.Done()
+}
+
+func (b *block) incRef() {
+	b.ref.AddRunning(1)
+}
+
 func (b *block) close() {
+	b.dscRef()
+	b.ref.SignalAndWait()
 	for _, closer := range b.closableLst {
 		_ = closer.Close()
 	}
 }
+
+type blockDelegate interface {
+	io.Closer
+	contains(ts time.Time) bool
+	write(key []byte, val []byte, ts time.Time) error
+}
+
+var _ blockDelegate = (*bDelegate)(nil)
+
+type bDelegate struct {
+	delegate *block
+}
+
+func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
+	return d.delegate.store.Put(key, val, uint64(ts.UnixNano()))
+}
+
+func (d *bDelegate) contains(ts time.Time) bool {
+	greaterAndEqualStart := d.delegate.startTime.Equal(ts) || d.delegate.startTime.Before(ts)
+	if d.delegate.endTime.IsZero() {
+		return greaterAndEqualStart
+	}
+	return greaterAndEqualStart && d.delegate.endTime.After(ts)
+}
+
+func (d *bDelegate) Close() error {
+	d.delegate.dscRef()
+	return nil
+}
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 810aae0..ee8ce57 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -18,12 +18,20 @@
 package tsdb
 
 import (
+	"bytes"
+	"io"
 	"time"
 
+	"github.com/pkg/errors"
+	"go.uber.org/multierr"
+
 	"github.com/apache/skywalking-banyandb/api/common"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
 )
 
+var ErrEmptySeriesSpan = errors.New("there is no data in such time range")
+
 type Iterator interface {
 	Next() bool
 	Val() Item
@@ -43,6 +51,10 @@ type ConditionValue struct {
 type Condition map[string][]ConditionValue
 
 type ItemID struct {
+	//shardID int
+	//segID   []byte
+	//blockID []byte
+	id []byte
 }
 
 type TimeRange struct {
@@ -57,21 +69,23 @@ type Series interface {
 }
 
 type SeriesSpan interface {
+	io.Closer
 	WriterBuilder() WriterBuilder
 	Iterator() Iterator
 	SeekerBuilder() SeekerBuilder
 }
 
 type WriterBuilder interface {
-	Family(name string) WriterBuilder
+	Family(name string, val []byte) WriterBuilder
 	Time(ts time.Time) WriterBuilder
 	Val(val []byte) WriterBuilder
-	OrderBy(order modelv2.QueryOrder_Sort) WriterBuilder
-	Build() Writer
+	Build() (Writer, error)
 }
 
 type Writer interface {
-	Write() ItemID
+	Write() (ItemID, error)
+	WriteLSMIndex(name string, val []byte) error
+	WriteInvertedIndex(name string, val []byte) error
 }
 
 type SeekerBuilder interface {
@@ -88,12 +102,14 @@ type Seeker interface {
 var _ Series = (*series)(nil)
 
 type series struct {
-	id common.SeriesID
+	id      common.SeriesID
+	blockDB blockDatabase
 }
 
-func newSeries(id common.SeriesID) *series {
+func newSeries(id common.SeriesID, blockDB blockDatabase) *series {
 	return &series{
-		id: id,
+		id:      id,
+		blockDB: blockDB,
 	}
 }
 
@@ -102,9 +118,145 @@ func (s *series) ID() common.SeriesID {
 }
 
 func (s *series) Span(timeRange TimeRange) (SeriesSpan, error) {
-	panic("implement me")
+	blocks := s.blockDB.span(timeRange)
+	if len(blocks) < 1 {
+		return nil, ErrEmptySeriesSpan
+	}
+	return newSeriesSpan(blocks, s.id), nil
 }
 
 func (s *series) Get(id ItemID) (Item, error) {
+	panic("not implemented")
+}
+
+var _ SeriesSpan = (*seriesSpan)(nil)
+
+type seriesSpan struct {
+	blocks   []blockDelegate
+	seriesID common.SeriesID
+}
+
+func (s *seriesSpan) Close() (err error) {
+	for _, delegate := range s.blocks {
+		err = multierr.Append(err, delegate.Close())
+	}
+	return err
+}
+
+func (s *seriesSpan) WriterBuilder() WriterBuilder {
+	return newWriterBuilder(s)
+}
+
+func (s *seriesSpan) Iterator() Iterator {
 	panic("implement me")
 }
+
+func (s *seriesSpan) SeekerBuilder() SeekerBuilder {
+	panic("implement me")
+}
+
+func newSeriesSpan(blocks []blockDelegate, id common.SeriesID) *seriesSpan {
+	return &seriesSpan{
+		blocks:   blocks,
+		seriesID: id,
+	}
+}
+
+var _ WriterBuilder = (*writerBuilder)(nil)
+
+type writerBuilder struct {
+	series *seriesSpan
+	block  blockDelegate
+	values []struct {
+		family []byte
+		val    []byte
+	}
+	ts            time.Time
+	seriesIDBytes []byte
+}
+
+func (w *writerBuilder) Family(name string, val []byte) WriterBuilder {
+	w.values = append(w.values, struct {
+		family []byte
+		val    []byte
+	}{family: bytes.Join([][]byte{w.seriesIDBytes, hash([]byte(name))}, nil), val: val})
+	return w
+}
+
+func (w *writerBuilder) Time(ts time.Time) WriterBuilder {
+	w.ts = ts
+	for _, b := range w.series.blocks {
+		if b.contains(ts) {
+			w.block = b
+			break
+		}
+	}
+	return w
+}
+
+func (w *writerBuilder) Val(val []byte) WriterBuilder {
+	w.values = append(w.values, struct {
+		family []byte
+		val    []byte
+	}{val: val})
+	return w
+}
+
+var ErrNoTime = errors.New("no time specified")
+var ErrNoVal = errors.New("no value specified")
+
+func (w *writerBuilder) Build() (Writer, error) {
+	if w.block == nil {
+		return nil, ErrNoTime
+	}
+	if len(w.values) < 1 {
+		return nil, ErrNoVal
+	}
+	wt := &writer{
+		block:    w.block,
+		ts:       w.ts,
+		seriesID: w.seriesIDBytes,
+		itemID:   bytes.Join([][]byte{w.seriesIDBytes, convert.Int64ToBytes(w.ts.UnixNano())}, nil),
+		columns:  w.values,
+	}
+	return wt, nil
+}
+
+func newWriterBuilder(seriesSpan *seriesSpan) WriterBuilder {
+	return &writerBuilder{
+		series:        seriesSpan,
+		seriesIDBytes: convert.Uint64ToBytes(uint64(seriesSpan.seriesID)),
+	}
+}
+
+var _ Writer = (*writer)(nil)
+
+type writer struct {
+	block    blockDelegate
+	ts       time.Time
+	seriesID []byte
+	columns  []struct {
+		family []byte
+		val    []byte
+	}
+	itemID []byte
+}
+
+func (w *writer) WriteLSMIndex(name string, val []byte) error {
+	panic("implement me")
+}
+
+func (w *writer) WriteInvertedIndex(name string, val []byte) error {
+	panic("implement me")
+}
+
+func (w *writer) Write() (id ItemID, err error) {
+	for _, c := range w.columns {
+		err = w.block.write(c.family, c.val, w.ts)
+		if err != nil {
+			return id, err
+		}
+	}
+	id.id = w.itemID
+	return id, nil
+}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index b38c30f..aa9ec22 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -80,11 +80,16 @@ func NewPath(entries []Entry) Path {
 
 type SeriesDatabase interface {
 	io.Closer
-	Create(entity Entity) error
+	Get(entity Entity) (Series, error)
 	List(path Path) (SeriesList, error)
 }
 
+type blockDatabase interface {
+	span(timeRange TimeRange) []blockDelegate
+}
+
 var _ SeriesDatabase = (*seriesDB)(nil)
+var _ blockDatabase = (*seriesDB)(nil)
 
 type seriesDB struct {
 	sync.Mutex
@@ -94,59 +99,23 @@ type seriesDB struct {
 	seriesMetadata kv.Store
 }
 
-func (s *seriesDB) Close() error {
-	for _, seg := range s.lst {
-		seg.close()
-	}
-	return s.seriesMetadata.Close()
-}
-
-func newSeriesDataBase(ctx context.Context, path string) (SeriesDatabase, error) {
-	sdb := &seriesDB{}
-	parentLogger := ctx.Value(logger.ContextKey)
-	if parentLogger == nil {
-		return nil, logger.ErrNoLoggerInContext
-	}
-	if pl, ok := parentLogger.(*logger.Logger); ok {
-		sdb.l = pl.Named("series")
-	}
-	var err error
-	sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", kv.StoreWithNamedLogger("metadata", sdb.l))
-	if err != nil {
-		return nil, err
-	}
-	segPath, err := mkdir(segTemplate, path, time.Now().Format(segFormat))
-	if err != nil {
-		return nil, err
-	}
-	seg, err := newSegment(ctx, segPath)
-	if err != nil {
-		return nil, err
-	}
-	{
-		sdb.Lock()
-		defer sdb.Unlock()
-		sdb.lst = append(sdb.lst, seg)
-	}
-	return sdb, nil
-}
-
-func (s *seriesDB) Create(entity Entity) error {
+func (s *seriesDB) Get(entity Entity) (Series, error) {
 	key := hashEntity(entity)
-	_, err := s.seriesMetadata.Get(key)
+	seriesID, err := s.seriesMetadata.Get(key)
 	if err != nil && err != kv.ErrKeyNotFound {
-		return err
+		return nil, err
 	}
 	if err == nil {
-		return nil
+		return newSeries(bytesConvSeriesID(seriesID), s), nil
 	}
 	s.Lock()
 	defer s.Unlock()
-	err = s.seriesMetadata.Put(key, hash(key))
+	seriesID = hash(key)
+	err = s.seriesMetadata.Put(key, seriesID)
 	if err != nil {
-		return err
+		return nil, err
 	}
-	return nil
+	return newSeries(bytesConvSeriesID(seriesID), s), nil
 }
 
 func (s *seriesDB) List(path Path) (SeriesList, error) {
@@ -156,7 +125,7 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
 			return nil, err
 		}
 		if err == nil {
-			return []Series{newSeries(common.SeriesID(convert.BytesToUint64(id)))}, nil
+			return []Series{newSeries(bytesConvSeriesID(id), s)}, nil
 		}
 		return nil, nil
 	}
@@ -173,7 +142,7 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
 				err = multierr.Append(err, errGetVal)
 				return nil
 			}
-			result = append(result, newSeries(common.SeriesID(convert.BytesToUint64(id))))
+			result = append(result, newSeries(common.SeriesID(convert.BytesToUint64(id)), s))
 		}
 		return nil
 	})
@@ -183,6 +152,52 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
 	return result, err
 }
 
+func (s *seriesDB) span(_ TimeRange) []blockDelegate {
+	//TODO: return correct blocks
+	result := make([]blockDelegate, 0, len(s.lst[0].lst))
+	for _, b := range s.lst[0].lst {
+		result = append(result, b.delegate())
+	}
+	return result
+}
+
+func (s *seriesDB) Close() error {
+	for _, seg := range s.lst {
+		seg.close()
+	}
+	return s.seriesMetadata.Close()
+}
+
+func newSeriesDataBase(ctx context.Context, path string) (SeriesDatabase, error) {
+	sdb := &seriesDB{}
+	parentLogger := ctx.Value(logger.ContextKey)
+	if parentLogger == nil {
+		return nil, logger.ErrNoLoggerInContext
+	}
+	if pl, ok := parentLogger.(*logger.Logger); ok {
+		sdb.l = pl.Named("series")
+	}
+	var err error
+	sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", kv.StoreWithNamedLogger("metadata", sdb.l))
+	if err != nil {
+		return nil, err
+	}
+	segPath, err := mkdir(segTemplate, path, time.Now().Format(segFormat))
+	if err != nil {
+		return nil, err
+	}
+	seg, err := newSegment(ctx, segPath)
+	if err != nil {
+		return nil, err
+	}
+	{
+		sdb.Lock()
+		defer sdb.Unlock()
+		sdb.lst = append(sdb.lst, seg)
+	}
+	return sdb, nil
+}
+
 func hashEntity(entity Entity) []byte {
 	result := make(Entry, 0, len(entity)*8)
 	for _, entry := range entity {
@@ -195,6 +210,10 @@ func hash(entry []byte) []byte {
 	return convert.Uint64ToBytes(convert.Hash(entry))
 }
 
+func bytesConvSeriesID(data []byte) common.SeriesID {
+	return common.SeriesID(convert.BytesToUint64(data))
+}
+
 type SeriesList []Series
 
 func (a SeriesList) Len() int {
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index 098648a..83b3008 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -141,7 +141,7 @@ func TestNewPath(t *testing.T) {
 	}
 }
 
-func Test_SeriesDatabase_Create(t *testing.T) {
+func Test_SeriesDatabase_Get(t *testing.T) {
 
 	tests := []struct {
 		name     string
@@ -152,7 +152,7 @@ func Test_SeriesDatabase_Create(t *testing.T) {
 			entities: []Entity{{
 				Entry("productpage"),
 				Entry("10.0.0.1"),
-				Entry(convert.Uint64ToBytes(0)),
+				convert.Uint64ToBytes(0),
 			}},
 		},
 		{
@@ -161,12 +161,12 @@ func Test_SeriesDatabase_Create(t *testing.T) {
 				{
 					Entry("productpage"),
 					Entry("10.0.0.1"),
-					Entry(convert.Uint64ToBytes(0)),
+					convert.Uint64ToBytes(0),
 				},
 				{
 					Entry("productpage"),
 					Entry("10.0.0.1"),
-					Entry(convert.Uint64ToBytes(0)),
+					convert.Uint64ToBytes(0),
 				},
 			},
 		},
@@ -183,7 +183,9 @@ func Test_SeriesDatabase_Create(t *testing.T) {
 			s, err := newSeriesDataBase(context.WithValue(context.Background(), logger.ContextKey, logger.GetLogger("test")), dir)
 			tester.NoError(err)
 			for _, entity := range tt.entities {
-				tester.NoError(s.Create(entity))
+				series, err := s.Get(entity)
+				tester.NoError(err)
+				tester.Equal(hashEntity(entity), series.ID())
 			}
 		})
 	}
@@ -214,7 +216,7 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				convert.Uint64ToBytes(0),
 			}),
 			want: SeriesList{
-				newSeries(data[0].id),
+				newMockSeries(data[0].id),
 			},
 		},
 		{
@@ -225,8 +227,8 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				AnyEntry,
 			}),
 			want: SeriesList{
-				newSeries(data[1].id),
-				newSeries(data[2].id),
+				newMockSeries(data[1].id),
+				newMockSeries(data[2].id),
 			},
 		},
 		{
@@ -237,10 +239,10 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				AnyEntry,
 			}),
 			want: SeriesList{
-				newSeries(data[0].id),
-				newSeries(data[1].id),
-				newSeries(data[2].id),
-				newSeries(data[3].id),
+				newMockSeries(data[0].id),
+				newMockSeries(data[1].id),
+				newMockSeries(data[2].id),
+				newMockSeries(data[3].id),
 			},
 		},
 		{
@@ -251,9 +253,9 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				convert.Uint64ToBytes(0),
 			}),
 			want: SeriesList{
-				newSeries(data[0].id),
-				newSeries(data[1].id),
-				newSeries(data[3].id),
+				newMockSeries(data[0].id),
+				newMockSeries(data[1].id),
+				newMockSeries(data[3].id),
 			},
 		},
 		{
@@ -264,8 +266,8 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				convert.Uint64ToBytes(1),
 			}),
 			want: SeriesList{
-				newSeries(data[2].id),
-				newSeries(data[4].id),
+				newMockSeries(data[2].id),
+				newMockSeries(data[4].id),
 			},
 		},
 	}
@@ -278,9 +280,7 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				return
 			}
 			tester.NoError(err)
-			sort.Sort(tt.want)
-			sort.Sort(series)
-			tester.Equal(tt.want, series)
+			tester.Equal(transform(tt.want), transform(series))
 		})
 	}
 }
@@ -330,7 +330,21 @@ func setUpEntities(t *assert.Assertions, db SeriesDatabase) []*entityWithID {
 	}
 	for _, d := range data {
 		d.id = common.SeriesID(convert.BytesToUint64(hash(hashEntity(d.entity))))
-		t.NoError(db.Create(d.entity))
+		series, err := db.Get(d.entity)
+		t.NoError(err)
+		t.Equal(hashEntity(d.entity), series.ID())
 	}
 	return data
 }
+
+func newMockSeries(id common.SeriesID) *series {
+	return newSeries(id, nil)
+}
+
+func transform(list SeriesList) (seriesIDs []common.SeriesID) {
+	sort.Sort(list)
+	for _, s := range list {
+		seriesIDs = append(seriesIDs, s.ID())
+	}
+	return seriesIDs
+}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 050444e..94cb90a 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -31,7 +31,7 @@ type shard struct {
 }
 
 func (s *shard) Series() SeriesDatabase {
-	panic("implement me")
+	return s.seriesDatabase
 }
 
 func (s *shard) Index() IndexDatabase {
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 503b0e8..3d20a88 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -44,9 +44,12 @@ const (
 	dirPerm = 0700
 )
 
+var ErrInvalidShardID = errors.New("invalid shard id")
+
 type Database interface {
 	io.Closer
 	Shards() []Shard
+	Shard(id uint) (Shard, error)
 }
 
 type Shard interface {
@@ -71,6 +74,24 @@ type database struct {
 	sync.Mutex
 }
 
+func (d *database) Shards() []Shard {
+	return d.sLst
+}
+
+func (d *database) Shard(id uint) (Shard, error) {
+	if id >= uint(len(d.sLst)) {
+		return nil, ErrInvalidShardID
+	}
+	return d.sLst[id], nil
+}
+
+func (d *database) Close() error {
+	for _, s := range d.sLst {
+		_ = s.Close()
+	}
+	return nil
+}
+
 func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
 	db := &database{
 		location: opts.Location,
@@ -98,17 +119,6 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
 	return createDatabase(thisContext, db)
 }
 
-func (d *database) Close() error {
-	for _, s := range d.sLst {
-		_ = s.Close()
-	}
-	return nil
-}
-
-func (d *database) Shards() []Shard {
-	return d.sLst
-}
-
 func createDatabase(ctx context.Context, db *database) (Database, error) {
 	var err error
 	db.Lock()
diff --git a/go.mod b/go.mod
index 65674a5..55e7aa9 100644
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@ require (
 	github.com/RoaringBitmap/roaring v0.9.1
 	github.com/cespare/xxhash v1.1.0
 	github.com/dgraph-io/badger/v3 v3.2011.1
+	github.com/dgraph-io/ristretto v0.1.0
 	github.com/golang/mock v1.5.0
 	github.com/golang/protobuf v1.5.2
 	github.com/google/go-cmp v0.5.6

[skywalking-banyandb] 02/02: Introduce indices generation

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 5c0e83c2ad1d8afe138f017e1782bd6e1e9a8dc2
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sat Sep 4 01:38:11 2021 +0800

    Introduce indices generation
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 api/common/id.go                    |   1 +
 banyand/liaison/grpc/trace_test.go  |   2 +-
 banyand/series/trace/common_test.go |   4 +-
 banyand/series/trace/query_test.go  |   6 +-
 banyand/series/trace/write_test.go  |   4 +-
 banyand/stream/index.go             | 107 ++++++++++++++++++++++++++++++++++++
 banyand/stream/service.go           |  11 +++-
 banyand/stream/stream.go            |  78 ++++++++++++++++++--------
 banyand/stream/stream_write.go      | 104 +++++++++++++++++++++++++----------
 banyand/stream/stream_write_test.go |  15 ++++-
 banyand/tsdb/block.go               |  21 +++++++
 banyand/tsdb/indexdb.go             | 104 +++++++++++++++++++++++++++++++++--
 banyand/tsdb/segment.go             |  35 +++++++++++-
 banyand/tsdb/series.go              |  91 +++++++++++++++++++++---------
 banyand/tsdb/seriesdb.go            |  27 ++++-----
 banyand/tsdb/seriesdb_test.go       |   4 +-
 banyand/tsdb/shard.go               |  34 ++++++++++--
 banyand/tsdb/tsdb.go                |  18 +++---
 18 files changed, 537 insertions(+), 129 deletions(-)

diff --git a/api/common/id.go b/api/common/id.go
index e49fa6a..6eafb8a 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -19,3 +19,4 @@ package common
 
 type ChunkID uint64
 type SeriesID uint64
+type ShardID uint32
diff --git a/banyand/liaison/grpc/trace_test.go b/banyand/liaison/grpc/trace_test.go
index ab64906..1d002b0 100644
--- a/banyand/liaison/grpc/trace_test.go
+++ b/banyand/liaison/grpc/trace_test.go
@@ -266,7 +266,7 @@ func traceWrite(t *testing.T, tc caseData, conn *grpclib.ClientConn) {
 	ctx := context.Background()
 	stream, errorWrite := client.Write(ctx)
 	if errorWrite != nil {
-		t.Errorf("%v.Write(_) = _, %v", client, errorWrite)
+		t.Errorf("%v.write(_) = _, %v", client, errorWrite)
 	}
 	waitc := make(chan struct{})
 	go func() {
diff --git a/banyand/series/trace/common_test.go b/banyand/series/trace/common_test.go
index 9071bb4..c5f47d1 100644
--- a/banyand/series/trace/common_test.go
+++ b/banyand/series/trace/common_test.go
@@ -244,10 +244,10 @@ func setupTestData(t *testing.T, ts *traceSeries, seriesEntities []seriesEntity)
 			EntityValue: ev,
 		})
 		if err != nil {
-			t.Error("Write() got error")
+			t.Error("write() got error")
 		}
 		if got < 1 {
-			t.Error("Write() got empty chunkID")
+			t.Error("write() got empty chunkID")
 		}
 		results = append(results, idWithShard{
 			id:      got,
diff --git a/banyand/series/trace/query_test.go b/banyand/series/trace/query_test.go
index f8a1ed3..251c607 100644
--- a/banyand/series/trace/query_test.go
+++ b/banyand/series/trace/query_test.go
@@ -158,7 +158,7 @@ func Test_traceSeries_FetchEntity(t *testing.T) {
 				entities = append(entities, ee...)
 			}
 			if (err != nil) != tt.wantErr {
-				t.Errorf("Write() error = %v, wantErr %v", err, tt.wantErr)
+				t.Errorf("write() error = %v, wantErr %v", err, tt.wantErr)
 			}
 			sort.Sort(entities)
 			assert.Equal(t, len(tt.wantEntities), len(entities))
@@ -212,7 +212,7 @@ func Test_traceSeries_FetchTrace(t *testing.T) {
 		t.Run(tt.name, func(t *testing.T) {
 			traceData, err := ts.FetchTrace(tt.args.traceID, series.ScanOptions{DataBinary: true, Projection: []string{"trace_id"}})
 			if (err != nil) != tt.wantErr {
-				t.Errorf("Write() error = %v, wantErr %v", err, tt.wantErr)
+				t.Errorf("write() error = %v, wantErr %v", err, tt.wantErr)
 			}
 			var entities ByEntityID = traceData.Entities
 			assert.Equal(t, len(tt.wantEntities), len(entities))
@@ -306,7 +306,7 @@ func Test_traceSeries_ScanEntity(t *testing.T) {
 			var err error
 			entities, err = ts.ScanEntity(uint64(tt.args.start.UnixNano()), uint64(tt.args.end.UnixNano()), series.ScanOptions{DataBinary: true, Projection: []string{"trace_id"}})
 			if (err != nil) != tt.wantErr {
-				t.Errorf("Write() error = %v, wantErr %v", err, tt.wantErr)
+				t.Errorf("write() error = %v, wantErr %v", err, tt.wantErr)
 			}
 			assert.Equal(t, len(tt.wantEntities), len(entities))
 			sort.Sort(entities)
diff --git a/banyand/series/trace/write_test.go b/banyand/series/trace/write_test.go
index 41dbcb0..18e4956 100644
--- a/banyand/series/trace/write_test.go
+++ b/banyand/series/trace/write_test.go
@@ -182,11 +182,11 @@ func Test_traceSeries_Write(t *testing.T) {
 				EntityValue: ev,
 			})
 			if (err != nil) != tt.wantErr {
-				t.Errorf("Write() error = %v, wantErr %v", err, tt.wantErr)
+				t.Errorf("write() error = %v, wantErr %v", err, tt.wantErr)
 				return
 			}
 			if err == nil && got < 1 {
-				t.Error("Write() got empty chunkID")
+				t.Error("write() got empty chunkID")
 			}
 		})
 	}
diff --git a/banyand/stream/index.go b/banyand/stream/index.go
new file mode 100644
index 0000000..1d9a463
--- /dev/null
+++ b/banyand/stream/index.go
@@ -0,0 +1,107 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+	"io"
+
+	"go.uber.org/multierr"
+
+	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/partition"
+)
+
+type indexMessage struct {
+	localWriter tsdb.Writer
+	blockCloser io.Closer
+	value       *streamv2.ElementValue
+}
+
+func (s *stream) bootIndexGenerator() {
+	go func() {
+		for {
+			m, more := <-s.indexCh
+			if !more {
+				return
+			}
+			var err error
+			for _, ruleIndex := range s.indexRuleIndex {
+				rule := ruleIndex.rule
+				switch rule.GetLocation() {
+				case databasev2.IndexRule_LOCATION_SERIES:
+					err = multierr.Append(err, writeLocalIndex(m.localWriter, ruleIndex, m.value))
+				case databasev2.IndexRule_LOCATION_GLOBAL:
+					err = multierr.Append(err, s.writeGlobalIndex(ruleIndex, m.localWriter.ItemID(), m.value))
+				}
+			}
+			err = multierr.Append(err, m.blockCloser.Close())
+			if err != nil {
+				s.l.Error().Err(err).Msg("encounter some errors when generating indices")
+			}
+		}
+	}()
+}
+
+//TODO: should listen to pipeline in a distributed cluster
+func (s *stream) writeGlobalIndex(ruleIndex indexRule, ref tsdb.ItemID, value *streamv2.ElementValue) error {
+	val, err := getIndexValue(ruleIndex, value)
+	if err != nil {
+		return err
+	}
+	indexShardID, err := partition.ShardID(val, s.schema.ShardNum)
+	if err != nil {
+		return err
+	}
+	shard, err := s.db.Shard(indexShardID)
+	if err != nil {
+		return err
+	}
+	builder := shard.Index().IndexWriterBuilder()
+	indexWriter, err := builder.
+		GlobalItemID(ref).
+		Time(value.GetTimestamp().AsTime()).
+		Build()
+	if err != nil {
+		return err
+	}
+	rule := ruleIndex.rule
+	switch rule.GetType() {
+	case databasev2.IndexRule_TYPE_INVERTED:
+		return indexWriter.WriteInvertedIndex(rule.Metadata.Name, val)
+	case databasev2.IndexRule_TYPE_TREE:
+		return indexWriter.WriteLSMIndex(rule.Metadata.Name, val)
+	}
+	return err
+}
+
+func writeLocalIndex(writer tsdb.Writer, ruleIndex indexRule, value *streamv2.ElementValue) (err error) {
+	val, err := getIndexValue(ruleIndex, value)
+	if err != nil {
+		return err
+	}
+	rule := ruleIndex.rule
+	switch rule.GetType() {
+	case databasev2.IndexRule_TYPE_INVERTED:
+		return writer.WriteInvertedIndex(rule.Metadata.Name, val)
+	case databasev2.IndexRule_TYPE_TREE:
+		return writer.WriteLSMIndex(rule.Metadata.Name, val)
+	}
+	return err
+}
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
index a187f29..d65978e 100644
--- a/banyand/stream/service.go
+++ b/banyand/stream/service.go
@@ -65,7 +65,7 @@ func (s *service) Name() string {
 }
 
 func (s *service) PreRun() error {
-	schemas, err := s.metadata.Stream().List(context.Background(), schema.ListOpt{})
+	schemas, err := s.metadata.Stream().List(context.TODO(), schema.ListOpt{})
 	if err != nil {
 		return err
 	}
@@ -73,7 +73,14 @@ func (s *service) PreRun() error {
 	s.schemaMap = make(map[string]*stream, len(schemas))
 	s.l = logger.GetLogger(s.Name())
 	for _, sa := range schemas {
-		sm, errTS := openStream(s.root, sa, s.l)
+		iRules, errIndexRules := s.metadata.IndexRules(context.TODO(), sa.Metadata)
+		if errIndexRules != nil {
+			return errIndexRules
+		}
+		sm, errTS := openStream(s.root, streamSpec{
+			schema:     sa,
+			indexRules: iRules,
+		}, s.l)
 		if errTS != nil {
 			return errTS
 		}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 88da951..338e1af 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -26,19 +26,30 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
+type tagIndex struct {
+	family int
+	tag    int
+}
+
+type indexRule struct {
+	rule       *databasev2.IndexRule
+	tagIndices []tagIndex
+}
 type stream struct {
-	name        string
-	group       string
-	l           *logger.Logger
-	schema      *databasev2.Stream
-	db          tsdb.Database
-	entityIndex []struct {
-		family int
-		tag    int
-	}
+	name           string
+	group          string
+	l              *logger.Logger
+	schema         *databasev2.Stream
+	db             tsdb.Database
+	entityIndex    []tagIndex
+	indexRules     []*databasev2.IndexRule
+	indexRuleIndex []indexRule
+
+	indexCh chan indexMessage
 }
 
 func (s *stream) Close() error {
+	close(s.indexCh)
 	return s.db.Close()
 }
 
@@ -47,37 +58,58 @@ func (s *stream) parseSchema() {
 	meta := sm.GetMetadata()
 	s.name, s.group = meta.GetName(), meta.GetGroup()
 	for _, tagInEntity := range sm.Entity.GetTagNames() {
-	nextEntityTag:
-		for fi, family := range sm.GetTagFamilies() {
-			for ti, tag := range family.Tags {
-				if tagInEntity == tag.GetName() {
-					s.entityIndex = append(s.entityIndex, struct {
-						family int
-						tag    int
-					}{family: fi, tag: ti})
-					break nextEntityTag
-				}
+		fIndex, tIndex, tag := s.findTagByName(tagInEntity)
+		if tag != nil {
+			s.entityIndex = append(s.entityIndex, tagIndex{family: fIndex, tag: tIndex})
+		}
+	}
+	for _, rule := range s.indexRules {
+		tagIndices := make([]tagIndex, 0, len(rule.GetTags()))
+		for _, tagInIndex := range rule.GetTags() {
+			fIndex, tIndex, tag := s.findTagByName(tagInIndex)
+			if tag != nil {
+				tagIndices = append(tagIndices, tagIndex{family: fIndex, tag: tIndex})
 			}
 		}
+		s.indexRuleIndex = append(s.indexRuleIndex, indexRule{rule: rule, tagIndices: tagIndices})
 	}
 }
 
-func openStream(root string, schema *databasev2.Stream, l *logger.Logger) (*stream, error) {
+func (s *stream) findTagByName(tagName string) (int, int, *databasev2.TagSpec) {
+	for fi, family := range s.schema.GetTagFamilies() {
+		for ti, tag := range family.Tags {
+			if tagName == tag.GetName() {
+				return fi, ti, tag
+			}
+		}
+	}
+	return 0, 0, nil
+}
+
+type streamSpec struct {
+	schema     *databasev2.Stream
+	indexRules []*databasev2.IndexRule
+}
+
+func openStream(root string, spec streamSpec, l *logger.Logger) (*stream, error) {
 	sm := &stream{
-		schema: schema,
-		l:      l,
+		schema:     spec.schema,
+		indexRules: spec.indexRules,
+		l:          l,
+		indexCh:    make(chan indexMessage),
 	}
 	sm.parseSchema()
 	db, err := tsdb.OpenDatabase(
 		context.WithValue(context.Background(), logger.ContextKey, l),
 		tsdb.DatabaseOpts{
 			Location: root,
-			ShardNum: uint(schema.GetShardNum()),
+			ShardNum: sm.schema.GetShardNum(),
 		})
 	if err != nil {
 		return nil, err
 	}
 	sm.db = db
+	sm.bootIndexGenerator()
 	return sm, nil
 }
 
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index c16e75b..7678ab8 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -35,7 +35,7 @@ var (
 	ErrUnsupportedTagTypeAsEntry = errors.New("the tag type can not be as an entry in an entity")
 )
 
-func (s *stream) Write(shardID uint, value *streamv2.ElementValue) error {
+func (s *stream) write(shardID uint, value *streamv2.ElementValue) error {
 	sm := s.schema
 	fLen := len(value.GetTagFamilies())
 	if fLen < 1 {
@@ -62,48 +62,85 @@ func (s *stream) Write(shardID uint, value *streamv2.ElementValue) error {
 		Duration: 0,
 	})
 	if err != nil {
+		if wp != nil {
+			_ = wp.Close()
+		}
 		return err
 	}
-	defer func() {
-		_ = wp.Close()
-	}()
-	builder := wp.WriterBuilder().Time(t)
-	for fi, family := range value.GetTagFamilies() {
-		familySpec := sm.GetTagFamilies()[fi]
-		if len(family.GetTags()) > len(familySpec.GetTags()) {
-			return errors.Wrap(ErrMalformedElement, "tag number is more than expected")
-		}
-		for ti, tag := range family.GetTags() {
-			tagSpec := familySpec.GetTags()[ti]
-			tType, isNull := tagValueTypeConv(tag)
-			if isNull {
-				continue
+	writeFn := func() (tsdb.Writer, error) {
+		builder := wp.WriterBuilder().Time(t)
+		for fi, family := range value.GetTagFamilies() {
+			familySpec := sm.GetTagFamilies()[fi]
+			if len(family.GetTags()) > len(familySpec.GetTags()) {
+				return nil, errors.Wrap(ErrMalformedElement, "tag number is more than expected")
 			}
-			if tType != tagSpec.GetType() {
-				return errors.Wrapf(ErrMalformedElement, "tag %s type is unexpected", tagSpec.GetName())
+			for ti, tag := range family.GetTags() {
+				tagSpec := familySpec.GetTags()[ti]
+				tType, isNull := tagValueTypeConv(tag)
+				if isNull {
+					continue
+				}
+				if tType != tagSpec.GetType() {
+					return nil, errors.Wrapf(ErrMalformedElement, "tag %s type is unexpected", tagSpec.GetName())
+				}
 			}
+			bb, errMarshal := proto.Marshal(family)
+			if errMarshal != nil {
+				return nil, errMarshal
+			}
+			builder.Family(sm.GetTagFamilies()[fi].GetName(), bb)
 		}
-		bb, errMarshal := proto.Marshal(family)
-		if errMarshal != nil {
-			return errMarshal
+		writer, errWrite := builder.Build()
+		if errWrite != nil {
+			return nil, errWrite
 		}
-		builder.Family(sm.GetTagFamilies()[fi].GetName(), bb)
+		_, errWrite = writer.Write()
+		return writer, errWrite
 	}
-	writer, err := builder.Build()
+	writer, err := writeFn()
 	if err != nil {
+		_ = wp.Close()
 		return err
 	}
-	_, err = writer.Write()
+	m := indexMessage{
+		localWriter: writer,
+		value:       value,
+		blockCloser: wp,
+	}
+	go func(m indexMessage) {
+		defer func() {
+			if recover() != nil {
+				_ = m.blockCloser.Close()
+			}
+		}()
+		s.indexCh <- m
+	}(m)
 	return err
 }
 
+func getIndexValue(ruleIndex indexRule, value *streamv2.ElementValue) (val []byte, err error) {
+	val = make([]byte, 0, len(ruleIndex.tagIndices))
+	for _, tIndex := range ruleIndex.tagIndices {
+		tag, err := getTagByOffset(value, tIndex.family, tIndex.tag)
+		if err != nil {
+			return nil, errors.WithMessagef(err, "index rule:%v", ruleIndex.rule.Metadata)
+		}
+		v, err := proto.Marshal(tag)
+		if err != nil {
+			return nil, err
+		}
+		val = append(val, v...)
+	}
+	return val, nil
+}
+
 func (s *stream) buildEntity(value *streamv2.ElementValue) (entity tsdb.Entity, err error) {
 	for _, index := range s.entityIndex {
-		family := value.GetTagFamilies()[index.family]
-		if index.tag >= len(family.GetTags()) {
-			return nil, errors.Wrap(ErrMalformedElement, "the tag which composite the entity doesn't exist ")
+		tag, err := getTagByOffset(value, index.family, index.tag)
+		if err != nil {
+			return nil, err
 		}
-		entry, err := tagConvEntry(family.GetTags()[index.tag])
+		entry, err := tagConvEntry(tag)
 		if err != nil {
 			return nil, err
 		}
@@ -112,6 +149,17 @@ func (s *stream) buildEntity(value *streamv2.ElementValue) (entity tsdb.Entity,
 	return entity, nil
 }
 
+func getTagByOffset(value *streamv2.ElementValue, fIndex, tIndex int) (*modelv2.Tag, error) {
+	if fIndex >= len(value.TagFamilies) {
+		return nil, errors.Wrap(ErrMalformedElement, "tag family offset is invalid")
+	}
+	family := value.GetTagFamilies()[fIndex]
+	if tIndex >= len(family.GetTags()) {
+		return nil, errors.Wrap(ErrMalformedElement, "tag offset is invalid")
+	}
+	return family.GetTags()[tIndex], nil
+}
+
 func tagConvEntry(tag *modelv2.Tag) (tsdb.Entry, error) {
 	switch tag.GetValueType().(type) {
 	case *modelv2.Tag_Str:
@@ -144,7 +192,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
 	}
 	sm := writeEvent.WriteRequest.GetMetadata()
 	id := formatStreamID(sm.GetName(), sm.GetGroup())
-	err := w.schemaMap[id].Write(writeEvent.ShardID, writeEvent.WriteRequest.GetElement())
+	err := w.schemaMap[id].write(writeEvent.ShardID, writeEvent.WriteRequest.GetElement())
 	if err != nil {
 		w.l.Debug().Err(err)
 	}
diff --git a/banyand/stream/stream_write_test.go b/banyand/stream/stream_write_test.go
index dac9281..f02ff0a 100644
--- a/banyand/stream/stream_write_test.go
+++ b/banyand/stream/stream_write_test.go
@@ -29,6 +29,7 @@ import (
 	commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
 	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/banyand/metadata"
 	"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/test"
@@ -195,7 +196,7 @@ func Test_Stream_Write(t *testing.T) {
 
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			err := s.Write(tt.args.shardID, tt.args.ele)
+			err := s.write(tt.args.shardID, tt.args.ele)
 			if tt.wantErr {
 				tester.Error(err)
 				return
@@ -214,12 +215,20 @@ func setup(t *assert.Assertions) (*stream, func()) {
 	tempDir, deferFunc := test.Space(t)
 	streamRepo, err := schema.NewStream()
 	t.NoError(err)
-	streamSpec, err := streamRepo.Get(context.TODO(), &commonv2.Metadata{
+	sa, err := streamRepo.Get(context.TODO(), &commonv2.Metadata{
 		Name:  "sw",
 		Group: "default",
 	})
 	t.NoError(err)
-	s, err := openStream(tempDir, streamSpec, logger.GetLogger("test"))
+	mService, err := metadata.NewService(context.TODO())
+	t.NoError(err)
+	iRules, err := mService.IndexRules(context.TODO(), sa.Metadata)
+	t.NoError(err)
+	sSpec := streamSpec{
+		schema:     sa,
+		indexRules: iRules,
+	}
+	s, err := openStream(tempDir, sSpec, logger.GetLogger("test"))
 	t.NoError(err)
 	return s, func() {
 		_ = s.Close()
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 6505582..ad2483a 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -38,11 +38,15 @@ type block struct {
 	closableLst []io.Closer
 	endTime     time.Time
 	startTime   time.Time
+	segID       uint16
+	blockID     uint16
 
 	//revertedIndex kv.Store
 }
 
 type blockOpts struct {
+	segID         uint16
+	blockID       uint16
 	path          string
 	compressLevel int
 	valueSize     int
@@ -50,6 +54,8 @@ type blockOpts struct {
 
 func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 	b = &block{
+		segID:     opts.segID,
+		blockID:   opts.blockID,
 		path:      opts.path,
 		ref:       z.NewCloser(1),
 		startTime: time.Now(),
@@ -98,6 +104,9 @@ type blockDelegate interface {
 	io.Closer
 	contains(ts time.Time) bool
 	write(key []byte, val []byte, ts time.Time) error
+	writeLSMIndex(key []byte, val []byte) error
+	writeInvertedIndex(key []byte, val []byte) error
+	identity() (segID uint16, blockID uint16)
 }
 
 var _ blockDelegate = (*bDelegate)(nil)
@@ -106,10 +115,22 @@ type bDelegate struct {
 	delegate *block
 }
 
+func (d *bDelegate) identity() (segID uint16, blockID uint16) {
+	return d.delegate.segID, d.delegate.blockID
+}
+
 func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
 	return d.delegate.store.Put(key, val, uint64(ts.UnixNano()))
 }
 
+func (d *bDelegate) writeLSMIndex(key []byte, val []byte) error {
+	return d.delegate.treeIndex.Put(key, val)
+}
+
+func (d *bDelegate) writeInvertedIndex(key []byte, val []byte) error {
+	return d.delegate.treeIndex.Put(key, val)
+}
+
 func (d *bDelegate) contains(ts time.Time) bool {
 	greaterAndEqualStart := d.delegate.startTime.Equal(ts) || d.delegate.startTime.Before(ts)
 	if d.delegate.endTime.IsZero() {
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index 98e60e9..0b3fb80 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -17,10 +17,106 @@
 
 package tsdb
 
-import databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+import (
+	"bytes"
+	"context"
+	"time"
+
+	"github.com/pkg/errors"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+)
 
 type IndexDatabase interface {
-	CreateSeries(rule *databasev2.IndexRule) error
-	CreateGlobal(rule *databasev2.IndexRule) error
-	WriteGlobal() error
+	IndexWriterBuilder() IndexWriterBuilder
+}
+
+type IndexWriter interface {
+	WriteLSMIndex(name string, val []byte) error
+	WriteInvertedIndex(name string, val []byte) error
+}
+
+type IndexWriterBuilder interface {
+	Time(ts time.Time) IndexWriterBuilder
+	GlobalItemID(itemID ItemID) IndexWriterBuilder
+	Build() (IndexWriter, error)
+}
+
+var _ IndexDatabase = (*indexDB)(nil)
+
+type indexDB struct {
+	shardID common.ShardID
+	lst     []*segment
+}
+
+func (i *indexDB) IndexWriterBuilder() IndexWriterBuilder {
+	return newIndexWriterBuilder(i.lst)
+}
+
+func newIndexDatabase(_ context.Context, id common.ShardID, lst []*segment) (IndexDatabase, error) {
+	return &indexDB{
+		shardID: id,
+		lst:     lst,
+	}, nil
+}
+
+var _ IndexWriterBuilder = (*indexWriterBuilder)(nil)
+
+type indexWriterBuilder struct {
+	segments     []*segment
+	ts           time.Time
+	seg          *segment
+	globalItemID *ItemID
+}
+
+func (i *indexWriterBuilder) Time(ts time.Time) IndexWriterBuilder {
+	i.ts = ts
+	for _, s := range i.segments {
+		if s.contains(ts) {
+			i.seg = s
+			break
+		}
+	}
+	return i
+}
+
+func (i *indexWriterBuilder) GlobalItemID(itemID ItemID) IndexWriterBuilder {
+	i.globalItemID = &itemID
+	return i
+}
+
+func (i *indexWriterBuilder) Build() (IndexWriter, error) {
+	if i.seg == nil {
+		return nil, errors.WithStack(ErrNoTime)
+	}
+	if i.globalItemID == nil {
+		return nil, errors.WithStack(ErrNoVal)
+	}
+	return &indexWriter{
+		seg:    i.seg,
+		ts:     i.ts,
+		itemID: i.globalItemID,
+	}, nil
+}
+
+func newIndexWriterBuilder(segments []*segment) IndexWriterBuilder {
+	return &indexWriterBuilder{
+		segments: segments,
+	}
+}
+
+var _ IndexWriter = (*indexWriter)(nil)
+
+type indexWriter struct {
+	seg    *segment
+	ts     time.Time
+	itemID *ItemID
+}
+
+func (i *indexWriter) WriteLSMIndex(name string, val []byte) error {
+	return i.seg.globalIndex.Put(bytes.Join([][]byte{[]byte(name), val}, nil), i.itemID.Marshal())
+}
+
+func (i *indexWriter) WriteInvertedIndex(name string, val []byte) error {
+	return i.seg.globalIndex.Put(bytes.Join([][]byte{[]byte(name), val}, nil), i.itemID.Marshal())
 }
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 80af39f..b2dad50 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -21,25 +21,54 @@ import (
 	"context"
 	"sync"
 	"time"
+
+	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 type segment struct {
 	path string
 
-	lst []*block
+	lst         []*block
+	globalIndex kv.Store
 	sync.Mutex
+	l         *logger.Logger
+	startTime time.Time
+	endTime   time.Time
+}
+
+func (s *segment) contains(ts time.Time) bool {
+	greaterAndEqualStart := s.startTime.Equal(ts) || s.startTime.Before(ts)
+	if s.endTime.IsZero() {
+		return greaterAndEqualStart
+	}
+	return greaterAndEqualStart && s.endTime.After(ts)
 }
 
 func newSegment(ctx context.Context, path string) (s *segment, err error) {
 	s = &segment{
-		path: path,
+		path:      path,
+		startTime: time.Now(),
+	}
+	parentLogger := ctx.Value(logger.ContextKey)
+	if parentLogger != nil {
+		if pl, ok := parentLogger.(*logger.Logger); ok {
+			s.l = pl.Named("segment")
+		}
+	}
+	indexPath, err := mkdir(globalIndexTemplate, path)
+	if err != nil {
+		return nil, err
+	}
+	if s.globalIndex, err = kv.OpenStore(0, indexPath, kv.StoreWithLogger(s.l)); err != nil {
+		return nil, err
 	}
 	blockPath, err := mkdir(blockTemplate, path, time.Now().Format(blockFormat))
 	if err != nil {
 		return nil, err
 	}
 	var b *block
-	if b, err = newBlock(ctx, blockOpts{
+	if b, err = newBlock(context.WithValue(ctx, logger.ContextKey, s.l), blockOpts{
 		path: blockPath,
 	}); err != nil {
 		return nil, err
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index ee8ce57..bdbb7d4 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -30,7 +30,10 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 )
 
-var ErrEmptySeriesSpan = errors.New("there is no data in such time range")
+var (
+	ErrEmptySeriesSpan = errors.New("there is no data in such time range")
+	ErrItemIDMalformed = errors.New("serialized item id is malformed")
+)
 
 type Iterator interface {
 	Next() bool
@@ -51,10 +54,34 @@ type ConditionValue struct {
 type Condition map[string][]ConditionValue
 
 type ItemID struct {
-	//shardID int
-	//segID   []byte
-	//blockID []byte
-	id []byte
+	shardID common.ShardID
+	segID   uint16
+	blockID uint16
+	id      []byte
+}
+
+func (i *ItemID) Marshal() []byte {
+	return bytes.Join([][]byte{
+		convert.Uint32ToBytes(uint32(i.shardID)),
+		convert.Uint16ToBytes(i.segID),
+		convert.Uint16ToBytes(i.blockID),
+		i.id,
+	}, nil)
+}
+
+func (i *ItemID) UnMarshal(data []byte) error {
+	if len(data) <= 32+16+16 {
+		return ErrItemIDMalformed
+	}
+	var offset int
+	i.shardID = common.ShardID(convert.BytesToUint32(data[offset : offset+4]))
+	offset += 4
+	i.segID = convert.BytesToUint16(data[offset : offset+2])
+	offset += 2
+	i.blockID = convert.BytesToUint16(data[offset : offset+2])
+	offset += 2
+	i.id = data[offset:]
+	return nil
 }
 
 type TimeRange struct {
@@ -83,9 +110,9 @@ type WriterBuilder interface {
 }
 
 type Writer interface {
+	IndexWriter
 	Write() (ItemID, error)
-	WriteLSMIndex(name string, val []byte) error
-	WriteInvertedIndex(name string, val []byte) error
+	ItemID() ItemID
 }
 
 type SeekerBuilder interface {
@@ -104,12 +131,14 @@ var _ Series = (*series)(nil)
 type series struct {
 	id      common.SeriesID
 	blockDB blockDatabase
+	shardID common.ShardID
 }
 
 func newSeries(id common.SeriesID, blockDB blockDatabase) *series {
 	return &series{
 		id:      id,
 		blockDB: blockDB,
+		shardID: blockDB.shardID(),
 	}
 }
 
@@ -122,7 +151,7 @@ func (s *series) Span(timeRange TimeRange) (SeriesSpan, error) {
 	if len(blocks) < 1 {
 		return nil, ErrEmptySeriesSpan
 	}
-	return newSeriesSpan(blocks, s.id), nil
+	return newSeriesSpan(blocks, s.id, s.shardID), nil
 }
 
 func (s *series) Get(id ItemID) (Item, error) {
@@ -134,6 +163,7 @@ var _ SeriesSpan = (*seriesSpan)(nil)
 type seriesSpan struct {
 	blocks   []blockDelegate
 	seriesID common.SeriesID
+	shardID  common.ShardID
 }
 
 func (s *seriesSpan) Close() (err error) {
@@ -155,10 +185,11 @@ func (s *seriesSpan) SeekerBuilder() SeekerBuilder {
 	panic("implement me")
 }
 
-func newSeriesSpan(blocks []blockDelegate, id common.SeriesID) *seriesSpan {
+func newSeriesSpan(blocks []blockDelegate, id common.SeriesID, shardID common.ShardID) *seriesSpan {
 	return &seriesSpan{
 		blocks:   blocks,
 		seriesID: id,
+		shardID:  shardID,
 	}
 }
 
@@ -207,19 +238,23 @@ var ErrNoVal = errors.New("no value specified")
 
 func (w *writerBuilder) Build() (Writer, error) {
 	if w.block == nil {
-		return nil, ErrNoTime
+		return nil, errors.WithStack(ErrNoTime)
 	}
 	if len(w.values) < 1 {
-		return nil, ErrNoVal
+		return nil, errors.WithStack(ErrNoVal)
 	}
-	wt := &writer{
-		block:    w.block,
-		ts:       w.ts,
-		seriesID: w.seriesIDBytes,
-		itemID:   bytes.Join([][]byte{w.seriesIDBytes, convert.Int64ToBytes(w.ts.UnixNano())}, nil),
-		columns:  w.values,
-	}
-	return wt, nil
+	segID, blockID := w.block.identity()
+	return &writer{
+		block: w.block,
+		ts:    w.ts,
+		itemID: &ItemID{
+			shardID: w.series.shardID,
+			segID:   segID,
+			blockID: blockID,
+			id:      bytes.Join([][]byte{w.seriesIDBytes, convert.Int64ToBytes(w.ts.UnixNano())}, nil),
+		},
+		columns: w.values,
+	}, nil
 }
 
 func newWriterBuilder(seriesSpan *seriesSpan) WriterBuilder {
@@ -232,22 +267,25 @@ func newWriterBuilder(seriesSpan *seriesSpan) WriterBuilder {
 var _ Writer = (*writer)(nil)
 
 type writer struct {
-	block    blockDelegate
-	ts       time.Time
-	seriesID []byte
-	columns  []struct {
+	block   blockDelegate
+	ts      time.Time
+	columns []struct {
 		family []byte
 		val    []byte
 	}
-	itemID []byte
+	itemID *ItemID
+}
+
+func (w *writer) ItemID() ItemID {
+	return *w.itemID
 }
 
 func (w *writer) WriteLSMIndex(name string, val []byte) error {
-	panic("implement me")
+	return w.block.writeLSMIndex(bytes.Join([][]byte{[]byte(name), val}, nil), w.itemID.id)
 }
 
 func (w *writer) WriteInvertedIndex(name string, val []byte) error {
-	panic("implement me")
+	return w.block.writeInvertedIndex(bytes.Join([][]byte{[]byte(name), val}, nil), w.itemID.id)
 }
 
 func (w *writer) Write() (id ItemID, err error) {
@@ -257,6 +295,5 @@ func (w *writer) Write() (id ItemID, err error) {
 			return id, err
 		}
 	}
-	id.id = w.itemID
 	return id, nil
 }
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index aa9ec22..8c0e9fb 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -23,7 +23,6 @@ import (
 	"io"
 	"math"
 	"sync"
-	"time"
 
 	"go.uber.org/multierr"
 
@@ -85,6 +84,7 @@ type SeriesDatabase interface {
 }
 
 type blockDatabase interface {
+	shardID() common.ShardID
 	span(timeRange TimeRange) []blockDelegate
 }
 
@@ -97,6 +97,11 @@ type seriesDB struct {
 
 	lst            []*segment
 	seriesMetadata kv.Store
+	sID            common.ShardID
+}
+
+func (s *seriesDB) shardID() common.ShardID {
+	return s.sID
 }
 
 func (s *seriesDB) Get(entity Entity) (Series, error) {
@@ -168,8 +173,11 @@ func (s *seriesDB) Close() error {
 	return s.seriesMetadata.Close()
 }
 
-func newSeriesDataBase(ctx context.Context, path string) (SeriesDatabase, error) {
-	sdb := &seriesDB{}
+func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string, segLst []*segment) (SeriesDatabase, error) {
+	sdb := &seriesDB{
+		sID: shardID,
+		lst: segLst,
+	}
 	parentLogger := ctx.Value(logger.ContextKey)
 	if parentLogger == nil {
 		return nil, logger.ErrNoLoggerInContext
@@ -182,19 +190,6 @@ func newSeriesDataBase(ctx context.Context, path string) (SeriesDatabase, error)
 	if err != nil {
 		return nil, err
 	}
-	segPath, err := mkdir(segTemplate, path, time.Now().Format(segFormat))
-	if err != nil {
-		return nil, err
-	}
-	seg, err := newSegment(ctx, segPath)
-	if err != nil {
-		return nil, err
-	}
-	{
-		sdb.Lock()
-		defer sdb.Unlock()
-		sdb.lst = append(sdb.lst, seg)
-	}
 	return sdb, nil
 }
 
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index 83b3008..3968564 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -180,7 +180,7 @@ func Test_SeriesDatabase_Get(t *testing.T) {
 		t.Run(tt.name, func(t *testing.T) {
 			dir, deferFunc := test.Space(tester)
 			defer deferFunc()
-			s, err := newSeriesDataBase(context.WithValue(context.Background(), logger.ContextKey, logger.GetLogger("test")), dir)
+			s, err := newSeriesDataBase(context.WithValue(context.Background(), logger.ContextKey, logger.GetLogger("test")), 0, dir, nil)
 			tester.NoError(err)
 			for _, entity := range tt.entities {
 				series, err := s.Get(entity)
@@ -199,7 +199,7 @@ func Test_SeriesDatabase_List(t *testing.T) {
 	}))
 	dir, deferFunc := test.Space(tester)
 	defer deferFunc()
-	s, err := newSeriesDataBase(context.WithValue(context.Background(), logger.ContextKey, logger.GetLogger("test")), dir)
+	s, err := newSeriesDataBase(context.WithValue(context.Background(), logger.ContextKey, logger.GetLogger("test")), 0, dir, nil)
 	tester.NoError(err)
 	data := setUpEntities(tester, s)
 	tests := []struct {
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 94cb90a..3b2ed4e 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -19,15 +19,22 @@ package tsdb
 
 import (
 	"context"
+	"sync"
+	"time"
+
+	"github.com/apache/skywalking-banyandb/api/common"
 )
 
 var _ Shard = (*shard)(nil)
 
 type shard struct {
-	id int
+	sync.Mutex
+	id common.ShardID
 
 	location       string
 	seriesDatabase SeriesDatabase
+	indexDatabase  IndexDatabase
+	lst            []*segment
 }
 
 func (s *shard) Series() SeriesDatabase {
@@ -35,24 +42,41 @@ func (s *shard) Series() SeriesDatabase {
 }
 
 func (s *shard) Index() IndexDatabase {
-	panic("implement me")
+	return s.indexDatabase
 }
 
-func newShard(ctx context.Context, id int, location string) (*shard, error) {
+func newShard(ctx context.Context, id common.ShardID, location string) (*shard, error) {
 	s := &shard{
 		id:       id,
 		location: location,
 	}
+	segPath, err := mkdir(segTemplate, location, time.Now().Format(segFormat))
+	if err != nil {
+		return nil, err
+	}
+	seg, err := newSegment(ctx, segPath)
+	if err != nil {
+		return nil, err
+	}
+	{
+		s.Lock()
+		defer s.Unlock()
+		s.lst = append(s.lst, seg)
+	}
 	seriesPath, err := mkdir(seriesTemplate, s.location)
 	if err != nil {
 		return nil, err
 	}
-	sdb, err := newSeriesDataBase(ctx, seriesPath)
+	sdb, err := newSeriesDataBase(ctx, s.id, seriesPath, s.lst)
 	if err != nil {
 		return nil, err
 	}
 	s.seriesDatabase = sdb
-
+	idb, err := newIndexDatabase(ctx, s.id, s.lst)
+	if err != nil {
+		return nil, err
+	}
+	s.indexDatabase = idb
 	return s, nil
 }
 
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 3d20a88..66ba915 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -29,14 +29,16 @@ import (
 	"github.com/pkg/errors"
 	"go.uber.org/multierr"
 
+	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 const (
-	shardTemplate  = "%s/shard-%d"
-	seriesTemplate = "%s/series"
-	segTemplate    = "%s/seg-%s"
-	blockTemplate  = "%s/block-%s"
+	shardTemplate       = "%s/shard-%d"
+	seriesTemplate      = "%s/series"
+	segTemplate         = "%s/seg-%s"
+	blockTemplate       = "%s/block-%s"
+	globalIndexTemplate = "%s/index"
 
 	segFormat   = "20060102"
 	blockFormat = "1504"
@@ -62,13 +64,13 @@ var _ Database = (*database)(nil)
 
 type DatabaseOpts struct {
 	Location string
-	ShardNum uint
+	ShardNum uint32
 }
 
 type database struct {
 	logger   *logger.Logger
 	location string
-	shardNum uint
+	shardNum uint32
 
 	sLst []Shard
 	sync.Mutex
@@ -123,13 +125,13 @@ func createDatabase(ctx context.Context, db *database) (Database, error) {
 	var err error
 	db.Lock()
 	defer db.Unlock()
-	for i := 0; i < int(db.shardNum); i++ {
+	for i := uint32(0); i < db.shardNum; i++ {
 		shardLocation, errInternal := mkdir(shardTemplate, db.location, i)
 		if errInternal != nil {
 			err = multierr.Append(err, errInternal)
 			continue
 		}
-		so, errNewShard := newShard(ctx, i, shardLocation)
+		so, errNewShard := newShard(ctx, common.ShardID(i), shardLocation)
 		if errNewShard != nil {
 			err = multierr.Append(err, errNewShard)
 			continue