You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/09/03 17:40:53 UTC

[skywalking-banyandb] 01/02: Add stream moduel

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit bbf89b1146ae04c11bf8d0946b10d8cf6f16ade0
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Sep 2 00:23:29 2021 +0800

    Add stream moduel
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 .../model/v2/write.proto => data/stream.go}        |  36 ++-
 api/proto/banyandb/database/v2/schema.pb.go        |  37 +--
 api/proto/banyandb/database/v2/schema.proto        |   1 +
 api/proto/banyandb/model/v2/write.pb.go            |  38 ++-
 api/proto/banyandb/model/v2/write.proto            |   1 +
 api/proto/banyandb/stream/v2/write.pb.go           | 169 +++++++-----
 api/proto/banyandb/stream/v2/write.proto           |  14 +-
 banyand/metadata/metadata.go                       |  21 +-
 banyand/metadata/schema/data/stream.json           |   2 +-
 banyand/stream/service.go                          | 102 ++++++++
 banyand/stream/stream.go                           | 104 ++++++++
 banyand/stream/stream_write.go                     | 152 +++++++++++
 banyand/stream/stream_write_test.go                | 282 +++++++++++++++++++++
 banyand/stream/testdata/shard0.json                |  18 ++
 banyand/tsdb/block.go                              |  57 ++++-
 banyand/tsdb/series.go                             | 168 +++++++++++-
 banyand/tsdb/seriesdb.go                           | 113 +++++----
 banyand/tsdb/seriesdb_test.go                      |  56 ++--
 banyand/tsdb/shard.go                              |   2 +-
 banyand/tsdb/tsdb.go                               |  32 ++-
 go.mod                                             |   1 +
 21 files changed, 1205 insertions(+), 201 deletions(-)

diff --git a/api/proto/banyandb/model/v2/write.proto b/api/data/stream.go
similarity index 56%
copy from api/proto/banyandb/model/v2/write.proto
copy to api/data/stream.go
index 836423a..65fc2ef 100644
--- a/api/proto/banyandb/model/v2/write.proto
+++ b/api/data/stream.go
@@ -15,22 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-syntax = "proto3";
+package data
 
-option java_package = "org.apache.skywalking.banyandb.model.v2";
-option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2";
+import (
+	"github.com/apache/skywalking-banyandb/api/common"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+)
 
-package banyandb.model.v2;
+var StreamKindVersion = common.KindVersion{Version: "v2", Kind: "data-stream"}
 
-import "google/protobuf/struct.proto";
-import "banyandb/model/v2/common.proto";
+var StreamWriteEventKindVersion = common.KindVersion{
+	Version: "v2",
+	Kind:    "stream-write",
+}
+
+var TopicStreamWriteEvent = bus.UniTopic(StreamWriteEventKindVersion.String())
+
+type Stream struct {
+	common.KindVersion
+	Entities []Entity
+}
 
