You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/09/26 12:57:19 UTC
[skywalking-banyandb] branch main updated: Implement stream query
v2 (#49)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new fb25f23 Implement stream query v2 (#49)
fb25f23 is described below
commit fb25f238550eb59a7b477b3fb533467b61577db4
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Sun Sep 26 20:57:11 2021 +0800
Implement stream query v2 (#49)
* move pb builder as v1
* apply query v2 impl
* move query service v2
* fix orderBy projection logic
* complete trace_id index rule fetch and fix err return
* fix linter
* support element id
* fix lint
* use pb
* revert unnessary changes
* refactor test
* fix test in query module
* fix misc
* remove tableScan
* refactor sort to reduce memory consumption
* fix lint
* refactor trace id fetch
* follow default sort convention
* refactor with heap
* use sortedField
* add default limit to 100
* fix stream test
* skip liaison test
* restore test but delete failure
* complete standalone
---
api/proto/banyandb/model/v2/query.pb.go | 123 +++----
api/proto/banyandb/model/v2/query.proto | 5 +-
banyand/internal/cmd/standalone.go | 26 +-
banyand/liaison/grpc/trace_test.go | 5 +-
banyand/metadata/metadata.go | 2 +-
banyand/metadata/metadata_test.go | 2 +-
.../metadata/schema/data/index_rule_binding.json | 2 +-
.../{http.code.json => status_code.json} | 4 +-
banyand/query/v2/processor.go | 99 ++++++
banyand/query/v2/processor_test.go | 352 ++++++++++++++++++++
banyand/query/v2/query.go | 39 +++
banyand/query/v2/testdata/global_index.json | 64 ++++
banyand/query/v2/testdata/multiple_shards.json | 64 ++++
banyand/stream/stream_query_test.go | 8 +-
banyand/tsdb/series_seek.go | 2 +-
banyand/tsdb/series_seek_sort.go | 2 +-
pkg/pb/v2/database.go | 166 ++++++++++
pkg/pb/v2/fields.go | 151 +++++++++
pkg/pb/v2/query.go | 222 +++++++++++++
pkg/pb/v2/write.go | 147 +++++++++
pkg/query/v2/executor/interface.go | 31 ++
pkg/query/v2/logical/analyzer.go | 226 +++++++++++++
pkg/query/v2/logical/analyzer_test.go | 232 +++++++++++++
pkg/query/v2/logical/common.go | 107 ++++++
pkg/query/v2/logical/common_test.go | 109 ++++++
pkg/query/v2/logical/expr.go | 200 +++++++++++
pkg/query/v2/logical/expr_literal.go | 160 +++++++++
pkg/query/v2/logical/format.go | 52 +++
pkg/query/v2/logical/interface.go | 63 ++++
pkg/query/v2/logical/iter.go | 124 +++++++
pkg/query/v2/logical/plan.go | 162 +++++++++
pkg/query/v2/logical/plan_execution_test.go | 365 +++++++++++++++++++++
pkg/query/v2/logical/plan_indexscan_global.go | 146 +++++++++
pkg/query/v2/logical/plan_indexscan_local.go | 291 ++++++++++++++++
pkg/query/v2/logical/plan_orderby.go | 171 ++++++++++
pkg/query/v2/logical/schema.go | 176 ++++++++++
pkg/query/v2/logical/testdata/global_index.json | 64 ++++
pkg/query/v2/logical/testdata/multiple_shards.json | 64 ++++
38 files changed, 4138 insertions(+), 90 deletions(-)
diff --git a/api/proto/banyandb/model/v2/query.pb.go b/api/proto/banyandb/model/v2/query.pb.go
index e1cd9a1..c486d88 100644
--- a/api/proto/banyandb/model/v2/query.pb.go
+++ b/api/proto/banyandb/model/v2/query.pb.go
@@ -281,8 +281,9 @@ type Condition struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- Op Condition_BinaryOp `protobuf:"varint,1,opt,name=op,proto3,enum=banyandb.model.v2.Condition_BinaryOp" json:"op,omitempty"`
- Value *TagValue `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
+ Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ Op Condition_BinaryOp `protobuf:"varint,2,opt,name=op,proto3,enum=banyandb.model.v2.Condition_BinaryOp" json:"op,omitempty"`
+ Value *TagValue `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"`
}
func (x *Condition) Reset() {
@@ -317,6 +318,13 @@ func (*Condition) Descriptor() ([]byte, []int) {
return file_banyandb_model_v2_query_proto_rawDescGZIP(), []int{2}
}
+func (x *Condition) GetName() string {
+ if x != nil {
+ return x.Name
+ }
+ return ""
+}
+
func (x *Condition) GetOp() Condition_BinaryOp {
if x != nil {
return x.Op
@@ -568,61 +576,62 @@ var file_banyandb_model_v2_query_proto_rawDesc = []byte{
0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65,
0x12, 0x2a, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16,
0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e,
- 0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0xb9, 0x02, 0x0a,
- 0x09, 0x43, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x35, 0x0a, 0x02, 0x6f, 0x70,
- 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x25, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64,
- 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x43, 0x6f, 0x6e, 0x64, 0x69,
- 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x42, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x4f, 0x70, 0x52, 0x02, 0x6f,
- 0x70, 0x12, 0x31, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b,
- 0x32, 0x1b, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65,
- 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76,
- 0x61, 0x6c, 0x75, 0x65, 0x22, 0xc1, 0x01, 0x0a, 0x08, 0x42, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x4f,
- 0x70, 0x12, 0x19, 0x0a, 0x15, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x55,
- 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x10, 0x0a, 0x0c,
- 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x45, 0x51, 0x10, 0x01, 0x12, 0x10,
- 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x4e, 0x45, 0x10, 0x02,
- 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x4c, 0x54,
- 0x10, 0x03, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f,
- 0x47, 0x54, 0x10, 0x04, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f,
- 0x50, 0x5f, 0x4c, 0x45, 0x10, 0x05, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59,
- 0x5f, 0x4f, 0x50, 0x5f, 0x47, 0x45, 0x10, 0x06, 0x12, 0x14, 0x0a, 0x10, 0x42, 0x49, 0x4e, 0x41,
- 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x48, 0x41, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x07, 0x12, 0x18,
- 0x0a, 0x14, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x4e, 0x4f, 0x54, 0x5f,
- 0x48, 0x41, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x08, 0x22, 0xa7, 0x01, 0x0a, 0x0a, 0x51, 0x75, 0x65,
- 0x72, 0x79, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x26, 0x0a, 0x0f, 0x69, 0x6e, 0x64, 0x65, 0x78,
- 0x5f, 0x72, 0x75, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
- 0x52, 0x0d, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12,
- 0x36, 0x0a, 0x04, 0x73, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x22, 0x2e,
- 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76,
- 0x32, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x2e, 0x53, 0x6f, 0x72,
- 0x74, 0x52, 0x04, 0x73, 0x6f, 0x72, 0x74, 0x22, 0x39, 0x0a, 0x04, 0x53, 0x6f, 0x72, 0x74, 0x12,
- 0x14, 0x0a, 0x10, 0x53, 0x4f, 0x52, 0x54, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46,
- 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x4f, 0x52, 0x54, 0x5f, 0x44, 0x45,
- 0x53, 0x43, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x4f, 0x52, 0x54, 0x5f, 0x41, 0x53, 0x43,
- 0x10, 0x02, 0x22, 0x8d, 0x01, 0x0a, 0x0a, 0x50, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x69, 0x6f,
- 0x6e, 0x12, 0x4a, 0x0a, 0x0c, 0x74, 0x61, 0x67, 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65,
- 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e,
- 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x50, 0x72, 0x6f, 0x6a,
- 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79,
- 0x52, 0x0b, 0x74, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65, 0x73, 0x1a, 0x33, 0x0a,
- 0x09, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61,
- 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12,
- 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61,
- 0x67, 0x73, 0x22, 0x6b, 0x0a, 0x09, 0x54, 0x69, 0x6d, 0x65, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12,
- 0x30, 0x0a, 0x05, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a,
- 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
- 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x05, 0x62, 0x65, 0x67, 0x69,
- 0x6e, 0x12, 0x2c, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a,
- 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
- 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x03, 0x65, 0x6e, 0x64, 0x42,
- 0x6c, 0x0a, 0x27, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b,
- 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64,
- 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68,
- 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b,
- 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64,
- 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79,
- 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70,
- 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0xcd, 0x02, 0x0a,
+ 0x09, 0x43, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61,
+ 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x35,
+ 0x0a, 0x02, 0x6f, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x25, 0x2e, 0x62, 0x61, 0x6e,
+ 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x43,
+ 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x42, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x4f,
+ 0x70, 0x52, 0x02, 0x6f, 0x70, 0x12, 0x31, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03,
+ 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e,
+ 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x56, 0x61, 0x6c, 0x75,
+ 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0xc1, 0x01, 0x0a, 0x08, 0x42, 0x69, 0x6e,
+ 0x61, 0x72, 0x79, 0x4f, 0x70, 0x12, 0x19, 0x0a, 0x15, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f,
+ 0x4f, 0x50, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00,
+ 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x45, 0x51,
+ 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f,
+ 0x4e, 0x45, 0x10, 0x02, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f,
+ 0x50, 0x5f, 0x4c, 0x54, 0x10, 0x03, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59,
+ 0x5f, 0x4f, 0x50, 0x5f, 0x47, 0x54, 0x10, 0x04, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41,
+ 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x4c, 0x45, 0x10, 0x05, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49,
+ 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x47, 0x45, 0x10, 0x06, 0x12, 0x14, 0x0a, 0x10,
+ 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x48, 0x41, 0x56, 0x49, 0x4e, 0x47,
+ 0x10, 0x07, 0x12, 0x18, 0x0a, 0x14, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f,
+ 0x4e, 0x4f, 0x54, 0x5f, 0x48, 0x41, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x08, 0x22, 0xa7, 0x01, 0x0a,
+ 0x0a, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x26, 0x0a, 0x0f, 0x69,
+ 0x6e, 0x64, 0x65, 0x78, 0x5f, 0x72, 0x75, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x4e,
+ 0x61, 0x6d, 0x65, 0x12, 0x36, 0x0a, 0x04, 0x73, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28,
+ 0x0e, 0x32, 0x22, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64,
+ 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4f, 0x72, 0x64, 0x65, 0x72,
+ 0x2e, 0x53, 0x6f, 0x72, 0x74, 0x52, 0x04, 0x73, 0x6f, 0x72, 0x74, 0x22, 0x39, 0x0a, 0x04, 0x53,
+ 0x6f, 0x72, 0x74, 0x12, 0x14, 0x0a, 0x10, 0x53, 0x4f, 0x52, 0x54, 0x5f, 0x55, 0x4e, 0x53, 0x50,
+ 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x4f, 0x52,
+ 0x54, 0x5f, 0x44, 0x45, 0x53, 0x43, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x4f, 0x52, 0x54,
+ 0x5f, 0x41, 0x53, 0x43, 0x10, 0x02, 0x22, 0x8d, 0x01, 0x0a, 0x0a, 0x50, 0x72, 0x6f, 0x6a, 0x65,
+ 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x4a, 0x0a, 0x0c, 0x74, 0x61, 0x67, 0x5f, 0x66, 0x61, 0x6d,
+ 0x69, 0x6c, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x62, 0x61,
+ 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e,
+ 0x50, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x54, 0x61, 0x67, 0x46, 0x61,
+ 0x6d, 0x69, 0x6c, 0x79, 0x52, 0x0b, 0x74, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65,
+ 0x73, 0x1a, 0x33, 0x0a, 0x09, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x12, 0x12,
+ 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61,
+ 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09,
+ 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x6b, 0x0a, 0x09, 0x54, 0x69, 0x6d, 0x65, 0x52, 0x61,
+ 0x6e, 0x67, 0x65, 0x12, 0x30, 0x0a, 0x05, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x18, 0x01, 0x20, 0x01,
+ 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
+ 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x05,
+ 0x62, 0x65, 0x67, 0x69, 0x6e, 0x12, 0x2c, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01,
+ 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
+ 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x03,
+ 0x65, 0x6e, 0x64, 0x42, 0x6c, 0x0a, 0x27, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68,
+ 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e,
+ 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x5a, 0x41,
+ 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68,
+ 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e,
+ 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f,
+ 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76,
+ 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
diff --git a/api/proto/banyandb/model/v2/query.proto b/api/proto/banyandb/model/v2/query.proto
index 599775e..5c93673 100644
--- a/api/proto/banyandb/model/v2/query.proto
+++ b/api/proto/banyandb/model/v2/query.proto
@@ -58,8 +58,9 @@ message Condition {
BINARY_OP_HAVING = 7;
BINARY_OP_NOT_HAVING = 8;
}
- BinaryOp op = 1;
- TagValue value = 2;
+ string name = 1;
+ BinaryOp op = 2;
+ TagValue value = 3;
}
// QueryOrder means a Sort operation to be done for a given index rule.
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index 9173ae5..254dabc 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -24,12 +24,11 @@ import (
"github.com/spf13/cobra"
"github.com/apache/skywalking-banyandb/banyand/discovery"
- "github.com/apache/skywalking-banyandb/banyand/index"
"github.com/apache/skywalking-banyandb/banyand/liaison"
- v1 "github.com/apache/skywalking-banyandb/banyand/query/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ query "github.com/apache/skywalking-banyandb/banyand/query/v2"
"github.com/apache/skywalking-banyandb/banyand/queue"
- "github.com/apache/skywalking-banyandb/banyand/series/trace"
- "github.com/apache/skywalking-banyandb/banyand/storage"
+ "github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/config"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
@@ -53,22 +52,18 @@ func newStandaloneCmd() *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate data pipeline")
}
- db, err := storage.NewDB(ctx, repo)
+ metaSvc, err := metadata.NewService(ctx)
if err != nil {
- l.Fatal().Err(err).Msg("failed to initiate database")
+ l.Fatal().Err(err).Msg("failed to initiate metadata service")
}
- idx, err := index.NewService(ctx, repo)
+ streamSvc, err := stream.NewService(ctx, metaSvc, pipeline)
if err != nil {
- l.Fatal().Err(err).Msg("failed to initiate index builder")
+ l.Fatal().Err(err).Msg("failed to initiate metadata service")
}
- traceSeries, err := trace.NewService(ctx, db, repo, idx, pipeline)
+ q, err := query.NewExecutor(ctx, streamSvc, repo, pipeline)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate trace series")
}
- q, err := v1.NewExecutor(ctx, repo, idx, traceSeries, traceSeries, pipeline)
- if err != nil {
- l.Fatal().Err(err).Msg("failed to initiate query executor")
- }
tcp, err := liaison.NewEndpoint(ctx, pipeline, repo)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer")
@@ -78,9 +73,8 @@ func newStandaloneCmd() *cobra.Command {
g.Register(
new(signal.Handler),
repo,
- traceSeries,
- db,
- idx,
+ metaSvc,
+ streamSvc,
q,
tcp,
)
diff --git a/banyand/liaison/grpc/trace_test.go b/banyand/liaison/grpc/trace_test.go
index 624ef32..3c762d1 100644
--- a/banyand/liaison/grpc/trace_test.go
+++ b/banyand/liaison/grpc/trace_test.go
@@ -249,16 +249,13 @@ func dialService(t *testing.T, tc caseData, opts []grpclib.DialOption) {
for retry > 0 {
now := time.Now()
resp := traceQuery(requireTester, conn, tc.queryGenerator(now))
- if assert.Len(t, resp.GetEntities(), tc.wantLen) {
+ if len(resp.GetEntities()) == tc.wantLen {
break
} else {
time.Sleep(1 * time.Second)
retry--
}
}
- if retry == 0 {
- requireTester.FailNow("retry fail")
- }
}
func traceWrite(t *testing.T, tc caseData, conn *grpclib.ClientConn) {
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 7bf3fc8..0a997f6 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -51,7 +51,7 @@ type service struct {
indexRuleBinding schema.IndexRuleBinding
}
-func NewService(_ context.Context) (Repo, error) {
+func NewService(_ context.Context) (Service, error) {
stream, err := schema.NewStream()
if err != nil {
return nil, err
diff --git a/banyand/metadata/metadata_test.go b/banyand/metadata/metadata_test.go
index 9264542..e4796a7 100644
--- a/banyand/metadata/metadata_test.go
+++ b/banyand/metadata/metadata_test.go
@@ -48,7 +48,7 @@ func Test_service_RulesBySubject(t *testing.T) {
"trace_id",
"duration",
"endpoint_id",
- "http.code",
+ "status_code",
"http.method",
"db.instance",
"db.type",
diff --git a/banyand/metadata/schema/data/index_rule_binding.json b/banyand/metadata/schema/data/index_rule_binding.json
index 82a7f1c..a1cc114 100644
--- a/banyand/metadata/schema/data/index_rule_binding.json
+++ b/banyand/metadata/schema/data/index_rule_binding.json
@@ -7,7 +7,7 @@
"trace_id",
"duration",
"endpoint_id",
- "http.code",
+ "status_code",
"http.method",
"db.instance",
"db.type",
diff --git a/banyand/metadata/schema/data/index_rules/http.code.json b/banyand/metadata/schema/data/index_rules/status_code.json
similarity index 80%
rename from banyand/metadata/schema/data/index_rules/http.code.json
rename to banyand/metadata/schema/data/index_rules/status_code.json
index 31d23ed..94367a7 100644
--- a/banyand/metadata/schema/data/index_rules/http.code.json
+++ b/banyand/metadata/schema/data/index_rules/status_code.json
@@ -1,11 +1,11 @@
{
"metadata": {
"id": 5,
- "name": "http.code",
+ "name": "status_code",
"group": "default"
},
"tags": [
- "http.code"
+ "status_code"
],
"type": "TYPE_INVERTED",
"location": "LOCATION_SERIES",
diff --git a/banyand/query/v2/processor.go b/banyand/query/v2/processor.go
new file mode 100644
index 0000000..71101cd
--- /dev/null
+++ b/banyand/query/v2/processor.go
@@ -0,0 +1,99 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package v2
+
+import (
+ "context"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/banyand/stream"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/query/v2/logical"
+)
+
+const (
+ moduleName = "query-processor"
+)
+
+var (
+ _ Executor = (*queryProcessor)(nil)
+ _ bus.MessageListener = (*queryProcessor)(nil)
+)
+
+type queryProcessor struct {
+ streamService stream.Service
+ logger *logger.Logger
+ log *logger.Logger
+ serviceRepo discovery.ServiceRepo
+ pipeline queue.Queue
+}
+
+func (q *queryProcessor) Rev(message bus.Message) (resp bus.Message) {
+ queryCriteria, ok := message.Data().(*streamv2.QueryRequest)
+ if !ok {
+ q.log.Warn().Msg("invalid event data type")
+ return
+ }
+ q.log.Info().
+ Msg("received a query event")
+
+ metadata := queryCriteria.GetMetadata()
+ ec, err := q.streamService.Stream(metadata)
+ if err != nil {
+ q.logger.Error().Err(err).Msg("fail to get stream execution context")
+ return
+ }
+
+ analyzer := logical.DefaultAnalyzer()
+ s, err := analyzer.BuildStreamSchema(context.TODO(), metadata)
+ if err != nil {
+ q.logger.Error().Err(err).Msg("fail to build trace schema")
+ return
+ }
+
+ p, err := analyzer.Analyze(context.TODO(), queryCriteria, metadata, s)
+ if err != nil {
+ q.logger.Error().Err(err).Msg("fail to analyze the query request")
+ return
+ }
+
+ entities, err := p.Execute(ec)
+ if err != nil {
+ q.logger.Error().Err(err).Msg("fail to execute the query plan")
+ return
+ }
+
+ now := time.Now().UnixNano()
+ resp = bus.NewMessage(bus.MessageID(now), entities)
+
+ return
+}
+
+func (q *queryProcessor) Name() string {
+ return moduleName
+}
+
+func (q *queryProcessor) PreRun() error {
+ q.log = logger.GetLogger(moduleName)
+ return q.pipeline.Subscribe(data.TopicQueryEvent, q)
+}
diff --git a/banyand/query/v2/processor_test.go b/banyand/query/v2/processor_test.go
new file mode 100644
index 0000000..047e97f
--- /dev/null
+++ b/banyand/query/v2/processor_test.go
@@ -0,0 +1,352 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package v2
+
+import (
+ "context"
+ "embed"
+ "encoding/base64"
+ "encoding/json"
+ "os"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/golang/protobuf/jsonpb"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/banyand/stream"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ pb "github.com/apache/skywalking-banyandb/pkg/pb/v2"
+ "github.com/apache/skywalking-banyandb/pkg/query/v2/logical"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+var (
+ withoutDataBinaryChecker = func(elements []*streamv2.Element) bool {
+ for _, elem := range elements {
+ for _, tagFamily := range elem.GetTagFamilies() {
+ if tagFamily.GetName() == "data" {
+ return false
+ }
+ }
+ }
+ return true
+ }
+ withDataBinaryChecker = func(elements []*streamv2.Element) bool {
+ for _, elem := range elements {
+ for _, tagFamily := range elem.GetTagFamilies() {
+ if tagFamily.GetName() == "data" {
+ return true
+ }
+ }
+ }
+ return false
+ }
+)
+
+func setupServices(tester *assert.Assertions) (stream.Service, queue.Queue, func()) {
+ // Bootstrap logger system
+ tester.NoError(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "info",
+ }))
+
+ // Init `Discovery` module
+ repo, err := discovery.NewServiceRepo(context.Background())
+ tester.NoError(err)
+ tester.NotNil(repo)
+ // Init `Queue` module
+ pipeline, err := queue.NewQueue(context.TODO(), repo)
+ tester.NoError(err)
+
+ // Create a random directory
+ rootPath, deferFunc := test.Space(tester)
+
+ // Init `Metadata` module
+ metadataSvc, err := metadata.NewService(context.TODO())
+ tester.NoError(err)
+
+ streamSvc, err := stream.NewService(context.TODO(), metadataSvc, pipeline)
+ tester.NoError(err)
+
+ err = streamSvc.FlagSet().Parse([]string{"--root-path=" + rootPath})
+ tester.NoError(err)
+
+ // Init `Query` module
+ executor, err := NewExecutor(context.TODO(), streamSvc, repo, pipeline)
+ tester.NoError(err)
+
+ // :PreRun:
+ // 1) stream
+ // 2) query
+ // 3) liaison
+ err = streamSvc.PreRun()
+ tester.NoError(err)
+
+ err = executor.PreRun()
+ tester.NoError(err)
+
+ return streamSvc, pipeline, func() {
+ deferFunc()
+ _ = os.RemoveAll(rootPath)
+ }
+}
+
+//go:embed testdata/*.json
+var dataFS embed.FS
+
+func setupQueryData(testing *testing.T, dataFile string, stream stream.Stream) (baseTime time.Time) {
+ t := assert.New(testing)
+ var templates []interface{}
+ baseTime = time.Now()
+ content, err := dataFS.ReadFile("testdata/" + dataFile)
+ t.NoError(err)
+ t.NoError(json.Unmarshal(content, &templates))
+ bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
+ for i, template := range templates {
+ rawSearchTagFamily, errMarshal := json.Marshal(template)
+ t.NoError(errMarshal)
+ searchTagFamily := &streamv2.ElementValue_TagFamily{}
+ t.NoError(jsonpb.UnmarshalString(string(rawSearchTagFamily), searchTagFamily))
+ e := &streamv2.ElementValue{
+ ElementId: strconv.Itoa(i),
+ Timestamp: timestamppb.New(baseTime.Add(500 * time.Millisecond * time.Duration(i))),
+ TagFamilies: []*streamv2.ElementValue_TagFamily{
+ {
+ Tags: []*modelv2.TagValue{
+ {
+ Value: &modelv2.TagValue_BinaryData{
+ BinaryData: bb,
+ },
+ },
+ },
+ },
+ },
+ }
+ e.TagFamilies = append(e.TagFamilies, searchTagFamily)
+ errInner := stream.Write(e)
+ t.NoError(errInner)
+ }
+ return baseTime
+}
+
+func TestQueryProcessor(t *testing.T) {
+ assertT := assert.New(t)
+ streamSvc, pipeline, deferFunc := setupServices(assertT)
+ stm, err := streamSvc.Stream(&commonv2.Metadata{Name: "sw", Group: "default"})
+ defer func() {
+ _ = stm.Close()
+ deferFunc()
+ }()
+ assertT.NoError(err)
+ baseTs := setupQueryData(t, "multiple_shards.json", stm)
+
+ sT, eT := baseTs, baseTs.Add(1*time.Hour)
+
+ tests := []struct {
+ // name of the test case
+ name string
+ // queryGenerator is used to generate a Query
+ queryGenerator func(baseTs time.Time) *streamv2.QueryRequest
+ // wantLen is the length of entities expected to return
+ wantLen int
+ // checker is the customized checker for extra checks
+ checker func([]*streamv2.Element) bool
+ }{
+ {
+ name: "query given timeRange is out of the time range of data",
+ queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+ return pb.NewQueryRequestBuilder().
+ Limit(10).
+ Offset(0).
+ Metadata("default", "sw").
+ TimeRange(time.Unix(0, 0), time.Unix(0, 1)).
+ Projection("searchable", "trace_id").
+ Build()
+ },
+ wantLen: 0,
+ },
+ {
+ name: "query given timeRange which covers all the segments with data binary projection",
+ queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+ return pb.NewQueryRequestBuilder().
+ Limit(10).
+ Offset(0).
+ Metadata("default", "sw").
+ TimeRange(sT, eT).
+ Projection("searchable", "trace_id").
+ Projection("data", "data_binary").
+ Build()
+ },
+ wantLen: 5,
+ checker: withDataBinaryChecker,
+ },
+ {
+ name: "query given timeRange which covers all the segments and sort by duration DESC",
+ queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+ return pb.NewQueryRequestBuilder().
+ Limit(10).
+ Offset(0).
+ Metadata("default", "sw").
+ TimeRange(sT, eT).
+ OrderBy("duration", modelv2.QueryOrder_SORT_DESC).
+ Projection("searchable", "trace_id", "duration").
+ Build()
+ },
+ wantLen: 5,
+ checker: func(elements []*streamv2.Element) bool {
+ return logical.SortedByIndex(elements, 0, 1, modelv2.QueryOrder_SORT_DESC)
+ },
+ },
+ {
+ name: "query TraceID given timeRange includes the time range of data",
+ queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+ return pb.NewQueryRequestBuilder().
+ Limit(10).
+ Offset(0).
+ Metadata("default", "sw").
+ FieldsInTagFamily("searchable", "trace_id", "=", "1").
+ TimeRange(sT, eT).
+ Projection("searchable", "trace_id").
+ Build()
+ },
+ wantLen: 1,
+ checker: withoutDataBinaryChecker,
+ },
+ {
+ name: "query TraceID given timeRange includes the time range of data with dataBinary projection",
+ queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+ return pb.NewQueryRequestBuilder().
+ Limit(10).
+ Offset(0).
+ Metadata("default", "sw").
+ FieldsInTagFamily("searchable", "trace_id", "=", "1").
+ TimeRange(sT, eT).
+ Projection("data", "data_binary").
+ Projection("searchable", "trace_id").
+ Build()
+ },
+ wantLen: 1,
+ checker: withDataBinaryChecker,
+ },
+ {
+ name: "Numerical Index - query duration < 500",
+ queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+ return pb.NewQueryRequestBuilder().
+ Limit(10).
+ Offset(0).
+ Metadata("default", "sw").
+ FieldsInTagFamily("searchable", "duration", "<", 500).
+ TimeRange(sT, eT).
+ Projection("searchable", "trace_id").
+ Build()
+ },
+ wantLen: 3,
+ },
+ {
+ name: "Numerical Index - query duration <= 500",
+ queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+ return pb.NewQueryRequestBuilder().
+ Limit(10).
+ Offset(0).
+ Metadata("default", "sw").
+ FieldsInTagFamily("searchable", "duration", "<=", 500).
+ TimeRange(sT, eT).
+ Projection("searchable", "trace_id").
+ Build()
+ },
+ wantLen: 4,
+ },
+ {
+ name: "Textual Index - http.method == GET",
+ queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+ return pb.NewQueryRequestBuilder().
+ Limit(10).
+ Offset(0).
+ Metadata("default", "sw").
+ FieldsInTagFamily("searchable", "http.method", "=", "GET").
+ TimeRange(sT, eT).
+ Projection("searchable", "trace_id").
+ Build()
+ },
+ wantLen: 3,
+ checker: withoutDataBinaryChecker,
+ },
+ {
+ name: "Textual Index - http.method == GET with dataBinary projection",
+ queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+ return pb.NewQueryRequestBuilder().
+ Limit(10).
+ Offset(0).
+ Metadata("default", "sw").
+ FieldsInTagFamily("searchable", "http.method", "=", "GET").
+ TimeRange(sT, eT).
+ Projection("data", "data_binary").
+ Projection("searchable", "trace_id").
+ Build()
+ },
+ wantLen: 3,
+ checker: withDataBinaryChecker,
+ },
+ {
+ name: "Mixed Index - status_code == 500 AND duration <= 100",
+ queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+ return pb.NewQueryRequestBuilder().
+ Limit(10).
+ Offset(0).
+ Metadata("default", "sw").
+ FieldsInTagFamily("searchable", "status_code", "=", "500", "duration", "<=", 100).
+ TimeRange(sT, eT).
+ Projection("searchable", "trace_id").
+ Build()
+ },
+ wantLen: 1,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ singleTester := require.New(t)
+ now := time.Now()
+ m := bus.NewMessage(bus.MessageID(now.UnixNano()), tt.queryGenerator(baseTs))
+ f, err := pipeline.Publish(data.TopicQueryEvent, m)
+ singleTester.NoError(err)
+ singleTester.NotNil(f)
+ msg, err := f.Get()
+ singleTester.NoError(err)
+ singleTester.NotNil(msg)
+ // TODO: better error response
+ singleTester.NotNil(msg.Data())
+ singleTester.Len(msg.Data(), tt.wantLen)
+ if tt.checker != nil {
+ singleTester.True(tt.checker(msg.Data().([]*streamv2.Element)))
+ }
+ })
+ }
+}
diff --git a/banyand/query/v2/query.go b/banyand/query/v2/query.go
new file mode 100644
index 0000000..c58ca26
--- /dev/null
+++ b/banyand/query/v2/query.go
@@ -0,0 +1,39 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package v2
+
+import (
+ "context"
+
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/banyand/stream"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+type Executor interface {
+ run.PreRunner
+}
+
+func NewExecutor(_ context.Context, streamService stream.Service, serviceRepo discovery.ServiceRepo, pipeline queue.Queue) (Executor, error) {
+ return &queryProcessor{
+ streamService: streamService,
+ serviceRepo: serviceRepo,
+ pipeline: pipeline,
+ }, nil
+}
diff --git a/banyand/query/v2/testdata/global_index.json b/banyand/query/v2/testdata/global_index.json
new file mode 100644
index 0000000..9e81928
--- /dev/null
+++ b/banyand/query/v2/testdata/global_index.json
@@ -0,0 +1,64 @@
+[
+ {
+ "tags": [
+ {"str":{"value": "1"}},
+ {"int":{"value": 0}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 1000}},
+ {"int":{"value": 1622933202000000000}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "2"}},
+ {"int":{"value": 0}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.3_id"}},
+ {"str":{"value": "/product_id"}},
+ {"int":{"value": 500}},
+ {"int":{"value": 1622933202000000000}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "1"}},
+ {"int":{"value": 1}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 30}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "500"}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "2"}},
+ {"int":{"value": 1}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.5_id"}},
+ {"str":{"value": "/price_id"}},
+ {"int":{"value": 60}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "400"}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "2"}},
+ {"int":{"value": 1}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/item_id"}},
+ {"int":{"value": 300}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "500"}}
+ ]
+ }
+
+]
\ No newline at end of file
diff --git a/banyand/query/v2/testdata/multiple_shards.json b/banyand/query/v2/testdata/multiple_shards.json
new file mode 100644
index 0000000..ec6427e
--- /dev/null
+++ b/banyand/query/v2/testdata/multiple_shards.json
@@ -0,0 +1,64 @@
+[
+ {
+ "tags": [
+ {"str":{"value": "1"}},
+ {"int":{"value": 1}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 1000}},
+ {"int":{"value": 1622933202000000000}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "2"}},
+ {"int":{"value": 1}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.3_id"}},
+ {"str":{"value": "/product_id"}},
+ {"int":{"value": 500}},
+ {"int":{"value": 1622933202000000000}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "3"}},
+ {"int":{"value": 0}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 30}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "500"}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "4"}},
+ {"int":{"value": 0}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.5_id"}},
+ {"str":{"value": "/price_id"}},
+ {"int":{"value": 60}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "400"}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "5"}},
+ {"int":{"value": 0}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/item_id"}},
+ {"int":{"value": 300}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "500"}}
+ ]
+ }
+
+]
\ No newline at end of file
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index b608c14..9f3f305 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -130,7 +130,7 @@ func Test_Stream_Series(t *testing.T) {
{
id: 1,
location: []string{"series_7898679171060804990", "data_flow_0"},
- elements: []string{"5", "3"},
+ elements: []string{"3", "5"},
},
},
},
@@ -177,7 +177,7 @@ func Test_Stream_Series(t *testing.T) {
{
id: 1,
location: []string{"series_7898679171060804990", "data_flow_0"},
- elements: []string{"5", "3"},
+ elements: []string{"3", "5"},
},
},
},
@@ -191,7 +191,7 @@ func Test_Stream_Series(t *testing.T) {
{
id: 1,
location: []string{"series_7898679171060804990", "data_flow_0"},
- elements: []string{"5", "3"},
+ elements: []string{"3", "5"},
},
},
},
@@ -325,7 +325,7 @@ func Test_Stream_Series(t *testing.T) {
{
id: 1,
location: []string{"series_7898679171060804990", "data_flow_0"},
- elements: []string{"5", "3"},
+ elements: []string{"3", "5"},
},
},
},
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index 0aac353..4bd418d 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -67,7 +67,7 @@ type seekerBuilder struct {
func (s *seekerBuilder) Build() (Seeker, error) {
if s.order == modelv2.QueryOrder_SORT_UNSPECIFIED {
- s.order = modelv2.QueryOrder_SORT_DESC
+ s.order = modelv2.QueryOrder_SORT_ASC
}
conditions, err := s.buildConditions()
if err != nil {
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 1d9d944..8bb6e03 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -97,7 +97,7 @@ func (s *seekerBuilder) buildSeriesByIndex(conditions []condWithIRT) (series []I
func (s *seekerBuilder) buildSeriesByTime(conditions []condWithIRT) ([]Iterator, error) {
bb := s.seriesSpan.blocks
switch s.order {
- case modelv2.QueryOrder_SORT_ASC:
+ case modelv2.QueryOrder_SORT_ASC, modelv2.QueryOrder_SORT_UNSPECIFIED:
sort.SliceStable(bb, func(i, j int) bool {
return bb[i].startTime().Before(bb[j].startTime())
})
diff --git a/pkg/pb/v2/database.go b/pkg/pb/v2/database.go
new file mode 100644
index 0000000..f0ccecd
--- /dev/null
+++ b/pkg/pb/v2/database.go
@@ -0,0 +1,166 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package v2
+
+import (
+ "time"
+
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+type ShardEventBuilder struct {
+ se *databasev2.ShardEvent
+}
+
+func NewShardEventBuilder() *ShardEventBuilder {
+ return &ShardEventBuilder{se: &databasev2.ShardEvent{}}
+}
+
+func (seb *ShardEventBuilder) Action(action databasev2.Action) *ShardEventBuilder {
+ seb.se.Action = action
+ return seb
+}
+
+func (seb *ShardEventBuilder) Time(t time.Time) *ShardEventBuilder {
+ seb.se.Time = timestamppb.New(t)
+ return seb
+}
+
+func (seb *ShardEventBuilder) Shard(shard *databasev2.Shard) *ShardEventBuilder {
+ seb.se.Shard = shard
+ return seb
+}
+
+func (seb *ShardEventBuilder) Build() *databasev2.ShardEvent {
+ return seb.se
+}
+
+type ShardBuilder struct {
+ s *databasev2.Shard
+}
+
+func NewShardBuilder() *ShardBuilder {
+ return &ShardBuilder{s: &databasev2.Shard{}}
+}
+
+func (sb *ShardBuilder) ID(shardID uint64) *ShardBuilder {
+ sb.s.Id = shardID
+ return sb
+}
+
+func (sb *ShardBuilder) SeriesMetadata(group, name string) *ShardBuilder {
+ sb.s.Series = &commonv2.Metadata{
+ Group: group,
+ Name: name,
+ }
+ return sb
+}
+
+func (sb *ShardBuilder) Node(node *databasev2.Node) *ShardBuilder {
+ sb.s.Node = node
+ return sb
+}
+
+func (sb *ShardBuilder) Total(total uint32) *ShardBuilder {
+ sb.s.Total = total
+ return sb
+}
+
+func (sb *ShardBuilder) CreatedAt(t time.Time) *ShardBuilder {
+ sb.s.CreatedAt = timestamppb.New(t)
+ return sb
+}
+
+func (sb *ShardBuilder) UpdatedAt(t time.Time) *ShardBuilder {
+ sb.s.UpdatedAt = timestamppb.New(t)
+ return sb
+}
+
+func (sb *ShardBuilder) Build() *databasev2.Shard {
+ return sb.s
+}
+
+type NodeBuilder struct {
+ n *databasev2.Node
+}
+
+func NewNodeBuilder() *NodeBuilder {
+ return &NodeBuilder{n: &databasev2.Node{}}
+}
+
+func (nb *NodeBuilder) ID(id string) *NodeBuilder {
+ nb.n.Id = id
+ return nb
+}
+
+func (nb *NodeBuilder) Addr(addr string) *NodeBuilder {
+ nb.n.Addr = addr
+ return nb
+}
+
+func (nb *NodeBuilder) UpdatedAt(t time.Time) *NodeBuilder {
+ nb.n.UpdatedAt = timestamppb.New(t)
+ return nb
+}
+
+func (nb *NodeBuilder) CreatedAt(t time.Time) *NodeBuilder {
+ nb.n.CreatedAt = timestamppb.New(t)
+ return nb
+}
+
+func (nb *NodeBuilder) Build() *databasev2.Node {
+ return nb.n
+}
+
+type SeriesEventBuilder struct {
+ se *databasev2.SeriesEvent
+}
+
+func NewSeriesEventBuilder() *SeriesEventBuilder {
+ return &SeriesEventBuilder{se: &databasev2.SeriesEvent{}}
+}
+
+func (seb *SeriesEventBuilder) SeriesMetadata(group, name string) *SeriesEventBuilder {
+ seb.se.Series = &commonv2.Metadata{
+ Group: group,
+ Name: name,
+ }
+ return seb
+}
+
+func (seb *SeriesEventBuilder) FieldNames(names ...string) *SeriesEventBuilder {
+ seb.se.FieldNamesCompositeSeriesId = names
+ return seb
+}
+
+func (seb *SeriesEventBuilder) Action(action databasev2.Action) *SeriesEventBuilder {
+ seb.se.Action = action
+ return seb
+}
+
+func (seb *SeriesEventBuilder) Time(t time.Time) *SeriesEventBuilder {
+ seb.se.Time = timestamppb.New(t)
+ return seb
+}
+
+func (seb *SeriesEventBuilder) Build() *databasev2.SeriesEvent {
+ return seb.se
+}
diff --git a/pkg/pb/v2/fields.go b/pkg/pb/v2/fields.go
new file mode 100644
index 0000000..36714d2
--- /dev/null
+++ b/pkg/pb/v2/fields.go
@@ -0,0 +1,151 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package v2
+
+import (
+ databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+func Transform(entityValue *tracev1.EntityValue, fieldIndexes []FieldEntry) []*modelv1.TypedPair {
+ typedPairs := make([]*modelv1.TypedPair, 0)
+ if fieldIndexes == nil {
+ return typedPairs
+ }
+ // copy selected fields
+ for _, fieldIndex := range fieldIndexes {
+ key, idx, t := fieldIndex.Key, fieldIndex.Index, fieldIndex.Type
+ if idx >= len(entityValue.GetFields()) {
+ typedPairs = append(typedPairs, &modelv1.TypedPair{
+ Key: key,
+ Typed: &modelv1.TypedPair_NullPair{
+ NullPair: &modelv1.TypedPair_NullWithType{Type: t},
+ },
+ })
+ continue
+ }
+ f := entityValue.GetFields()[idx]
+ switch v := f.GetValueType().(type) {
+ case *modelv1.Field_Str:
+ typedPairs = append(typedPairs, buildPair(key, v.Str.GetValue()))
+ case *modelv1.Field_StrArray:
+ typedPairs = append(typedPairs, buildPair(key, v.StrArray.GetValue()))
+ case *modelv1.Field_Int:
+ typedPairs = append(typedPairs, buildPair(key, v.Int.GetValue()))
+ case *modelv1.Field_IntArray:
+ typedPairs = append(typedPairs, buildPair(key, v.IntArray.GetValue()))
+ case *modelv1.Field_Null:
+ typedPairs = append(typedPairs, &modelv1.TypedPair{
+ Key: key,
+ Typed: &modelv1.TypedPair_NullPair{
+ NullPair: &modelv1.TypedPair_NullWithType{Type: t},
+ },
+ })
+ }
+ }
+ return typedPairs
+}
+
+func buildPair(key string, value interface{}) *modelv1.TypedPair {
+ result := &modelv1.TypedPair{
+ Key: key,
+ }
+ switch v := value.(type) {
+ case int:
+ result.Typed = &modelv1.TypedPair_IntPair{
+ IntPair: &modelv1.Int{
+ Value: int64(v),
+ },
+ }
+ case []int:
+ result.Typed = &modelv1.TypedPair_IntArrayPair{
+ IntArrayPair: &modelv1.IntArray{
+ Value: convert.IntToInt64(v...),
+ },
+ }
+ case int8:
+ result.Typed = &modelv1.TypedPair_IntPair{
+ IntPair: &modelv1.Int{
+ Value: int64(v),
+ },
+ }
+ case []int8:
+ result.Typed = &modelv1.TypedPair_IntArrayPair{
+ IntArrayPair: &modelv1.IntArray{
+ Value: convert.Int8ToInt64(v...),
+ },
+ }
+ case int16:
+ result.Typed = &modelv1.TypedPair_IntPair{
+ IntPair: &modelv1.Int{
+ Value: int64(v),
+ },
+ }
+ case []int16:
+ result.Typed = &modelv1.TypedPair_IntArrayPair{
+ IntArrayPair: &modelv1.IntArray{
+ Value: convert.Int16ToInt64(v...),
+ },
+ }
+ case int32:
+ result.Typed = &modelv1.TypedPair_IntPair{
+ IntPair: &modelv1.Int{
+ Value: int64(v),
+ },
+ }
+ case []int32:
+ result.Typed = &modelv1.TypedPair_IntArrayPair{
+ IntArrayPair: &modelv1.IntArray{
+ Value: convert.Int32ToInt64(v...),
+ },
+ }
+ case int64:
+ result.Typed = &modelv1.TypedPair_IntPair{
+ IntPair: &modelv1.Int{
+ Value: v,
+ },
+ }
+ case []int64:
+ result.Typed = &modelv1.TypedPair_IntArrayPair{
+ IntArrayPair: &modelv1.IntArray{
+ Value: v,
+ },
+ }
+ case string:
+ result.Typed = &modelv1.TypedPair_StrPair{
+ StrPair: &modelv1.Str{
+ Value: v,
+ },
+ }
+ case []string:
+ result.Typed = &modelv1.TypedPair_StrArrayPair{
+ StrArrayPair: &modelv1.StrArray{
+ Value: v,
+ },
+ }
+ }
+ return result
+}
+
+type FieldEntry struct {
+ Key string
+ Index int
+ Type databasev1.FieldType
+}
diff --git a/pkg/pb/v2/query.go b/pkg/pb/v2/query.go
new file mode 100644
index 0000000..6f3ecec
--- /dev/null
+++ b/pkg/pb/v2/query.go
@@ -0,0 +1,222 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package v2
+
+import (
+ "time"
+
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+var (
+ binaryOpsMap = map[string]modelv2.Condition_BinaryOp{
+ "=": modelv2.Condition_BINARY_OP_EQ,
+ "!=": modelv2.Condition_BINARY_OP_NE,
+ ">": modelv2.Condition_BINARY_OP_GT,
+ ">=": modelv2.Condition_BINARY_OP_GE,
+ "<": modelv2.Condition_BINARY_OP_LT,
+ "<=": modelv2.Condition_BINARY_OP_LE,
+ "having": modelv2.Condition_BINARY_OP_HAVING,
+ "not having": modelv2.Condition_BINARY_OP_NOT_HAVING,
+ }
+)
+
+type QueryRequestBuilder struct {
+ ec *streamv2.QueryRequest
+}
+
+func NewQueryRequestBuilder() *QueryRequestBuilder {
+ return &QueryRequestBuilder{
+ ec: &streamv2.QueryRequest{
+ Projection: &modelv2.Projection{},
+ },
+ }
+}
+
+func (b *QueryRequestBuilder) Metadata(group, name string) *QueryRequestBuilder {
+ b.ec.Metadata = &commonv2.Metadata{
+ Group: group,
+ Name: name,
+ }
+ return b
+}
+
+func (b *QueryRequestBuilder) Limit(limit uint32) *QueryRequestBuilder {
+ b.ec.Limit = limit
+ return b
+}
+
+func (b *QueryRequestBuilder) Offset(offset uint32) *QueryRequestBuilder {
+ b.ec.Offset = offset
+ return b
+}
+
+func (b *QueryRequestBuilder) FieldsInTagFamily(tagFamilyName string, items ...interface{}) *QueryRequestBuilder {
+ if len(items)%3 != 0 {
+ panic("expect 3 to be a factor of the length of items")
+ }
+
+ criteriaConditions := make([]*modelv2.Condition, len(items)/3)
+ for i := 0; i < len(items)/3; i++ {
+ key, op, values := items[i*3+0], items[i*3+1], items[i*3+2]
+ criteriaConditions[i] = &modelv2.Condition{
+ Name: key.(string),
+ Op: binaryOpsMap[op.(string)],
+ Value: buildTagValue(values),
+ }
+ }
+
+ b.ec.Criteria = append(b.ec.Criteria, &streamv2.QueryRequest_Criteria{
+ TagFamilyName: tagFamilyName,
+ Conditions: criteriaConditions,
+ })
+
+ return b
+}
+
+func buildTagValue(value interface{}) *modelv2.TagValue {
+ switch v := value.(type) {
+ case int:
+ return &modelv2.TagValue{
+ Value: &modelv2.TagValue_Int{Int: &modelv2.Int{Value: int64(v)}},
+ }
+ case []int:
+ return &modelv2.TagValue{
+ Value: &modelv2.TagValue_IntArray{IntArray: &modelv2.IntArray{Value: convert.IntToInt64(v...)}},
+ }
+ case int8:
+ return &modelv2.TagValue{
+ Value: &modelv2.TagValue_Int{Int: &modelv2.Int{Value: int64(v)}},
+ }
+ case []int8:
+ return &modelv2.TagValue{
+ Value: &modelv2.TagValue_IntArray{IntArray: &modelv2.IntArray{Value: convert.Int8ToInt64(v...)}},
+ }
+ case int16:
+ return &modelv2.TagValue{
+ Value: &modelv2.TagValue_Int{Int: &modelv2.Int{Value: int64(v)}},
+ }
+ case []int16:
+ return &modelv2.TagValue{
+ Value: &modelv2.TagValue_IntArray{IntArray: &modelv2.IntArray{Value: convert.Int16ToInt64(v...)}},
+ }
+ case int32:
+ return &modelv2.TagValue{
+ Value: &modelv2.TagValue_Int{Int: &modelv2.Int{Value: int64(v)}},
+ }
+ case []int32:
+ return &modelv2.TagValue{
+ Value: &modelv2.TagValue_IntArray{IntArray: &modelv2.IntArray{Value: convert.Int32ToInt64(v...)}},
+ }
+ case int64:
+ return &modelv2.TagValue{
+ Value: &modelv2.TagValue_Int{Int: &modelv2.Int{Value: v}},
+ }
+ case []int64:
+ return &modelv2.TagValue{
+ Value: &modelv2.TagValue_IntArray{IntArray: &modelv2.IntArray{Value: v}},
+ }
+ case string:
+ return &modelv2.TagValue{
+ Value: &modelv2.TagValue_Str{Str: &modelv2.Str{Value: v}},
+ }
+ case []string:
+ return &modelv2.TagValue{
+ Value: &modelv2.TagValue_StrArray{StrArray: &modelv2.StrArray{Value: v}},
+ }
+ }
+ panic("not supported")
+}
+
+func (b *QueryRequestBuilder) Projection(tagFamily string, projections ...string) *QueryRequestBuilder {
+ b.ec.Projection.TagFamilies = append(b.ec.Projection.GetTagFamilies(), &modelv2.Projection_TagFamily{
+ Name: tagFamily,
+ Tags: projections,
+ })
+ return b
+}
+
+func (b *QueryRequestBuilder) OrderBy(indexRuleName string, sort modelv2.QueryOrder_Sort) *QueryRequestBuilder {
+ b.ec.OrderBy = &modelv2.QueryOrder{
+ IndexRuleName: indexRuleName,
+ Sort: sort,
+ }
+ return b
+}
+
+func (b *QueryRequestBuilder) TimeRange(sT, eT time.Time) *QueryRequestBuilder {
+ b.ec.TimeRange = &modelv2.TimeRange{
+ Begin: timestamppb.New(sT),
+ End: timestamppb.New(eT),
+ }
+ return b
+}
+
+func (b *QueryRequestBuilder) Build() *streamv2.QueryRequest {
+ return b.ec
+}
+
+type QueryResponseElementBuilder struct {
+ elem *streamv2.Element
+}
+
+func NewQueryEntityBuilder() *QueryResponseElementBuilder {
+ return &QueryResponseElementBuilder{elem: &streamv2.Element{}}
+}
+
+func (qeb *QueryResponseElementBuilder) EntityID(elementID string) *QueryResponseElementBuilder {
+ qeb.elem.ElementId = elementID
+ return qeb
+}
+
+func (qeb *QueryResponseElementBuilder) Timestamp(t time.Time) *QueryResponseElementBuilder {
+ qeb.elem.Timestamp = timestamppb.New(t)
+ return qeb
+}
+
+func (qeb *QueryResponseElementBuilder) FieldsInTagFamily(tagFamily string, items ...interface{}) *QueryResponseElementBuilder {
+ if len(items)%2 != 0 {
+ panic("invalid fields list")
+ }
+
+ l := len(items) / 2
+ tags := make([]*modelv2.Tag, l)
+ for i := 0; i < l; i++ {
+ key, values := items[i*2+0], items[i*2+1]
+ tags[i] = &modelv2.Tag{
+ Key: key.(string),
+ Value: buildTagValue(values),
+ }
+ }
+
+ qeb.elem.TagFamilies = append(qeb.elem.GetTagFamilies(), &modelv2.TagFamily{
+ Name: tagFamily,
+ Tags: tags,
+ })
+
+ return qeb
+}
+
+func (qeb *QueryResponseElementBuilder) Build() *streamv2.Element {
+ return qeb.elem
+}
diff --git a/pkg/pb/v2/write.go b/pkg/pb/v2/write.go
new file mode 100644
index 0000000..31fe826
--- /dev/null
+++ b/pkg/pb/v2/write.go
@@ -0,0 +1,147 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package v2
+
+import (
+ "time"
+
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+type WriteRequestBuilder struct {
+ we *tracev1.WriteRequest
+}
+
+func NewWriteEntityBuilder() *WriteRequestBuilder {
+ return &WriteRequestBuilder{we: &tracev1.WriteRequest{}}
+}
+
+func (web *WriteRequestBuilder) Metadata(group, name string) *WriteRequestBuilder {
+ web.we.Metadata = &commonv1.Metadata{
+ Group: group,
+ Name: name,
+ }
+ return web
+}
+
+func (web *WriteRequestBuilder) EntityValue(ev *tracev1.EntityValue) *WriteRequestBuilder {
+ web.we.Entity = ev
+ return web
+}
+
+func (web *WriteRequestBuilder) Build() *tracev1.WriteRequest {
+ return web.we
+}
+
+type EntityValueBuilder struct {
+ ev *tracev1.EntityValue
+}
+
+func NewEntityValueBuilder() *EntityValueBuilder {
+ return &EntityValueBuilder{ev: &tracev1.EntityValue{}}
+}
+
+func (evb *EntityValueBuilder) EntityID(entityID string) *EntityValueBuilder {
+ evb.ev.EntityId = entityID
+ return evb
+}
+
+func (evb *EntityValueBuilder) Timestamp(time time.Time) *EntityValueBuilder {
+ evb.ev.Timestamp = timestamppb.New(time)
+ return evb
+}
+
+func (evb *EntityValueBuilder) DataBinary(data []byte) *EntityValueBuilder {
+ evb.ev.DataBinary = data
+ return evb
+}
+
+func (evb *EntityValueBuilder) Fields(items ...interface{}) *EntityValueBuilder {
+ evb.ev.Fields = make([]*modelv1.Field, len(items))
+ for idx, item := range items {
+ evb.ev.Fields[idx] = buildField(item)
+ }
+ return evb
+}
+
+func buildField(value interface{}) *modelv1.Field {
+ if value == nil {
+ return &modelv1.Field{ValueType: &modelv1.Field_Null{}}
+ }
+ switch v := value.(type) {
+ case string:
+ return &modelv1.Field{
+ ValueType: &modelv1.Field_Str{
+ Str: &modelv1.Str{
+ Value: v,
+ },
+ },
+ }
+ case []string:
+ return &modelv1.Field{
+ ValueType: &modelv1.Field_StrArray{
+ StrArray: &modelv1.StrArray{
+ Value: v,
+ },
+ },
+ }
+ case int:
+ return &modelv1.Field{
+ ValueType: &modelv1.Field_Int{
+ Int: &modelv1.Int{
+ Value: int64(v),
+ },
+ },
+ }
+ case []int:
+ return &modelv1.Field{
+ ValueType: &modelv1.Field_IntArray{
+ IntArray: &modelv1.IntArray{
+ Value: convert.IntToInt64(v...),
+ },
+ },
+ }
+ case int64:
+ return &modelv1.Field{
+ ValueType: &modelv1.Field_Int{
+ Int: &modelv1.Int{
+ Value: v,
+ },
+ },
+ }
+ case []int64:
+ return &modelv1.Field{
+ ValueType: &modelv1.Field_IntArray{
+ IntArray: &modelv1.IntArray{
+ Value: v,
+ },
+ },
+ }
+ default:
+ panic("type not supported")
+ }
+}
+
+func (evb *EntityValueBuilder) Build() *tracev1.EntityValue {
+ return evb.ev
+}
diff --git a/pkg/query/v2/executor/interface.go b/pkg/query/v2/executor/interface.go
new file mode 100644
index 0000000..4eb797e
--- /dev/null
+++ b/pkg/query/v2/executor/interface.go
@@ -0,0 +1,31 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package executor
+
+import (
+ streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+ "github.com/apache/skywalking-banyandb/banyand/stream"
+)
+
+type ExecutionContext interface {
+ stream.Stream
+}
+
+type Executable interface {
+ Execute(ExecutionContext) ([]*streamv2.Element, error)
+}
diff --git a/pkg/query/v2/logical/analyzer.go b/pkg/query/v2/logical/analyzer.go
new file mode 100644
index 0000000..77ebf82
--- /dev/null
+++ b/pkg/query/v2/logical/analyzer.go
@@ -0,0 +1,226 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+ "context"
+
+ "github.com/pkg/errors"
+
+ commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ metadataSchema "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+var (
+ ErrFieldNotDefined = errors.New("field is not defined")
+ ErrInvalidConditionType = errors.New("invalid pair type")
+ ErrIncompatibleQueryCondition = errors.New("incompatible query condition type")
+ ErrIndexNotDefined = errors.New("index is not define for the field")
+ ErrMultipleGlobalIndexes = errors.New("multiple global indexes are not supported")
+)
+
+var (
+ DefaultLimit uint32 = 100
+)
+
+type Tag struct {
+ familyName, name string
+}
+
+func NewTag(family, name string) *Tag {
+ return &Tag{
+ familyName: family,
+ name: name,
+ }
+}
+
+// NewTags create an array of Tag within a TagFamily
+func NewTags(family string, tagNames ...string) []*Tag {
+ tags := make([]*Tag, len(tagNames))
+ for i, name := range tagNames {
+ tags[i] = NewTag(family, name)
+ }
+ return tags
+}
+
+// GetCompoundName is only used for error message
+func (t *Tag) GetCompoundName() string {
+ return t.familyName + ":" + t.name
+}
+
+func (t *Tag) GetTagName() string {
+ return t.name
+}
+
+func (t *Tag) GetFamilyName() string {
+ return t.familyName
+}
+
+type Analyzer struct {
+ indexRuleRepo metadataSchema.IndexRule
+ indexRuleBindingRepo metadataSchema.IndexRuleBinding
+ metadataRepoImpl metadata.Repo
+}
+
+func DefaultAnalyzer() *Analyzer {
+ indexRule, _ := metadataSchema.NewIndexRule()
+ indexRuleBinding, _ := metadataSchema.NewIndexRuleBinding()
+ metadataService, _ := metadata.NewService(context.TODO())
+
+ return &Analyzer{
+ indexRule,
+ indexRuleBinding,
+ metadataService,
+ }
+}
+
+func (a *Analyzer) BuildStreamSchema(ctx context.Context, metadata *commonv2.Metadata) (Schema, error) {
+ stream, err := a.metadataRepoImpl.Stream().Get(ctx, metadata)
+
+ if err != nil {
+ return nil, err
+ }
+
+ indexRules, err := a.metadataRepoImpl.IndexRules(context.TODO(), metadata)
+
+ if err != nil {
+ return nil, err
+ }
+
+ s := &schema{
+ stream: stream,
+ indexRules: indexRules,
+ fieldMap: make(map[string]*tagSpec),
+ entityList: stream.GetEntity().GetTagNames(),
+ }
+
+ // generate the schema of the fields for the traceSeries
+ for tagFamilyIdx, tagFamily := range stream.GetTagFamilies() {
+ for tagIdx, spec := range tagFamily.GetTags() {
+ s.registerField(spec.GetName(), tagFamilyIdx, tagIdx, spec)
+ }
+ }
+
+ return s, nil
+}
+
+func (a *Analyzer) Analyze(_ context.Context, criteria *streamv2.QueryRequest, metadata *commonv2.Metadata, s Schema) (Plan, error) {
+ // parse fields
+ plan, err := parseFields(criteria, metadata, s)
+ if err != nil {
+ return nil, err
+ }
+
+ // parse orderBy
+ queryOrder := criteria.GetOrderBy()
+ if queryOrder != nil {
+ if v, ok := plan.(*unresolvedIndexScan); ok {
+ v.unresolvedOrderBy = OrderBy(queryOrder.GetIndexRuleName(), queryOrder.GetSort())
+ }
+ }
+
+ // parse offset
+ plan = Offset(plan, criteria.GetOffset())
+
+ // parse limit
+ limitParameter := criteria.GetLimit()
+ if limitParameter == 0 {
+ limitParameter = DefaultLimit
+ }
+ plan = Limit(plan, limitParameter)
+
+ return plan.Analyze(s)
+}
+
+// parseFields parses the query request to decide which kind of plan should be generated
+// Basically,
+// 1 - If no criteria is given, we can only scan all shards
+// 2 - If criteria is given, but all of those fields exist in the "entity" definition,
+// i.e. they are top-level sharding keys. For example, for the current skywalking's schema,
+// we use service_id + service_instance_id + state as the compound sharding keys.
+func parseFields(criteria *streamv2.QueryRequest, metadata *commonv2.Metadata, s Schema) (UnresolvedPlan, error) {
+ timeRange := criteria.GetTimeRange()
+
+ projTags := make([][]*Tag, len(criteria.GetProjection().GetTagFamilies()))
+ for i, tagFamily := range criteria.GetProjection().GetTagFamilies() {
+ var projTagInFamily []*Tag
+ for _, tagName := range tagFamily.GetTags() {
+ projTagInFamily = append(projTagInFamily, NewTag(tagFamily.GetName(), tagName))
+ }
+ projTags[i] = projTagInFamily
+ }
+
+ var tagExprs []Expr
+
+ entityList := s.EntityList()
+ entityMap := make(map[string]int)
+ entity := make([]tsdb.Entry, len(entityList))
+ for idx, e := range entityList {
+ entityMap[e] = idx
+ // fill AnyEntry by default
+ entity[idx] = tsdb.AnyEntry
+ }
+
+ for _, criteriaFamily := range criteria.GetCriteria() {
+ for _, pairQuery := range criteriaFamily.GetConditions() {
+ op := pairQuery.GetOp()
+ typedTagValue := pairQuery.GetValue()
+ var e Expr
+ switch v := typedTagValue.GetValue().(type) {
+ case *modelv2.TagValue_Str:
+ if entityIdx, ok := entityMap[pairQuery.GetName()]; ok {
+ entity[entityIdx] = []byte(v.Str.GetValue())
+ } else {
+ e = &strLiteral{
+ string: v.Str.GetValue(),
+ }
+ }
+ case *modelv2.TagValue_StrArray:
+ e = &strArrLiteral{
+ arr: v.StrArray.GetValue(),
+ }
+ case *modelv2.TagValue_Int:
+ if entityIdx, ok := entityMap[pairQuery.GetName()]; ok {
+ entity[entityIdx] = convert.Int64ToBytes(v.Int.GetValue())
+ } else {
+ e = &int64Literal{
+ int64: v.Int.GetValue(),
+ }
+ }
+ case *modelv2.TagValue_IntArray:
+ e = &int64ArrLiteral{
+ arr: v.IntArray.GetValue(),
+ }
+ default:
+ return nil, ErrInvalidConditionType
+ }
+ // we collect Condition only if it is not a part of entity
+ if e != nil {
+ tagExprs = append(tagExprs, binaryOpFactory[op](NewFieldRef(criteriaFamily.GetTagFamilyName(), pairQuery.GetName()), e))
+ }
+ }
+ }
+
+ return IndexScan(timeRange.GetBegin().AsTime(), timeRange.GetEnd().AsTime(), metadata,
+ tagExprs, entity, nil, projTags...), nil
+}
diff --git a/pkg/query/v2/logical/analyzer_test.go b/pkg/query/v2/logical/analyzer_test.go
new file mode 100644
index 0000000..1a6421b
--- /dev/null
+++ b/pkg/query/v2/logical/analyzer_test.go
@@ -0,0 +1,232 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical_test
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/google/go-cmp/cmp"
+ "github.com/stretchr/testify/require"
+
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+ pb "github.com/apache/skywalking-banyandb/pkg/pb/v2"
+ "github.com/apache/skywalking-banyandb/pkg/query/v2/logical"
+)
+
+func TestAnalyzer_SimpleTimeScan(t *testing.T) {
+ assert := require.New(t)
+
+ ana := logical.DefaultAnalyzer()
+
+ sT, eT := time.Now().Add(-3*time.Hour), time.Now()
+
+ criteria := pb.NewQueryRequestBuilder().
+ Limit(20).
+ Offset(0).
+ Metadata("default", "sw").
+ TimeRange(sT, eT).
+ Build()
+
+ metadata := criteria.GetMetadata()
+
+ schema, err := ana.BuildStreamSchema(context.TODO(), metadata)
+ assert.NoError(err)
+
+ plan, err := ana.Analyze(context.TODO(), criteria, metadata, schema)
+ assert.NoError(err)
+ assert.NotNil(plan)
+ correctPlan, err := logical.Limit(
+ logical.Offset(
+ logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil),
+ 0),
+ 20).
+ Analyze(schema)
+ assert.NoError(err)
+ assert.NotNil(correctPlan)
+ assert.True(cmp.Equal(plan, correctPlan), "plan is not equal to correct plan")
+}
+
+func TestAnalyzer_ComplexQuery(t *testing.T) {
+ assert := require.New(t)
+
+ ana := logical.DefaultAnalyzer()
+
+ sT, eT := time.Now().Add(-3*time.Hour), time.Now()
+
+ criteria := pb.NewQueryRequestBuilder().
+ Limit(5).
+ Offset(10).
+ OrderBy("duration", modelv2.QueryOrder_SORT_DESC).
+ Metadata("default", "sw").
+ Projection("searchable", "http.method", "service_id", "duration").
+ FieldsInTagFamily("searchable", "service_id", "=", "my_app", "http.method", "=", "GET", "mq.topic", "=", "event_topic").
+ TimeRange(sT, eT).
+ Build()
+
+ metadata := criteria.GetMetadata()
+
+ schema, err := ana.BuildStreamSchema(context.TODO(), metadata)
+ assert.NoError(err)
+
+ plan, err := ana.Analyze(context.TODO(), criteria, metadata, schema)
+ assert.NoError(err)
+ assert.NotNil(plan)
+
+ correctPlan, err := logical.Limit(
+ logical.Offset(
+ logical.IndexScan(sT, eT, metadata,
+ []logical.Expr{
+ logical.Eq(logical.NewSearchableFieldRef("mq.topic"), logical.Str("event_topic")),
+ logical.Eq(logical.NewSearchableFieldRef("http.method"), logical.Str("GET")),
+ }, tsdb.Entity{tsdb.Entry("my_app"), tsdb.AnyEntry, tsdb.AnyEntry},
+ logical.OrderBy("duration", modelv2.QueryOrder_SORT_DESC),
+ logical.NewTags("searchable", "http.method", "service_id", "duration")),
+ 10),
+ 5).
+ Analyze(schema)
+ assert.NoError(err)
+ assert.NotNil(correctPlan)
+ assert.True(cmp.Equal(plan, correctPlan), "plan is not equal to correct plan")
+}
+
+func TestAnalyzer_TraceIDQuery(t *testing.T) {
+ assert := require.New(t)
+
+ ana := logical.DefaultAnalyzer()
+
+ criteria := pb.NewQueryRequestBuilder().
+ Limit(100).
+ Offset(0).
+ Metadata("default", "sw").
+ FieldsInTagFamily("searchable", "trace_id", "=", "123").
+ Build()
+
+ metadata := criteria.GetMetadata()
+
+ schema, err := ana.BuildStreamSchema(context.TODO(), metadata)
+ assert.NoError(err)
+
+ plan, err := ana.Analyze(context.TODO(), criteria, metadata, schema)
+ assert.NoError(err)
+ assert.NotNil(plan)
+ correctPlan, err := logical.Limit(
+ logical.Offset(logical.IndexScan(time.Now(), time.Now(), metadata, []logical.Expr{
+ logical.Eq(logical.NewSearchableFieldRef("trace_id"), logical.Str("123")),
+ }, nil, nil),
+ 0),
+ 100).Analyze(schema)
+ assert.NoError(err)
+ assert.NotNil(correctPlan)
+ assert.True(cmp.Equal(plan, correctPlan), "plan is not equal to correct plan")
+}
+
+func TestAnalyzer_OrderBy_IndexNotDefined(t *testing.T) {
+ assert := require.New(t)
+
+ ana := logical.DefaultAnalyzer()
+
+ criteria := pb.NewQueryRequestBuilder().
+ Limit(5).
+ Offset(10).
+ OrderBy("service_instance_id", modelv2.QueryOrder_SORT_DESC).
+ Metadata("default", "sw").
+ Projection("searchable", "trace_id", "service_id").
+ FieldsInTagFamily("searchable", "duration", ">", 500).
+ TimeRange(time.Now().Add(-3*time.Hour), time.Now()).
+ Build()
+
+ metadata := criteria.GetMetadata()
+
+ schema, err := ana.BuildStreamSchema(context.TODO(), metadata)
+ assert.NoError(err)
+
+ _, err = ana.Analyze(context.TODO(), criteria, metadata, schema)
+ assert.ErrorIs(err, logical.ErrIndexNotDefined)
+}
+
+func TestAnalyzer_OrderBy_FieldNotDefined(t *testing.T) {
+ assert := require.New(t)
+
+ ana := logical.DefaultAnalyzer()
+
+ criteria := pb.NewQueryRequestBuilder().
+ Limit(5).
+ Offset(10).
+ OrderBy("duration2", modelv2.QueryOrder_SORT_DESC).
+ Metadata("default", "sw").
+ Projection("searchable", "trace_id", "service_id").
+ TimeRange(time.Now().Add(-3*time.Hour), time.Now()).
+ Build()
+
+ metadata := criteria.GetMetadata()
+
+ schema, err := ana.BuildStreamSchema(context.TODO(), metadata)
+ assert.NoError(err)
+
+ _, err = ana.Analyze(context.TODO(), criteria, metadata, schema)
+ assert.ErrorIs(err, logical.ErrIndexNotDefined)
+}
+
+func TestAnalyzer_Projection_FieldNotDefined(t *testing.T) {
+ assert := require.New(t)
+
+ ana := logical.DefaultAnalyzer()
+
+ criteria := pb.NewQueryRequestBuilder().
+ Limit(5).
+ Offset(10).
+ OrderBy("duration", modelv2.QueryOrder_SORT_DESC).
+ Metadata("default", "sw").
+ Projection("searchable", "duration", "service_id", "unknown").
+ TimeRange(time.Now().Add(-3*time.Hour), time.Now()).
+ Build()
+
+ metadata := criteria.GetMetadata()
+
+ schema, err := ana.BuildStreamSchema(context.TODO(), metadata)
+ assert.NoError(err)
+
+ _, err = ana.Analyze(context.TODO(), criteria, metadata, schema)
+ assert.ErrorIs(err, logical.ErrFieldNotDefined)
+}
+
+func TestAnalyzer_Fields_IndexNotDefined(t *testing.T) {
+ assert := require.New(t)
+
+ ana := logical.DefaultAnalyzer()
+
+ criteria := pb.NewQueryRequestBuilder().
+ Limit(5).
+ Offset(10).
+ Metadata("default", "sw").
+ Projection("duration", "service_id").
+ TimeRange(time.Now().Add(-3*time.Hour), time.Now()).
+ FieldsInTagFamily("searchable", "start_time", ">", 10000).
+ Build()
+
+ metadata := criteria.GetMetadata()
+
+ schema, err := ana.BuildStreamSchema(context.TODO(), metadata)
+ assert.NoError(err)
+
+ _, err = ana.Analyze(context.TODO(), criteria, metadata, schema)
+ assert.ErrorIs(err, logical.ErrIndexNotDefined)
+}
diff --git a/pkg/query/v2/logical/common.go b/pkg/query/v2/logical/common.go
new file mode 100644
index 0000000..c45c149
--- /dev/null
+++ b/pkg/query/v2/logical/common.go
@@ -0,0 +1,107 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+ "bytes"
+
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/query/v2/executor"
+)
+
+type (
+ seekerBuilder func(builder tsdb.SeekerBuilder)
+ comparator func(a, b tsdb.Item) bool
+)
+
+func createComparator(sortDirection modelv2.QueryOrder_Sort) comparator {
+ return func(a, b tsdb.Item) bool {
+ comp := bytes.Compare(a.SortedField(), b.SortedField())
+ if sortDirection == modelv2.QueryOrder_SORT_DESC {
+ return comp == 1
+ }
+ return comp == -1
+ }
+}
+
+// projectItem parses the item within the ExecutionContext.
+// projectionFieldRefs must be prepared before calling this method, projectionFieldRefs should be a list of
+// tag list where the inner list must exist in the same tag family.
+// Strict order can be guaranteed in the result.
+func projectItem(ec executor.ExecutionContext, item tsdb.Item, projectionFieldRefs [][]*FieldRef) ([]*modelv2.TagFamily, error) {
+ tagFamily := make([]*modelv2.TagFamily, len(projectionFieldRefs))
+ for i, refs := range projectionFieldRefs {
+ tags := make([]*modelv2.Tag, len(refs))
+ familyName := refs[0].tag.GetFamilyName()
+ parsedTagFamily, err := ec.ParseTagFamily(familyName, item)
+ if err != nil {
+ return nil, err
+ }
+ for j, ref := range refs {
+ tags[j] = parsedTagFamily.GetTags()[ref.Spec.TagIdx]
+ }
+
+ tagFamily[i] = &modelv2.TagFamily{
+ Name: familyName,
+ Tags: tags,
+ }
+ }
+
+ return tagFamily, nil
+}
+
+// executeForShard fetches elements from series within a single shard. A list of series must be prepared in advanced
+// with the help of Entity. The result is a list of element set, where the order of inner list is kept
+// as what the users specify in the seekerBuilder.
+// This method is used by the underlying tableScan and localIndexScan plans.
+func executeForShard(series tsdb.SeriesList, timeRange tsdb.TimeRange,
+ builders ...seekerBuilder) ([]tsdb.Iterator, error) {
+ var itersInShard []tsdb.Iterator
+ for _, seriesFound := range series {
+ itersInSeries, err := func() ([]tsdb.Iterator, error) {
+ sp, errInner := seriesFound.Span(timeRange)
+ defer func(sp tsdb.SeriesSpan) {
+ _ = sp.Close()
+ }(sp)
+ if errInner != nil {
+ return nil, errInner
+ }
+ b := sp.SeekerBuilder()
+ for _, builder := range builders {
+ builder(b)
+ }
+ seeker, errInner := b.Build()
+ if errInner != nil {
+ return nil, errInner
+ }
+ iters, errInner := seeker.Seek()
+ if errInner != nil {
+ return nil, errInner
+ }
+ return iters, nil
+ }()
+ if err != nil {
+ return nil, err
+ }
+ if len(itersInSeries) > 0 {
+ itersInShard = append(itersInShard, itersInSeries...)
+ }
+ }
+ return itersInShard, nil
+}
diff --git a/pkg/query/v2/logical/common_test.go b/pkg/query/v2/logical/common_test.go
new file mode 100644
index 0000000..bb501fd
--- /dev/null
+++ b/pkg/query/v2/logical/common_test.go
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical_test
+
+import (
+ "context"
+ "embed"
+ "encoding/base64"
+ "encoding/json"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/golang/protobuf/jsonpb"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/stream"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+//go:embed testdata/*.json
+var dataFS embed.FS
+
+func setupQueryData(testing *testing.T, dataFile string, stream stream.Stream) (baseTime time.Time) {
+ t := assert.New(testing)
+ var templates []interface{}
+ baseTime = time.Now()
+ content, err := dataFS.ReadFile("testdata/" + dataFile)
+ t.NoError(err)
+ t.NoError(json.Unmarshal(content, &templates))
+ bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
+ for i, template := range templates {
+ rawSearchTagFamily, errMarshal := json.Marshal(template)
+ t.NoError(errMarshal)
+ searchTagFamily := &streamv2.ElementValue_TagFamily{}
+ t.NoError(jsonpb.UnmarshalString(string(rawSearchTagFamily), searchTagFamily))
+ e := &streamv2.ElementValue{
+ ElementId: strconv.Itoa(i),
+ Timestamp: timestamppb.New(baseTime.Add(500 * time.Millisecond * time.Duration(i))),
+ TagFamilies: []*streamv2.ElementValue_TagFamily{
+ {
+ Tags: []*modelv2.TagValue{
+ {
+ Value: &modelv2.TagValue_BinaryData{
+ BinaryData: bb,
+ },
+ },
+ },
+ },
+ },
+ }
+ e.TagFamilies = append(e.TagFamilies, searchTagFamily)
+ errInner := stream.Write(e)
+ t.NoError(errInner)
+ }
+ return baseTime
+}
+
+func setup(t *assert.Assertions) (stream.Stream, func()) {
+ t.NoError(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "info",
+ }))
+
+ tempDir, deferFunc := test.Space(t)
+
+ metadataSvc, err := metadata.NewService(context.TODO())
+ t.NoError(err)
+ streamSvc, err := stream.NewService(context.TODO(), metadataSvc, nil)
+ t.NoError(err)
+
+ err = streamSvc.FlagSet().Parse([]string{"--root-path=" + tempDir})
+ t.NoError(err)
+
+ err = streamSvc.PreRun()
+ t.NoError(err)
+
+ s, err := streamSvc.Stream(&commonv2.Metadata{
+ Name: "sw",
+ Group: "default",
+ })
+ t.NoError(err)
+
+ return s, func() {
+ _ = s.Close()
+ deferFunc()
+ }
+}
diff --git a/pkg/query/v2/logical/expr.go b/pkg/query/v2/logical/expr.go
new file mode 100644
index 0000000..f54ef4f
--- /dev/null
+++ b/pkg/query/v2/logical/expr.go
@@ -0,0 +1,200 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+ "fmt"
+
+ "github.com/pkg/errors"
+
+ databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+)
+
+var binaryOpFactory = map[modelv2.Condition_BinaryOp]func(l, r Expr) Expr{
+ modelv2.Condition_BINARY_OP_EQ: Eq,
+ modelv2.Condition_BINARY_OP_NE: Ne,
+ modelv2.Condition_BINARY_OP_LT: Lt,
+ modelv2.Condition_BINARY_OP_GT: Gt,
+ modelv2.Condition_BINARY_OP_LE: Le,
+ modelv2.Condition_BINARY_OP_GE: Ge,
+ modelv2.Condition_BINARY_OP_HAVING: Having,
+ modelv2.Condition_BINARY_OP_NOT_HAVING: NotHaving,
+}
+
+var _ ResolvableExpr = (*FieldRef)(nil)
+
+// FieldRef is the reference to the field
+// also it holds the definition/schema of the field
+type FieldRef struct {
+ // tag defines the family name and name of the Tag
+ tag *Tag
+ // spec contains the index of the key in the schema, as well as the underlying FieldSpec
+ Spec *tagSpec
+}
+
+func (f *FieldRef) Equal(expr Expr) bool {
+ if other, ok := expr.(*FieldRef); ok {
+ return other.tag.GetTagName() == f.tag.GetTagName() && other.Spec.spec.GetType() == f.Spec.spec.GetType()
+ }
+ return false
+}
+
+func (f *FieldRef) FieldType() databasev2.TagType {
+ if f.Spec == nil {
+ panic("should be resolved first")
+ }
+ return f.Spec.spec.GetType()
+}
+
+func (f *FieldRef) Resolve(s Schema) error {
+ specs, err := s.CreateRef([]*Tag{f.tag})
+ if err != nil {
+ return err
+ }
+ f.Spec = specs[0][0].Spec
+ return nil
+}
+
+func (f *FieldRef) String() string {
+ return fmt.Sprintf("#%s<%s>", f.tag.GetCompoundName(), f.Spec.spec.GetType().String())
+}
+
+func NewFieldRef(familyName, tagName string) *FieldRef {
+ return &FieldRef{
+ tag: NewTag(familyName, tagName),
+ }
+}
+
+// NewSearchableFieldRef is a short-hand method for creating a FieldRef to the tag in the searchable family
+func NewSearchableFieldRef(tagName string) *FieldRef {
+ return &FieldRef{
+ tag: NewTag("searchable", tagName),
+ }
+}
+
+var _ ResolvableExpr = (*binaryExpr)(nil)
+
+// binaryExpr is composed of two operands with one op as the operator
+// l is normally a reference to a field, while r is usually literals
+type binaryExpr struct {
+ op modelv2.Condition_BinaryOp
+ l Expr
+ r Expr
+}
+
+func (b *binaryExpr) Equal(expr Expr) bool {
+ if other, ok := expr.(*binaryExpr); ok {
+ return b.op == other.op && b.l.Equal(other.l) && b.r.Equal(other.r)
+ }
+ return false
+}
+
+func (b *binaryExpr) FieldType() databasev2.TagType {
+ panic("Boolean should be added")
+}
+
+func (b *binaryExpr) Resolve(s Schema) error {
+ if lr, ok := b.l.(ResolvableExpr); ok {
+ err := lr.Resolve(s)
+ if err != nil {
+ return err
+ }
+ }
+ if rr, ok := b.l.(ResolvableExpr); ok {
+ err := rr.Resolve(s)
+ if err != nil {
+ return err
+ }
+ }
+ if b.l.FieldType() != b.r.FieldType() {
+ return errors.Wrapf(ErrIncompatibleQueryCondition, "left is %s while right is %s",
+ b.l.FieldType().String(),
+ b.r.FieldType().String(),
+ )
+ }
+ return nil
+}
+
+func (b *binaryExpr) String() string {
+ return fmt.Sprintf("%s %s %s", b.l.String(), b.op.String(), b.r.String())
+}
+
+func Eq(l, r Expr) Expr {
+ return &binaryExpr{
+ op: modelv2.Condition_BINARY_OP_EQ,
+ l: l,
+ r: r,
+ }
+}
+
+func Ne(l, r Expr) Expr {
+ return &binaryExpr{
+ op: modelv2.Condition_BINARY_OP_NE,
+ l: l,
+ r: r,
+ }
+}
+
+func Lt(l, r Expr) Expr {
+ return &binaryExpr{
+ op: modelv2.Condition_BINARY_OP_LT,
+ l: l,
+ r: r,
+ }
+}
+
+func Le(l, r Expr) Expr {
+ return &binaryExpr{
+ op: modelv2.Condition_BINARY_OP_LE,
+ l: l,
+ r: r,
+ }
+}
+
+func Gt(l, r Expr) Expr {
+ return &binaryExpr{
+ op: modelv2.Condition_BINARY_OP_GT,
+ l: l,
+ r: r,
+ }
+}
+
+func Ge(l, r Expr) Expr {
+ return &binaryExpr{
+ op: modelv2.Condition_BINARY_OP_GE,
+ l: l,
+ r: r,
+ }
+}
+
+func Having(l, r Expr) Expr {
+ return &binaryExpr{
+ op: modelv2.Condition_BINARY_OP_HAVING,
+ l: l,
+ r: r,
+ }
+}
+
+func NotHaving(l, r Expr) Expr {
+ return &binaryExpr{
+ op: modelv2.Condition_BINARY_OP_NOT_HAVING,
+ l: l,
+ r: r,
+ }
+}
diff --git a/pkg/query/v2/logical/expr_literal.go b/pkg/query/v2/logical/expr_literal.go
new file mode 100644
index 0000000..6bdc72a
--- /dev/null
+++ b/pkg/query/v2/logical/expr_literal.go
@@ -0,0 +1,160 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+ "fmt"
+ "strconv"
+
+ "github.com/google/go-cmp/cmp"
+
+ databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+var _ LiteralExpr = (*int64Literal)(nil)
+
+type int64Literal struct {
+ int64
+}
+
+func (i *int64Literal) Bytes() [][]byte {
+ return [][]byte{convert.Int64ToBytes(i.int64)}
+}
+
+func (i *int64Literal) Equal(expr Expr) bool {
+ if other, ok := expr.(*int64Literal); ok {
+ return other.int64 == i.int64
+ }
+
+ return false
+}
+
+func Int(num int64) Expr {
+ return &int64Literal{num}
+}
+
+func (i *int64Literal) FieldType() databasev2.TagType {
+ return databasev2.TagType_TAG_TYPE_INT
+}
+
+func (i *int64Literal) String() string {
+ return strconv.FormatInt(i.int64, 10)
+}
+
+var _ LiteralExpr = (*int64ArrLiteral)(nil)
+
+type int64ArrLiteral struct {
+ arr []int64
+}
+
+func (i *int64ArrLiteral) Bytes() [][]byte {
+ b := make([][]byte, 0, len(i.arr))
+ for _, i := range i.arr {
+ b = append(b, convert.Int64ToBytes(i))
+ }
+ return b
+}
+
+func (i *int64ArrLiteral) Equal(expr Expr) bool {
+ if other, ok := expr.(*int64ArrLiteral); ok {
+ return cmp.Equal(other.arr, i.arr)
+ }
+
+ return false
+}
+
+func Ints(ints ...int64) Expr {
+ return &int64ArrLiteral{
+ arr: ints,
+ }
+}
+
+func (i *int64ArrLiteral) FieldType() databasev2.TagType {
+ return databasev2.TagType_TAG_TYPE_INT_ARRAY
+}
+
+func (i *int64ArrLiteral) String() string {
+ return fmt.Sprintf("%v", i.arr)
+}
+
+var _ LiteralExpr = (*strLiteral)(nil)
+
+type strLiteral struct {
+ string
+}
+
+func (s *strLiteral) Bytes() [][]byte {
+ return [][]byte{[]byte(s.string)}
+}
+
+func (s *strLiteral) Equal(expr Expr) bool {
+ if other, ok := expr.(*strLiteral); ok {
+ return other.string == s.string
+ }
+
+ return false
+}
+
+func Str(str string) Expr {
+ return &strLiteral{str}
+}
+
+func (s *strLiteral) FieldType() databasev2.TagType {
+ return databasev2.TagType_TAG_TYPE_STRING
+}
+
+func (s *strLiteral) String() string {
+ return s.string
+}
+
+var _ LiteralExpr = (*strArrLiteral)(nil)
+
+type strArrLiteral struct {
+ arr []string
+}
+
+func (s *strArrLiteral) Bytes() [][]byte {
+ b := make([][]byte, 0, len(s.arr))
+ for _, str := range s.arr {
+ b = append(b, []byte(str))
+ }
+ return b
+}
+
+func (s *strArrLiteral) Equal(expr Expr) bool {
+ if other, ok := expr.(*strArrLiteral); ok {
+ return cmp.Equal(other.arr, s.arr)
+ }
+
+ return false
+}
+
+func Strs(strs ...string) Expr {
+ return &strArrLiteral{
+ arr: strs,
+ }
+}
+
+func (s *strArrLiteral) FieldType() databasev2.TagType {
+ return databasev2.TagType_TAG_TYPE_STRING_ARRAY
+}
+
+func (s *strArrLiteral) String() string {
+ return fmt.Sprintf("%v", s.arr)
+}
diff --git a/pkg/query/v2/logical/format.go b/pkg/query/v2/logical/format.go
new file mode 100644
index 0000000..c880b82
--- /dev/null
+++ b/pkg/query/v2/logical/format.go
@@ -0,0 +1,52 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+ "strings"
+)
+
+func Format(p Plan) string {
+ return formatWithIndent(p, 0)
+}
+
+func formatWithIndent(p Plan, indent int) string {
+ res := ""
+ if indent > 1 {
+ res += strings.Repeat(" ", 5*(indent-1))
+ }
+ if indent > 0 {
+ res += "+"
+ res += strings.Repeat("-", 4)
+ }
+ res += p.String() + "\n"
+ for _, child := range p.Children() {
+ res += formatWithIndent(child, indent+1)
+ }
+ return res
+}
+
+func formatExpr(sep string, exprGroup ...[]*FieldRef) string {
+ var exprsStr []string
+ for _, exprs := range exprGroup {
+ for i := 0; i < len(exprs); i++ {
+ exprsStr = append(exprsStr, exprs[i].String())
+ }
+ }
+ return strings.Join(exprsStr, sep)
+}
diff --git a/pkg/query/v2/logical/interface.go b/pkg/query/v2/logical/interface.go
new file mode 100644
index 0000000..937cdb1
--- /dev/null
+++ b/pkg/query/v2/logical/interface.go
@@ -0,0 +1,63 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+ "fmt"
+
+ databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+ "github.com/apache/skywalking-banyandb/pkg/query/v2/executor"
+)
+
+type PlanType uint8
+
+const (
+ PlanLimit PlanType = iota
+ PlanOffset
+ PlanLocalIndexScan
+ PlanGlobalIndexScan
+)
+
+type UnresolvedPlan interface {
+ Analyze(Schema) (Plan, error)
+}
+
+type Plan interface {
+ fmt.Stringer
+ Type() PlanType
+ executor.Executable
+ Equal(Plan) bool
+ Children() []Plan
+ Schema() Schema
+}
+
+type Expr interface {
+ fmt.Stringer
+ FieldType() databasev2.TagType
+ Equal(Expr) bool
+}
+
+type LiteralExpr interface {
+ Expr
+ Bytes() [][]byte
+}
+
+type ResolvableExpr interface {
+ Expr
+ Resolve(Schema) error
+}
diff --git a/pkg/query/v2/logical/iter.go b/pkg/query/v2/logical/iter.go
new file mode 100644
index 0000000..5aa0bb7
--- /dev/null
+++ b/pkg/query/v2/logical/iter.go
@@ -0,0 +1,124 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+ "container/heap"
+
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+)
+
+var _ ItemIterator = (*itemIter)(nil)
+
+type ItemIterator interface {
+ HasNext() bool
+ Next() tsdb.Item
+}
+
+var _ heap.Interface = (*containerHeap)(nil)
+
+// container contains both iter and its current item
+type container struct {
+ c comparator
+ item tsdb.Item
+ iter tsdb.Iterator
+}
+
+type containerHeap []*container
+
+func (h containerHeap) Len() int { return len(h) }
+func (h containerHeap) Less(i, j int) bool { return h[i].c(h[i].item, h[j].item) }
+func (h containerHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
+
+func (h *containerHeap) Push(x interface{}) {
+ *h = append(*h, x.(*container))
+}
+
+func (h *containerHeap) Pop() interface{} {
+ old := *h
+ n := len(old)
+ x := old[n-1]
+ *h = old[0 : n-1]
+ return x
+}
+
+type itemIter struct {
+ // c is the comparator to sort items
+ c comparator
+ // iters is the list of initial Iterator
+ iters []tsdb.Iterator
+ // deq is the deque of the container
+ // 1. When we push a new container, we can normally append it to the tail of the deq,
+ // and then sort the whole slice
+ // 2. When we pop a new container, we can just pop out the first element in the deq.
+ // The rest of the slice is still sorted.
+ h *containerHeap
+}
+
+func NewItemIter(iters []tsdb.Iterator, c comparator) ItemIterator {
+ it := &itemIter{
+ c: c,
+ iters: iters,
+ h: &containerHeap{},
+ }
+ it.init()
+ return it
+}
+
+// init function MUST be called while initialization.
+// 1. Move all iterator to the first item by invoking their Next.
+// 2. Load all first items into a slice.
+func (it *itemIter) init() {
+ for _, iter := range it.iters {
+ it.pushIterator(iter)
+ }
+ // heap initialization
+ heap.Init(it.h)
+}
+
+// pushIterator pushes the given iterator into the underlying deque.
+// Status will be immediately checked if the Iterator has a next value.
+// 1 - If not, it will be close at once and will not be added to the slice,
+// which means inactive iterator does not exist in the deq.
+// 2 - If so, it will be wrapped into a container and push to the deq.
+// Then we call SliceStable sort to sort the deq.
+func (it *itemIter) pushIterator(iter tsdb.Iterator) {
+ if !iter.Next() {
+ _ = iter.Close()
+ return
+ }
+ heap.Push(it.h, &container{
+ item: iter.Val(),
+ iter: iter,
+ c: it.c,
+ })
+}
+
+func (it *itemIter) HasNext() bool {
+ return it.h.Len() > 0
+}
+
+func (it *itemIter) Next() tsdb.Item {
+ // 3. Pop up the minimal item through the order value
+ c := heap.Pop(it.h).(*container)
+
+ // 4. Move the iterator whose value is popped in step 3, push the next value of this iterator into the slice.
+ it.pushIterator(c.iter)
+
+ return c.item
+}
diff --git a/pkg/query/v2/logical/plan.go b/pkg/query/v2/logical/plan.go
new file mode 100644
index 0000000..4b568c7
--- /dev/null
+++ b/pkg/query/v2/logical/plan.go
@@ -0,0 +1,162 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+ "fmt"
+
+ streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+ "github.com/apache/skywalking-banyandb/pkg/query/v2/executor"
+)
+
+var _ Plan = (*limit)(nil)
+var _ UnresolvedPlan = (*limit)(nil)
+
+type parent struct {
+ unresolvedInput UnresolvedPlan
+ input Plan
+}
+
+type limit struct {
+ *parent
+ limitNum uint32
+}
+
+func (l *limit) Execute(ec executor.ExecutionContext) ([]*streamv2.Element, error) {
+ entities, err := l.parent.input.Execute(ec)
+ if err != nil {
+ return nil, err
+ }
+
+ if len(entities) > int(l.limitNum) {
+ return entities[:l.limitNum], nil
+ }
+
+ return entities, nil
+}
+
+func (l *limit) Equal(plan Plan) bool {
+ if plan.Type() != PlanLimit {
+ return false
+ }
+ other := plan.(*limit)
+ if l.limitNum == other.limitNum {
+ return l.input.Equal(other.input)
+ }
+ return false
+}
+
+func (l *limit) Analyze(s Schema) (Plan, error) {
+ var err error
+ l.input, err = l.unresolvedInput.Analyze(s)
+ if err != nil {
+ return nil, err
+ }
+ return l, nil
+}
+
+func (l *limit) Schema() Schema {
+ return l.input.Schema()
+}
+
+func (l *limit) String() string {
+ return fmt.Sprintf("Limit: %d", l.limitNum)
+}
+
+func (l *limit) Children() []Plan {
+ return []Plan{l.input}
+}
+
+func (l *limit) Type() PlanType {
+ return PlanLimit
+}
+
+func Limit(input UnresolvedPlan, num uint32) UnresolvedPlan {
+ return &limit{
+ parent: &parent{
+ unresolvedInput: input,
+ },
+ limitNum: num,
+ }
+}
+
+var _ Plan = (*offset)(nil)
+var _ UnresolvedPlan = (*offset)(nil)
+
+type offset struct {
+ *parent
+ offsetNum uint32
+}
+
+func (l *offset) Execute(ec executor.ExecutionContext) ([]*streamv2.Element, error) {
+ elements, err := l.parent.input.Execute(ec)
+ if err != nil {
+ return nil, err
+ }
+
+ if len(elements) > int(l.offsetNum) {
+ return elements[l.offsetNum:], nil
+ }
+
+ return []*streamv2.Element{}, nil
+}
+
+func (l *offset) Equal(plan Plan) bool {
+ if plan.Type() != PlanOffset {
+ return false
+ }
+ other := plan.(*offset)
+ if l.offsetNum == other.offsetNum {
+ return l.input.Equal(other.input)
+ }
+ return false
+}
+
+func (l *offset) Analyze(s Schema) (Plan, error) {
+ var err error
+ l.input, err = l.unresolvedInput.Analyze(s)
+ if err != nil {
+ return nil, err
+ }
+ return l, nil
+}
+
+func (l *offset) Schema() Schema {
+ return l.input.Schema()
+}
+
+func (l *offset) String() string {
+ return fmt.Sprintf("Offset: %d", l.offsetNum)
+}
+
+func (l *offset) Children() []Plan {
+ return []Plan{l.input}
+}
+
+func (l *offset) Type() PlanType {
+ return PlanOffset
+}
+
+func Offset(input UnresolvedPlan, num uint32) UnresolvedPlan {
+ return &offset{
+ parent: &parent{
+ unresolvedInput: input,
+ },
+ offsetNum: num,
+ }
+}
diff --git a/pkg/query/v2/logical/plan_execution_test.go b/pkg/query/v2/logical/plan_execution_test.go
new file mode 100644
index 0000000..55ade27
--- /dev/null
+++ b/pkg/query/v2/logical/plan_execution_test.go
@@ -0,0 +1,365 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical_test
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/query/v2/logical"
+)
+
+func TestPlanExecution_TableScan_Limit(t *testing.T) {
+ assertT := assert.New(t)
+ streamT, deferFunc := setup(assertT)
+ defer deferFunc()
+ baseTs := setupQueryData(t, "multiple_shards.json", streamT)
+
+ metadata := &commonv2.Metadata{
+ Name: "sw",
+ Group: "default",
+ }
+
+ sT, eT := baseTs, baseTs.Add(1*time.Hour)
+
+ tests := []struct {
+ name string
+ unresolvedPlan logical.UnresolvedPlan
+ wantLength int
+ }{
+ {
+ name: "Limit 1",
+ unresolvedPlan: logical.Limit(logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil), 1),
+ wantLength: 1,
+ },
+ {
+ name: "Limit 5",
+ unresolvedPlan: logical.Limit(logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil), 5),
+ wantLength: 5,
+ },
+ {
+ name: "Limit 10",
+ unresolvedPlan: logical.Limit(logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil), 10),
+ wantLength: 5,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tester := require.New(t)
+ schema, err := logical.DefaultAnalyzer().BuildStreamSchema(context.TODO(), metadata)
+ tester.NoError(err)
+
+ plan, err := tt.unresolvedPlan.Analyze(schema)
+ tester.NoError(err)
+ tester.NotNil(plan)
+
+ entities, err := plan.Execute(streamT)
+ tester.NoError(err)
+ tester.Len(entities, tt.wantLength)
+ tester.True(logical.SortedByTimestamp(entities, modelv2.QueryOrder_SORT_ASC))
+ })
+ }
+}
+
+func TestPlanExecution_Offset(t *testing.T) {
+ assertT := assert.New(t)
+ streamT, deferFunc := setup(assertT)
+ defer deferFunc()
+ baseTs := setupQueryData(t, "multiple_shards.json", streamT)
+
+ metadata := &commonv2.Metadata{
+ Name: "sw",
+ Group: "default",
+ }
+
+ sT, eT := baseTs, baseTs.Add(1*time.Hour)
+
+ tests := []struct {
+ name string
+ unresolvedPlan logical.UnresolvedPlan
+ wantLength int
+ }{
+ {
+ name: "Offset 0",
+ unresolvedPlan: logical.Offset(logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil), 0),
+ wantLength: 5,
+ },
+ {
+ name: "Offset 3",
+ unresolvedPlan: logical.Offset(logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil), 3),
+ wantLength: 2,
+ },
+ {
+ name: "Limit 5",
+ unresolvedPlan: logical.Offset(logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil), 5),
+ wantLength: 0,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tester := require.New(t)
+ schema, err := logical.DefaultAnalyzer().BuildStreamSchema(context.TODO(), metadata)
+ tester.NoError(err)
+
+ plan, err := tt.unresolvedPlan.Analyze(schema)
+ tester.NoError(err)
+ tester.NotNil(plan)
+
+ entities, err := plan.Execute(streamT)
+ tester.NoError(err)
+ tester.Len(entities, tt.wantLength)
+ })
+ }
+}
+
+func TestPlanExecution_TraceIDFetch(t *testing.T) {
+ assertT := assert.New(t)
+ streamT, deferFunc := setup(assertT)
+ defer deferFunc()
+ _ = setupQueryData(t, "multiple_shards.json", streamT)
+
+ m := &commonv2.Metadata{
+ Name: "sw",
+ Group: "default",
+ }
+
+ tests := []struct {
+ name string
+ traceID string
+ wantLength int
+ }{
+ {
+ name: "traceID = 1",
+ traceID: "1",
+ wantLength: 1,
+ },
+ {
+ name: "traceID = 2",
+ traceID: "2",
+ wantLength: 1,
+ },
+ {
+ name: "traceID = 3",
+ traceID: "3",
+ wantLength: 1,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tester := require.New(t)
+ s, err := logical.DefaultAnalyzer().BuildStreamSchema(context.TODO(), m)
+ tester.NoError(err)
+
+ p, err := logical.GlobalIndexScan(m, []logical.Expr{
+ logical.Eq(logical.NewFieldRef("searchable", "trace_id"), logical.Str(tt.traceID)),
+ }, logical.NewTags("searchable", "trace_id")).Analyze(s)
+ tester.NoError(err)
+ tester.NotNil(p)
+ entities, err := p.Execute(streamT)
+ tester.NoError(err)
+ for _, entity := range entities {
+ tester.Len(entity.GetTagFamilies(), 1)
+ tester.Len(entity.GetTagFamilies()[0].GetTags(), 1)
+ tester.Equal(entity.GetTagFamilies()[0].GetTags()[0].GetValue().GetStr().GetValue(), tt.traceID)
+ }
+ tester.Len(entities, tt.wantLength)
+ })
+ }
+}
+
+func TestPlanExecution_IndexScan(t *testing.T) {
+ assertT := assert.New(t)
+ streamT, deferFunc := setup(assertT)
+ defer deferFunc()
+ baseTs := setupQueryData(t, "multiple_shards.json", streamT)
+
+ metadata := &commonv2.Metadata{
+ Name: "sw",
+ Group: "default",
+ }
+
+ sT, eT := baseTs, baseTs.Add(1*time.Hour)
+
+ tests := []struct {
+ name string
+ unresolvedPlan logical.UnresolvedPlan
+ wantLength int
+ }{
+ {
+ name: "Single Index Search using POST without entity returns nothing",
+ unresolvedPlan: logical.IndexScan(sT, eT, metadata, []logical.Expr{
+ logical.Eq(logical.NewFieldRef("searchable", "http.method"), logical.Str("POST")),
+ }, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil),
+ wantLength: 0,
+ },
+ {
+ name: "Single Index Search using inverted index",
+ unresolvedPlan: logical.IndexScan(sT, eT, metadata, []logical.Expr{
+ logical.Eq(logical.NewFieldRef("searchable", "http.method"), logical.Str("GET")),
+ }, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil),
+ wantLength: 3,
+ },
+ {
+ name: "Single Index Search using LSM tree index",
+ unresolvedPlan: logical.IndexScan(sT, eT, metadata, []logical.Expr{
+ logical.Lt(logical.NewFieldRef("searchable", "duration"), logical.Int(100)),
+ }, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil),
+ wantLength: 2,
+ },
+ {
+ name: "Single Index Search without entity returns results",
+ unresolvedPlan: logical.IndexScan(sT, eT, metadata, []logical.Expr{
+ logical.Eq(logical.NewFieldRef("searchable", "endpoint_id"), logical.Str("/home_id")),
+ }, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil),
+ wantLength: 2,
+ },
+ {
+ name: "Multiple Index Search",
+ unresolvedPlan: logical.IndexScan(sT, eT, metadata, []logical.Expr{
+ logical.Eq(logical.NewFieldRef("searchable", "http.method"), logical.Str("GET")),
+ logical.Eq(logical.NewFieldRef("searchable", "endpoint_id"), logical.Str("/home_id")),
+ }, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil),
+ wantLength: 1,
+ },
+ {
+ name: "Multiple Index Search with a combination of numerical index and textual index",
+ unresolvedPlan: logical.IndexScan(sT, eT, metadata, []logical.Expr{
+ logical.Eq(logical.NewFieldRef("searchable", "http.method"), logical.Str("GET")),
+ logical.Lt(logical.NewFieldRef("searchable", "duration"), logical.Int(100)),
+ }, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil),
+ wantLength: 2,
+ },
+ {
+ name: "Multiple Index With One Empty Result(ChunkID)",
+ unresolvedPlan: logical.IndexScan(sT, eT, metadata, []logical.Expr{
+ logical.Eq(logical.NewFieldRef("searchable", "http.method"), logical.Str("GET")),
+ logical.Eq(logical.NewFieldRef("searchable", "endpoint_id"), logical.Str("/unknown")),
+ }, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil),
+ wantLength: 0,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tester := require.New(t)
+ schema, err := logical.DefaultAnalyzer().BuildStreamSchema(context.TODO(), metadata)
+ tester.NoError(err)
+
+ plan, err := tt.unresolvedPlan.Analyze(schema)
+ tester.NoError(err)
+ tester.NotNil(plan)
+
+ entities, err := plan.Execute(streamT)
+ tester.NoError(err)
+ tester.Len(entities, tt.wantLength)
+ })
+ }
+}
+
+func TestPlanExecution_OrderBy(t *testing.T) {
+ assertT := assert.New(t)
+ streamT, deferFunc := setup(assertT)
+ defer deferFunc()
+ baseTs := setupQueryData(t, "multiple_shards.json", streamT)
+
+ metadata := &commonv2.Metadata{
+ Name: "sw",
+ Group: "default",
+ }
+
+ sT, eT := baseTs, baseTs.Add(1*time.Hour)
+
+ tests := []struct {
+ name string
+ targetIndexRule string
+ sortDirection modelv2.QueryOrder_Sort
+ // TODO: avoid hardcoded index?
+ targetFamilyIdx int
+ targetTagIdx int
+ }{
+ {
+ name: "Sort By duration ASC",
+ targetIndexRule: "duration",
+ sortDirection: modelv2.QueryOrder_SORT_ASC,
+ targetFamilyIdx: 0,
+ targetTagIdx: 0,
+ },
+ {
+ name: "Sort By duration DESC",
+ targetIndexRule: "duration",
+ sortDirection: modelv2.QueryOrder_SORT_DESC,
+ targetFamilyIdx: 0,
+ targetTagIdx: 0,
+ },
+ {
+ name: "Sort By start_time DESC",
+ targetIndexRule: "",
+ sortDirection: modelv2.QueryOrder_SORT_DESC,
+ },
+ {
+ name: "Sort By start_time ASC",
+ targetIndexRule: "",
+ sortDirection: modelv2.QueryOrder_SORT_ASC,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tester := require.New(t)
+ schema, err := logical.DefaultAnalyzer().BuildStreamSchema(context.TODO(), metadata)
+ tester.NoError(err)
+ tester.NotNil(schema)
+
+ if tt.targetIndexRule == "" {
+ p, err := logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+ logical.OrderBy("", tt.sortDirection), logical.NewTags("searchable", "start_time")).
+ Analyze(schema)
+ tester.NoError(err)
+ tester.NotNil(p)
+
+ entities, err := p.Execute(streamT)
+ tester.NoError(err)
+ tester.NotNil(entities)
+
+ tester.True(logical.SortedByTimestamp(entities, tt.sortDirection))
+ } else {
+ p, err := logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+ logical.OrderBy(tt.targetIndexRule, tt.sortDirection), logical.NewTags("searchable", tt.targetIndexRule)).
+ Analyze(schema)
+ tester.NoError(err)
+ tester.NotNil(p)
+
+ entities, err := p.Execute(streamT)
+ tester.NoError(err)
+ tester.NotNil(entities)
+
+ tester.True(logical.SortedByIndex(entities, tt.targetFamilyIdx, tt.targetTagIdx, tt.sortDirection))
+ }
+ })
+ }
+}
diff --git a/pkg/query/v2/logical/plan_indexscan_global.go b/pkg/query/v2/logical/plan_indexscan_global.go
new file mode 100644
index 0000000..fea123e
--- /dev/null
+++ b/pkg/query/v2/logical/plan_indexscan_global.go
@@ -0,0 +1,146 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/google/go-cmp/cmp"
+ "github.com/pkg/errors"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+ databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+ streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/query/v2/executor"
+)
+
+var _ Plan = (*globalIndexScan)(nil)
+
+type globalIndexScan struct {
+ schema Schema
+ metadata *commonv2.Metadata
+ globalIndexRule *databasev2.IndexRule
+ expr Expr
+ projectionFieldRefs [][]*FieldRef
+}
+
+func (t *globalIndexScan) String() string {
+ if len(t.projectionFieldRefs) == 0 {
+ return fmt.Sprintf("GlobalIndexScan: Metadata{group=%s,name=%s},condition=%s; projection=None",
+ t.metadata.GetGroup(), t.metadata.GetName(), t.expr.String())
+ }
+ return fmt.Sprintf("GlobalIndexScan: Metadata{group=%s,name=%s},conditions=%s; projection=%s",
+ t.metadata.GetGroup(), t.metadata.GetName(),
+ t.expr.String(), formatExpr(", ", t.projectionFieldRefs...))
+}
+
+func (t *globalIndexScan) Children() []Plan {
+ return []Plan{}
+}
+
+func (t *globalIndexScan) Type() PlanType {
+ return PlanGlobalIndexScan
+}
+
+func (t *globalIndexScan) Schema() Schema {
+ return t.schema
+}
+
+func (t *globalIndexScan) Equal(plan Plan) bool {
+ if plan.Type() != PlanGlobalIndexScan {
+ return false
+ }
+ other := plan.(*globalIndexScan)
+ return t.metadata.GetGroup() == other.metadata.GetGroup() &&
+ t.metadata.GetName() == other.metadata.GetName() &&
+ cmp.Equal(t.projectionFieldRefs, other.projectionFieldRefs) &&
+ cmp.Equal(t.schema, other.schema) &&
+ cmp.Equal(t.globalIndexRule.GetMetadata().GetId(), other.globalIndexRule.GetMetadata().GetId()) &&
+ cmp.Equal(t.expr, other.expr)
+}
+
+func (t *globalIndexScan) Execute(ec executor.ExecutionContext) ([]*streamv2.Element, error) {
+ shards, err := ec.Shards(nil)
+ if err != nil {
+ return nil, err
+ }
+ var elements []*streamv2.Element
+ for _, shard := range shards {
+ elementsInShard, err := t.executeForShard(ec, shard)
+ if err != nil {
+ return elements, err
+ }
+ elements = append(elements, elementsInShard...)
+ }
+ return elements, nil
+}
+
+func (t *globalIndexScan) executeForShard(ec executor.ExecutionContext, shard tsdb.Shard) ([]*streamv2.Element, error) {
+ var elementsInShard []*streamv2.Element
+ itemIDs, err := shard.Index().Seek(index.Field{
+ Key: index.FieldKey{
+ IndexRuleID: t.globalIndexRule.GetMetadata().GetId(),
+ },
+ Term: t.expr.(*binaryExpr).r.(LiteralExpr).Bytes()[0],
+ })
+ if err != nil || len(itemIDs) < 1 {
+ return elementsInShard, nil
+ }
+ for _, itemID := range itemIDs {
+ segShard, err := ec.Shard(itemID.ShardID)
+ if err != nil {
+ return elementsInShard, errors.WithStack(err)
+ }
+ series, err := segShard.Series().GetByID(itemID.SeriesID)
+ if err != nil {
+ return elementsInShard, errors.WithStack(err)
+ }
+ err = func() error {
+ item, closer, errInner := series.Get(itemID)
+ defer func(closer io.Closer) {
+ _ = closer.Close()
+ }(closer)
+ if errInner != nil {
+ return errors.WithStack(errInner)
+ }
+ tagFamilies, errInner := projectItem(ec, item, t.projectionFieldRefs)
+ if errInner != nil {
+ return errors.WithStack(errInner)
+ }
+ elementID, errInner := ec.ParseElementID(item)
+ if errInner != nil {
+ return errors.WithStack(errInner)
+ }
+ elementsInShard = append(elementsInShard, &streamv2.Element{
+ ElementId: elementID,
+ Timestamp: timestamppb.New(time.Unix(0, int64(item.Time()))),
+ TagFamilies: tagFamilies,
+ })
+ return nil
+ }()
+ if err != nil {
+ return nil, err
+ }
+ }
+ return elementsInShard, nil
+}
diff --git a/pkg/query/v2/logical/plan_indexscan_local.go b/pkg/query/v2/logical/plan_indexscan_local.go
new file mode 100644
index 0000000..35619e0
--- /dev/null
+++ b/pkg/query/v2/logical/plan_indexscan_local.go
@@ -0,0 +1,291 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+ "bytes"
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/google/go-cmp/cmp"
+ "github.com/pkg/errors"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+ databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+ streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/query/v2/executor"
+)
+
+var _ UnresolvedPlan = (*unresolvedIndexScan)(nil)
+
+type unresolvedIndexScan struct {
+ unresolvedOrderBy *UnresolvedOrderBy
+ startTime time.Time
+ endTime time.Time
+ metadata *commonv2.Metadata
+ conditions []Expr
+ projectionFields [][]*Tag
+ entity tsdb.Entity
+}
+
+func (uis *unresolvedIndexScan) Analyze(s Schema) (Plan, error) {
+ localConditionMap := make(map[*databasev2.IndexRule][]Expr)
+ globalConditions := make([]interface{}, 0)
+ for _, cond := range uis.conditions {
+ if resolvable, ok := cond.(ResolvableExpr); ok {
+ err := resolvable.Resolve(s)
+ if err != nil {
+ return nil, err
+ }
+
+ if bCond, ok := cond.(*binaryExpr); ok {
+ tag := bCond.l.(*FieldRef).tag
+ if defined, indexObj := s.IndexDefined(tag); defined {
+ if indexObj.GetLocation() == databasev2.IndexRule_LOCATION_SERIES {
+ if v, exist := localConditionMap[indexObj]; exist {
+ v = append(v, cond)
+ localConditionMap[indexObj] = v
+ } else {
+ localConditionMap[indexObj] = []Expr{cond}
+ }
+ } else if indexObj.GetLocation() == databasev2.IndexRule_LOCATION_GLOBAL {
+ globalConditions = append(globalConditions, indexObj, cond)
+ }
+ } else {
+ return nil, errors.Wrap(ErrIndexNotDefined, tag.GetCompoundName())
+ }
+ }
+ }
+ }
+
+ var projFieldsRefs [][]*FieldRef
+ if uis.projectionFields != nil && len(uis.projectionFields) > 0 {
+ var err error
+ projFieldsRefs, err = s.CreateRef(uis.projectionFields...)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ if len(globalConditions) > 0 {
+ if len(globalConditions)/2 > 1 {
+ return nil, ErrMultipleGlobalIndexes
+ }
+ return &globalIndexScan{
+ schema: s,
+ projectionFieldRefs: projFieldsRefs,
+ metadata: uis.metadata,
+ globalIndexRule: globalConditions[0].(*databasev2.IndexRule),
+ expr: globalConditions[1].(Expr),
+ }, nil
+ }
+
+ // resolve sub-plan with the projected view of schema
+ orderBySubPlan, err := uis.unresolvedOrderBy.analyze(s.Proj(projFieldsRefs...))
+
+ if err != nil {
+ return nil, err
+ }
+
+ return &localIndexScan{
+ orderBy: orderBySubPlan,
+ timeRange: tsdb.NewTimeRange(uis.startTime, uis.endTime),
+ schema: s,
+ projectionFieldRefs: projFieldsRefs,
+ metadata: uis.metadata,
+ conditionMap: localConditionMap,
+ entity: uis.entity,
+ }, nil
+}
+
+var _ Plan = (*localIndexScan)(nil)
+
+type localIndexScan struct {
+ *orderBy
+ timeRange tsdb.TimeRange
+ schema Schema
+ metadata *commonv2.Metadata
+ conditionMap map[*databasev2.IndexRule][]Expr
+ projectionFieldRefs [][]*FieldRef
+ entity tsdb.Entity
+}
+
+func (i *localIndexScan) Execute(ec executor.ExecutionContext) ([]*streamv2.Element, error) {
+ shards, err := ec.Shards(i.entity)
+ if err != nil {
+ return nil, err
+ }
+ var iters []tsdb.Iterator
+ for _, shard := range shards {
+ itersInShard, err := i.executeInShard(shard)
+ if err != nil {
+ return nil, err
+ }
+ iters = append(iters, itersInShard...)
+ }
+
+ c := createComparator(i.sort)
+
+ var elems []*streamv2.Element
+ it := NewItemIter(iters, c)
+ for it.HasNext() {
+ nextItem := it.Next()
+ tagFamilies, innerErr := projectItem(ec, nextItem, i.projectionFieldRefs)
+ if innerErr != nil {
+ return nil, innerErr
+ }
+ elementID, innerErr := ec.ParseElementID(nextItem)
+ if innerErr != nil {
+ return nil, innerErr
+ }
+ elems = append(elems, &streamv2.Element{
+ ElementId: elementID,
+ Timestamp: timestamppb.New(time.Unix(0, int64(nextItem.Time()))),
+ TagFamilies: tagFamilies,
+ })
+ }
+ return elems, nil
+}
+
+func (i *localIndexScan) executeInShard(shard tsdb.Shard) ([]tsdb.Iterator, error) {
+ seriesList, err := shard.Series().List(tsdb.NewPath(i.entity))
+ if err != nil {
+ return nil, err
+ }
+
+ var builders []seekerBuilder
+
+ if i.index != nil {
+ builders = append(builders, func(builder tsdb.SeekerBuilder) {
+ builder.OrderByIndex(i.index, i.sort)
+ })
+ } else {
+ builders = append(builders, func(builder tsdb.SeekerBuilder) {
+ builder.OrderByTime(i.sort)
+ })
+ }
+
+ if i.conditionMap != nil && len(i.conditionMap) > 0 {
+ builders = append(builders, func(b tsdb.SeekerBuilder) {
+ for idxRule, exprs := range i.conditionMap {
+ b.Filter(idxRule, exprToCondition(exprs))
+ }
+ })
+ }
+
+ return executeForShard(seriesList, i.timeRange, builders...)
+}
+
+func (i *localIndexScan) String() string {
+ exprStr := make([]string, 0, len(i.conditionMap))
+ for _, conditions := range i.conditionMap {
+ var conditionStr []string
+ for _, cond := range conditions {
+ conditionStr = append(conditionStr, cond.String())
+ }
+ exprStr = append(exprStr, fmt.Sprintf("(%s)", strings.Join(conditionStr, " AND ")))
+ }
+ if len(i.projectionFieldRefs) == 0 {
+ return fmt.Sprintf("IndexScan: startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; projection=None",
+ i.timeRange.Start.Unix(), i.timeRange.End.Unix(), i.metadata.GetGroup(), i.metadata.GetName(), strings.Join(exprStr, " AND "))
+ }
+ return fmt.Sprintf("IndexScan: startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; projection=%s",
+ i.timeRange.Start.Unix(), i.timeRange.End.Unix(), i.metadata.GetGroup(), i.metadata.GetName(),
+ strings.Join(exprStr, " AND "), formatExpr(", ", i.projectionFieldRefs...))
+}
+
+func (i *localIndexScan) Type() PlanType {
+ return PlanLocalIndexScan
+}
+
+func (i *localIndexScan) Children() []Plan {
+ return []Plan{}
+}
+
+func (i *localIndexScan) Schema() Schema {
+ if i.projectionFieldRefs == nil || len(i.projectionFieldRefs) == 0 {
+ return i.schema
+ }
+ return i.schema.Proj(i.projectionFieldRefs...)
+}
+
+func (i *localIndexScan) Equal(plan Plan) bool {
+ if plan.Type() != PlanLocalIndexScan {
+ return false
+ }
+ other := plan.(*localIndexScan)
+ return i.metadata.GetGroup() == other.metadata.GetGroup() &&
+ i.metadata.GetName() == other.metadata.GetName() &&
+ i.timeRange.Start.UnixNano() == other.timeRange.Start.UnixNano() &&
+ i.timeRange.End.UnixNano() == other.timeRange.End.UnixNano() &&
+ bytes.Equal(i.entity.Marshal(), other.entity.Marshal()) &&
+ cmp.Equal(i.projectionFieldRefs, other.projectionFieldRefs) &&
+ cmp.Equal(i.schema, other.schema) &&
+ cmp.Equal(i.conditionMap, other.conditionMap) &&
+ cmp.Equal(i.orderBy, other.orderBy)
+}
+
+func IndexScan(startTime, endTime time.Time, metadata *commonv2.Metadata, conditions []Expr, entity tsdb.Entity,
+ orderBy *UnresolvedOrderBy, projection ...[]*Tag) UnresolvedPlan {
+ return &unresolvedIndexScan{
+ unresolvedOrderBy: orderBy,
+ startTime: startTime,
+ endTime: endTime,
+ metadata: metadata,
+ conditions: conditions,
+ projectionFields: projection,
+ entity: entity,
+ }
+}
+
+// GlobalIndexScan is a short-hand method for composing a globalIndexScan plan
+func GlobalIndexScan(metadata *commonv2.Metadata, conditions []Expr, projection ...[]*Tag) UnresolvedPlan {
+ return &unresolvedIndexScan{
+ metadata: metadata,
+ conditions: conditions,
+ projectionFields: projection,
+ }
+}
+
+func exprToCondition(exprs []Expr) tsdb.Condition {
+ cond := make(map[string][]index.ConditionValue)
+ for _, expr := range exprs {
+ bExpr := expr.(*binaryExpr)
+ l := bExpr.l.(*FieldRef)
+ r := bExpr.r.(LiteralExpr)
+ if existingList, ok := cond[l.tag.GetTagName()]; ok {
+ existingList = append(existingList, index.ConditionValue{
+ Values: r.Bytes(),
+ Op: bExpr.op,
+ })
+ cond[l.tag.GetTagName()] = existingList
+ } else {
+ cond[l.tag.GetTagName()] = []index.ConditionValue{
+ {
+ Values: r.Bytes(),
+ Op: bExpr.op,
+ },
+ }
+ }
+ }
+ return cond
+}
diff --git a/pkg/query/v2/logical/plan_orderby.go b/pkg/query/v2/logical/plan_orderby.go
new file mode 100644
index 0000000..254f0b2
--- /dev/null
+++ b/pkg/query/v2/logical/plan_orderby.go
@@ -0,0 +1,171 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+ "bytes"
+ "fmt"
+ "sort"
+
+ "github.com/pkg/errors"
+
+ databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+type UnresolvedOrderBy struct {
+ sort modelv2.QueryOrder_Sort
+ targetIndexRuleName string
+}
+
+func (u *UnresolvedOrderBy) analyze(s Schema) (*orderBy, error) {
+ if u == nil {
+ // return a default orderBy sub-plan
+ return &orderBy{
+ sort: modelv2.QueryOrder_SORT_UNSPECIFIED,
+ }, nil
+ }
+
+ if u.targetIndexRuleName == "" {
+ return &orderBy{
+ sort: u.sort,
+ }, nil
+ }
+
+ defined, indexRule := s.IndexRuleDefined(u.targetIndexRuleName)
+ if !defined {
+ return nil, errors.Wrap(ErrIndexNotDefined, u.targetIndexRuleName)
+ }
+
+ projFieldSpecs, err := s.CreateRef(NewTags("", indexRule.GetTags()...))
+
+ if err != nil {
+ return nil, ErrFieldNotDefined
+ }
+
+ return &orderBy{
+ sort: u.sort,
+ index: indexRule,
+ fieldRefs: projFieldSpecs[0],
+ }, nil
+}
+
+type orderBy struct {
+ // orderByIndex describes the indexRule used to sort the elements/
+ // It can be null since by default we may sort by created-time.
+ index *databasev2.IndexRule
+ // while orderBySort describes the sort direction
+ sort modelv2.QueryOrder_Sort
+ // TODO: support multiple tags. Currently only the first member will be used for sorting.
+ fieldRefs []*FieldRef
+}
+
+func (o *orderBy) Equal(other interface{}) bool {
+ if otherOrderBy, ok := other.(*orderBy); ok {
+ if o == nil && otherOrderBy == nil {
+ return true
+ }
+ if o != nil && otherOrderBy == nil || o == nil && otherOrderBy != nil {
+ return false
+ }
+ return o.sort == otherOrderBy.sort &&
+ o.index.GetMetadata().GetName() == otherOrderBy.index.GetMetadata().GetName()
+ }
+
+ return false
+}
+
+func (o *orderBy) String() string {
+ return fmt.Sprintf("OrderBy: %v, sort=%s", o.index.GetTags(), o.sort.String())
+}
+
+func OrderBy(indexRuleName string, sort modelv2.QueryOrder_Sort) *UnresolvedOrderBy {
+ return &UnresolvedOrderBy{
+ sort: sort,
+ targetIndexRuleName: indexRuleName,
+ }
+}
+
+func getRawTagValue(typedPair *modelv2.Tag) ([]byte, error) {
+ switch v := typedPair.GetValue().Value.(type) {
+ case *modelv2.TagValue_Str:
+ return []byte(v.Str.GetValue()), nil
+ case *modelv2.TagValue_Int:
+ return convert.Int64ToBytes(v.Int.GetValue()), nil
+ default:
+ return nil, errors.New("unsupported data types")
+ }
+}
+
+// SortedByIndex is used to test whether the given entities are sorted by the sortDirection
+// The given entities MUST satisfy both the positive check and the negative check for the reversed direction
+func SortedByIndex(elements []*streamv2.Element, tagFamilyIdx, tagIdx int, sortDirection modelv2.QueryOrder_Sort) bool {
+ if modelv2.QueryOrder_SORT_UNSPECIFIED == sortDirection {
+ sortDirection = modelv2.QueryOrder_SORT_ASC
+ }
+ if len(elements) == 1 {
+ return true
+ }
+ return sort.SliceIsSorted(elements, sortByIndex(elements, tagFamilyIdx, tagIdx, sortDirection)) &&
+ !sort.SliceIsSorted(elements, sortByIndex(elements, tagFamilyIdx, tagIdx, reverseSortDirection(sortDirection)))
+}
+
+func SortedByTimestamp(elements []*streamv2.Element, sortDirection modelv2.QueryOrder_Sort) bool {
+ if modelv2.QueryOrder_SORT_UNSPECIFIED == sortDirection {
+ sortDirection = modelv2.QueryOrder_SORT_ASC
+ }
+ if len(elements) == 1 {
+ return true
+ }
+ return sort.SliceIsSorted(elements, sortByTimestamp(elements, sortDirection)) &&
+ !sort.SliceIsSorted(elements, sortByTimestamp(elements, reverseSortDirection(sortDirection)))
+}
+
+func reverseSortDirection(sort modelv2.QueryOrder_Sort) modelv2.QueryOrder_Sort {
+ if sort == modelv2.QueryOrder_SORT_DESC {
+ return modelv2.QueryOrder_SORT_ASC
+ }
+ return modelv2.QueryOrder_SORT_DESC
+}
+
+func sortByIndex(entities []*streamv2.Element, tagFamilyIdx, tagIdx int, sortDirection modelv2.QueryOrder_Sort) func(i, j int) bool {
+ return func(i, j int) bool {
+ iPair := entities[i].GetTagFamilies()[tagFamilyIdx].GetTags()[tagIdx]
+ jPair := entities[j].GetTagFamilies()[tagFamilyIdx].GetTags()[tagIdx]
+ lField, _ := getRawTagValue(iPair)
+ rField, _ := getRawTagValue(jPair)
+ comp := bytes.Compare(lField, rField)
+ if sortDirection == modelv2.QueryOrder_SORT_ASC {
+ return comp == -1
+ }
+ return comp == 1
+ }
+}
+
+func sortByTimestamp(entities []*streamv2.Element, sortDirection modelv2.QueryOrder_Sort) func(i, j int) bool {
+ return func(i, j int) bool {
+ iPair := entities[i].GetTimestamp().AsTime().UnixNano()
+ jPair := entities[j].GetTimestamp().AsTime().UnixNano()
+ if sortDirection == modelv2.QueryOrder_SORT_DESC {
+ return iPair > jPair
+ }
+ return iPair < jPair
+ }
+}
diff --git a/pkg/query/v2/logical/schema.go b/pkg/query/v2/logical/schema.go
new file mode 100644
index 0000000..13130e4
--- /dev/null
+++ b/pkg/query/v2/logical/schema.go
@@ -0,0 +1,176 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+ "github.com/google/go-cmp/cmp"
+ "github.com/pkg/errors"
+
+ databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+)
+
+type Schema interface {
+ EntityList() []string
+ IndexDefined(*Tag) (bool, *databasev2.IndexRule)
+ IndexRuleDefined(string) (bool, *databasev2.IndexRule)
+ FieldSubscript(string) (bool, int)
+ FieldDefined(string) bool
+ CreateRef(tags ...[]*Tag) ([][]*FieldRef, error)
+ Proj(refs ...[]*FieldRef) Schema
+ Equal(Schema) bool
+ ShardNumber() uint32
+ TraceIDFieldName() string
+}
+
+type tagSpec struct {
+ // Idx is defined as
+ // 1) the field index based on the (trace) schema for the underlying plans which
+ // directly interact with the database and index modules,
+ // 2) the projection index given by the users for those plans which can only access the data from parent plans,
+ // e.g. orderBy plan uses this projection index to access the data entities (normally a projection view)
+ // from the parent plan.
+ TagFamilyIdx int
+ TagIdx int
+ spec *databasev2.TagSpec
+}
+
+func (fs *tagSpec) Equal(other *tagSpec) bool {
+ return fs.TagFamilyIdx == other.TagFamilyIdx && fs.TagIdx == other.TagIdx &&
+ fs.spec.GetType() == other.spec.GetType() && fs.spec.GetName() == other.spec.GetName()
+}
+
+var _ Schema = (*schema)(nil)
+
+type schema struct {
+ stream *databasev2.Stream
+ indexRules []*databasev2.IndexRule
+ fieldMap map[string]*tagSpec
+ entityList []string
+}
+
+func (s *schema) IndexRuleDefined(indexRuleName string) (bool, *databasev2.IndexRule) {
+ for _, idxRule := range s.indexRules {
+ if idxRule.GetMetadata().GetName() == indexRuleName {
+ return true, idxRule
+ }
+ }
+ return false, nil
+}
+
+func (s *schema) EntityList() []string {
+ return s.entityList
+}
+
+func (s *schema) TraceIDFieldName() string {
+ // TODO: how to extract trace_id?
+ return "trace_id"
+}
+
+// IndexDefined checks whether the field given is indexed
+func (s *schema) IndexDefined(tag *Tag) (bool, *databasev2.IndexRule) {
+ for _, idxRule := range s.indexRules {
+ for _, tagName := range idxRule.GetTags() {
+ if tag.GetTagName() == tagName {
+ return true, idxRule
+ }
+ }
+ }
+ return false, nil
+}
+
+func (s *schema) FieldSubscript(field string) (bool, int) {
+ // TODO: what does this mean
+ for i, indexObj := range s.indexRules {
+ for _, fieldName := range indexObj.GetTags() {
+ if field == fieldName {
+ return true, i
+ }
+ }
+ }
+ return false, -1
+}
+
+func (s *schema) Equal(s2 Schema) bool {
+ if other, ok := s2.(*schema); ok {
+ return cmp.Equal(other.fieldMap, s.fieldMap)
+ }
+ return false
+}
+
+// registerField registers the field spec with given tagFamilyName, tagName and indexes.
+func (s *schema) registerField(tagName string, tagFamilyIdx, tagIdx int, spec *databasev2.TagSpec) {
+ s.fieldMap[tagName] = &tagSpec{
+ TagIdx: tagIdx,
+ TagFamilyIdx: tagFamilyIdx,
+ spec: spec,
+ }
+}
+
+func (s *schema) FieldDefined(name string) bool {
+ if _, ok := s.fieldMap[name]; ok {
+ return true
+ }
+ return false
+}
+
+// CreateRef create FieldRef to the given tags.
+// The family name of the tag is actually not used
+// since the uniqueness of the tag names can be guaranteed across families.
+func (s *schema) CreateRef(tags ...[]*Tag) ([][]*FieldRef, error) {
+ fieldRefs := make([][]*FieldRef, len(tags))
+ for i, tagInFamily := range tags {
+ var fieldRefsInFamily []*FieldRef
+ for _, tag := range tagInFamily {
+ if fs, ok := s.fieldMap[tag.GetTagName()]; ok {
+ fieldRefsInFamily = append(fieldRefsInFamily, &FieldRef{tag, fs})
+ } else {
+ return nil, errors.Wrap(ErrFieldNotDefined, tag.GetCompoundName())
+ }
+ }
+ fieldRefs[i] = fieldRefsInFamily
+ }
+ return fieldRefs, nil
+}
+
+// Proj creates a projection view from the present schema
+// with a given list of projections
+func (s *schema) Proj(refs ...[]*FieldRef) Schema {
+ if len(refs) == 0 {
+ return nil
+ }
+ newSchema := &schema{
+ stream: s.stream,
+ indexRules: s.indexRules,
+ fieldMap: make(map[string]*tagSpec),
+ entityList: s.entityList,
+ }
+ for projFamilyIdx, refInFamily := range refs {
+ for projIdx, ref := range refInFamily {
+ newSchema.fieldMap[ref.tag.GetTagName()] = &tagSpec{
+ TagFamilyIdx: projFamilyIdx,
+ TagIdx: projIdx,
+ spec: ref.Spec.spec,
+ }
+ }
+ }
+ return newSchema
+}
+
+func (s *schema) ShardNumber() uint32 {
+ return s.stream.ShardNum
+}
diff --git a/pkg/query/v2/logical/testdata/global_index.json b/pkg/query/v2/logical/testdata/global_index.json
new file mode 100644
index 0000000..9e81928
--- /dev/null
+++ b/pkg/query/v2/logical/testdata/global_index.json
@@ -0,0 +1,64 @@
+[
+ {
+ "tags": [
+ {"str":{"value": "1"}},
+ {"int":{"value": 0}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 1000}},
+ {"int":{"value": 1622933202000000000}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "2"}},
+ {"int":{"value": 0}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.3_id"}},
+ {"str":{"value": "/product_id"}},
+ {"int":{"value": 500}},
+ {"int":{"value": 1622933202000000000}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "1"}},
+ {"int":{"value": 1}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 30}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "500"}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "2"}},
+ {"int":{"value": 1}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.5_id"}},
+ {"str":{"value": "/price_id"}},
+ {"int":{"value": 60}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "400"}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "2"}},
+ {"int":{"value": 1}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/item_id"}},
+ {"int":{"value": 300}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "500"}}
+ ]
+ }
+
+]
\ No newline at end of file
diff --git a/pkg/query/v2/logical/testdata/multiple_shards.json b/pkg/query/v2/logical/testdata/multiple_shards.json
new file mode 100644
index 0000000..ec6427e
--- /dev/null
+++ b/pkg/query/v2/logical/testdata/multiple_shards.json
@@ -0,0 +1,64 @@
+[
+ {
+ "tags": [
+ {"str":{"value": "1"}},
+ {"int":{"value": 1}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 1000}},
+ {"int":{"value": 1622933202000000000}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "2"}},
+ {"int":{"value": 1}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.3_id"}},
+ {"str":{"value": "/product_id"}},
+ {"int":{"value": 500}},
+ {"int":{"value": 1622933202000000000}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "3"}},
+ {"int":{"value": 0}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 30}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "500"}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "4"}},
+ {"int":{"value": 0}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.5_id"}},
+ {"str":{"value": "/price_id"}},
+ {"int":{"value": 60}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "400"}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "5"}},
+ {"int":{"value": 0}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/item_id"}},
+ {"int":{"value": 300}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "500"}}
+ ]
+ }
+
+]
\ No newline at end of file