You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/09/03 17:40:53 UTC
[skywalking-banyandb] 01/02: Add stream moduel
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit bbf89b1146ae04c11bf8d0946b10d8cf6f16ade0
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Sep 2 00:23:29 2021 +0800
Add stream moduel
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
.../model/v2/write.proto => data/stream.go} | 36 ++-
api/proto/banyandb/database/v2/schema.pb.go | 37 +--
api/proto/banyandb/database/v2/schema.proto | 1 +
api/proto/banyandb/model/v2/write.pb.go | 38 ++-
api/proto/banyandb/model/v2/write.proto | 1 +
api/proto/banyandb/stream/v2/write.pb.go | 169 +++++++-----
api/proto/banyandb/stream/v2/write.proto | 14 +-
banyand/metadata/metadata.go | 21 +-
banyand/metadata/schema/data/stream.json | 2 +-
banyand/stream/service.go | 102 ++++++++
banyand/stream/stream.go | 104 ++++++++
banyand/stream/stream_write.go | 152 +++++++++++
banyand/stream/stream_write_test.go | 282 +++++++++++++++++++++
banyand/stream/testdata/shard0.json | 18 ++
banyand/tsdb/block.go | 57 ++++-
banyand/tsdb/series.go | 168 +++++++++++-
banyand/tsdb/seriesdb.go | 113 +++++----
banyand/tsdb/seriesdb_test.go | 56 ++--
banyand/tsdb/shard.go | 2 +-
banyand/tsdb/tsdb.go | 32 ++-
go.mod | 1 +
21 files changed, 1205 insertions(+), 201 deletions(-)
diff --git a/api/proto/banyandb/model/v2/write.proto b/api/data/stream.go
similarity index 56%
copy from api/proto/banyandb/model/v2/write.proto
copy to api/data/stream.go
index 836423a..65fc2ef 100644
--- a/api/proto/banyandb/model/v2/write.proto
+++ b/api/data/stream.go
@@ -15,22 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-syntax = "proto3";
+package data
-option java_package = "org.apache.skywalking.banyandb.model.v2";
-option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2";
+import (
+ "github.com/apache/skywalking-banyandb/api/common"
+ streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+)
-package banyandb.model.v2;
+var StreamKindVersion = common.KindVersion{Version: "v2", Kind: "data-stream"}
-import "google/protobuf/struct.proto";
-import "banyandb/model/v2/common.proto";
+var StreamWriteEventKindVersion = common.KindVersion{
+ Version: "v2",
+ Kind: "stream-write",
+}
+
+var TopicStreamWriteEvent = bus.UniTopic(StreamWriteEventKindVersion.String())
+
+type Stream struct {
+ common.KindVersion
+ Entities []Entity
+}
-message Tag {
- oneof value_type {
- google.protobuf.NullValue null = 1;
- Str str = 2;
- StrArray str_array = 3;
- Int int = 4;
- IntArray int_array = 5;
- }
+type StreamWriteData struct {
+ ShardID uint
+ SeriesID uint64
+ WriteRequest *streamv2.WriteRequest
}
diff --git a/api/proto/banyandb/database/v2/schema.pb.go b/api/proto/banyandb/database/v2/schema.pb.go
index 5f8168e..f2fed85 100644
--- a/api/proto/banyandb/database/v2/schema.pb.go
+++ b/api/proto/banyandb/database/v2/schema.pb.go
@@ -49,6 +49,7 @@ const (
TagType_TAG_TYPE_INT TagType = 2
TagType_TAG_TYPE_STRING_ARRAY TagType = 3
TagType_TAG_TYPE_INT_ARRAY TagType = 4
+ TagType_TAG_TYPE_DATA_BINARY TagType = 5
)
// Enum value maps for TagType.
@@ -59,6 +60,7 @@ var (
2: "TAG_TYPE_INT",
3: "TAG_TYPE_STRING_ARRAY",
4: "TAG_TYPE_INT_ARRAY",
+ 5: "TAG_TYPE_DATA_BINARY",
}
TagType_value = map[string]int32{
"TAG_TYPE_UNSPECIFIED": 0,
@@ -66,6 +68,7 @@ var (
"TAG_TYPE_INT": 2,
"TAG_TYPE_STRING_ARRAY": 3,
"TAG_TYPE_INT_ARRAY": 4,
+ "TAG_TYPE_DATA_BINARY": 5,
}
)
@@ -912,22 +915,24 @@ var file_banyandb_database_v2_schema_proto_rawDesc = []byte{
0x64, 0x5f, 0x61, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f,
0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d,
0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41,
- 0x74, 0x2a, 0x7d, 0x0a, 0x07, 0x54, 0x61, 0x67, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14,
- 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49,
- 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59,
- 0x50, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x54,
- 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x49, 0x4e, 0x54, 0x10, 0x02, 0x12, 0x19, 0x0a,
- 0x15, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47,
- 0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10, 0x03, 0x12, 0x16, 0x0a, 0x12, 0x54, 0x41, 0x47, 0x5f,
- 0x54, 0x59, 0x50, 0x45, 0x5f, 0x49, 0x4e, 0x54, 0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10, 0x04,
- 0x42, 0x72, 0x0a, 0x2a, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73,
- 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e,
- 0x64, 0x62, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x76, 0x32, 0x5a, 0x44,
- 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68,
- 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e,
- 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f,
- 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73,
- 0x65, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x74, 0x2a, 0x97, 0x01, 0x0a, 0x07, 0x54, 0x61, 0x67, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a,
+ 0x14, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43,
+ 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x41, 0x47, 0x5f, 0x54,
+ 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c,
+ 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x49, 0x4e, 0x54, 0x10, 0x02, 0x12, 0x19,
+ 0x0a, 0x15, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e,
+ 0x47, 0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10, 0x03, 0x12, 0x16, 0x0a, 0x12, 0x54, 0x41, 0x47,
+ 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x49, 0x4e, 0x54, 0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10,
+ 0x04, 0x12, 0x18, 0x0a, 0x14, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x44, 0x41,
+ 0x54, 0x41, 0x5f, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x10, 0x05, 0x42, 0x72, 0x0a, 0x2a, 0x6f,
+ 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c,
+ 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64, 0x61,
+ 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x76, 0x32, 0x5a, 0x44, 0x67, 0x69, 0x74, 0x68, 0x75,
+ 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79,
+ 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
+ 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61,
+ 0x6e, 0x64, 0x62, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2f, 0x76, 0x32, 0x62,
+ 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
diff --git a/api/proto/banyandb/database/v2/schema.proto b/api/proto/banyandb/database/v2/schema.proto
index c3745ff..cb358aa 100644
--- a/api/proto/banyandb/database/v2/schema.proto
+++ b/api/proto/banyandb/database/v2/schema.proto
@@ -44,6 +44,7 @@ enum TagType {
TAG_TYPE_INT = 2;
TAG_TYPE_STRING_ARRAY = 3;
TAG_TYPE_INT_ARRAY = 4;
+ TAG_TYPE_DATA_BINARY = 5;
}
message TagFamily {
diff --git a/api/proto/banyandb/model/v2/write.pb.go b/api/proto/banyandb/model/v2/write.pb.go
index 0d36e62..1640f11 100644
--- a/api/proto/banyandb/model/v2/write.pb.go
+++ b/api/proto/banyandb/model/v2/write.pb.go
@@ -50,6 +50,7 @@ type Tag struct {
// *Tag_StrArray
// *Tag_Int
// *Tag_IntArray
+ // *Tag_BinaryData
ValueType isTag_ValueType `protobuf_oneof:"value_type"`
}
@@ -127,6 +128,13 @@ func (x *Tag) GetIntArray() *IntArray {
return nil
}
+func (x *Tag) GetBinaryData() []byte {
+ if x, ok := x.GetValueType().(*Tag_BinaryData); ok {
+ return x.BinaryData
+ }
+ return nil
+}
+
type isTag_ValueType interface {
isTag_ValueType()
}
@@ -151,6 +159,10 @@ type Tag_IntArray struct {
IntArray *IntArray `protobuf:"bytes,5,opt,name=int_array,json=intArray,proto3,oneof"`
}
+type Tag_BinaryData struct {
+ BinaryData []byte `protobuf:"bytes,6,opt,name=binary_data,json=binaryData,proto3,oneof"`
+}
+
func (*Tag_Null) isTag_ValueType() {}
func (*Tag_Str) isTag_ValueType() {}
@@ -161,6 +173,8 @@ func (*Tag_Int) isTag_ValueType() {}
func (*Tag_IntArray) isTag_ValueType() {}
+func (*Tag_BinaryData) isTag_ValueType() {}
+
var File_banyandb_model_v2_write_proto protoreflect.FileDescriptor
var file_banyandb_model_v2_write_proto_rawDesc = []byte{
@@ -171,7 +185,7 @@ var file_banyandb_model_v2_write_proto_rawDesc = []byte{
0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x1a, 0x1e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c,
0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
- 0x22, 0x95, 0x02, 0x0a, 0x03, 0x54, 0x61, 0x67, 0x12, 0x30, 0x0a, 0x04, 0x6e, 0x75, 0x6c, 0x6c,
+ 0x22, 0xb8, 0x02, 0x0a, 0x03, 0x54, 0x61, 0x67, 0x12, 0x30, 0x0a, 0x04, 0x6e, 0x75, 0x6c, 0x6c,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x4e, 0x75, 0x6c, 0x6c, 0x56, 0x61, 0x6c,
0x75, 0x65, 0x48, 0x00, 0x52, 0x04, 0x6e, 0x75, 0x6c, 0x6c, 0x12, 0x2a, 0x0a, 0x03, 0x73, 0x74,
@@ -187,15 +201,18 @@ var file_banyandb_model_v2_write_proto_rawDesc = []byte{
0x0a, 0x09, 0x69, 0x6e, 0x74, 0x5f, 0x61, 0x72, 0x72, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x1b, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64,
0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x49, 0x6e, 0x74, 0x41, 0x72, 0x72, 0x61, 0x79, 0x48, 0x00,
- 0x52, 0x08, 0x69, 0x6e, 0x74, 0x41, 0x72, 0x72, 0x61, 0x79, 0x42, 0x0c, 0x0a, 0x0a, 0x76, 0x61,
- 0x6c, 0x75, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x42, 0x6c, 0x0a, 0x27, 0x6f, 0x72, 0x67, 0x2e,
- 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e,
- 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c,
- 0x2e, 0x76, 0x32, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
- 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e,
- 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70,
- 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x6d, 0x6f,
- 0x64, 0x65, 0x6c, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x52, 0x08, 0x69, 0x6e, 0x74, 0x41, 0x72, 0x72, 0x61, 0x79, 0x12, 0x21, 0x0a, 0x0b, 0x62, 0x69,
+ 0x6e, 0x61, 0x72, 0x79, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x48,
+ 0x00, 0x52, 0x0a, 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x44, 0x61, 0x74, 0x61, 0x42, 0x0c, 0x0a,
+ 0x0a, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x42, 0x6c, 0x0a, 0x27, 0x6f,
+ 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c,
+ 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f,
+ 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63,
+ 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c,
+ 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70,
+ 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
+ 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
+ 0x33,
}
var (
@@ -258,6 +275,7 @@ func file_banyandb_model_v2_write_proto_init() {
(*Tag_StrArray)(nil),
(*Tag_Int)(nil),
(*Tag_IntArray)(nil),
+ (*Tag_BinaryData)(nil),
}
type x struct{}
out := protoimpl.TypeBuilder{
diff --git a/api/proto/banyandb/model/v2/write.proto b/api/proto/banyandb/model/v2/write.proto
index 836423a..4636bd3 100644
--- a/api/proto/banyandb/model/v2/write.proto
+++ b/api/proto/banyandb/model/v2/write.proto
@@ -32,5 +32,6 @@ message Tag {
StrArray str_array = 3;
Int int = 4;
IntArray int_array = 5;
+ bytes binary_data = 6;
}
}
diff --git a/api/proto/banyandb/stream/v2/write.pb.go b/api/proto/banyandb/stream/v2/write.pb.go
index 664078b..ecc7655 100644
--- a/api/proto/banyandb/stream/v2/write.pb.go
+++ b/api/proto/banyandb/stream/v2/write.pb.go
@@ -31,8 +31,8 @@ import (
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
- v21 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
- v2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ v2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+ v21 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
)
const (
@@ -53,13 +53,8 @@ type ElementValue struct {
// 1) either the start time of a Span/Segment,
// 2) or the timestamp of a log
Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
- // binary representation of segments, including tags, spans...
- DataBinary []byte `protobuf:"bytes,3,opt,name=data_binary,json=dataBinary,proto3" json:"data_binary,omitempty"`
- // support all of indexed tags in the tags.
- // Tag only has value, as the value of value_type match with the key
- // by the index rules and index rule bindings of Metadata group.
- // indexed tags of multiple entities are compression in the tags.
- Tags []*v2.Tag `protobuf:"bytes,4,rep,name=tags,proto3" json:"tags,omitempty"`
+ // the order of tag_families' items match the stream schema
+ TagFamilies []*ElementValue_TagFamily `protobuf:"bytes,3,rep,name=tag_families,json=tagFamilies,proto3" json:"tag_families,omitempty"`
}
func (x *ElementValue) Reset() {
@@ -108,16 +103,9 @@ func (x *ElementValue) GetTimestamp() *timestamppb.Timestamp {
return nil
}
-func (x *ElementValue) GetDataBinary() []byte {
+func (x *ElementValue) GetTagFamilies() []*ElementValue_TagFamily {
if x != nil {
- return x.DataBinary
- }
- return nil
-}
-
-func (x *ElementValue) GetTags() []*v2.Tag {
- if x != nil {
- return x.Tags
+ return x.TagFamilies
}
return nil
}
@@ -128,7 +116,7 @@ type WriteRequest struct {
unknownFields protoimpl.UnknownFields
// the metadata is only required in the first write.
- Metadata *v21.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
+ Metadata *v2.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
// the element is required.
Element *ElementValue `protobuf:"bytes,2,opt,name=element,proto3" json:"element,omitempty"`
}
@@ -165,7 +153,7 @@ func (*WriteRequest) Descriptor() ([]byte, []int) {
return file_banyandb_stream_v2_write_proto_rawDescGZIP(), []int{1}
}
-func (x *WriteRequest) GetMetadata() *v21.Metadata {
+func (x *WriteRequest) GetMetadata() *v2.Metadata {
if x != nil {
return x.Metadata
}
@@ -217,6 +205,53 @@ func (*WriteResponse) Descriptor() ([]byte, []int) {
return file_banyandb_stream_v2_write_proto_rawDescGZIP(), []int{2}
}
+type ElementValue_TagFamily struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Tags []*v21.Tag `protobuf:"bytes,1,rep,name=tags,proto3" json:"tags,omitempty"`
+}
+
+func (x *ElementValue_TagFamily) Reset() {
+ *x = ElementValue_TagFamily{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_banyandb_stream_v2_write_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *ElementValue_TagFamily) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ElementValue_TagFamily) ProtoMessage() {}
+
+func (x *ElementValue_TagFamily) ProtoReflect() protoreflect.Message {
+ mi := &file_banyandb_stream_v2_write_proto_msgTypes[3]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use ElementValue_TagFamily.ProtoReflect.Descriptor instead.
+func (*ElementValue_TagFamily) Descriptor() ([]byte, []int) {
+ return file_banyandb_stream_v2_write_proto_rawDescGZIP(), []int{0, 0}
+}
+
+func (x *ElementValue_TagFamily) GetTags() []*v21.Tag {
+ if x != nil {
+ return x.Tags
+ }
+ return nil
+}
+
var File_banyandb_stream_v2_write_proto protoreflect.FileDescriptor
var file_banyandb_stream_v2_write_proto_rawDesc = []byte{
@@ -229,35 +264,39 @@ var file_banyandb_stream_v2_write_proto_rawDesc = []byte{
0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76, 0x32, 0x2f, 0x77, 0x72, 0x69, 0x74, 0x65, 0x2e,
- 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xb4, 0x01, 0x0a, 0x0c, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e,
+ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xef, 0x01, 0x0a, 0x0c, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e,
0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x6e,
0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x65, 0x6c, 0x65, 0x6d,
0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12,
- 0x1f, 0x0a, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x18, 0x03,
- 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x42, 0x69, 0x6e, 0x61, 0x72, 0x79,
- 0x12, 0x2a, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16,
- 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e,
- 0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x84, 0x01, 0x0a,
- 0x0c, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a,
- 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32,
- 0x1c, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f,
- 0x6e, 0x2e, 0x76, 0x32, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d,
- 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x3a, 0x0a, 0x07, 0x65, 0x6c, 0x65, 0x6d, 0x65,
- 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61,
- 0x6e, 0x64, 0x62, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6c,
- 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x07, 0x65, 0x6c, 0x65, 0x6d,
- 0x65, 0x6e, 0x74, 0x22, 0x0f, 0x0a, 0x0d, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70,
- 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x6e, 0x0a, 0x28, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63,
- 0x68, 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61,
- 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32,
- 0x5a, 0x42, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61,
- 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62,
- 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74,
- 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61,
- 0x6d, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x4d, 0x0a, 0x0c, 0x74, 0x61, 0x67, 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65, 0x73, 0x18,
+ 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
+ 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6c, 0x65, 0x6d, 0x65,
+ 0x6e, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x2e, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c,
+ 0x79, 0x52, 0x0b, 0x74, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65, 0x73, 0x1a, 0x37,
+ 0x0a, 0x09, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x12, 0x2a, 0x0a, 0x04, 0x74,
+ 0x61, 0x67, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x62, 0x61, 0x6e, 0x79,
+ 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x54, 0x61,
+ 0x67, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x84, 0x01, 0x0a, 0x0c, 0x57, 0x72, 0x69, 0x74,
+ 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61,
+ 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x62, 0x61, 0x6e,
+ 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x32, 0x2e,
+ 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61,
+ 0x74, 0x61, 0x12, 0x3a, 0x0a, 0x07, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20,
+ 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x73,
+ 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74,
+ 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x07, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x0f,
+ 0x0a, 0x0d, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42,
+ 0x6e, 0x0a, 0x28, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b,
+ 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64,
+ 0x62, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x32, 0x5a, 0x42, 0x67, 0x69, 0x74,
+ 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73,
+ 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e,
+ 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e,
+ 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x76, 0x32, 0x62,
+ 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -272,25 +311,27 @@ func file_banyandb_stream_v2_write_proto_rawDescGZIP() []byte {
return file_banyandb_stream_v2_write_proto_rawDescData
}
-var file_banyandb_stream_v2_write_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
+var file_banyandb_stream_v2_write_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_banyandb_stream_v2_write_proto_goTypes = []interface{}{
- (*ElementValue)(nil), // 0: banyandb.stream.v2.ElementValue
- (*WriteRequest)(nil), // 1: banyandb.stream.v2.WriteRequest
- (*WriteResponse)(nil), // 2: banyandb.stream.v2.WriteResponse
- (*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp
- (*v2.Tag)(nil), // 4: banyandb.model.v2.Tag
- (*v21.Metadata)(nil), // 5: banyandb.common.v2.Metadata
+ (*ElementValue)(nil), // 0: banyandb.stream.v2.ElementValue
+ (*WriteRequest)(nil), // 1: banyandb.stream.v2.WriteRequest
+ (*WriteResponse)(nil), // 2: banyandb.stream.v2.WriteResponse
+ (*ElementValue_TagFamily)(nil), // 3: banyandb.stream.v2.ElementValue.TagFamily
+ (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp
+ (*v2.Metadata)(nil), // 5: banyandb.common.v2.Metadata
+ (*v21.Tag)(nil), // 6: banyandb.model.v2.Tag
}
var file_banyandb_stream_v2_write_proto_depIdxs = []int32{
- 3, // 0: banyandb.stream.v2.ElementValue.timestamp:type_name -> google.protobuf.Timestamp
- 4, // 1: banyandb.stream.v2.ElementValue.tags:type_name -> banyandb.model.v2.Tag
+ 4, // 0: banyandb.stream.v2.ElementValue.timestamp:type_name -> google.protobuf.Timestamp
+ 3, // 1: banyandb.stream.v2.ElementValue.tag_families:type_name -> banyandb.stream.v2.ElementValue.TagFamily
5, // 2: banyandb.stream.v2.WriteRequest.metadata:type_name -> banyandb.common.v2.Metadata
0, // 3: banyandb.stream.v2.WriteRequest.element:type_name -> banyandb.stream.v2.ElementValue
- 4, // [4:4] is the sub-list for method output_type
- 4, // [4:4] is the sub-list for method input_type
- 4, // [4:4] is the sub-list for extension type_name
- 4, // [4:4] is the sub-list for extension extendee
- 0, // [0:4] is the sub-list for field type_name
+ 6, // 4: banyandb.stream.v2.ElementValue.TagFamily.tags:type_name -> banyandb.model.v2.Tag
+ 5, // [5:5] is the sub-list for method output_type
+ 5, // [5:5] is the sub-list for method input_type
+ 5, // [5:5] is the sub-list for extension type_name
+ 5, // [5:5] is the sub-list for extension extendee
+ 0, // [0:5] is the sub-list for field type_name
}
func init() { file_banyandb_stream_v2_write_proto_init() }
@@ -335,6 +376,18 @@ func file_banyandb_stream_v2_write_proto_init() {
return nil
}
}
+ file_banyandb_stream_v2_write_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*ElementValue_TagFamily); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
}
type x struct{}
out := protoimpl.TypeBuilder{
@@ -342,7 +395,7 @@ func file_banyandb_stream_v2_write_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_banyandb_stream_v2_write_proto_rawDesc,
NumEnums: 0,
- NumMessages: 3,
+ NumMessages: 4,
NumExtensions: 0,
NumServices: 0,
},
diff --git a/api/proto/banyandb/stream/v2/write.proto b/api/proto/banyandb/stream/v2/write.proto
index 59b0eef..39ef649 100644
--- a/api/proto/banyandb/stream/v2/write.proto
+++ b/api/proto/banyandb/stream/v2/write.proto
@@ -26,6 +26,8 @@ import "google/protobuf/timestamp.proto";
import "banyandb/common/v2/common.proto";
import "banyandb/model/v2/write.proto";
+
+
message ElementValue {
// element_id could be span_id of a Span or segment_id of a Segment in the context of stream
string element_id = 1;
@@ -33,13 +35,11 @@ message ElementValue {
// 1) either the start time of a Span/Segment,
// 2) or the timestamp of a log
google.protobuf.Timestamp timestamp = 2;
- // binary representation of segments, including tags, spans...
- bytes data_binary = 3;
- // support all of indexed tags in the tags.
- // Tag only has value, as the value of value_type match with the key
- // by the index rules and index rule bindings of Metadata group.
- // indexed tags of multiple entities are compression in the tags.
- repeated model.v2.Tag tags = 4;
+ message TagFamily {
+ repeated model.v2.Tag tags = 1;
+ }
+ // the order of tag_families' items match the stream schema
+ repeated TagFamily tag_families = 3;
}
message WriteRequest {
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 57e1fd9..7bf3fc8 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -35,8 +35,13 @@ type IndexFilter interface {
IndexRules(ctx context.Context, subject *commonv2.Metadata) ([]*databasev2.IndexRule, error)
}
-type Metadata interface {
+type Repo interface {
IndexFilter
+ Stream() schema.Stream
+}
+
+type Service interface {
+ Repo
run.Unit
}
@@ -46,11 +51,7 @@ type service struct {
indexRuleBinding schema.IndexRuleBinding
}
-func (s *service) Name() string {
- return "metadata"
-}
-
-func NewService(_ context.Context) (Metadata, error) {
+func NewService(_ context.Context) (Repo, error) {
stream, err := schema.NewStream()
if err != nil {
return nil, err
@@ -70,6 +71,14 @@ func NewService(_ context.Context) (Metadata, error) {
}, nil
}
+func (s *service) Stream() schema.Stream {
+ return s.stream
+}
+
+func (s *service) Name() string {
+ return "metadata"
+}
+
func (s *service) IndexRules(ctx context.Context, subject *commonv2.Metadata) ([]*databasev2.IndexRule, error) {
bindings, err := s.indexRuleBinding.List(ctx, schema.ListOpt{Group: subject.Group})
if err != nil {
diff --git a/banyand/metadata/schema/data/stream.json b/banyand/metadata/schema/data/stream.json
index c34ec65..41b3b52 100644
--- a/banyand/metadata/schema/data/stream.json
+++ b/banyand/metadata/schema/data/stream.json
@@ -9,7 +9,7 @@
"tags": [
{
"name": "data_binary",
- "type": "TAG_TYPE_UNSPECIFIED"
+ "type": "TAG_TYPE_DATA_BINARY"
}
]
},
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
new file mode 100644
index 0000000..a187f29
--- /dev/null
+++ b/banyand/stream/service.go
@@ -0,0 +1,102 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+ "context"
+
+ "github.com/pkg/errors"
+
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+var ErrEmptyRootPath = errors.New("root path is empty")
+
+type Service interface {
+ run.PreRunner
+ run.Config
+ run.Service
+}
+
+var _ Service = (*service)(nil)
+
+type service struct {
+ schemaMap map[string]*stream
+ writeListener *writeCallback
+ l *logger.Logger
+ metadata metadata.Repo
+ root string
+}
+
+func (s *service) FlagSet() *run.FlagSet {
+ flagS := run.NewFlagSet("storage")
+ flagS.StringVar(&s.root, "root-path", "/tmp", "the root path of database")
+ return flagS
+}
+
+func (s *service) Validate() error {
+ if s.root == "" {
+ return ErrEmptyRootPath
+ }
+ return nil
+}
+
+func (s *service) Name() string {
+ return "stream"
+}
+
+func (s *service) PreRun() error {
+ schemas, err := s.metadata.Stream().List(context.Background(), schema.ListOpt{})
+ if err != nil {
+ return err
+ }
+
+ s.schemaMap = make(map[string]*stream, len(schemas))
+ s.l = logger.GetLogger(s.Name())
+ for _, sa := range schemas {
+ sm, errTS := openStream(s.root, sa, s.l)
+ if errTS != nil {
+ return errTS
+ }
+ id := formatStreamID(sm.name, sm.group)
+ s.schemaMap[id] = sm
+ s.writeListener.schemaMap[id] = sm
+ s.l.Info().Str("id", id).Msg("initialize stream")
+ }
+ s.writeListener = setUpWriteCallback(s.l, s.schemaMap)
+ return err
+}
+
+func (s *service) Serve() error {
+ panic("implement me")
+}
+
+func (s *service) GracefulStop() {
+ panic("implement me")
+}
+
+//NewService returns a new service
+func NewService(_ context.Context, metadata metadata.Repo, pipeline queue.Queue) (Service, error) {
+ return &service{
+ metadata: metadata,
+ }, nil
+}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
new file mode 100644
index 0000000..88da951
--- /dev/null
+++ b/banyand/stream/stream.go
@@ -0,0 +1,104 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+ "context"
+
+ databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+type stream struct {
+ name string
+ group string
+ l *logger.Logger
+ schema *databasev2.Stream
+ db tsdb.Database
+ entityIndex []struct {
+ family int
+ tag int
+ }
+}
+
+func (s *stream) Close() error {
+ return s.db.Close()
+}
+
+func (s *stream) parseSchema() {
+ sm := s.schema
+ meta := sm.GetMetadata()
+ s.name, s.group = meta.GetName(), meta.GetGroup()
+ for _, tagInEntity := range sm.Entity.GetTagNames() {
+ nextEntityTag:
+ for fi, family := range sm.GetTagFamilies() {
+ for ti, tag := range family.Tags {
+ if tagInEntity == tag.GetName() {
+ s.entityIndex = append(s.entityIndex, struct {
+ family int
+ tag int
+ }{family: fi, tag: ti})
+ break nextEntityTag
+ }
+ }
+ }
+ }
+}
+
+func openStream(root string, schema *databasev2.Stream, l *logger.Logger) (*stream, error) {
+ sm := &stream{
+ schema: schema,
+ l: l,
+ }
+ sm.parseSchema()
+ db, err := tsdb.OpenDatabase(
+ context.WithValue(context.Background(), logger.ContextKey, l),
+ tsdb.DatabaseOpts{
+ Location: root,
+ ShardNum: uint(schema.GetShardNum()),
+ })
+ if err != nil {
+ return nil, err
+ }
+ sm.db = db
+ return sm, nil
+}
+
+func formatStreamID(name, group string) string {
+ return name + ":" + group
+}
+
+func tagValueTypeConv(tag *modelv2.Tag) (tagType databasev2.TagType, isNull bool) {
+ switch tag.GetValueType().(type) {
+ case *modelv2.Tag_Int:
+ return databasev2.TagType_TAG_TYPE_INT, false
+ case *modelv2.Tag_Str:
+ return databasev2.TagType_TAG_TYPE_STRING, false
+ case *modelv2.Tag_IntArray:
+ return databasev2.TagType_TAG_TYPE_INT_ARRAY, false
+ case *modelv2.Tag_StrArray:
+ return databasev2.TagType_TAG_TYPE_STRING_ARRAY, false
+ case *modelv2.Tag_BinaryData:
+ return databasev2.TagType_TAG_TYPE_DATA_BINARY, false
+ case *modelv2.Tag_Null:
+ return databasev2.TagType_TAG_TYPE_UNSPECIFIED, true
+ }
+ return databasev2.TagType_TAG_TYPE_UNSPECIFIED, false
+}
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
new file mode 100644
index 0000000..c16e75b
--- /dev/null
+++ b/banyand/stream/stream_write.go
@@ -0,0 +1,152 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+ "github.com/pkg/errors"
+ "google.golang.org/protobuf/proto"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+ ErrMalformedElement = errors.New("element is malformed")
+ ErrUnsupportedTagTypeAsEntry = errors.New("the tag type can not be as an entry in an entity")
+)
+
+func (s *stream) Write(shardID uint, value *streamv2.ElementValue) error {
+ sm := s.schema
+ fLen := len(value.GetTagFamilies())
+ if fLen < 1 {
+ return errors.Wrap(ErrMalformedElement, "no tag family")
+ }
+ if fLen > len(sm.TagFamilies) {
+ return errors.Wrap(ErrMalformedElement, "tag family number is more than expected")
+ }
+ shard, err := s.db.Shard(shardID)
+ if err != nil {
+ return err
+ }
+ entity, err := s.buildEntity(value)
+ if err != nil {
+ return err
+ }
+ series, err := shard.Series().Get(entity)
+ if err != nil {
+ return err
+ }
+ t := value.GetTimestamp().AsTime()
+ wp, err := series.Span(tsdb.TimeRange{
+ Start: t,
+ Duration: 0,
+ })
+ if err != nil {
+ return err
+ }
+ defer func() {
+ _ = wp.Close()
+ }()
+ builder := wp.WriterBuilder().Time(t)
+ for fi, family := range value.GetTagFamilies() {
+ familySpec := sm.GetTagFamilies()[fi]
+ if len(family.GetTags()) > len(familySpec.GetTags()) {
+ return errors.Wrap(ErrMalformedElement, "tag number is more than expected")
+ }
+ for ti, tag := range family.GetTags() {
+ tagSpec := familySpec.GetTags()[ti]
+ tType, isNull := tagValueTypeConv(tag)
+ if isNull {
+ continue
+ }
+ if tType != tagSpec.GetType() {
+ return errors.Wrapf(ErrMalformedElement, "tag %s type is unexpected", tagSpec.GetName())
+ }
+ }
+ bb, errMarshal := proto.Marshal(family)
+ if errMarshal != nil {
+ return errMarshal
+ }
+ builder.Family(sm.GetTagFamilies()[fi].GetName(), bb)
+ }
+ writer, err := builder.Build()
+ if err != nil {
+ return err
+ }
+ _, err = writer.Write()
+ return err
+}
+
+func (s *stream) buildEntity(value *streamv2.ElementValue) (entity tsdb.Entity, err error) {
+ for _, index := range s.entityIndex {
+ family := value.GetTagFamilies()[index.family]
+ if index.tag >= len(family.GetTags()) {
+ return nil, errors.Wrap(ErrMalformedElement, "the tag which composite the entity doesn't exist ")
+ }
+ entry, err := tagConvEntry(family.GetTags()[index.tag])
+ if err != nil {
+ return nil, err
+ }
+ entity = append(entity, entry)
+ }
+ return entity, nil
+}
+
+func tagConvEntry(tag *modelv2.Tag) (tsdb.Entry, error) {
+ switch tag.GetValueType().(type) {
+ case *modelv2.Tag_Str:
+ return tsdb.Entry(tag.GetStr().GetValue()), nil
+ case *modelv2.Tag_Int:
+ return convert.Int64ToBytes(tag.GetInt().GetValue()), nil
+ default:
+ return nil, ErrUnsupportedTagTypeAsEntry
+ }
+}
+
+type writeCallback struct {
+ l *logger.Logger
+ schemaMap map[string]*stream
+}
+
+func setUpWriteCallback(l *logger.Logger, schemaMap map[string]*stream) *writeCallback {
+ wcb := &writeCallback{
+ l: l,
+ schemaMap: schemaMap,
+ }
+ return wcb
+}
+
+func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
+ writeEvent, ok := message.Data().(data.StreamWriteData)
+ if !ok {
+ w.l.Warn().Msg("invalid event data type")
+ return
+ }
+ sm := writeEvent.WriteRequest.GetMetadata()
+ id := formatStreamID(sm.GetName(), sm.GetGroup())
+ err := w.schemaMap[id].Write(writeEvent.ShardID, writeEvent.WriteRequest.GetElement())
+ if err != nil {
+ w.l.Debug().Err(err)
+ }
+ return
+}
diff --git a/banyand/stream/stream_write_test.go b/banyand/stream/stream_write_test.go
new file mode 100644
index 0000000..dac9281
--- /dev/null
+++ b/banyand/stream/stream_write_test.go
@@ -0,0 +1,282 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+ "context"
+ "encoding/base64"
+ "math"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func Test_Stream_Write(t *testing.T) {
+ tester := assert.New(t)
+ s, deferFunc := setup(tester)
+ defer deferFunc()
+
+ type args struct {
+ shardID uint
+ ele *streamv2.ElementValue
+ }
+ tests := []struct {
+ name string
+ args args
+ wantErr bool
+ }{
+ {
+ name: "golden path",
+ args: args{
+ shardID: 0,
+ ele: getEle(
+ "trace_id-xxfff.111323",
+ 0,
+ "webapp_id",
+ "10.0.0.1_id",
+ "/home_id",
+ 300,
+ 1622933202000000000,
+ ),
+ },
+ },
+ {
+ name: "minimal",
+ args: args{
+ shardID: 1,
+ ele: getEle(
+ nil,
+ 1,
+ "webapp_id",
+ "10.0.0.1_id",
+ ),
+ },
+ },
+ {
+ name: "http",
+ args: args{
+ shardID: 0,
+ ele: getEle(
+ "trace_id-xxfff.111323",
+ 0,
+ "webapp_id",
+ "10.0.0.1_id",
+ "/home_id",
+ 300,
+ 1622933202000000000,
+ "GET",
+ "200",
+ ),
+ },
+ },
+ {
+ name: "database",
+ args: args{
+ shardID: 0,
+ ele: getEle(
+ "trace_id-xxfff.111323",
+ 0,
+ "webapp_id",
+ "10.0.0.1_id",
+ "/home_id",
+ 300,
+ 1622933202000000000,
+ nil,
+ nil,
+ "MySQL",
+ "10.1.1.2",
+ ),
+ },
+ },
+ {
+ name: "mq",
+ args: args{
+ shardID: 0,
+ ele: getEle(
+ "trace_id-xxfff.111323",
+ 1,
+ "webapp_id",
+ "10.0.0.1_id",
+ "/home_id",
+ 300,
+ 1622933202000000000,
+ nil,
+ nil,
+ nil,
+ nil,
+ "test_topic",
+ "10.0.0.1",
+ "broker",
+ ),
+ },
+ },
+ {
+ name: "invalid trace id",
+ args: args{
+ shardID: 1,
+ ele: getEle(
+ 1212323,
+ 1,
+ "webapp_id",
+ "10.0.0.1_id",
+ ),
+ },
+ wantErr: true,
+ },
+ {
+ name: "empty input",
+ args: args{},
+ wantErr: true,
+ },
+ {
+ name: "invalid shard id",
+ args: args{
+ shardID: math.MaxUint64,
+ ele: getEle(
+ "trace_id-xxfff.111323",
+ 0,
+ "webapp_id",
+ "10.0.0.1_id",
+ "/home_id",
+ 300,
+ 1622933202000000000,
+ ),
+ },
+ wantErr: true,
+ },
+ {
+ name: "unknown tags",
+ args: args{
+ shardID: 0,
+ ele: getEle(
+ "trace_id-xxfff.111323",
+ 1,
+ "webapp_id",
+ "10.0.0.1_id",
+ "/home_id",
+ 300,
+ 1622933202000000000,
+ nil,
+ nil,
+ nil,
+ nil,
+ "test_topic",
+ "10.0.0.1",
+ "broker",
+ "unknown",
+ ),
+ },
+ wantErr: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ err := s.Write(tt.args.shardID, tt.args.ele)
+ if tt.wantErr {
+ tester.Error(err)
+ return
+ }
+ tester.NoError(err)
+ })
+ }
+
+}
+
+func setup(t *assert.Assertions) (*stream, func()) {
+ t.NoError(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "info",
+ }))
+ tempDir, deferFunc := test.Space(t)
+ streamRepo, err := schema.NewStream()
+ t.NoError(err)
+ streamSpec, err := streamRepo.Get(context.TODO(), &commonv2.Metadata{
+ Name: "sw",
+ Group: "default",
+ })
+ t.NoError(err)
+ s, err := openStream(tempDir, streamSpec, logger.GetLogger("test"))
+ t.NoError(err)
+ return s, func() {
+ _ = s.Close()
+ deferFunc()
+ }
+}
+
+func getEle(tags ...interface{}) *streamv2.ElementValue {
+ searchableTags := make([]*modelv2.Tag, 0)
+ for _, tag := range tags {
+ searchableTags = append(searchableTags, getTag(tag))
+ }
+ bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
+ e := &streamv2.ElementValue{
+ ElementId: "1231.dfd.123123ssf",
+ Timestamp: timestamppb.Now(),
+ TagFamilies: []*streamv2.ElementValue_TagFamily{
+ {
+ Tags: []*modelv2.Tag{
+ {
+ ValueType: &modelv2.Tag_BinaryData{
+ BinaryData: bb,
+ },
+ },
+ },
+ },
+ {
+ Tags: searchableTags,
+ },
+ },
+ }
+ return e
+}
+
+func getTag(tag interface{}) *modelv2.Tag {
+ if tag == nil {
+ return &modelv2.Tag{
+ ValueType: &modelv2.Tag_Null{},
+ }
+ }
+ switch t := tag.(type) {
+ case int:
+ return &modelv2.Tag{
+ ValueType: &modelv2.Tag_Int{
+ Int: &modelv2.Int{
+ Value: int64(t),
+ },
+ },
+ }
+ case string:
+ return &modelv2.Tag{
+ ValueType: &modelv2.Tag_Str{
+ Str: &modelv2.Str{
+ Value: t,
+ },
+ },
+ }
+ }
+ return nil
+}
diff --git a/banyand/stream/testdata/shard0.json b/banyand/stream/testdata/shard0.json
new file mode 100644
index 0000000..28de2f3
--- /dev/null
+++ b/banyand/stream/testdata/shard0.json
@@ -0,0 +1,18 @@
+[
+ {
+ "element_id": "1",
+ "timestamp": "2021-04-15T01:30:15.01Z",
+ "tag_families": [
+ {
+ "tags": [
+ {"binary_data": "YWJjMTIzIT8kKiYoKSctPUB+"}
+ ]
+ },
+ {
+ "tags": [
+ {"str": ""}
+ ]
+ }
+ ]
+ }
+]
\ No newline at end of file
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index dcab966..6505582 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -20,6 +20,9 @@ package tsdb
import (
"context"
"io"
+ "time"
+
+ "github.com/dgraph-io/ristretto/z"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -28,10 +31,14 @@ import (
type block struct {
path string
l *logger.Logger
+ ref *z.Closer
store kv.TimeSeriesStore
treeIndex kv.Store
closableLst []io.Closer
+ endTime time.Time
+ startTime time.Time
+
//revertedIndex kv.Store
}
@@ -43,7 +50,9 @@ type blockOpts struct {
func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
b = &block{
- path: opts.path,
+ path: opts.path,
+ ref: z.NewCloser(1),
+ startTime: time.Now(),
}
parentLogger := ctx.Value(logger.ContextKey)
if parentLogger != nil {
@@ -62,8 +71,54 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
return b, nil
}
+func (b *block) delegate() blockDelegate {
+ b.incRef()
+ return &bDelegate{
+ delegate: b,
+ }
+}
+
+func (b *block) dscRef() {
+ b.ref.Done()
+}
+
+func (b *block) incRef() {
+ b.ref.AddRunning(1)
+}
+
func (b *block) close() {
+ b.dscRef()
+ b.ref.SignalAndWait()
for _, closer := range b.closableLst {
_ = closer.Close()
}
}
+
+type blockDelegate interface {
+ io.Closer
+ contains(ts time.Time) bool
+ write(key []byte, val []byte, ts time.Time) error
+}
+
+var _ blockDelegate = (*bDelegate)(nil)
+
+type bDelegate struct {
+ delegate *block
+}
+
+func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
+ return d.delegate.store.Put(key, val, uint64(ts.UnixNano()))
+}
+
+func (d *bDelegate) contains(ts time.Time) bool {
+ greaterAndEqualStart := d.delegate.startTime.Equal(ts) || d.delegate.startTime.Before(ts)
+ if d.delegate.endTime.IsZero() {
+ return greaterAndEqualStart
+ }
+ return greaterAndEqualStart && d.delegate.endTime.After(ts)
+}
+
+func (d *bDelegate) Close() error {
+ d.delegate.dscRef()
+ return nil
+}
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 810aae0..ee8ce57 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -18,12 +18,20 @@
package tsdb
import (
+ "bytes"
+ "io"
"time"
+ "github.com/pkg/errors"
+ "go.uber.org/multierr"
+
"github.com/apache/skywalking-banyandb/api/common"
modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
)
+var ErrEmptySeriesSpan = errors.New("there is no data in such time range")
+
type Iterator interface {
Next() bool
Val() Item
@@ -43,6 +51,10 @@ type ConditionValue struct {
type Condition map[string][]ConditionValue
type ItemID struct {
+ //shardID int
+ //segID []byte
+ //blockID []byte
+ id []byte
}
type TimeRange struct {
@@ -57,21 +69,23 @@ type Series interface {
}
type SeriesSpan interface {
+ io.Closer
WriterBuilder() WriterBuilder
Iterator() Iterator
SeekerBuilder() SeekerBuilder
}
type WriterBuilder interface {
- Family(name string) WriterBuilder
+ Family(name string, val []byte) WriterBuilder
Time(ts time.Time) WriterBuilder
Val(val []byte) WriterBuilder
- OrderBy(order modelv2.QueryOrder_Sort) WriterBuilder
- Build() Writer
+ Build() (Writer, error)
}
type Writer interface {
- Write() ItemID
+ Write() (ItemID, error)
+ WriteLSMIndex(name string, val []byte) error
+ WriteInvertedIndex(name string, val []byte) error
}
type SeekerBuilder interface {
@@ -88,12 +102,14 @@ type Seeker interface {
var _ Series = (*series)(nil)
type series struct {
- id common.SeriesID
+ id common.SeriesID
+ blockDB blockDatabase
}
-func newSeries(id common.SeriesID) *series {
+func newSeries(id common.SeriesID, blockDB blockDatabase) *series {
return &series{
- id: id,
+ id: id,
+ blockDB: blockDB,
}
}
@@ -102,9 +118,145 @@ func (s *series) ID() common.SeriesID {
}
func (s *series) Span(timeRange TimeRange) (SeriesSpan, error) {
- panic("implement me")
+ blocks := s.blockDB.span(timeRange)
+ if len(blocks) < 1 {
+ return nil, ErrEmptySeriesSpan
+ }
+ return newSeriesSpan(blocks, s.id), nil
}
func (s *series) Get(id ItemID) (Item, error) {
+ panic("not implemented")
+}
+
+var _ SeriesSpan = (*seriesSpan)(nil)
+
+type seriesSpan struct {
+ blocks []blockDelegate
+ seriesID common.SeriesID
+}
+
+func (s *seriesSpan) Close() (err error) {
+ for _, delegate := range s.blocks {
+ err = multierr.Append(err, delegate.Close())
+ }
+ return err
+}
+
+func (s *seriesSpan) WriterBuilder() WriterBuilder {
+ return newWriterBuilder(s)
+}
+
+func (s *seriesSpan) Iterator() Iterator {
panic("implement me")
}
+
+func (s *seriesSpan) SeekerBuilder() SeekerBuilder {
+ panic("implement me")
+}
+
+func newSeriesSpan(blocks []blockDelegate, id common.SeriesID) *seriesSpan {
+ return &seriesSpan{
+ blocks: blocks,
+ seriesID: id,
+ }
+}
+
+var _ WriterBuilder = (*writerBuilder)(nil)
+
+type writerBuilder struct {
+ series *seriesSpan
+ block blockDelegate
+ values []struct {
+ family []byte
+ val []byte
+ }
+ ts time.Time
+ seriesIDBytes []byte
+}
+
+func (w *writerBuilder) Family(name string, val []byte) WriterBuilder {
+ w.values = append(w.values, struct {
+ family []byte
+ val []byte
+ }{family: bytes.Join([][]byte{w.seriesIDBytes, hash([]byte(name))}, nil), val: val})
+ return w
+}
+
+func (w *writerBuilder) Time(ts time.Time) WriterBuilder {
+ w.ts = ts
+ for _, b := range w.series.blocks {
+ if b.contains(ts) {
+ w.block = b
+ break
+ }
+ }
+ return w
+}
+
+func (w *writerBuilder) Val(val []byte) WriterBuilder {
+ w.values = append(w.values, struct {
+ family []byte
+ val []byte
+ }{val: val})
+ return w
+}
+
+var ErrNoTime = errors.New("no time specified")
+var ErrNoVal = errors.New("no value specified")
+
+func (w *writerBuilder) Build() (Writer, error) {
+ if w.block == nil {
+ return nil, ErrNoTime
+ }
+ if len(w.values) < 1 {
+ return nil, ErrNoVal
+ }
+ wt := &writer{
+ block: w.block,
+ ts: w.ts,
+ seriesID: w.seriesIDBytes,
+ itemID: bytes.Join([][]byte{w.seriesIDBytes, convert.Int64ToBytes(w.ts.UnixNano())}, nil),
+ columns: w.values,
+ }
+ return wt, nil
+}
+
+func newWriterBuilder(seriesSpan *seriesSpan) WriterBuilder {
+ return &writerBuilder{
+ series: seriesSpan,
+ seriesIDBytes: convert.Uint64ToBytes(uint64(seriesSpan.seriesID)),
+ }
+}
+
+var _ Writer = (*writer)(nil)
+
+type writer struct {
+ block blockDelegate
+ ts time.Time
+ seriesID []byte
+ columns []struct {
+ family []byte
+ val []byte
+ }
+ itemID []byte
+}
+
+func (w *writer) WriteLSMIndex(name string, val []byte) error {
+ panic("implement me")
+}
+
+func (w *writer) WriteInvertedIndex(name string, val []byte) error {
+ panic("implement me")
+}
+
+func (w *writer) Write() (id ItemID, err error) {
+ for _, c := range w.columns {
+ err = w.block.write(c.family, c.val, w.ts)
+ if err != nil {
+ return id, err
+ }
+ }
+ id.id = w.itemID
+ return id, nil
+}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index b38c30f..aa9ec22 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -80,11 +80,16 @@ func NewPath(entries []Entry) Path {
type SeriesDatabase interface {
io.Closer
- Create(entity Entity) error
+ Get(entity Entity) (Series, error)
List(path Path) (SeriesList, error)
}
+type blockDatabase interface {
+ span(timeRange TimeRange) []blockDelegate
+}
+
var _ SeriesDatabase = (*seriesDB)(nil)
+var _ blockDatabase = (*seriesDB)(nil)
type seriesDB struct {
sync.Mutex
@@ -94,59 +99,23 @@ type seriesDB struct {
seriesMetadata kv.Store
}
-func (s *seriesDB) Close() error {
- for _, seg := range s.lst {
- seg.close()
- }
- return s.seriesMetadata.Close()
-}
-
-func newSeriesDataBase(ctx context.Context, path string) (SeriesDatabase, error) {
- sdb := &seriesDB{}
- parentLogger := ctx.Value(logger.ContextKey)
- if parentLogger == nil {
- return nil, logger.ErrNoLoggerInContext
- }
- if pl, ok := parentLogger.(*logger.Logger); ok {
- sdb.l = pl.Named("series")
- }
- var err error
- sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", kv.StoreWithNamedLogger("metadata", sdb.l))
- if err != nil {
- return nil, err
- }
- segPath, err := mkdir(segTemplate, path, time.Now().Format(segFormat))
- if err != nil {
- return nil, err
- }
- seg, err := newSegment(ctx, segPath)
- if err != nil {
- return nil, err
- }
- {
- sdb.Lock()
- defer sdb.Unlock()
- sdb.lst = append(sdb.lst, seg)
- }
- return sdb, nil
-}
-
-func (s *seriesDB) Create(entity Entity) error {
+func (s *seriesDB) Get(entity Entity) (Series, error) {
key := hashEntity(entity)
- _, err := s.seriesMetadata.Get(key)
+ seriesID, err := s.seriesMetadata.Get(key)
if err != nil && err != kv.ErrKeyNotFound {
- return err
+ return nil, err
}
if err == nil {
- return nil
+ return newSeries(bytesConvSeriesID(seriesID), s), nil
}
s.Lock()
defer s.Unlock()
- err = s.seriesMetadata.Put(key, hash(key))
+ seriesID = hash(key)
+ err = s.seriesMetadata.Put(key, seriesID)
if err != nil {
- return err
+ return nil, err
}
- return nil
+ return newSeries(bytesConvSeriesID(seriesID), s), nil
}
func (s *seriesDB) List(path Path) (SeriesList, error) {
@@ -156,7 +125,7 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
return nil, err
}
if err == nil {
- return []Series{newSeries(common.SeriesID(convert.BytesToUint64(id)))}, nil
+ return []Series{newSeries(bytesConvSeriesID(id), s)}, nil
}
return nil, nil
}
@@ -173,7 +142,7 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
err = multierr.Append(err, errGetVal)
return nil
}
- result = append(result, newSeries(common.SeriesID(convert.BytesToUint64(id))))
+ result = append(result, newSeries(common.SeriesID(convert.BytesToUint64(id)), s))
}
return nil
})
@@ -183,6 +152,52 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
return result, err
}
+func (s *seriesDB) span(_ TimeRange) []blockDelegate {
+ //TODO: return correct blocks
+ result := make([]blockDelegate, 0, len(s.lst[0].lst))
+ for _, b := range s.lst[0].lst {
+ result = append(result, b.delegate())
+ }
+ return result
+}
+
+func (s *seriesDB) Close() error {
+ for _, seg := range s.lst {
+ seg.close()
+ }
+ return s.seriesMetadata.Close()
+}
+
+func newSeriesDataBase(ctx context.Context, path string) (SeriesDatabase, error) {
+ sdb := &seriesDB{}
+ parentLogger := ctx.Value(logger.ContextKey)
+ if parentLogger == nil {
+ return nil, logger.ErrNoLoggerInContext
+ }
+ if pl, ok := parentLogger.(*logger.Logger); ok {
+ sdb.l = pl.Named("series")
+ }
+ var err error
+ sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", kv.StoreWithNamedLogger("metadata", sdb.l))
+ if err != nil {
+ return nil, err
+ }
+ segPath, err := mkdir(segTemplate, path, time.Now().Format(segFormat))
+ if err != nil {
+ return nil, err
+ }
+ seg, err := newSegment(ctx, segPath)
+ if err != nil {
+ return nil, err
+ }
+ {
+ sdb.Lock()
+ defer sdb.Unlock()
+ sdb.lst = append(sdb.lst, seg)
+ }
+ return sdb, nil
+}
+
func hashEntity(entity Entity) []byte {
result := make(Entry, 0, len(entity)*8)
for _, entry := range entity {
@@ -195,6 +210,10 @@ func hash(entry []byte) []byte {
return convert.Uint64ToBytes(convert.Hash(entry))
}
+func bytesConvSeriesID(data []byte) common.SeriesID {
+ return common.SeriesID(convert.BytesToUint64(data))
+}
+
type SeriesList []Series
func (a SeriesList) Len() int {
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index 098648a..83b3008 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -141,7 +141,7 @@ func TestNewPath(t *testing.T) {
}
}
-func Test_SeriesDatabase_Create(t *testing.T) {
+func Test_SeriesDatabase_Get(t *testing.T) {
tests := []struct {
name string
@@ -152,7 +152,7 @@ func Test_SeriesDatabase_Create(t *testing.T) {
entities: []Entity{{
Entry("productpage"),
Entry("10.0.0.1"),
- Entry(convert.Uint64ToBytes(0)),
+ convert.Uint64ToBytes(0),
}},
},
{
@@ -161,12 +161,12 @@ func Test_SeriesDatabase_Create(t *testing.T) {
{
Entry("productpage"),
Entry("10.0.0.1"),
- Entry(convert.Uint64ToBytes(0)),
+ convert.Uint64ToBytes(0),
},
{
Entry("productpage"),
Entry("10.0.0.1"),
- Entry(convert.Uint64ToBytes(0)),
+ convert.Uint64ToBytes(0),
},
},
},
@@ -183,7 +183,9 @@ func Test_SeriesDatabase_Create(t *testing.T) {
s, err := newSeriesDataBase(context.WithValue(context.Background(), logger.ContextKey, logger.GetLogger("test")), dir)
tester.NoError(err)
for _, entity := range tt.entities {
- tester.NoError(s.Create(entity))
+ series, err := s.Get(entity)
+ tester.NoError(err)
+ tester.Equal(hashEntity(entity), series.ID())
}
})
}
@@ -214,7 +216,7 @@ func Test_SeriesDatabase_List(t *testing.T) {
convert.Uint64ToBytes(0),
}),
want: SeriesList{
- newSeries(data[0].id),
+ newMockSeries(data[0].id),
},
},
{
@@ -225,8 +227,8 @@ func Test_SeriesDatabase_List(t *testing.T) {
AnyEntry,
}),
want: SeriesList{
- newSeries(data[1].id),
- newSeries(data[2].id),
+ newMockSeries(data[1].id),
+ newMockSeries(data[2].id),
},
},
{
@@ -237,10 +239,10 @@ func Test_SeriesDatabase_List(t *testing.T) {
AnyEntry,
}),
want: SeriesList{
- newSeries(data[0].id),
- newSeries(data[1].id),
- newSeries(data[2].id),
- newSeries(data[3].id),
+ newMockSeries(data[0].id),
+ newMockSeries(data[1].id),
+ newMockSeries(data[2].id),
+ newMockSeries(data[3].id),
},
},
{
@@ -251,9 +253,9 @@ func Test_SeriesDatabase_List(t *testing.T) {
convert.Uint64ToBytes(0),
}),
want: SeriesList{
- newSeries(data[0].id),
- newSeries(data[1].id),
- newSeries(data[3].id),
+ newMockSeries(data[0].id),
+ newMockSeries(data[1].id),
+ newMockSeries(data[3].id),
},
},
{
@@ -264,8 +266,8 @@ func Test_SeriesDatabase_List(t *testing.T) {
convert.Uint64ToBytes(1),
}),
want: SeriesList{
- newSeries(data[2].id),
- newSeries(data[4].id),
+ newMockSeries(data[2].id),
+ newMockSeries(data[4].id),
},
},
}
@@ -278,9 +280,7 @@ func Test_SeriesDatabase_List(t *testing.T) {
return
}
tester.NoError(err)
- sort.Sort(tt.want)
- sort.Sort(series)
- tester.Equal(tt.want, series)
+ tester.Equal(transform(tt.want), transform(series))
})
}
}
@@ -330,7 +330,21 @@ func setUpEntities(t *assert.Assertions, db SeriesDatabase) []*entityWithID {
}
for _, d := range data {
d.id = common.SeriesID(convert.BytesToUint64(hash(hashEntity(d.entity))))
- t.NoError(db.Create(d.entity))
+ series, err := db.Get(d.entity)
+ t.NoError(err)
+ t.Equal(hashEntity(d.entity), series.ID())
}
return data
}
+
+func newMockSeries(id common.SeriesID) *series {
+ return newSeries(id, nil)
+}
+
+func transform(list SeriesList) (seriesIDs []common.SeriesID) {
+ sort.Sort(list)
+ for _, s := range list {
+ seriesIDs = append(seriesIDs, s.ID())
+ }
+ return seriesIDs
+}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 050444e..94cb90a 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -31,7 +31,7 @@ type shard struct {
}
func (s *shard) Series() SeriesDatabase {
- panic("implement me")
+ return s.seriesDatabase
}
func (s *shard) Index() IndexDatabase {
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 503b0e8..3d20a88 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -44,9 +44,12 @@ const (
dirPerm = 0700
)
+var ErrInvalidShardID = errors.New("invalid shard id")
+
type Database interface {
io.Closer
Shards() []Shard
+ Shard(id uint) (Shard, error)
}
type Shard interface {
@@ -71,6 +74,24 @@ type database struct {
sync.Mutex
}
+func (d *database) Shards() []Shard {
+ return d.sLst
+}
+
+func (d *database) Shard(id uint) (Shard, error) {
+ if id >= uint(len(d.sLst)) {
+ return nil, ErrInvalidShardID
+ }
+ return d.sLst[id], nil
+}
+
+func (d *database) Close() error {
+ for _, s := range d.sLst {
+ _ = s.Close()
+ }
+ return nil
+}
+
func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
db := &database{
location: opts.Location,
@@ -98,17 +119,6 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
return createDatabase(thisContext, db)
}
-func (d *database) Close() error {
- for _, s := range d.sLst {
- _ = s.Close()
- }
- return nil
-}
-
-func (d *database) Shards() []Shard {
- return d.sLst
-}
-
func createDatabase(ctx context.Context, db *database) (Database, error) {
var err error
db.Lock()
diff --git a/go.mod b/go.mod
index 65674a5..55e7aa9 100644
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@ require (
github.com/RoaringBitmap/roaring v0.9.1
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/badger/v3 v3.2011.1
+ github.com/dgraph-io/ristretto v0.1.0
github.com/golang/mock v1.5.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.6