-message Tag {
-    oneof value_type {
-        google.protobuf.NullValue null = 1;
-        Str str = 2;
-        StrArray str_array = 3;
-        Int int = 4;
-        IntArray int_array = 5;
-    }
+type StreamWriteData struct {
+	ShardID      uint
+	SeriesID     uint64
+	WriteRequest *streamv2.WriteRequest
 }
diff --git a/api/proto/banyandb/database/v2/schema.pb.go b/api/proto/banyandb/database/v2/schema.pb.go
index 5f8168e..f2fed85 100644
--- a/api/proto/banyandb/database/v2/schema.pb.go
+++ b/api/proto/banyandb/database/v2/schema.pb.go
@@ -49,6 +49,7 @@ const (
 	TagType_TAG_TYPE_INT          TagType = 2
 	TagType_TAG_TYPE_STRING_ARRAY TagType = 3
 	TagType_TAG_TYPE_INT_ARRAY    TagType = 4
+	TagType_TAG_TYPE_DATA_BINARY  TagType = 5
 )
 
 // Enum value maps for TagType.
@@ -59,6 +60,7 @@ var (
 		2: "TAG_TYPE_INT",
 		3: "TAG_TYPE_STRING_ARRAY",
 		4: "TAG_TYPE_INT_ARRAY",
+		5: "TAG_TYPE_DATA_BINARY",
 	}
 	TagType_value = map[string]int32{
 		"TAG_TYPE_UNSPECIFIED":  0,
@@ -66,6 +68,7 @@ var (
 		"TAG_TYPE_INT":          2,
 		"TAG_TYPE_STRING_ARRAY": 3,
 		"TAG_TYPE_INT_ARRAY":    4,
+		"TAG_TYPE_DATA_BINARY":  5,
 	}
 )
 
@@ -912,22 +915,24 @@ var file_banyandb_database_v2_schema_proto_rawDesc = []byte{
 	0x64, 0x5f, 0x61, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f,
 	0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d,
 	0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41,
-	0x74, 0x2a, 0x7d, 0x0a, 0x07, 0x54, 0x61, 0x67, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14,
-	0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49,
-	0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59,
-	0x50, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x54,
-	0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x49, 0x4e, 0x54, 0x10, 0x02, 0x12, 0x19, 0x0a,
-	0x15, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47,
-	0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10, 0x03, 0x12, 0x16, 0x0a, 0x12, 0x54, 0x41, 0x47, 0x5f,
-	0x54, 0x59, 0x50, 0x45, 0x5f, 0x49, 0x4e, 0x54, 0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10, 0x04,
-	0x42, 0x72, 0x0a, 0x2a, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73,
-	0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e,
-	0x64, 0x62, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x76, 0x32, 0x5a, 0x44,
-	0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68,
-	0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e,
-	0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f,
-	0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73,
-	0x65, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x74, 0x2a, 0x97, 0x01, 0x0a, 0x07, 0x54, 0x61, 0x67, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a,
+	0x14, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43,
+	0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x41, 0x47, 0x5f, 0x54,
+	0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c,
+	0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x49, 0x4e, 0x54, 0x10, 0x02, 0x12, 0x19,
+	0x0a, 0x15, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e,
+	0x47, 0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10, 0x03, 0x12, 0x16, 0x0a, 0x12, 0x54, 0x41, 0x47,
+	0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x49, 0x4e, 0x54, 0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10,
+	0x04, 0x12, 0x18, 0x0a, 0x14, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x44, 0x41,
+	0x54, 0x41, 0x5f, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x10, 0x05, 0x42, 0x72, 0x0a, 0x2a, 0x6f,
+	0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c,
+	0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64, 0x61,
+	0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x76, 0x32, 0x5a, 0x44, 0x67, 0x69, 0x74, 0x68, 0x75,
+	0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79,
+	0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
+	0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61,
+	0x6e, 0x64, 0x62, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2f, 0x76, 0x32, 0x62,
+	0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
diff --git a/api/proto/banyandb/database/v2/schema.proto b/api/proto/banyandb/database/v2/schema.proto
index c3745ff..cb358aa 100644
--- a/api/proto/banyandb/database/v2/schema.proto
+++ b/api/proto/banyandb/database/v2/schema.proto
@@ -44,6 +44,7 @@ enum TagType {
     TAG_TYPE_INT = 2;
     TAG_TYPE_STRING_ARRAY = 3;
     TAG_TYPE_INT_ARRAY = 4;
+    TAG_TYPE_DATA_BINARY = 5;
 }
 
 message TagFamily {
diff --git a/api/proto/banyandb/model/v2/write.pb.go b/api/proto/banyandb/model/v2/write.pb.go
index 0d36e62..1640f11 100644
--- a/api/proto/banyandb/model/v2/write.pb.go
+++ b/api/proto/banyandb/model/v2/write.pb.go
@@ -50,6 +50,7 @@ type Tag struct {
 	//	*Tag_StrArray
 	//	*Tag_Int
 	//	*Tag_IntArray
+	//	*Tag_BinaryData
 	ValueType isTag_ValueType `protobuf_oneof:"value_type"`
 }
 
@@ -127,6 +128,13 @@ func (x *Tag) GetIntArray() *IntArray {
 	return nil
 }
 
+func (x *Tag) GetBinaryData() []byte {
+	if x, ok := x.GetValueType().(*Tag_BinaryData); ok {
+		return x.BinaryData
+	}
+	return nil
+}
+
 type isTag_ValueType interface {
 	isTag_ValueType()
 }
@@ -151,6 +159,10 @@ type Tag_IntArray struct {
 	IntArray *IntArray `protobuf:"bytes,5,opt,name=int_array,json=intArray,proto3,oneof"`
 }
 
+type Tag_BinaryData struct {
+	BinaryData []byte `protobuf:"bytes,6,opt,name=binary_data,json=binaryData,proto3,oneof"`
+}
+
 func (*Tag_Null) isTag_ValueType() {}
 
 func (*Tag_Str) isTag_ValueType() {}
@@ -161,6 +173,8 @@ func (*Tag_Int) isTag_ValueType() {}
 
 func (*Tag_IntArray) isTag_ValueType() {}
 
+func (*Tag_BinaryData) isTag_ValueType() {}
+
 var File_banyandb_model_v2_write_proto protoreflect.FileDescriptor
 
 var file_banyandb_model_v2_write_proto_rawDesc = []byte{
@@ -171,7 +185,7 @@ var file_banyandb_model_v2_write_proto_rawDesc = []byte{
 	0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
 	0x1a, 0x1e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c,
 	0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
-	0x22, 0x95, 0x02, 0x0a, 0x03, 0x54, 0x61, 0x67, 0x12, 0x30, 0x0a, 0x04, 0x6e, 0x75, 0x6c, 0x6c,
+	0x22, 0xb8, 0x02, 0x0a, 0x03, 0x54, 0x61, 0x67, 0x12, 0x30, 0x0a, 0x04, 0x6e, 0x75, 0x6c, 0x6c,
 	0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
 	0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x4e, 0x75, 0x6c, 0x6c, 0x56, 0x61, 0x6c,
 	0x75, 0x65, 0x48, 0x00, 0x52, 0x04, 0x6e, 0x75, 0x6c, 0x6c, 0x12, 0x2a, 0x0a, 0x03, 0x73, 0x74,
@@ -187,15 +201,18 @@ var file_banyandb_model_v2_write_proto_rawDesc = []byte{
 	0x0a, 0x09, 0x69, 0x6e, 0x74, 0x5f, 0x61, 0x72, 0x72, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28,
 	0x0b, 0x32, 0x1b, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64,
 	0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x49, 0x6e, 0x74, 0x41, 0x72, 0x72, 0x61, 0x79, 0x48, 0x00,
-	0x52, 0x08, 0x69, 0x6e, 0x74, 0x41, 0x72, 0x72, 0x61, 0x79, 0x42, 0x0c, 0x0a, 0x0a, 0x76, 0x61,
-	0x6c, 0x75, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x42, 0x6c, 0x0a, 0x27, 0x6f, 0x72, 0x67, 0x2e,
-	0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e,
-	0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c,
-	0x2e, 0x76, 0x32, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
-	0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e,
-	0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70,
-	0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x6d, 0x6f,
-	0x64, 0x65, 0x6c, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x52, 0x08, 0x69, 0x6e, 0x74, 0x41, 0x72, 0x72, 0x61, 0x79, 0x12, 0x21, 0x0a, 0x0b, 0x62, 0x69,
+	0x6e, 0x61, 0x72, 0x79, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x48,
+	0x00, 0x52, 0x0a, 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x44, 0x61, 0x74, 0x61, 0x42, 0x0c, 0x0a,
+	0x0a, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x42, 0x6c, 0x0a, 0x27, 0x6f,
+	0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c,
+	0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f,
+	0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63,
+	0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c,
+	0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70,
+	0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
+	0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
+	0x33,
 }
 
 var (
@@ -258,6 +275,7 @@ func file_banyandb_model_v2_write_proto_init() {
 		(*Tag_StrArray)(nil),
 		(*Tag_Int)(nil),
 		(*Tag_IntArray)(nil),
+		(*Tag_BinaryData)(nil),
 	}
 	type x struct{}
 	out := protoimpl.TypeBuilder{
diff --git a/api/proto/banyandb/model/v2/write.proto b/api/proto/banyandb/model/v2/write.proto
index 836423a..4636bd3 100644
--- a/api/proto/banyandb/model/v2/write.proto
+++ b/api/proto/banyandb/model/v2/write.proto
@@ -32,5 +32,6 @@ message Tag {
         StrArray str_array = 3;
         Int int = 4;
         IntArray int_array = 5;
+        bytes binary_data = 6;
     }
 }
diff --git a/api/proto/banyandb/stream/v2/write.pb.go b/api/proto/banyandb/stream/v2/write.pb.go
index 664078b..ecc7655 100644
--- a/api/proto/banyandb/stream/v2/write.pb.go
+++ b/api/proto/banyandb/stream/v2/write.pb.go
@@ -31,8 +31,8 @@ import (
 	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
 	timestamppb "google.golang.org/protobuf/types/known/timestamppb"
 
-	v21 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
-	v2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	v2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+	v21 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
 )
 
 const (
@@ -53,13 +53,8 @@ type ElementValue struct {
 	// 1) either the start time of a Span/Segment,
 	// 2) or the timestamp of a log
 	Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
-	// binary representation of segments, including tags, spans...
-	DataBinary []byte `protobuf:"bytes,3,opt,name=data_binary,json=dataBinary,proto3" json:"data_binary,omitempty"`
-	// support all of indexed tags in the tags.
-	// Tag only has value, as the value of value_type match with the key
-	// by the index rules and index rule bindings of Metadata group.
-	// indexed tags of multiple entities are compression in the tags.
-	Tags []*v2.Tag `protobuf:"bytes,4,rep,name=tags,proto3" json:"tags,omitempty"`
+	// the order of tag_families' items match the stream schema
+	TagFamilies []*ElementValue_TagFamily `protobuf:"bytes,3,rep,name=tag_families,json=tagFamilies,proto3" json:"tag_families,omitempty"`
 }
 
 func (x *ElementValue) Reset() {
@@ -108,16 +103,9 @@ func (x *ElementValue) GetTimestamp() *timestamppb.Timestamp {
 	return nil
 }
 
-func (x *ElementValue) GetDataBinary() []byte {
+func (x *ElementValue) GetTagFamilies() []*ElementValue_TagFamily {
 	if x != nil {
-		return x.DataBinary
-	}
-	return nil
-}
-
-func (x *ElementValue) GetTags() []*v2.Tag {
-	if x != nil {
-		return x.Tags
+		return x.TagFamilies
 	}
 	return nil
 }
@@ -128,7 +116,7 @@ type WriteRequest struct {
 	unknownFields protoimpl.UnknownFields
 
 	// the metadata is only required in the first write.
-	Metadata *v21.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
+	Metadata *v2.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
 	// the element is required.
 	Element *ElementValue `protobuf:"bytes,2,opt,name=element,proto3" json:"element,omitempty"`
 }
@@ -165,7 +153,7 @@ func (*WriteRequest) Descriptor() ([]byte, []int) {
 	return file_banyandb_stream_v2_write_proto_rawDescGZIP(), []int{1}
 }
 
-func (x *WriteRequest) GetMetadata() *v21.Metadata {
+func (x *WriteRequest) GetMetadata() *v2.Metadata {
 	if x != nil {
 		return x.Metadata
 	}
@@ -217,6 +205,53 @@ func (*WriteResponse) Descriptor() ([]byte, []int) {
 	return file_banyandb_stream_v2_write_proto_rawDescGZIP(), []int{2}
 }
 
+type ElementValue_TagFamily struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Tags []*v21.Tag `protobuf:"bytes,1,rep,name=tags,proto3" json:"tags,omitempty"`
+}
+
+func (x *ElementValue_TagFamily) Reset() {
+	*x = ElementValue_TagFamily{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_banyandb_stream_v2_write_proto_msgTypes[3]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *ElementValue_TagFamily) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ElementValue_TagFamily) ProtoMessage() {}
+
+func (x *ElementValue_TagFamily) ProtoReflect() protoreflect.Message {
+	mi := &file_banyandb_stream_v2_write_proto_msgTypes[3]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use ElementValue_TagFamily.ProtoReflect.Descriptor instead.
+func (*ElementValue_TagFamily) Descriptor() ([]byte, []int) {
+	return file_banyandb_stream_v2_write_proto_rawDescGZIP(), []int{0, 0}
+}
+
+func (x *ElementValue_TagFamily) GetTags() []*v21.Tag {
+	if x != nil {
+		return x.Tags
+	}
+	return nil
+}
+
 var File_banyandb_stream_v2_write_proto protoreflect.FileDescriptor
 
 var file_banyandb_stream_v2_write_proto_rawDesc = []byte{
@@ -229,35 +264,39 @@ var file_banyandb_stream_v2_write_proto_rawDesc = []byte{
 	0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e,
 	0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
 	0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76, 0x32, 0x2f, 0x77, 0x72, 0x69, 0x74, 0x65, 0x2e,
-	0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xb4, 0x01, 0x0a, 0x0c, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e,
+	0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xef, 0x01, 0x0a, 0x0c, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e,
 	0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x6e,
 	0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x65, 0x6c, 0x65, 0x6d,
 	0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
 	0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
 	0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
 	0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12,
-	0x1f, 0x0a, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x18, 0x03,
-	0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x42, 0x69, 0x6e, 0x61, 0x72, 0x79,
-	0x12, 0x2a, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16,
-	0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e,
-	0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x84, 0x01, 0x0a,
-	0x0c, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a,
-	0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32,
-	0x1c, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f,
-	0x6e, 0x2e, 0x76, 0x32, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d,
-	0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x3a, 0x0a, 0x07, 0x65, 0x6c, 0x65, 0x6d, 0x65,
-	0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61,
-	0x6e, 0x64, 0x62, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6c,
-	0x65, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x07, 0x65, 0x6c, 0x65, 0x6d,
-	0x65, 0x6e, 0x74, 0x22, 0x0f, 0x0a, 0x0d, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70,
-	0x6f, 0x6e, 0x73, 0x65, 0x42, 0x6e, 0x0a, 0x28, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63,
-	0x68, 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61,
-	0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32,
-	0x5a, 0x42, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61,
-	0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62,
-	0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74,
-	0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61,
-	0x6d, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x4d, 0x0a, 0x0c, 0x74, 0x61, 0x67, 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65, 0x73, 0x18,
+	0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
+	0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6c, 0x65, 0x6d, 0x65,
+	0x6e, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x2e, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c,
+	0x79, 0x52, 0x0b, 0x74, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65, 0x73, 0x1a, 0x37,
+	0x0a, 0x09, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x12, 0x2a, 0x0a, 0x04, 0x74,
+	0x61, 0x67, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x62, 0x61, 0x6e, 0x79,
+	0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x54, 0x61,
+	0x67, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x84, 0x01, 0x0a, 0x0c, 0x57, 0x72, 0x69, 0x74,
+	0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61,
+	0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x62, 0x61, 0x6e,
+	0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x32, 0x2e,
+	0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61,
+	0x74, 0x61, 0x12, 0x3a, 0x0a, 0x07, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20,
+	0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x73,
+	0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74,
+	0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x07, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x0f,
+	0x0a, 0x0d, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42,
+	0x6e, 0x0a, 0x28, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b,
+	0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64,
+	0x62, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32, 0x5a, 0x42, 0x67, 0x69, 0x74,
+	0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73,
+	0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e,
+	0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e,
+	0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x76, 0x32, 0x62,
+	0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
@@ -272,25 +311,27 @@ func file_banyandb_stream_v2_write_proto_rawDescGZIP() []byte {
 	return file_banyandb_stream_v2_write_proto_rawDescData
 }
 
-var file_banyandb_stream_v2_write_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
+var file_banyandb_stream_v2_write_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
 var file_banyandb_stream_v2_write_proto_goTypes = []interface{}{
-	(*ElementValue)(nil),          // 0: banyandb.stream.v2.ElementValue
-	(*WriteRequest)(nil),          // 1: banyandb.stream.v2.WriteRequest
-	(*WriteResponse)(nil),         // 2: banyandb.stream.v2.WriteResponse
-	(*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp
-	(*v2.Tag)(nil),                // 4: banyandb.model.v2.Tag
-	(*v21.Metadata)(nil),          // 5: banyandb.common.v2.Metadata
+	(*ElementValue)(nil),           // 0: banyandb.stream.v2.ElementValue
+	(*WriteRequest)(nil),           // 1: banyandb.stream.v2.WriteRequest
+	(*WriteResponse)(nil),          // 2: banyandb.stream.v2.WriteResponse
+	(*ElementValue_TagFamily)(nil), // 3: banyandb.stream.v2.ElementValue.TagFamily
+	(*timestamppb.Timestamp)(nil),  // 4: google.protobuf.Timestamp
+	(*v2.Metadata)(nil),            // 5: banyandb.common.v2.Metadata
+	(*v21.Tag)(nil),                // 6: banyandb.model.v2.Tag
 }
 var file_banyandb_stream_v2_write_proto_depIdxs = []int32{
-	3, // 0: banyandb.stream.v2.ElementValue.timestamp:type_name -> google.protobuf.Timestamp
-	4, // 1: banyandb.stream.v2.ElementValue.tags:type_name -> banyandb.model.v2.Tag
+	4, // 0: banyandb.stream.v2.ElementValue.timestamp:type_name -> google.protobuf.Timestamp
+	3, // 1: banyandb.stream.v2.ElementValue.tag_families:type_name -> banyandb.stream.v2.ElementValue.TagFamily
 	5, // 2: banyandb.stream.v2.WriteRequest.metadata:type_name -> banyandb.common.v2.Metadata
 	0, // 3: banyandb.stream.v2.WriteRequest.element:type_name -> banyandb.stream.v2.ElementValue
-	4, // [4:4] is the sub-list for method output_type
-	4, // [4:4] is the sub-list for method input_type
-	4, // [4:4] is the sub-list for extension type_name
-	4, // [4:4] is the sub-list for extension extendee
-	0, // [0:4] is the sub-list for field type_name
+	6, // 4: banyandb.stream.v2.ElementValue.TagFamily.tags:type_name -> banyandb.model.v2.Tag
+	5, // [5:5] is the sub-list for method output_type
+	5, // [5:5] is the sub-list for method input_type
+	5, // [5:5] is the sub-list for extension type_name
+	5, // [5:5] is the sub-list for extension extendee
+	0, // [0:5] is the sub-list for field type_name
 }
 
 func init() { file_banyandb_stream_v2_write_proto_init() }
@@ -335,6 +376,18 @@ func file_banyandb_stream_v2_write_proto_init() {
 				return nil
 			}
 		}
+		file_banyandb_stream_v2_write_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*ElementValue_TagFamily); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
 	}
 	type x struct{}
 	out := protoimpl.TypeBuilder{
@@ -342,7 +395,7 @@ func file_banyandb_stream_v2_write_proto_init() {
 			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
 			RawDescriptor: file_banyandb_stream_v2_write_proto_rawDesc,
 			NumEnums:      0,
-			NumMessages:   3,
+			NumMessages:   4,
 			NumExtensions: 0,
 			NumServices:   0,
 		},
diff --git a/api/proto/banyandb/stream/v2/write.proto b/api/proto/banyandb/stream/v2/write.proto
index 59b0eef..39ef649 100644
--- a/api/proto/banyandb/stream/v2/write.proto
+++ b/api/proto/banyandb/stream/v2/write.proto
@@ -26,6 +26,8 @@ import "google/protobuf/timestamp.proto";
 import "banyandb/common/v2/common.proto";
 import "banyandb/model/v2/write.proto";
 
+
+
 message ElementValue {
   // element_id could be span_id of a Span or segment_id of a Segment in the context of stream
   string element_id = 1;
@@ -33,13 +35,11 @@ message ElementValue {
   // 1) either the start time of a Span/Segment,
   // 2) or the timestamp of a log
   google.protobuf.Timestamp timestamp = 2;
-  // binary representation of segments, including tags, spans...
-  bytes data_binary = 3;
-  // support all of indexed tags in the tags.
-  // Tag only has value, as the value of value_type match with the key
-  // by the index rules and index rule bindings of Metadata group.
-  // indexed tags of multiple entities are compression in the tags.
-  repeated model.v2.Tag tags = 4;
+  message TagFamily {
+    repeated model.v2.Tag tags = 1;
+  }
+  // the order of tag_families' items match the stream schema
+  repeated TagFamily tag_families = 3;
 }
 
 message WriteRequest {
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 57e1fd9..7bf3fc8 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -35,8 +35,13 @@ type IndexFilter interface {
 	IndexRules(ctx context.Context, subject *commonv2.Metadata) ([]*databasev2.IndexRule, error)
 }
 
-type Metadata interface {
+type Repo interface {
 	IndexFilter
+	Stream() schema.Stream
+}
+
+type Service interface {
+	Repo
 	run.Unit
 }
 
@@ -46,11 +51,7 @@ type service struct {
 	indexRuleBinding schema.IndexRuleBinding
 }
 
-func (s *service) Name() string {
-	return "metadata"
-}
-
-func NewService(_ context.Context) (Metadata, error) {
+func NewService(_ context.Context) (Repo, error) {
 	stream, err := schema.NewStream()
 	if err != nil {
 		return nil, err
@@ -70,6 +71,14 @@ func NewService(_ context.Context) (Metadata, error) {
 	}, nil
 }
 
+func (s *service) Stream() schema.Stream {
+	return s.stream
+}
+
+func (s *service) Name() string {
+	return "metadata"
+}
+
 func (s *service) IndexRules(ctx context.Context, subject *commonv2.Metadata) ([]*databasev2.IndexRule, error) {
 	bindings, err := s.indexRuleBinding.List(ctx, schema.ListOpt{Group: subject.Group})
 	if err != nil {
diff --git a/banyand/metadata/schema/data/stream.json b/banyand/metadata/schema/data/stream.json
index c34ec65..41b3b52 100644
--- a/banyand/metadata/schema/data/stream.json
+++ b/banyand/metadata/schema/data/stream.json
@@ -9,7 +9,7 @@
       "tags": [
         {
           "name": "data_binary",
-          "type": "TAG_TYPE_UNSPECIFIED"
+          "type": "TAG_TYPE_DATA_BINARY"
         }
       ]
     },
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
new file mode 100644
index 0000000..a187f29
--- /dev/null
+++ b/banyand/stream/service.go
@@ -0,0 +1,102 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+	"context"
+
+	"github.com/pkg/errors"
+
+	"github.com/apache/skywalking-banyandb/banyand/metadata"
+	"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+var ErrEmptyRootPath = errors.New("root path is empty")
+
+type Service interface {
+	run.PreRunner
+	run.Config
+	run.Service
+}
+
+var _ Service = (*service)(nil)
+
+type service struct {
+	schemaMap     map[string]*stream
+	writeListener *writeCallback
+	l             *logger.Logger
+	metadata      metadata.Repo
+	root          string
+}
+
+func (s *service) FlagSet() *run.FlagSet {
+	flagS := run.NewFlagSet("storage")
+	flagS.StringVar(&s.root, "root-path", "/tmp", "the root path of database")
+	return flagS
+}
+
+func (s *service) Validate() error {
+	if s.root == "" {
+		return ErrEmptyRootPath
+	}
+	return nil
+}
+
+func (s *service) Name() string {
+	return "stream"
+}
+
+func (s *service) PreRun() error {
+	schemas, err := s.metadata.Stream().List(context.Background(), schema.ListOpt{})
+	if err != nil {
+		return err
+	}
+
+	s.schemaMap = make(map[string]*stream, len(schemas))
+	s.l = logger.GetLogger(s.Name())
+	for _, sa := range schemas {
+		sm, errTS := openStream(s.root, sa, s.l)
+		if errTS != nil {
+			return errTS
+		}
+		id := formatStreamID(sm.name, sm.group)
+		s.schemaMap[id] = sm
+		s.writeListener.schemaMap[id] = sm
+		s.l.Info().Str("id", id).Msg("initialize stream")
+	}
+	s.writeListener = setUpWriteCallback(s.l, s.schemaMap)
+	return err
+}
+
+func (s *service) Serve() error {
+	panic("implement me")
+}
+
+func (s *service) GracefulStop() {
+	panic("implement me")
+}
+
+//NewService returns a new service
+func NewService(_ context.Context, metadata metadata.Repo, pipeline queue.Queue) (Service, error) {
+	return &service{
+		metadata: metadata,
+	}, nil
+}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
new file mode 100644
index 0000000..88da951
--- /dev/null
+++ b/banyand/stream/stream.go
@@ -0,0 +1,104 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+	"context"
+
+	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+type stream struct {
+	name        string
+	group       string
+	l           *logger.Logger
+	schema      *databasev2.Stream
+	db          tsdb.Database
+	entityIndex []struct {
+		family int
+		tag    int
+	}
+}
+
+func (s *stream) Close() error {
+	return s.db.Close()
+}
+
+func (s *stream) parseSchema() {
+	sm := s.schema
+	meta := sm.GetMetadata()
+	s.name, s.group = meta.GetName(), meta.GetGroup()
+	for _, tagInEntity := range sm.Entity.GetTagNames() {
+	nextEntityTag:
+		for fi, family := range sm.GetTagFamilies() {
+			for ti, tag := range family.Tags {
+				if tagInEntity == tag.GetName() {
+					s.entityIndex = append(s.entityIndex, struct {
+						family int
+						tag    int
+					}{family: fi, tag: ti})
+					break nextEntityTag
+				}
+			}
+		}
+	}
+}
+
+func openStream(root string, schema *databasev2.Stream, l *logger.Logger) (*stream, error) {
+	sm := &stream{
+		schema: schema,
+		l:      l,
+	}
+	sm.parseSchema()
+	db, err := tsdb.OpenDatabase(
+		context.WithValue(context.Background(), logger.ContextKey, l),
+		tsdb.DatabaseOpts{
+			Location: root,
+			ShardNum: uint(schema.GetShardNum()),
+		})
+	if err != nil {
+		return nil, err
+	}
+	sm.db = db
+	return sm, nil
+}
+
+func formatStreamID(name, group string) string {
+	return name + ":" + group
+}
+
+func tagValueTypeConv(tag *modelv2.Tag) (tagType databasev2.TagType, isNull bool) {
+	switch tag.GetValueType().(type) {
+	case *modelv2.Tag_Int:
+		return databasev2.TagType_TAG_TYPE_INT, false
+	case *modelv2.Tag_Str:
+		return databasev2.TagType_TAG_TYPE_STRING, false
+	case *modelv2.Tag_IntArray:
+		return databasev2.TagType_TAG_TYPE_INT_ARRAY, false
+	case *modelv2.Tag_StrArray:
+		return databasev2.TagType_TAG_TYPE_STRING_ARRAY, false
+	case *modelv2.Tag_BinaryData:
+		return databasev2.TagType_TAG_TYPE_DATA_BINARY, false
+	case *modelv2.Tag_Null:
+		return databasev2.TagType_TAG_TYPE_UNSPECIFIED, true
+	}
+	return databasev2.TagType_TAG_TYPE_UNSPECIFIED, false
+}
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
new file mode 100644
index 0000000..c16e75b
--- /dev/null
+++ b/banyand/stream/stream_write.go
@@ -0,0 +1,152 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+	"github.com/pkg/errors"
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-banyandb/api/data"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+	ErrMalformedElement          = errors.New("element is malformed")
+	ErrUnsupportedTagTypeAsEntry = errors.New("the tag type can not be as an entry in an entity")
+)
+
+func (s *stream) Write(shardID uint, value *streamv2.ElementValue) error {
+	sm := s.schema
+	fLen := len(value.GetTagFamilies())
+	if fLen < 1 {
+		return errors.Wrap(ErrMalformedElement, "no tag family")
+	}
+	if fLen > len(sm.TagFamilies) {
+		return errors.Wrap(ErrMalformedElement, "tag family number is more than expected")
+	}
+	shard, err := s.db.Shard(shardID)
+	if err != nil {
+		return err
+	}
+	entity, err := s.buildEntity(value)
+	if err != nil {
+		return err
+	}
+	series, err := shard.Series().Get(entity)
+	if err != nil {
+		return err
+	}
+	t := value.GetTimestamp().AsTime()
+	wp, err := series.Span(tsdb.TimeRange{
+		Start:    t,
+		Duration: 0,
+	})
+	if err != nil {
+		return err
+	}
+	defer func() {
+		_ = wp.Close()
+	}()
+	builder := wp.WriterBuilder().Time(t)
+	for fi, family := range value.GetTagFamilies() {
+		familySpec := sm.GetTagFamilies()[fi]
+		if len(family.GetTags()) > len(familySpec.GetTags()) {
+			return errors.Wrap(ErrMalformedElement, "tag number is more than expected")
+		}
+		for ti, tag := range family.GetTags() {
+			tagSpec := familySpec.GetTags()[ti]
+			tType, isNull := tagValueTypeConv(tag)
+			if isNull {
+				continue
+			}
+			if tType != tagSpec.GetType() {
+				return errors.Wrapf(ErrMalformedElement, "tag %s type is unexpected", tagSpec.GetName())
+			}
+		}
+		bb, errMarshal := proto.Marshal(family)
+		if errMarshal != nil {
+			return errMarshal
+		}
+		builder.Family(sm.GetTagFamilies()[fi].GetName(), bb)
+	}
+	writer, err := builder.Build()
+	if err != nil {
+		return err
+	}
+	_, err = writer.Write()
+	return err
+}
+
+func (s *stream) buildEntity(value *streamv2.ElementValue) (entity tsdb.Entity, err error) {
+	for _, index := range s.entityIndex {
+		family := value.GetTagFamilies()[index.family]
+		if index.tag >= len(family.GetTags()) {
+			return nil, errors.Wrap(ErrMalformedElement, "the tag which composite the entity doesn't exist ")
+		}
+		entry, err := tagConvEntry(family.GetTags()[index.tag])
+		if err != nil {
+			return nil, err
+		}
+		entity = append(entity, entry)
+	}
+	return entity, nil
+}
+
+func tagConvEntry(tag *modelv2.Tag) (tsdb.Entry, error) {
+	switch tag.GetValueType().(type) {
+	case *modelv2.Tag_Str:
+		return tsdb.Entry(tag.GetStr().GetValue()), nil
+	case *modelv2.Tag_Int:
+		return convert.Int64ToBytes(tag.GetInt().GetValue()), nil
+	default:
+		return nil, ErrUnsupportedTagTypeAsEntry
+	}
+}
+
+type writeCallback struct {
+	l         *logger.Logger
+	schemaMap map[string]*stream
+}
+
+func setUpWriteCallback(l *logger.Logger, schemaMap map[string]*stream) *writeCallback {
+	wcb := &writeCallback{
+		l:         l,
+		schemaMap: schemaMap,
+	}
+	return wcb
+}
+
+func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
+	writeEvent, ok := message.Data().(data.StreamWriteData)
+	if !ok {
+		w.l.Warn().Msg("invalid event data type")
+		return
+	}
+	sm := writeEvent.WriteRequest.GetMetadata()
+	id := formatStreamID(sm.GetName(), sm.GetGroup())
+	err := w.schemaMap[id].Write(writeEvent.ShardID, writeEvent.WriteRequest.GetElement())
+	if err != nil {
+		w.l.Debug().Err(err)
+	}
+	return
+}
diff --git a/banyand/stream/stream_write_test.go b/banyand/stream/stream_write_test.go
new file mode 100644
index 0000000..dac9281
--- /dev/null
+++ b/banyand/stream/stream_write_test.go
@@ -0,0 +1,282 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+	"context"
+	"encoding/base64"
+	"math"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/protobuf/types/known/timestamppb"
+
+	commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func Test_Stream_Write(t *testing.T) {
+	tester := assert.New(t)
+	s, deferFunc := setup(tester)
+	defer deferFunc()
+
+	type args struct {
+		shardID uint
+		ele     *streamv2.ElementValue
+	}
+	tests := []struct {
+		name    string
+		args    args
+		wantErr bool
+	}{
+		{
+			name: "golden path",
+			args: args{
+				shardID: 0,
+				ele: getEle(
+					"trace_id-xxfff.111323",
+					0,
+					"webapp_id",
+					"10.0.0.1_id",
+					"/home_id",
+					300,
+					1622933202000000000,
+				),
+			},
+		},
+		{
+			name: "minimal",
+			args: args{
+				shardID: 1,
+				ele: getEle(
+					nil,
+					1,
+					"webapp_id",
+					"10.0.0.1_id",
+				),
+			},
+		},
+		{
+			name: "http",
+			args: args{
+				shardID: 0,
+				ele: getEle(
+					"trace_id-xxfff.111323",
+					0,
+					"webapp_id",
+					"10.0.0.1_id",
+					"/home_id",
+					300,
+					1622933202000000000,
+					"GET",
+					"200",
+				),
+			},
+		},
+		{
+			name: "database",
+			args: args{
+				shardID: 0,
+				ele: getEle(
+					"trace_id-xxfff.111323",
+					0,
+					"webapp_id",
+					"10.0.0.1_id",
+					"/home_id",
+					300,
+					1622933202000000000,
+					nil,
+					nil,
+					"MySQL",
+					"10.1.1.2",
+				),
+			},
+		},
+		{
+			name: "mq",
+			args: args{
+				shardID: 0,
+				ele: getEle(
+					"trace_id-xxfff.111323",
+					1,
+					"webapp_id",
+					"10.0.0.1_id",
+					"/home_id",
+					300,
+					1622933202000000000,
+					nil,
+					nil,
+					nil,
+					nil,
+					"test_topic",
+					"10.0.0.1",
+					"broker",
+				),
+			},
+		},
+		{
+			name: "invalid trace id",
+			args: args{
+				shardID: 1,
+				ele: getEle(
+					1212323,
+					1,
+					"webapp_id",
+					"10.0.0.1_id",
+				),
+			},
+			wantErr: true,
+		},
+		{
+			name:    "empty input",
+			args:    args{},
+			wantErr: true,
+		},
+		{
+			name: "invalid shard id",
+			args: args{
+				shardID: math.MaxUint64,
+				ele: getEle(
+					"trace_id-xxfff.111323",
+					0,
+					"webapp_id",
+					"10.0.0.1_id",
+					"/home_id",
+					300,
+					1622933202000000000,
+				),
+			},
+			wantErr: true,
+		},
+		{
+			name: "unknown tags",
+			args: args{
+				shardID: 0,
+				ele: getEle(
+					"trace_id-xxfff.111323",
+					1,
+					"webapp_id",
+					"10.0.0.1_id",
+					"/home_id",
+					300,
+					1622933202000000000,
+					nil,
+					nil,
+					nil,
+					nil,
+					"test_topic",
+					"10.0.0.1",
+					"broker",
+					"unknown",
+				),
+			},
+			wantErr: true,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			err := s.Write(tt.args.shardID, tt.args.ele)
+			if tt.wantErr {
+				tester.Error(err)
+				return
+			}
+			tester.NoError(err)
+		})
+	}
+
+}
+
+func setup(t *assert.Assertions) (*stream, func()) {
+	t.NoError(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "info",
+	}))
+	tempDir, deferFunc := test.Space(t)
+	streamRepo, err := schema.NewStream()
+	t.NoError(err)
+	streamSpec, err := streamRepo.Get(context.TODO(), &commonv2.Metadata{
+		Name:  "sw",
+		Group: "default",
+	})
+	t.NoError(err)
+	s, err := openStream(tempDir, streamSpec, logger.GetLogger("test"))
+	t.NoError(err)
+	return s, func() {
+		_ = s.Close()
+		deferFunc()
+	}
+}
+
+func getEle(tags ...interface{}) *streamv2.ElementValue {
+	searchableTags := make([]*modelv2.Tag, 0)
+	for _, tag := range tags {
+		searchableTags = append(searchableTags, getTag(tag))
+	}
+	bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
+	e := &streamv2.ElementValue{
+		ElementId: "1231.dfd.123123ssf",
+		Timestamp: timestamppb.Now(),
+		TagFamilies: []*streamv2.ElementValue_TagFamily{
+			{
+				Tags: []*modelv2.Tag{
+					{
+						ValueType: &modelv2.Tag_BinaryData{
+							BinaryData: bb,
+						},
+					},
+				},
+			},
+			{
+				Tags: searchableTags,
+			},
+		},
+	}
+	return e
+}
+
+func getTag(tag interface{}) *modelv2.Tag {
+	if tag == nil {
+		return &modelv2.Tag{
+			ValueType: &modelv2.Tag_Null{},
+		}
+	}
+	switch t := tag.(type) {
+	case int:
+		return &modelv2.Tag{
+			ValueType: &modelv2.Tag_Int{
+				Int: &modelv2.Int{
+					Value: int64(t),
+				},
+			},
+		}
+	case string:
+		return &modelv2.Tag{
+			ValueType: &modelv2.Tag_Str{
+				Str: &modelv2.Str{
+					Value: t,
+				},
+			},
+		}
+	}
+	return nil
+}
diff --git a/banyand/stream/testdata/shard0.json b/banyand/stream/testdata/shard0.json
new file mode 100644
index 0000000..28de2f3
--- /dev/null
+++ b/banyand/stream/testdata/shard0.json
@@ -0,0 +1,18 @@
+[
+  {
+    "element_id": "1",
+    "timestamp": "2021-04-15T01:30:15.01Z",
+    "tag_families": [
+      {
+        "tags": [
+          {"binary_data": "YWJjMTIzIT8kKiYoKSctPUB+"}
+        ]
+      },
+      {
+        "tags": [
+          {"str": ""}
+        ]
+      }
+    ]
+  }
+]
\ No newline at end of file
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index dcab966..6505582 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -20,6 +20,9 @@ package tsdb
 import (
 	"context"
 	"io"
+	"time"
+
+	"github.com/dgraph-io/ristretto/z"
 
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -28,10 +31,14 @@ import (
 type block struct {
 	path string
 	l    *logger.Logger
+	ref  *z.Closer
 
 	store       kv.TimeSeriesStore
 	treeIndex   kv.Store
 	closableLst []io.Closer
+	endTime     time.Time
+	startTime   time.Time
+
 	//revertedIndex kv.Store
 }
 
@@ -43,7 +50,9 @@ type blockOpts struct {
 
 func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 	b = &block{
-		path: opts.path,
+		path:      opts.path,
+		ref:       z.NewCloser(1),
+		startTime: time.Now(),
 	}
 	parentLogger := ctx.Value(logger.ContextKey)
 	if parentLogger != nil {
@@ -62,8 +71,54 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 	return b, nil
 }
 
+func (b *block) delegate() blockDelegate {
+	b.incRef()
+	return &bDelegate{
+		delegate: b,
+	}
+}
+
+func (b *block) dscRef() {
+	b.ref.Done()
+}
+
+func (b *block) incRef() {
+	b.ref.AddRunning(1)
+}
+
 func (b *block) close() {
+	b.dscRef()
+	b.ref.SignalAndWait()
 	for _, closer := range b.closableLst {
 		_ = closer.Close()
 	}
 }
+
+type blockDelegate interface {
+	io.Closer
+	contains(ts time.Time) bool
+	write(key []byte, val []byte, ts time.Time) error
+}
+
+var _ blockDelegate = (*bDelegate)(nil)
+
+type bDelegate struct {
+	delegate *block
+}
+
+func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
+	return d.delegate.store.Put(key, val, uint64(ts.UnixNano()))
+}
+
+func (d *bDelegate) contains(ts time.Time) bool {
+	greaterAndEqualStart := d.delegate.startTime.Equal(ts) || d.delegate.startTime.Before(ts)
+	if d.delegate.endTime.IsZero() {
+		return greaterAndEqualStart
+	}
+	return greaterAndEqualStart && d.delegate.endTime.After(ts)
+}
+
+func (d *bDelegate) Close() error {
+	d.delegate.dscRef()
+	return nil
+}
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 810aae0..ee8ce57 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -18,12 +18,20 @@
 package tsdb
 
 import (
+	"bytes"
+	"io"
 	"time"
 
+	"github.com/pkg/errors"
+	"go.uber.org/multierr"
+
 	"github.com/apache/skywalking-banyandb/api/common"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
 )
 
+var ErrEmptySeriesSpan = errors.New("there is no data in such time range")
+
 type Iterator interface {
 	Next() bool
 	Val() Item
@@ -43,6 +51,10 @@ type ConditionValue struct {
 type Condition map[string][]ConditionValue
 
 type ItemID struct {
+	//shardID int
+	//segID   []byte
+	//blockID []byte
+	id []byte
 }
 
 type TimeRange struct {
@@ -57,21 +69,23 @@ type Series interface {
 }
 
 type SeriesSpan interface {
+	io.Closer
 	WriterBuilder() WriterBuilder
 	Iterator() Iterator
 	SeekerBuilder() SeekerBuilder
 }
 
 type WriterBuilder interface {
-	Family(name string) WriterBuilder
+	Family(name string, val []byte) WriterBuilder
 	Time(ts time.Time) WriterBuilder
 	Val(val []byte) WriterBuilder
-	OrderBy(order modelv2.QueryOrder_Sort) WriterBuilder
-	Build() Writer
+	Build() (Writer, error)
 }
 
 type Writer interface {
-	Write() ItemID
+	Write() (ItemID, error)
+	WriteLSMIndex(name string, val []byte) error
+	WriteInvertedIndex(name string, val []byte) error
 }
 
 type SeekerBuilder interface {
@@ -88,12 +102,14 @@ type Seeker interface {
 var _ Series = (*series)(nil)
 
 type series struct {
-	id common.SeriesID
+	id      common.SeriesID
+	blockDB blockDatabase
 }
 
-func newSeries(id common.SeriesID) *series {
+func newSeries(id common.SeriesID, blockDB blockDatabase) *series {
 	return &series{
-		id: id,
+		id:      id,
+		blockDB: blockDB,
 	}
 }
 
@@ -102,9 +118,145 @@ func (s *series) ID() common.SeriesID {
 }
 
 func (s *series) Span(timeRange TimeRange) (SeriesSpan, error) {
-	panic("implement me")
+	blocks := s.blockDB.span(timeRange)
+	if len(blocks) < 1 {
+		return nil, ErrEmptySeriesSpan
+	}
+	return newSeriesSpan(blocks, s.id), nil
 }
 
 func (s *series) Get(id ItemID) (Item, error) {
+	panic("not implemented")
+}
+
+var _ SeriesSpan = (*seriesSpan)(nil)
+
+type seriesSpan struct {
+	blocks   []blockDelegate
+	seriesID common.SeriesID
+}
+
+func (s *seriesSpan) Close() (err error) {
+	for _, delegate := range s.blocks {
+		err = multierr.Append(err, delegate.Close())
+	}
+	return err
+}
+
+func (s *seriesSpan) WriterBuilder() WriterBuilder {
+	return newWriterBuilder(s)
+}
+
+func (s *seriesSpan) Iterator() Iterator {
 	panic("implement me")
 }
+
+func (s *seriesSpan) SeekerBuilder() SeekerBuilder {
+	panic("implement me")
+}
+
+func newSeriesSpan(blocks []blockDelegate, id common.SeriesID) *seriesSpan {
+	return &seriesSpan{
+		blocks:   blocks,
+		seriesID: id,
+	}
+}
+
+var _ WriterBuilder = (*writerBuilder)(nil)
+
+type writerBuilder struct {
+	series *seriesSpan
+	block  blockDelegate
+	values []struct {
+		family []byte
+		val    []byte
+	}
+	ts            time.Time
+	seriesIDBytes []byte
+}
+
+func (w *writerBuilder) Family(name string, val []byte) WriterBuilder {
+	w.values = append(w.values, struct {
+		family []byte
+		val    []byte
+	}{family: bytes.Join([][]byte{w.seriesIDBytes, hash([]byte(name))}, nil), val: val})
+	return w
+}
+
+func (w *writerBuilder) Time(ts time.Time) WriterBuilder {
+	w.ts = ts
+	for _, b := range w.series.blocks {
+		if b.contains(ts) {
+			w.block = b
+			break
+		}
+	}
+	return w
+}
+
+func (w *writerBuilder) Val(val []byte) WriterBuilder {
+	w.values = append(w.values, struct {
+		family []byte
+		val    []byte
+	}{val: val})
+	return w
+}
+
+var ErrNoTime = errors.New("no time specified")
+var ErrNoVal = errors.New("no value specified")
+
+func (w *writerBuilder) Build() (Writer, error) {
+	if w.block == nil {
+		return nil, ErrNoTime
+	}
+	if len(w.values) < 1 {
+		return nil, ErrNoVal
+	}
+	wt := &writer{
+		block:    w.block,
+		ts:       w.ts,
+		seriesID: w.seriesIDBytes,
+		itemID:   bytes.Join([][]byte{w.seriesIDBytes, convert.Int64ToBytes(w.ts.UnixNano())}, nil),
+		columns:  w.values,
+	}
+	return wt, nil
+}
+
+func newWriterBuilder(seriesSpan *seriesSpan) WriterBuilder {
+	return &writerBuilder{
+		series:        seriesSpan,
+		seriesIDBytes: convert.Uint64ToBytes(uint64(seriesSpan.seriesID)),
+	}
+}
+
+var _ Writer = (*writer)(nil)
+
+type writer struct {
+	block    blockDelegate
+	ts       time.Time
+	seriesID []byte
+	columns  []struct {
+		family []byte
+		val    []byte
+	}
+	itemID []byte
+}
+
+func (w *writer) WriteLSMIndex(name string, val []byte) error {
+	panic("implement me")
+}
+
+func (w *writer) WriteInvertedIndex(name string, val []byte) error {
+	panic("implement me")
+}
+
+func (w *writer) Write() (id ItemID, err error) {
+	for _, c := range w.columns {
+		err = w.block.write(c.family, c.val, w.ts)
+		if err != nil {
+			return id, err
+		}
+	}
+	id.id = w.itemID
+	return id, nil
+}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index b38c30f..aa9ec22 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -80,11 +80,16 @@ func NewPath(entries []Entry) Path {
 
 type SeriesDatabase interface {
 	io.Closer
-	Create(entity Entity) error
+	Get(entity Entity) (Series, error)
 	List(path Path) (SeriesList, error)
 }
 
+type blockDatabase interface {
+	span(timeRange TimeRange) []blockDelegate
+}
+
 var _ SeriesDatabase = (*seriesDB)(nil)
+var _ blockDatabase = (*seriesDB)(nil)
 
 type seriesDB struct {
 	sync.Mutex
@@ -94,59 +99,23 @@ type seriesDB struct {
 	seriesMetadata kv.Store
 }
 
-func (s *seriesDB) Close() error {
-	for _, seg := range s.lst {
-		seg.close()
-	}
-	return s.seriesMetadata.Close()
-}
-
-func newSeriesDataBase(ctx context.Context, path string) (SeriesDatabase, error) {
-	sdb := &seriesDB{}
-	parentLogger := ctx.Value(logger.ContextKey)
-	if parentLogger == nil {
-		return nil, logger.ErrNoLoggerInContext
-	}
-	if pl, ok := parentLogger.(*logger.Logger); ok {
-		sdb.l = pl.Named("series")
-	}
-	var err error
-	sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", kv.StoreWithNamedLogger("metadata", sdb.l))
-	if err != nil {
-		return nil, err
-	}
-	segPath, err := mkdir(segTemplate, path, time.Now().Format(segFormat))
-	if err != nil {
-		return nil, err
-	}
-	seg, err := newSegment(ctx, segPath)
-	if err != nil {
-		return nil, err
-	}
-	{
-		sdb.Lock()
-		defer sdb.Unlock()
-		sdb.lst = append(sdb.lst, seg)
-	}
-	return sdb, nil
-}
-
-func (s *seriesDB) Create(entity Entity) error {
+func (s *seriesDB) Get(entity Entity) (Series, error) {
 	key := hashEntity(entity)
-	_, err := s.seriesMetadata.Get(key)
+	seriesID, err := s.seriesMetadata.Get(key)
 	if err != nil && err != kv.ErrKeyNotFound {
-		return err
+		return nil, err
 	}
 	if err == nil {
-		return nil
+		return newSeries(bytesConvSeriesID(seriesID), s), nil
 	}
 	s.Lock()
 	defer s.Unlock()
-	err = s.seriesMetadata.Put(key, hash(key))
+	seriesID = hash(key)
+	err = s.seriesMetadata.Put(key, seriesID)
 	if err != nil {
-		return err
+		return nil, err
 	}
-	return nil
+	return newSeries(bytesConvSeriesID(seriesID), s), nil
 }
 
 func (s *seriesDB) List(path Path) (SeriesList, error) {
@@ -156,7 +125,7 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
 			return nil, err
 		}
 		if err == nil {
-			return []Series{newSeries(common.SeriesID(convert.BytesToUint64(id)))}, nil
+			return []Series{newSeries(bytesConvSeriesID(id), s)}, nil
 		}
 		return nil, nil
 	}
@@ -173,7 +142,7 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
 				err = multierr.Append(err, errGetVal)
 				return nil
 			}
-			result = append(result, newSeries(common.SeriesID(convert.BytesToUint64(id))))
+			result = append(result, newSeries(common.SeriesID(convert.BytesToUint64(id)), s))
 		}
 		return nil
 	})
@@ -183,6 +152,52 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
 	return result, err
 }
 
+func (s *seriesDB) span(_ TimeRange) []blockDelegate {
+	//TODO: return correct blocks
+	result := make([]blockDelegate, 0, len(s.lst[0].lst))
+	for _, b := range s.lst[0].lst {
+		result = append(result, b.delegate())
+	}
+	return result
+}
+
+func (s *seriesDB) Close() error {
+	for _, seg := range s.lst {
+		seg.close()
+	}
+	return s.seriesMetadata.Close()
+}
+
+func newSeriesDataBase(ctx context.Context, path string) (SeriesDatabase, error) {
+	sdb := &seriesDB{}
+	parentLogger := ctx.Value(logger.ContextKey)
+	if parentLogger == nil {
+		return nil, logger.ErrNoLoggerInContext
+	}
+	if pl, ok := parentLogger.(*logger.Logger); ok {
+		sdb.l = pl.Named("series")
+	}
+	var err error
+	sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", kv.StoreWithNamedLogger("metadata", sdb.l))
+	if err != nil {
+		return nil, err
+	}
+	segPath, err := mkdir(segTemplate, path, time.Now().Format(segFormat))
+	if err != nil {
+		return nil, err
+	}
+	seg, err := newSegment(ctx, segPath)
+	if err != nil {
+		return nil, err
+	}
+	{
+		sdb.Lock()
+		defer sdb.Unlock()
+		sdb.lst = append(sdb.lst, seg)
+	}
+	return sdb, nil
+}
+
 func hashEntity(entity Entity) []byte {
 	result := make(Entry, 0, len(entity)*8)
 	for _, entry := range entity {
@@ -195,6 +210,10 @@ func hash(entry []byte) []byte {
 	return convert.Uint64ToBytes(convert.Hash(entry))
 }
 
+func bytesConvSeriesID(data []byte) common.SeriesID {
+	return common.SeriesID(convert.BytesToUint64(data))
+}
+
 type SeriesList []Series
 
 func (a SeriesList) Len() int {
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index 098648a..83b3008 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -141,7 +141,7 @@ func TestNewPath(t *testing.T) {
 	}
 }
 
-func Test_SeriesDatabase_Create(t *testing.T) {
+func Test_SeriesDatabase_Get(t *testing.T) {
 
 	tests := []struct {
 		name     string
@@ -152,7 +152,7 @@ func Test_SeriesDatabase_Create(t *testing.T) {
 			entities: []Entity{{
 				Entry("productpage"),
 				Entry("10.0.0.1"),
-				Entry(convert.Uint64ToBytes(0)),
+				convert.Uint64ToBytes(0),
 			}},
 		},
 		{
@@ -161,12 +161,12 @@ func Test_SeriesDatabase_Create(t *testing.T) {
 				{
 					Entry("productpage"),
 					Entry("10.0.0.1"),
-					Entry(convert.Uint64ToBytes(0)),
+					convert.Uint64ToBytes(0),
 				},
 				{
 					Entry("productpage"),
 					Entry("10.0.0.1"),
-					Entry(convert.Uint64ToBytes(0)),
+					convert.Uint64ToBytes(0),
 				},
 			},
 		},
@@ -183,7 +183,9 @@ func Test_SeriesDatabase_Create(t *testing.T) {
 			s, err := newSeriesDataBase(context.WithValue(context.Background(), logger.ContextKey, logger.GetLogger("test")), dir)
 			tester.NoError(err)
 			for _, entity := range tt.entities {
-				tester.NoError(s.Create(entity))
+				series, err := s.Get(entity)
+				tester.NoError(err)
+				tester.Equal(hashEntity(entity), series.ID())
 			}
 		})
 	}
@@ -214,7 +216,7 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				convert.Uint64ToBytes(0),
 			}),
 			want: SeriesList{
-				newSeries(data[0].id),
+				newMockSeries(data[0].id),
 			},
 		},
 		{
@@ -225,8 +227,8 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				AnyEntry,
 			}),
 			want: SeriesList{
-				newSeries(data[1].id),
-				newSeries(data[2].id),
+				newMockSeries(data[1].id),
+				newMockSeries(data[2].id),
 			},
 		},
 		{
@@ -237,10 +239,10 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				AnyEntry,
 			}),
 			want: SeriesList{
-				newSeries(data[0].id),
-				newSeries(data[1].id),
-				newSeries(data[2].id),
-				newSeries(data[3].id),
+				newMockSeries(data[0].id),
+				newMockSeries(data[1].id),
+				newMockSeries(data[2].id),
+				newMockSeries(data[3].id),
 			},
 		},
 		{
@@ -251,9 +253,9 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				convert.Uint64ToBytes(0),
 			}),
 			want: SeriesList{
-				newSeries(data[0].id),
-				newSeries(data[1].id),
-				newSeries(data[3].id),
+				newMockSeries(data[0].id),
+				newMockSeries(data[1].id),
+				newMockSeries(data[3].id),
 			},
 		},
 		{
@@ -264,8 +266,8 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				convert.Uint64ToBytes(1),
 			}),
 			want: SeriesList{
-				newSeries(data[2].id),
-				newSeries(data[4].id),
+				newMockSeries(data[2].id),
+				newMockSeries(data[4].id),
 			},
 		},
 	}
@@ -278,9 +280,7 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				return
 			}
 			tester.NoError(err)
-			sort.Sort(tt.want)
-			sort.Sort(series)
-			tester.Equal(tt.want, series)
+			tester.Equal(transform(tt.want), transform(series))
 		})
 	}
 }
@@ -330,7 +330,21 @@ func setUpEntities(t *assert.Assertions, db SeriesDatabase) []*entityWithID {
 	}
 	for _, d := range data {
 		d.id = common.SeriesID(convert.BytesToUint64(hash(hashEntity(d.entity))))
-		t.NoError(db.Create(d.entity))
+		series, err := db.Get(d.entity)
+		t.NoError(err)
+		t.Equal(hashEntity(d.entity), series.ID())
 	}
 	return data
 }
+
+func newMockSeries(id common.SeriesID) *series {
+	return newSeries(id, nil)
+}
+
+func transform(list SeriesList) (seriesIDs []common.SeriesID) {
+	sort.Sort(list)
+	for _, s := range list {
+		seriesIDs = append(seriesIDs, s.ID())
+	}
+	return seriesIDs
+}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 050444e..94cb90a 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -31,7 +31,7 @@ type shard struct {
 }
 
 func (s *shard) Series() SeriesDatabase {
-	panic("implement me")
+	return s.seriesDatabase
 }
 
 func (s *shard) Index() IndexDatabase {
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 503b0e8..3d20a88 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -44,9 +44,12 @@ const (
 	dirPerm = 0700
 )
 
+var ErrInvalidShardID = errors.New("invalid shard id")
+
 type Database interface {
 	io.Closer
 	Shards() []Shard
+	Shard(id uint) (Shard, error)
 }
 
 type Shard interface {
@@ -71,6 +74,24 @@ type database struct {
 	sync.Mutex
 }
 
+func (d *database) Shards() []Shard {
+	return d.sLst
+}
+
+func (d *database) Shard(id uint) (Shard, error) {
+	if id >= uint(len(d.sLst)) {
+		return nil, ErrInvalidShardID
+	}
+	return d.sLst[id], nil
+}
+
+func (d *database) Close() error {
+	for _, s := range d.sLst {
+		_ = s.Close()
+	}
+	return nil
+}
+
 func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
 	db := &database{
 		location: opts.Location,
@@ -98,17 +119,6 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
 	return createDatabase(thisContext, db)
 }
 
-func (d *database) Close() error {
-	for _, s := range d.sLst {
-		_ = s.Close()
-	}
-	return nil
-}
-
-func (d *database) Shards() []Shard {
-	return d.sLst
-}
-
 func createDatabase(ctx context.Context, db *database) (Database, error) {
 	var err error
 	db.Lock()
diff --git a/go.mod b/go.mod
index 65674a5..55e7aa9 100644
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@ require (
 	github.com/RoaringBitmap/roaring v0.9.1
 	github.com/cespare/xxhash v1.1.0
 	github.com/dgraph-io/badger/v3 v3.2011.1
+	github.com/dgraph-io/ristretto v0.1.0
 	github.com/golang/mock v1.5.0
 	github.com/golang/protobuf v1.5.2
 	github.com/google/go-cmp v0.5.6