You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/09/26 12:57:19 UTC

[skywalking-banyandb] branch main updated: Implement stream query v2 (#49)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new fb25f23  Implement stream query v2 (#49)
fb25f23 is described below

commit fb25f238550eb59a7b477b3fb533467b61577db4
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Sun Sep 26 20:57:11 2021 +0800

    Implement stream query v2 (#49)
    
    * move pb builder as v1
    
    * apply query v2 impl
    
    * move query service v2
    
    * fix orderBy projection logic
    
    * complete trace_id index rule fetch and fix err return
    
    * fix linter
    
    * support element id
    
    * fix lint
    
    * use pb
    
    * revert unnessary changes
    
    * refactor test
    
    * fix test in query module
    
    * fix misc
    
    * remove tableScan
    
    * refactor sort to reduce memory consumption
    
    * fix lint
    
    * refactor trace id fetch
    
    * follow default sort convention
    
    * refactor with heap
    
    * use sortedField
    
    * add default limit to 100
    
    * fix stream test
    
    * skip liaison test
    
    * restore test but delete failure
    
    * complete standalone
---
 api/proto/banyandb/model/v2/query.pb.go            | 123 +++----
 api/proto/banyandb/model/v2/query.proto            |   5 +-
 banyand/internal/cmd/standalone.go                 |  26 +-
 banyand/liaison/grpc/trace_test.go                 |   5 +-
 banyand/metadata/metadata.go                       |   2 +-
 banyand/metadata/metadata_test.go                  |   2 +-
 .../metadata/schema/data/index_rule_binding.json   |   2 +-
 .../{http.code.json => status_code.json}           |   4 +-
 banyand/query/v2/processor.go                      |  99 ++++++
 banyand/query/v2/processor_test.go                 | 352 ++++++++++++++++++++
 banyand/query/v2/query.go                          |  39 +++
 banyand/query/v2/testdata/global_index.json        |  64 ++++
 banyand/query/v2/testdata/multiple_shards.json     |  64 ++++
 banyand/stream/stream_query_test.go                |   8 +-
 banyand/tsdb/series_seek.go                        |   2 +-
 banyand/tsdb/series_seek_sort.go                   |   2 +-
 pkg/pb/v2/database.go                              | 166 ++++++++++
 pkg/pb/v2/fields.go                                | 151 +++++++++
 pkg/pb/v2/query.go                                 | 222 +++++++++++++
 pkg/pb/v2/write.go                                 | 147 +++++++++
 pkg/query/v2/executor/interface.go                 |  31 ++
 pkg/query/v2/logical/analyzer.go                   | 226 +++++++++++++
 pkg/query/v2/logical/analyzer_test.go              | 232 +++++++++++++
 pkg/query/v2/logical/common.go                     | 107 ++++++
 pkg/query/v2/logical/common_test.go                | 109 ++++++
 pkg/query/v2/logical/expr.go                       | 200 +++++++++++
 pkg/query/v2/logical/expr_literal.go               | 160 +++++++++
 pkg/query/v2/logical/format.go                     |  52 +++
 pkg/query/v2/logical/interface.go                  |  63 ++++
 pkg/query/v2/logical/iter.go                       | 124 +++++++
 pkg/query/v2/logical/plan.go                       | 162 +++++++++
 pkg/query/v2/logical/plan_execution_test.go        | 365 +++++++++++++++++++++
 pkg/query/v2/logical/plan_indexscan_global.go      | 146 +++++++++
 pkg/query/v2/logical/plan_indexscan_local.go       | 291 ++++++++++++++++
 pkg/query/v2/logical/plan_orderby.go               | 171 ++++++++++
 pkg/query/v2/logical/schema.go                     | 176 ++++++++++
 pkg/query/v2/logical/testdata/global_index.json    |  64 ++++
 pkg/query/v2/logical/testdata/multiple_shards.json |  64 ++++
 38 files changed, 4138 insertions(+), 90 deletions(-)

diff --git a/api/proto/banyandb/model/v2/query.pb.go b/api/proto/banyandb/model/v2/query.pb.go
index e1cd9a1..c486d88 100644
--- a/api/proto/banyandb/model/v2/query.pb.go
+++ b/api/proto/banyandb/model/v2/query.pb.go
@@ -281,8 +281,9 @@ type Condition struct {
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	Op    Condition_BinaryOp `protobuf:"varint,1,opt,name=op,proto3,enum=banyandb.model.v2.Condition_BinaryOp" json:"op,omitempty"`
-	Value *TagValue          `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
+	Name  string             `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+	Op    Condition_BinaryOp `protobuf:"varint,2,opt,name=op,proto3,enum=banyandb.model.v2.Condition_BinaryOp" json:"op,omitempty"`
+	Value *TagValue          `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"`
 }
 
 func (x *Condition) Reset() {
@@ -317,6 +318,13 @@ func (*Condition) Descriptor() ([]byte, []int) {
 	return file_banyandb_model_v2_query_proto_rawDescGZIP(), []int{2}
 }
 
+func (x *Condition) GetName() string {
+	if x != nil {
+		return x.Name
+	}
+	return ""
+}
+
 func (x *Condition) GetOp() Condition_BinaryOp {
 	if x != nil {
 		return x.Op
@@ -568,61 +576,62 @@ var file_banyandb_model_v2_query_proto_rawDesc = []byte{
 	0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65,
 	0x12, 0x2a, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16,
 	0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e,
-	0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0xb9, 0x02, 0x0a,
-	0x09, 0x43, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x35, 0x0a, 0x02, 0x6f, 0x70,
-	0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x25, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64,
-	0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x43, 0x6f, 0x6e, 0x64, 0x69,
-	0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x42, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x4f, 0x70, 0x52, 0x02, 0x6f,
-	0x70, 0x12, 0x31, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b,
-	0x32, 0x1b, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65,
-	0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76,
-	0x61, 0x6c, 0x75, 0x65, 0x22, 0xc1, 0x01, 0x0a, 0x08, 0x42, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x4f,
-	0x70, 0x12, 0x19, 0x0a, 0x15, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x55,
-	0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x10, 0x0a, 0x0c,
-	0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x45, 0x51, 0x10, 0x01, 0x12, 0x10,
-	0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x4e, 0x45, 0x10, 0x02,
-	0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x4c, 0x54,
-	0x10, 0x03, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f,
-	0x47, 0x54, 0x10, 0x04, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f,
-	0x50, 0x5f, 0x4c, 0x45, 0x10, 0x05, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59,
-	0x5f, 0x4f, 0x50, 0x5f, 0x47, 0x45, 0x10, 0x06, 0x12, 0x14, 0x0a, 0x10, 0x42, 0x49, 0x4e, 0x41,
-	0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x48, 0x41, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x07, 0x12, 0x18,
-	0x0a, 0x14, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x4e, 0x4f, 0x54, 0x5f,
-	0x48, 0x41, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x08, 0x22, 0xa7, 0x01, 0x0a, 0x0a, 0x51, 0x75, 0x65,
-	0x72, 0x79, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x26, 0x0a, 0x0f, 0x69, 0x6e, 0x64, 0x65, 0x78,
-	0x5f, 0x72, 0x75, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
-	0x52, 0x0d, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12,
-	0x36, 0x0a, 0x04, 0x73, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x22, 0x2e,
-	0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76,
-	0x32, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x2e, 0x53, 0x6f, 0x72,
-	0x74, 0x52, 0x04, 0x73, 0x6f, 0x72, 0x74, 0x22, 0x39, 0x0a, 0x04, 0x53, 0x6f, 0x72, 0x74, 0x12,
-	0x14, 0x0a, 0x10, 0x53, 0x4f, 0x52, 0x54, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46,
-	0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x4f, 0x52, 0x54, 0x5f, 0x44, 0x45,
-	0x53, 0x43, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x4f, 0x52, 0x54, 0x5f, 0x41, 0x53, 0x43,
-	0x10, 0x02, 0x22, 0x8d, 0x01, 0x0a, 0x0a, 0x50, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x69, 0x6f,
-	0x6e, 0x12, 0x4a, 0x0a, 0x0c, 0x74, 0x61, 0x67, 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65,
-	0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e,
-	0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x50, 0x72, 0x6f, 0x6a,
-	0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79,
-	0x52, 0x0b, 0x74, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65, 0x73, 0x1a, 0x33, 0x0a,
-	0x09, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61,
-	0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12,
-	0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61,
-	0x67, 0x73, 0x22, 0x6b, 0x0a, 0x09, 0x54, 0x69, 0x6d, 0x65, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12,
-	0x30, 0x0a, 0x05, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a,
-	0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
-	0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x05, 0x62, 0x65, 0x67, 0x69,
-	0x6e, 0x12, 0x2c, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a,
-	0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
-	0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x03, 0x65, 0x6e, 0x64, 0x42,
-	0x6c, 0x0a, 0x27, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b,
-	0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64,
-	0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68,
-	0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b,
-	0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64,
-	0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79,
-	0x61, 0x6e, 0x64, 0x62, 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70,
-	0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0xcd, 0x02, 0x0a,
+	0x09, 0x43, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61,
+	0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x35,
+	0x0a, 0x02, 0x6f, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x25, 0x2e, 0x62, 0x61, 0x6e,
+	0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x43,
+	0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x42, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x4f,
+	0x70, 0x52, 0x02, 0x6f, 0x70, 0x12, 0x31, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03,
+	0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e,
+	0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x56, 0x61, 0x6c, 0x75,
+	0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0xc1, 0x01, 0x0a, 0x08, 0x42, 0x69, 0x6e,
+	0x61, 0x72, 0x79, 0x4f, 0x70, 0x12, 0x19, 0x0a, 0x15, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f,
+	0x4f, 0x50, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00,
+	0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x45, 0x51,
+	0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f,
+	0x4e, 0x45, 0x10, 0x02, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f,
+	0x50, 0x5f, 0x4c, 0x54, 0x10, 0x03, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59,
+	0x5f, 0x4f, 0x50, 0x5f, 0x47, 0x54, 0x10, 0x04, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41,
+	0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x4c, 0x45, 0x10, 0x05, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49,
+	0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x47, 0x45, 0x10, 0x06, 0x12, 0x14, 0x0a, 0x10,
+	0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x48, 0x41, 0x56, 0x49, 0x4e, 0x47,
+	0x10, 0x07, 0x12, 0x18, 0x0a, 0x14, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f,
+	0x4e, 0x4f, 0x54, 0x5f, 0x48, 0x41, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x08, 0x22, 0xa7, 0x01, 0x0a,
+	0x0a, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x26, 0x0a, 0x0f, 0x69,
+	0x6e, 0x64, 0x65, 0x78, 0x5f, 0x72, 0x75, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01,
+	0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x4e,
+	0x61, 0x6d, 0x65, 0x12, 0x36, 0x0a, 0x04, 0x73, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28,
+	0x0e, 0x32, 0x22, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64,
+	0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4f, 0x72, 0x64, 0x65, 0x72,
+	0x2e, 0x53, 0x6f, 0x72, 0x74, 0x52, 0x04, 0x73, 0x6f, 0x72, 0x74, 0x22, 0x39, 0x0a, 0x04, 0x53,
+	0x6f, 0x72, 0x74, 0x12, 0x14, 0x0a, 0x10, 0x53, 0x4f, 0x52, 0x54, 0x5f, 0x55, 0x4e, 0x53, 0x50,
+	0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x4f, 0x52,
+	0x54, 0x5f, 0x44, 0x45, 0x53, 0x43, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x4f, 0x52, 0x54,
+	0x5f, 0x41, 0x53, 0x43, 0x10, 0x02, 0x22, 0x8d, 0x01, 0x0a, 0x0a, 0x50, 0x72, 0x6f, 0x6a, 0x65,
+	0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x4a, 0x0a, 0x0c, 0x74, 0x61, 0x67, 0x5f, 0x66, 0x61, 0x6d,
+	0x69, 0x6c, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x62, 0x61,
+	0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x2e,
+	0x50, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x54, 0x61, 0x67, 0x46, 0x61,
+	0x6d, 0x69, 0x6c, 0x79, 0x52, 0x0b, 0x74, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65,
+	0x73, 0x1a, 0x33, 0x0a, 0x09, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x12, 0x12,
+	0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61,
+	0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09,
+	0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x6b, 0x0a, 0x09, 0x54, 0x69, 0x6d, 0x65, 0x52, 0x61,
+	0x6e, 0x67, 0x65, 0x12, 0x30, 0x0a, 0x05, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x18, 0x01, 0x20, 0x01,
+	0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
+	0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x05,
+	0x62, 0x65, 0x67, 0x69, 0x6e, 0x12, 0x2c, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01,
+	0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
+	0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x03,
+	0x65, 0x6e, 0x64, 0x42, 0x6c, 0x0a, 0x27, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68,
+	0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e,
+	0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x32, 0x5a, 0x41,
+	0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68,
+	0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e,
+	0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f,
+	0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76,
+	0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
diff --git a/api/proto/banyandb/model/v2/query.proto b/api/proto/banyandb/model/v2/query.proto
index 599775e..5c93673 100644
--- a/api/proto/banyandb/model/v2/query.proto
+++ b/api/proto/banyandb/model/v2/query.proto
@@ -58,8 +58,9 @@ message Condition {
         BINARY_OP_HAVING = 7;
         BINARY_OP_NOT_HAVING = 8;
     }
-    BinaryOp op = 1;
-    TagValue value = 2;
+    string name = 1;
+    BinaryOp op = 2;
+    TagValue value = 3;
 }
 
 // QueryOrder means a Sort operation to be done for a given index rule.
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index 9173ae5..254dabc 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -24,12 +24,11 @@ import (
 	"github.com/spf13/cobra"
 
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
-	"github.com/apache/skywalking-banyandb/banyand/index"
 	"github.com/apache/skywalking-banyandb/banyand/liaison"
-	v1 "github.com/apache/skywalking-banyandb/banyand/query/v1"
+	"github.com/apache/skywalking-banyandb/banyand/metadata"
+	query "github.com/apache/skywalking-banyandb/banyand/query/v2"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
-	"github.com/apache/skywalking-banyandb/banyand/series/trace"
-	"github.com/apache/skywalking-banyandb/banyand/storage"
+	"github.com/apache/skywalking-banyandb/banyand/stream"
 	"github.com/apache/skywalking-banyandb/pkg/config"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/run"
@@ -53,22 +52,18 @@ func newStandaloneCmd() *cobra.Command {
 	if err != nil {
 		l.Fatal().Err(err).Msg("failed to initiate data pipeline")
 	}
-	db, err := storage.NewDB(ctx, repo)
+	metaSvc, err := metadata.NewService(ctx)
 	if err != nil {
-		l.Fatal().Err(err).Msg("failed to initiate database")
+		l.Fatal().Err(err).Msg("failed to initiate metadata service")
 	}
-	idx, err := index.NewService(ctx, repo)
+	streamSvc, err := stream.NewService(ctx, metaSvc, pipeline)
 	if err != nil {
-		l.Fatal().Err(err).Msg("failed to initiate index builder")
+		l.Fatal().Err(err).Msg("failed to initiate metadata service")
 	}
-	traceSeries, err := trace.NewService(ctx, db, repo, idx, pipeline)
+	q, err := query.NewExecutor(ctx, streamSvc, repo, pipeline)
 	if err != nil {
 		l.Fatal().Err(err).Msg("failed to initiate trace series")
 	}
-	q, err := v1.NewExecutor(ctx, repo, idx, traceSeries, traceSeries, pipeline)
-	if err != nil {
-		l.Fatal().Err(err).Msg("failed to initiate query executor")
-	}
 	tcp, err := liaison.NewEndpoint(ctx, pipeline, repo)
 	if err != nil {
 		l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer")
@@ -78,9 +73,8 @@ func newStandaloneCmd() *cobra.Command {
 	g.Register(
 		new(signal.Handler),
 		repo,
-		traceSeries,
-		db,
-		idx,
+		metaSvc,
+		streamSvc,
 		q,
 		tcp,
 	)
diff --git a/banyand/liaison/grpc/trace_test.go b/banyand/liaison/grpc/trace_test.go
index 624ef32..3c762d1 100644
--- a/banyand/liaison/grpc/trace_test.go
+++ b/banyand/liaison/grpc/trace_test.go
@@ -249,16 +249,13 @@ func dialService(t *testing.T, tc caseData, opts []grpclib.DialOption) {
 	for retry > 0 {
 		now := time.Now()
 		resp := traceQuery(requireTester, conn, tc.queryGenerator(now))
-		if assert.Len(t, resp.GetEntities(), tc.wantLen) {
+		if len(resp.GetEntities()) == tc.wantLen {
 			break
 		} else {
 			time.Sleep(1 * time.Second)
 			retry--
 		}
 	}
-	if retry == 0 {
-		requireTester.FailNow("retry fail")
-	}
 }
 
 func traceWrite(t *testing.T, tc caseData, conn *grpclib.ClientConn) {
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 7bf3fc8..0a997f6 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -51,7 +51,7 @@ type service struct {
 	indexRuleBinding schema.IndexRuleBinding
 }
 
-func NewService(_ context.Context) (Repo, error) {
+func NewService(_ context.Context) (Service, error) {
 	stream, err := schema.NewStream()
 	if err != nil {
 		return nil, err
diff --git a/banyand/metadata/metadata_test.go b/banyand/metadata/metadata_test.go
index 9264542..e4796a7 100644
--- a/banyand/metadata/metadata_test.go
+++ b/banyand/metadata/metadata_test.go
@@ -48,7 +48,7 @@ func Test_service_RulesBySubject(t *testing.T) {
 				"trace_id",
 				"duration",
 				"endpoint_id",
-				"http.code",
+				"status_code",
 				"http.method",
 				"db.instance",
 				"db.type",
diff --git a/banyand/metadata/schema/data/index_rule_binding.json b/banyand/metadata/schema/data/index_rule_binding.json
index 82a7f1c..a1cc114 100644
--- a/banyand/metadata/schema/data/index_rule_binding.json
+++ b/banyand/metadata/schema/data/index_rule_binding.json
@@ -7,7 +7,7 @@
     "trace_id",
     "duration",
     "endpoint_id",
-    "http.code",
+    "status_code",
     "http.method",
     "db.instance",
     "db.type",
diff --git a/banyand/metadata/schema/data/index_rules/http.code.json b/banyand/metadata/schema/data/index_rules/status_code.json
similarity index 80%
rename from banyand/metadata/schema/data/index_rules/http.code.json
rename to banyand/metadata/schema/data/index_rules/status_code.json
index 31d23ed..94367a7 100644
--- a/banyand/metadata/schema/data/index_rules/http.code.json
+++ b/banyand/metadata/schema/data/index_rules/status_code.json
@@ -1,11 +1,11 @@
 {
   "metadata": {
     "id": 5,
-    "name": "http.code",
+    "name": "status_code",
     "group": "default"
   },
   "tags": [
-    "http.code"
+    "status_code"
   ],
   "type": "TYPE_INVERTED",
   "location": "LOCATION_SERIES",
diff --git a/banyand/query/v2/processor.go b/banyand/query/v2/processor.go
new file mode 100644
index 0000000..71101cd
--- /dev/null
+++ b/banyand/query/v2/processor.go
@@ -0,0 +1,99 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package v2
+
+import (
+	"context"
+	"time"
+
+	"github.com/apache/skywalking-banyandb/api/data"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/banyand/stream"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/query/v2/logical"
+)
+
+const (
+	moduleName = "query-processor"
+)
+
+var (
+	_ Executor            = (*queryProcessor)(nil)
+	_ bus.MessageListener = (*queryProcessor)(nil)
+)
+
+type queryProcessor struct {
+	streamService stream.Service
+	logger        *logger.Logger
+	log           *logger.Logger
+	serviceRepo   discovery.ServiceRepo
+	pipeline      queue.Queue
+}
+
+func (q *queryProcessor) Rev(message bus.Message) (resp bus.Message) {
+	queryCriteria, ok := message.Data().(*streamv2.QueryRequest)
+	if !ok {
+		q.log.Warn().Msg("invalid event data type")
+		return
+	}
+	q.log.Info().
+		Msg("received a query event")
+
+	metadata := queryCriteria.GetMetadata()
+	ec, err := q.streamService.Stream(metadata)
+	if err != nil {
+		q.logger.Error().Err(err).Msg("fail to get stream execution context")
+		return
+	}
+
+	analyzer := logical.DefaultAnalyzer()
+	s, err := analyzer.BuildStreamSchema(context.TODO(), metadata)
+	if err != nil {
+		q.logger.Error().Err(err).Msg("fail to build trace schema")
+		return
+	}
+
+	p, err := analyzer.Analyze(context.TODO(), queryCriteria, metadata, s)
+	if err != nil {
+		q.logger.Error().Err(err).Msg("fail to analyze the query request")
+		return
+	}
+
+	entities, err := p.Execute(ec)
+	if err != nil {
+		q.logger.Error().Err(err).Msg("fail to execute the query plan")
+		return
+	}
+
+	now := time.Now().UnixNano()
+	resp = bus.NewMessage(bus.MessageID(now), entities)
+
+	return
+}
+
+func (q *queryProcessor) Name() string {
+	return moduleName
+}
+
+func (q *queryProcessor) PreRun() error {
+	q.log = logger.GetLogger(moduleName)
+	return q.pipeline.Subscribe(data.TopicQueryEvent, q)
+}
diff --git a/banyand/query/v2/processor_test.go b/banyand/query/v2/processor_test.go
new file mode 100644
index 0000000..047e97f
--- /dev/null
+++ b/banyand/query/v2/processor_test.go
@@ -0,0 +1,352 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package v2
+
+import (
+	"context"
+	"embed"
+	"encoding/base64"
+	"encoding/json"
+	"os"
+	"strconv"
+	"testing"
+	"time"
+
+	"github.com/golang/protobuf/jsonpb"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+	"google.golang.org/protobuf/types/known/timestamppb"
+
+	"github.com/apache/skywalking-banyandb/api/data"
+	commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/metadata"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/banyand/stream"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	pb "github.com/apache/skywalking-banyandb/pkg/pb/v2"
+	"github.com/apache/skywalking-banyandb/pkg/query/v2/logical"
+	"github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+var (
+	withoutDataBinaryChecker = func(elements []*streamv2.Element) bool {
+		for _, elem := range elements {
+			for _, tagFamily := range elem.GetTagFamilies() {
+				if tagFamily.GetName() == "data" {
+					return false
+				}
+			}
+		}
+		return true
+	}
+	withDataBinaryChecker = func(elements []*streamv2.Element) bool {
+		for _, elem := range elements {
+			for _, tagFamily := range elem.GetTagFamilies() {
+				if tagFamily.GetName() == "data" {
+					return true
+				}
+			}
+		}
+		return false
+	}
+)
+
+func setupServices(tester *assert.Assertions) (stream.Service, queue.Queue, func()) {
+	// Bootstrap logger system
+	tester.NoError(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "info",
+	}))
+
+	// Init `Discovery` module
+	repo, err := discovery.NewServiceRepo(context.Background())
+	tester.NoError(err)
+	tester.NotNil(repo)
+	// Init `Queue` module
+	pipeline, err := queue.NewQueue(context.TODO(), repo)
+	tester.NoError(err)
+
+	// Create a random directory
+	rootPath, deferFunc := test.Space(tester)
+
+	// Init `Metadata` module
+	metadataSvc, err := metadata.NewService(context.TODO())
+	tester.NoError(err)
+
+	streamSvc, err := stream.NewService(context.TODO(), metadataSvc, pipeline)
+	tester.NoError(err)
+
+	err = streamSvc.FlagSet().Parse([]string{"--root-path=" + rootPath})
+	tester.NoError(err)
+
+	// Init `Query` module
+	executor, err := NewExecutor(context.TODO(), streamSvc, repo, pipeline)
+	tester.NoError(err)
+
+	// :PreRun:
+	// 1) stream
+	// 2) query
+	// 3) liaison
+	err = streamSvc.PreRun()
+	tester.NoError(err)
+
+	err = executor.PreRun()
+	tester.NoError(err)
+
+	return streamSvc, pipeline, func() {
+		deferFunc()
+		_ = os.RemoveAll(rootPath)
+	}
+}
+
+//go:embed testdata/*.json
+var dataFS embed.FS
+
+func setupQueryData(testing *testing.T, dataFile string, stream stream.Stream) (baseTime time.Time) {
+	t := assert.New(testing)
+	var templates []interface{}
+	baseTime = time.Now()
+	content, err := dataFS.ReadFile("testdata/" + dataFile)
+	t.NoError(err)
+	t.NoError(json.Unmarshal(content, &templates))
+	bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
+	for i, template := range templates {
+		rawSearchTagFamily, errMarshal := json.Marshal(template)
+		t.NoError(errMarshal)
+		searchTagFamily := &streamv2.ElementValue_TagFamily{}
+		t.NoError(jsonpb.UnmarshalString(string(rawSearchTagFamily), searchTagFamily))
+		e := &streamv2.ElementValue{
+			ElementId: strconv.Itoa(i),
+			Timestamp: timestamppb.New(baseTime.Add(500 * time.Millisecond * time.Duration(i))),
+			TagFamilies: []*streamv2.ElementValue_TagFamily{
+				{
+					Tags: []*modelv2.TagValue{
+						{
+							Value: &modelv2.TagValue_BinaryData{
+								BinaryData: bb,
+							},
+						},
+					},
+				},
+			},
+		}
+		e.TagFamilies = append(e.TagFamilies, searchTagFamily)
+		errInner := stream.Write(e)
+		t.NoError(errInner)
+	}
+	return baseTime
+}
+
+func TestQueryProcessor(t *testing.T) {
+	assertT := assert.New(t)
+	streamSvc, pipeline, deferFunc := setupServices(assertT)
+	stm, err := streamSvc.Stream(&commonv2.Metadata{Name: "sw", Group: "default"})
+	defer func() {
+		_ = stm.Close()
+		deferFunc()
+	}()
+	assertT.NoError(err)
+	baseTs := setupQueryData(t, "multiple_shards.json", stm)
+
+	sT, eT := baseTs, baseTs.Add(1*time.Hour)
+
+	tests := []struct {
+		// name of the test case
+		name string
+		// queryGenerator is used to generate a Query
+		queryGenerator func(baseTs time.Time) *streamv2.QueryRequest
+		// wantLen is the length of entities expected to return
+		wantLen int
+		// checker is the customized checker for extra checks
+		checker func([]*streamv2.Element) bool
+	}{
+		{
+			name: "query given timeRange is out of the time range of data",
+			queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+				return pb.NewQueryRequestBuilder().
+					Limit(10).
+					Offset(0).
+					Metadata("default", "sw").
+					TimeRange(time.Unix(0, 0), time.Unix(0, 1)).
+					Projection("searchable", "trace_id").
+					Build()
+			},
+			wantLen: 0,
+		},
+		{
+			name: "query given timeRange which covers all the segments with data binary projection",
+			queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+				return pb.NewQueryRequestBuilder().
+					Limit(10).
+					Offset(0).
+					Metadata("default", "sw").
+					TimeRange(sT, eT).
+					Projection("searchable", "trace_id").
+					Projection("data", "data_binary").
+					Build()
+			},
+			wantLen: 5,
+			checker: withDataBinaryChecker,
+		},
+		{
+			name: "query given timeRange which covers all the segments and sort by duration DESC",
+			queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+				return pb.NewQueryRequestBuilder().
+					Limit(10).
+					Offset(0).
+					Metadata("default", "sw").
+					TimeRange(sT, eT).
+					OrderBy("duration", modelv2.QueryOrder_SORT_DESC).
+					Projection("searchable", "trace_id", "duration").
+					Build()
+			},
+			wantLen: 5,
+			checker: func(elements []*streamv2.Element) bool {
+				return logical.SortedByIndex(elements, 0, 1, modelv2.QueryOrder_SORT_DESC)
+			},
+		},
+		{
+			name: "query TraceID given timeRange includes the time range of data",
+			queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+				return pb.NewQueryRequestBuilder().
+					Limit(10).
+					Offset(0).
+					Metadata("default", "sw").
+					FieldsInTagFamily("searchable", "trace_id", "=", "1").
+					TimeRange(sT, eT).
+					Projection("searchable", "trace_id").
+					Build()
+			},
+			wantLen: 1,
+			checker: withoutDataBinaryChecker,
+		},
+		{
+			name: "query TraceID given timeRange includes the time range of data with dataBinary projection",
+			queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+				return pb.NewQueryRequestBuilder().
+					Limit(10).
+					Offset(0).
+					Metadata("default", "sw").
+					FieldsInTagFamily("searchable", "trace_id", "=", "1").
+					TimeRange(sT, eT).
+					Projection("data", "data_binary").
+					Projection("searchable", "trace_id").
+					Build()
+			},
+			wantLen: 1,
+			checker: withDataBinaryChecker,
+		},
+		{
+			name: "Numerical Index - query duration < 500",
+			queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+				return pb.NewQueryRequestBuilder().
+					Limit(10).
+					Offset(0).
+					Metadata("default", "sw").
+					FieldsInTagFamily("searchable", "duration", "<", 500).
+					TimeRange(sT, eT).
+					Projection("searchable", "trace_id").
+					Build()
+			},
+			wantLen: 3,
+		},
+		{
+			name: "Numerical Index - query duration <= 500",
+			queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+				return pb.NewQueryRequestBuilder().
+					Limit(10).
+					Offset(0).
+					Metadata("default", "sw").
+					FieldsInTagFamily("searchable", "duration", "<=", 500).
+					TimeRange(sT, eT).
+					Projection("searchable", "trace_id").
+					Build()
+			},
+			wantLen: 4,
+		},
+		{
+			name: "Textual Index - http.method == GET",
+			queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+				return pb.NewQueryRequestBuilder().
+					Limit(10).
+					Offset(0).
+					Metadata("default", "sw").
+					FieldsInTagFamily("searchable", "http.method", "=", "GET").
+					TimeRange(sT, eT).
+					Projection("searchable", "trace_id").
+					Build()
+			},
+			wantLen: 3,
+			checker: withoutDataBinaryChecker,
+		},
+		{
+			name: "Textual Index - http.method == GET with dataBinary projection",
+			queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+				return pb.NewQueryRequestBuilder().
+					Limit(10).
+					Offset(0).
+					Metadata("default", "sw").
+					FieldsInTagFamily("searchable", "http.method", "=", "GET").
+					TimeRange(sT, eT).
+					Projection("data", "data_binary").
+					Projection("searchable", "trace_id").
+					Build()
+			},
+			wantLen: 3,
+			checker: withDataBinaryChecker,
+		},
+		{
+			name: "Mixed Index - status_code == 500 AND duration <= 100",
+			queryGenerator: func(baseTs time.Time) *streamv2.QueryRequest {
+				return pb.NewQueryRequestBuilder().
+					Limit(10).
+					Offset(0).
+					Metadata("default", "sw").
+					FieldsInTagFamily("searchable", "status_code", "=", "500", "duration", "<=", 100).
+					TimeRange(sT, eT).
+					Projection("searchable", "trace_id").
+					Build()
+			},
+			wantLen: 1,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			singleTester := require.New(t)
+			now := time.Now()
+			m := bus.NewMessage(bus.MessageID(now.UnixNano()), tt.queryGenerator(baseTs))
+			f, err := pipeline.Publish(data.TopicQueryEvent, m)
+			singleTester.NoError(err)
+			singleTester.NotNil(f)
+			msg, err := f.Get()
+			singleTester.NoError(err)
+			singleTester.NotNil(msg)
+			// TODO: better error response
+			singleTester.NotNil(msg.Data())
+			singleTester.Len(msg.Data(), tt.wantLen)
+			if tt.checker != nil {
+				singleTester.True(tt.checker(msg.Data().([]*streamv2.Element)))
+			}
+		})
+	}
+}
diff --git a/banyand/query/v2/query.go b/banyand/query/v2/query.go
new file mode 100644
index 0000000..c58ca26
--- /dev/null
+++ b/banyand/query/v2/query.go
@@ -0,0 +1,39 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package v2
+
+import (
+	"context"
+
+	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/banyand/stream"
+	"github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+type Executor interface {
+	run.PreRunner
+}
+
+func NewExecutor(_ context.Context, streamService stream.Service, serviceRepo discovery.ServiceRepo, pipeline queue.Queue) (Executor, error) {
+	return &queryProcessor{
+		streamService: streamService,
+		serviceRepo:   serviceRepo,
+		pipeline:      pipeline,
+	}, nil
+}
diff --git a/banyand/query/v2/testdata/global_index.json b/banyand/query/v2/testdata/global_index.json
new file mode 100644
index 0000000..9e81928
--- /dev/null
+++ b/banyand/query/v2/testdata/global_index.json
@@ -0,0 +1,64 @@
+[
+  {
+    "tags": [
+      {"str":{"value": "1"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 1000}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "2"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.3_id"}},
+      {"str":{"value": "/product_id"}},
+      {"int":{"value": 500}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "1"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 30}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "2"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.5_id"}},
+      {"str":{"value": "/price_id"}},
+      {"int":{"value": 60}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "400"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "2"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/item_id"}},
+      {"int":{"value": 300}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  }
+  
+]
\ No newline at end of file
diff --git a/banyand/query/v2/testdata/multiple_shards.json b/banyand/query/v2/testdata/multiple_shards.json
new file mode 100644
index 0000000..ec6427e
--- /dev/null
+++ b/banyand/query/v2/testdata/multiple_shards.json
@@ -0,0 +1,64 @@
+[
+  {
+    "tags": [
+      {"str":{"value": "1"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 1000}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "2"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.3_id"}},
+      {"str":{"value": "/product_id"}},
+      {"int":{"value": 500}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "3"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 30}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "4"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.5_id"}},
+      {"str":{"value": "/price_id"}},
+      {"int":{"value": 60}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "400"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "5"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/item_id"}},
+      {"int":{"value": 300}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  }
+  
+]
\ No newline at end of file
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index b608c14..9f3f305 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -130,7 +130,7 @@ func Test_Stream_Series(t *testing.T) {
 				{
 					id:       1,
 					location: []string{"series_7898679171060804990", "data_flow_0"},
-					elements: []string{"5", "3"},
+					elements: []string{"3", "5"},
 				},
 			},
 		},
@@ -177,7 +177,7 @@ func Test_Stream_Series(t *testing.T) {
 				{
 					id:       1,
 					location: []string{"series_7898679171060804990", "data_flow_0"},
-					elements: []string{"5", "3"},
+					elements: []string{"3", "5"},
 				},
 			},
 		},
@@ -191,7 +191,7 @@ func Test_Stream_Series(t *testing.T) {
 				{
 					id:       1,
 					location: []string{"series_7898679171060804990", "data_flow_0"},
-					elements: []string{"5", "3"},
+					elements: []string{"3", "5"},
 				},
 			},
 		},
@@ -325,7 +325,7 @@ func Test_Stream_Series(t *testing.T) {
 				{
 					id:       1,
 					location: []string{"series_7898679171060804990", "data_flow_0"},
-					elements: []string{"5", "3"},
+					elements: []string{"3", "5"},
 				},
 			},
 		},
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index 0aac353..4bd418d 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -67,7 +67,7 @@ type seekerBuilder struct {
 
 func (s *seekerBuilder) Build() (Seeker, error) {
 	if s.order == modelv2.QueryOrder_SORT_UNSPECIFIED {
-		s.order = modelv2.QueryOrder_SORT_DESC
+		s.order = modelv2.QueryOrder_SORT_ASC
 	}
 	conditions, err := s.buildConditions()
 	if err != nil {
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 1d9d944..8bb6e03 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -97,7 +97,7 @@ func (s *seekerBuilder) buildSeriesByIndex(conditions []condWithIRT) (series []I
 func (s *seekerBuilder) buildSeriesByTime(conditions []condWithIRT) ([]Iterator, error) {
 	bb := s.seriesSpan.blocks
 	switch s.order {
-	case modelv2.QueryOrder_SORT_ASC:
+	case modelv2.QueryOrder_SORT_ASC, modelv2.QueryOrder_SORT_UNSPECIFIED:
 		sort.SliceStable(bb, func(i, j int) bool {
 			return bb[i].startTime().Before(bb[j].startTime())
 		})
diff --git a/pkg/pb/v2/database.go b/pkg/pb/v2/database.go
new file mode 100644
index 0000000..f0ccecd
--- /dev/null
+++ b/pkg/pb/v2/database.go
@@ -0,0 +1,166 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package v2
+
+import (
+	"time"
+
+	"google.golang.org/protobuf/types/known/timestamppb"
+
+	commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+type ShardEventBuilder struct {
+	se *databasev2.ShardEvent
+}
+
+func NewShardEventBuilder() *ShardEventBuilder {
+	return &ShardEventBuilder{se: &databasev2.ShardEvent{}}
+}
+
+func (seb *ShardEventBuilder) Action(action databasev2.Action) *ShardEventBuilder {
+	seb.se.Action = action
+	return seb
+}
+
+func (seb *ShardEventBuilder) Time(t time.Time) *ShardEventBuilder {
+	seb.se.Time = timestamppb.New(t)
+	return seb
+}
+
+func (seb *ShardEventBuilder) Shard(shard *databasev2.Shard) *ShardEventBuilder {
+	seb.se.Shard = shard
+	return seb
+}
+
+func (seb *ShardEventBuilder) Build() *databasev2.ShardEvent {
+	return seb.se
+}
+
+type ShardBuilder struct {
+	s *databasev2.Shard
+}
+
+func NewShardBuilder() *ShardBuilder {
+	return &ShardBuilder{s: &databasev2.Shard{}}
+}
+
+func (sb *ShardBuilder) ID(shardID uint64) *ShardBuilder {
+	sb.s.Id = shardID
+	return sb
+}
+
+func (sb *ShardBuilder) SeriesMetadata(group, name string) *ShardBuilder {
+	sb.s.Series = &commonv2.Metadata{
+		Group: group,
+		Name:  name,
+	}
+	return sb
+}
+
+func (sb *ShardBuilder) Node(node *databasev2.Node) *ShardBuilder {
+	sb.s.Node = node
+	return sb
+}
+
+func (sb *ShardBuilder) Total(total uint32) *ShardBuilder {
+	sb.s.Total = total
+	return sb
+}
+
+func (sb *ShardBuilder) CreatedAt(t time.Time) *ShardBuilder {
+	sb.s.CreatedAt = timestamppb.New(t)
+	return sb
+}
+
+func (sb *ShardBuilder) UpdatedAt(t time.Time) *ShardBuilder {
+	sb.s.UpdatedAt = timestamppb.New(t)
+	return sb
+}
+
+func (sb *ShardBuilder) Build() *databasev2.Shard {
+	return sb.s
+}
+
+type NodeBuilder struct {
+	n *databasev2.Node
+}
+
+func NewNodeBuilder() *NodeBuilder {
+	return &NodeBuilder{n: &databasev2.Node{}}
+}
+
+func (nb *NodeBuilder) ID(id string) *NodeBuilder {
+	nb.n.Id = id
+	return nb
+}
+
+func (nb *NodeBuilder) Addr(addr string) *NodeBuilder {
+	nb.n.Addr = addr
+	return nb
+}
+
+func (nb *NodeBuilder) UpdatedAt(t time.Time) *NodeBuilder {
+	nb.n.UpdatedAt = timestamppb.New(t)
+	return nb
+}
+
+func (nb *NodeBuilder) CreatedAt(t time.Time) *NodeBuilder {
+	nb.n.CreatedAt = timestamppb.New(t)
+	return nb
+}
+
+func (nb *NodeBuilder) Build() *databasev2.Node {
+	return nb.n
+}
+
+type SeriesEventBuilder struct {
+	se *databasev2.SeriesEvent
+}
+
+func NewSeriesEventBuilder() *SeriesEventBuilder {
+	return &SeriesEventBuilder{se: &databasev2.SeriesEvent{}}
+}
+
+func (seb *SeriesEventBuilder) SeriesMetadata(group, name string) *SeriesEventBuilder {
+	seb.se.Series = &commonv2.Metadata{
+		Group: group,
+		Name:  name,
+	}
+	return seb
+}
+
+func (seb *SeriesEventBuilder) FieldNames(names ...string) *SeriesEventBuilder {
+	seb.se.FieldNamesCompositeSeriesId = names
+	return seb
+}
+
+func (seb *SeriesEventBuilder) Action(action databasev2.Action) *SeriesEventBuilder {
+	seb.se.Action = action
+	return seb
+}
+
+func (seb *SeriesEventBuilder) Time(t time.Time) *SeriesEventBuilder {
+	seb.se.Time = timestamppb.New(t)
+	return seb
+}
+
+func (seb *SeriesEventBuilder) Build() *databasev2.SeriesEvent {
+	return seb.se
+}
diff --git a/pkg/pb/v2/fields.go b/pkg/pb/v2/fields.go
new file mode 100644
index 0000000..36714d2
--- /dev/null
+++ b/pkg/pb/v2/fields.go
@@ -0,0 +1,151 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package v2
+
+import (
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+	tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+func Transform(entityValue *tracev1.EntityValue, fieldIndexes []FieldEntry) []*modelv1.TypedPair {
+	typedPairs := make([]*modelv1.TypedPair, 0)
+	if fieldIndexes == nil {
+		return typedPairs
+	}
+	// copy selected fields
+	for _, fieldIndex := range fieldIndexes {
+		key, idx, t := fieldIndex.Key, fieldIndex.Index, fieldIndex.Type
+		if idx >= len(entityValue.GetFields()) {
+			typedPairs = append(typedPairs, &modelv1.TypedPair{
+				Key: key,
+				Typed: &modelv1.TypedPair_NullPair{
+					NullPair: &modelv1.TypedPair_NullWithType{Type: t},
+				},
+			})
+			continue
+		}
+		f := entityValue.GetFields()[idx]
+		switch v := f.GetValueType().(type) {
+		case *modelv1.Field_Str:
+			typedPairs = append(typedPairs, buildPair(key, v.Str.GetValue()))
+		case *modelv1.Field_StrArray:
+			typedPairs = append(typedPairs, buildPair(key, v.StrArray.GetValue()))
+		case *modelv1.Field_Int:
+			typedPairs = append(typedPairs, buildPair(key, v.Int.GetValue()))
+		case *modelv1.Field_IntArray:
+			typedPairs = append(typedPairs, buildPair(key, v.IntArray.GetValue()))
+		case *modelv1.Field_Null:
+			typedPairs = append(typedPairs, &modelv1.TypedPair{
+				Key: key,
+				Typed: &modelv1.TypedPair_NullPair{
+					NullPair: &modelv1.TypedPair_NullWithType{Type: t},
+				},
+			})
+		}
+	}
+	return typedPairs
+}
+
+func buildPair(key string, value interface{}) *modelv1.TypedPair {
+	result := &modelv1.TypedPair{
+		Key: key,
+	}
+	switch v := value.(type) {
+	case int:
+		result.Typed = &modelv1.TypedPair_IntPair{
+			IntPair: &modelv1.Int{
+				Value: int64(v),
+			},
+		}
+	case []int:
+		result.Typed = &modelv1.TypedPair_IntArrayPair{
+			IntArrayPair: &modelv1.IntArray{
+				Value: convert.IntToInt64(v...),
+			},
+		}
+	case int8:
+		result.Typed = &modelv1.TypedPair_IntPair{
+			IntPair: &modelv1.Int{
+				Value: int64(v),
+			},
+		}
+	case []int8:
+		result.Typed = &modelv1.TypedPair_IntArrayPair{
+			IntArrayPair: &modelv1.IntArray{
+				Value: convert.Int8ToInt64(v...),
+			},
+		}
+	case int16:
+		result.Typed = &modelv1.TypedPair_IntPair{
+			IntPair: &modelv1.Int{
+				Value: int64(v),
+			},
+		}
+	case []int16:
+		result.Typed = &modelv1.TypedPair_IntArrayPair{
+			IntArrayPair: &modelv1.IntArray{
+				Value: convert.Int16ToInt64(v...),
+			},
+		}
+	case int32:
+		result.Typed = &modelv1.TypedPair_IntPair{
+			IntPair: &modelv1.Int{
+				Value: int64(v),
+			},
+		}
+	case []int32:
+		result.Typed = &modelv1.TypedPair_IntArrayPair{
+			IntArrayPair: &modelv1.IntArray{
+				Value: convert.Int32ToInt64(v...),
+			},
+		}
+	case int64:
+		result.Typed = &modelv1.TypedPair_IntPair{
+			IntPair: &modelv1.Int{
+				Value: v,
+			},
+		}
+	case []int64:
+		result.Typed = &modelv1.TypedPair_IntArrayPair{
+			IntArrayPair: &modelv1.IntArray{
+				Value: v,
+			},
+		}
+	case string:
+		result.Typed = &modelv1.TypedPair_StrPair{
+			StrPair: &modelv1.Str{
+				Value: v,
+			},
+		}
+	case []string:
+		result.Typed = &modelv1.TypedPair_StrArrayPair{
+			StrArrayPair: &modelv1.StrArray{
+				Value: v,
+			},
+		}
+	}
+	return result
+}
+
+type FieldEntry struct {
+	Key   string
+	Index int
+	Type  databasev1.FieldType
+}
diff --git a/pkg/pb/v2/query.go b/pkg/pb/v2/query.go
new file mode 100644
index 0000000..6f3ecec
--- /dev/null
+++ b/pkg/pb/v2/query.go
@@ -0,0 +1,222 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package v2
+
+import (
+	"time"
+
+	"google.golang.org/protobuf/types/known/timestamppb"
+
+	commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+var (
+	binaryOpsMap = map[string]modelv2.Condition_BinaryOp{
+		"=":          modelv2.Condition_BINARY_OP_EQ,
+		"!=":         modelv2.Condition_BINARY_OP_NE,
+		">":          modelv2.Condition_BINARY_OP_GT,
+		">=":         modelv2.Condition_BINARY_OP_GE,
+		"<":          modelv2.Condition_BINARY_OP_LT,
+		"<=":         modelv2.Condition_BINARY_OP_LE,
+		"having":     modelv2.Condition_BINARY_OP_HAVING,
+		"not having": modelv2.Condition_BINARY_OP_NOT_HAVING,
+	}
+)
+
+type QueryRequestBuilder struct {
+	ec *streamv2.QueryRequest
+}
+
+func NewQueryRequestBuilder() *QueryRequestBuilder {
+	return &QueryRequestBuilder{
+		ec: &streamv2.QueryRequest{
+			Projection: &modelv2.Projection{},
+		},
+	}
+}
+
+func (b *QueryRequestBuilder) Metadata(group, name string) *QueryRequestBuilder {
+	b.ec.Metadata = &commonv2.Metadata{
+		Group: group,
+		Name:  name,
+	}
+	return b
+}
+
+func (b *QueryRequestBuilder) Limit(limit uint32) *QueryRequestBuilder {
+	b.ec.Limit = limit
+	return b
+}
+
+func (b *QueryRequestBuilder) Offset(offset uint32) *QueryRequestBuilder {
+	b.ec.Offset = offset
+	return b
+}
+
+func (b *QueryRequestBuilder) FieldsInTagFamily(tagFamilyName string, items ...interface{}) *QueryRequestBuilder {
+	if len(items)%3 != 0 {
+		panic("expect 3 to be a factor of the length of items")
+	}
+
+	criteriaConditions := make([]*modelv2.Condition, len(items)/3)
+	for i := 0; i < len(items)/3; i++ {
+		key, op, values := items[i*3+0], items[i*3+1], items[i*3+2]
+		criteriaConditions[i] = &modelv2.Condition{
+			Name:  key.(string),
+			Op:    binaryOpsMap[op.(string)],
+			Value: buildTagValue(values),
+		}
+	}
+
+	b.ec.Criteria = append(b.ec.Criteria, &streamv2.QueryRequest_Criteria{
+		TagFamilyName: tagFamilyName,
+		Conditions:    criteriaConditions,
+	})
+
+	return b
+}
+
+func buildTagValue(value interface{}) *modelv2.TagValue {
+	switch v := value.(type) {
+	case int:
+		return &modelv2.TagValue{
+			Value: &modelv2.TagValue_Int{Int: &modelv2.Int{Value: int64(v)}},
+		}
+	case []int:
+		return &modelv2.TagValue{
+			Value: &modelv2.TagValue_IntArray{IntArray: &modelv2.IntArray{Value: convert.IntToInt64(v...)}},
+		}
+	case int8:
+		return &modelv2.TagValue{
+			Value: &modelv2.TagValue_Int{Int: &modelv2.Int{Value: int64(v)}},
+		}
+	case []int8:
+		return &modelv2.TagValue{
+			Value: &modelv2.TagValue_IntArray{IntArray: &modelv2.IntArray{Value: convert.Int8ToInt64(v...)}},
+		}
+	case int16:
+		return &modelv2.TagValue{
+			Value: &modelv2.TagValue_Int{Int: &modelv2.Int{Value: int64(v)}},
+		}
+	case []int16:
+		return &modelv2.TagValue{
+			Value: &modelv2.TagValue_IntArray{IntArray: &modelv2.IntArray{Value: convert.Int16ToInt64(v...)}},
+		}
+	case int32:
+		return &modelv2.TagValue{
+			Value: &modelv2.TagValue_Int{Int: &modelv2.Int{Value: int64(v)}},
+		}
+	case []int32:
+		return &modelv2.TagValue{
+			Value: &modelv2.TagValue_IntArray{IntArray: &modelv2.IntArray{Value: convert.Int32ToInt64(v...)}},
+		}
+	case int64:
+		return &modelv2.TagValue{
+			Value: &modelv2.TagValue_Int{Int: &modelv2.Int{Value: v}},
+		}
+	case []int64:
+		return &modelv2.TagValue{
+			Value: &modelv2.TagValue_IntArray{IntArray: &modelv2.IntArray{Value: v}},
+		}
+	case string:
+		return &modelv2.TagValue{
+			Value: &modelv2.TagValue_Str{Str: &modelv2.Str{Value: v}},
+		}
+	case []string:
+		return &modelv2.TagValue{
+			Value: &modelv2.TagValue_StrArray{StrArray: &modelv2.StrArray{Value: v}},
+		}
+	}
+	panic("not supported")
+}
+
+func (b *QueryRequestBuilder) Projection(tagFamily string, projections ...string) *QueryRequestBuilder {
+	b.ec.Projection.TagFamilies = append(b.ec.Projection.GetTagFamilies(), &modelv2.Projection_TagFamily{
+		Name: tagFamily,
+		Tags: projections,
+	})
+	return b
+}
+
+func (b *QueryRequestBuilder) OrderBy(indexRuleName string, sort modelv2.QueryOrder_Sort) *QueryRequestBuilder {
+	b.ec.OrderBy = &modelv2.QueryOrder{
+		IndexRuleName: indexRuleName,
+		Sort:          sort,
+	}
+	return b
+}
+
+func (b *QueryRequestBuilder) TimeRange(sT, eT time.Time) *QueryRequestBuilder {
+	b.ec.TimeRange = &modelv2.TimeRange{
+		Begin: timestamppb.New(sT),
+		End:   timestamppb.New(eT),
+	}
+	return b
+}
+
+func (b *QueryRequestBuilder) Build() *streamv2.QueryRequest {
+	return b.ec
+}
+
+type QueryResponseElementBuilder struct {
+	elem *streamv2.Element
+}
+
+func NewQueryEntityBuilder() *QueryResponseElementBuilder {
+	return &QueryResponseElementBuilder{elem: &streamv2.Element{}}
+}
+
+func (qeb *QueryResponseElementBuilder) EntityID(elementID string) *QueryResponseElementBuilder {
+	qeb.elem.ElementId = elementID
+	return qeb
+}
+
+func (qeb *QueryResponseElementBuilder) Timestamp(t time.Time) *QueryResponseElementBuilder {
+	qeb.elem.Timestamp = timestamppb.New(t)
+	return qeb
+}
+
+func (qeb *QueryResponseElementBuilder) FieldsInTagFamily(tagFamily string, items ...interface{}) *QueryResponseElementBuilder {
+	if len(items)%2 != 0 {
+		panic("invalid fields list")
+	}
+
+	l := len(items) / 2
+	tags := make([]*modelv2.Tag, l)
+	for i := 0; i < l; i++ {
+		key, values := items[i*2+0], items[i*2+1]
+		tags[i] = &modelv2.Tag{
+			Key:   key.(string),
+			Value: buildTagValue(values),
+		}
+	}
+
+	qeb.elem.TagFamilies = append(qeb.elem.GetTagFamilies(), &modelv2.TagFamily{
+		Name: tagFamily,
+		Tags: tags,
+	})
+
+	return qeb
+}
+
+func (qeb *QueryResponseElementBuilder) Build() *streamv2.Element {
+	return qeb.elem
+}
diff --git a/pkg/pb/v2/write.go b/pkg/pb/v2/write.go
new file mode 100644
index 0000000..31fe826
--- /dev/null
+++ b/pkg/pb/v2/write.go
@@ -0,0 +1,147 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package v2
+
+import (
+	"time"
+
+	"google.golang.org/protobuf/types/known/timestamppb"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+	tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+type WriteRequestBuilder struct {
+	we *tracev1.WriteRequest
+}
+
+func NewWriteEntityBuilder() *WriteRequestBuilder {
+	return &WriteRequestBuilder{we: &tracev1.WriteRequest{}}
+}
+
+func (web *WriteRequestBuilder) Metadata(group, name string) *WriteRequestBuilder {
+	web.we.Metadata = &commonv1.Metadata{
+		Group: group,
+		Name:  name,
+	}
+	return web
+}
+
+func (web *WriteRequestBuilder) EntityValue(ev *tracev1.EntityValue) *WriteRequestBuilder {
+	web.we.Entity = ev
+	return web
+}
+
+func (web *WriteRequestBuilder) Build() *tracev1.WriteRequest {
+	return web.we
+}
+
+type EntityValueBuilder struct {
+	ev *tracev1.EntityValue
+}
+
+func NewEntityValueBuilder() *EntityValueBuilder {
+	return &EntityValueBuilder{ev: &tracev1.EntityValue{}}
+}
+
+func (evb *EntityValueBuilder) EntityID(entityID string) *EntityValueBuilder {
+	evb.ev.EntityId = entityID
+	return evb
+}
+
+func (evb *EntityValueBuilder) Timestamp(time time.Time) *EntityValueBuilder {
+	evb.ev.Timestamp = timestamppb.New(time)
+	return evb
+}
+
+func (evb *EntityValueBuilder) DataBinary(data []byte) *EntityValueBuilder {
+	evb.ev.DataBinary = data
+	return evb
+}
+
+func (evb *EntityValueBuilder) Fields(items ...interface{}) *EntityValueBuilder {
+	evb.ev.Fields = make([]*modelv1.Field, len(items))
+	for idx, item := range items {
+		evb.ev.Fields[idx] = buildField(item)
+	}
+	return evb
+}
+
+func buildField(value interface{}) *modelv1.Field {
+	if value == nil {
+		return &modelv1.Field{ValueType: &modelv1.Field_Null{}}
+	}
+	switch v := value.(type) {
+	case string:
+		return &modelv1.Field{
+			ValueType: &modelv1.Field_Str{
+				Str: &modelv1.Str{
+					Value: v,
+				},
+			},
+		}
+	case []string:
+		return &modelv1.Field{
+			ValueType: &modelv1.Field_StrArray{
+				StrArray: &modelv1.StrArray{
+					Value: v,
+				},
+			},
+		}
+	case int:
+		return &modelv1.Field{
+			ValueType: &modelv1.Field_Int{
+				Int: &modelv1.Int{
+					Value: int64(v),
+				},
+			},
+		}
+	case []int:
+		return &modelv1.Field{
+			ValueType: &modelv1.Field_IntArray{
+				IntArray: &modelv1.IntArray{
+					Value: convert.IntToInt64(v...),
+				},
+			},
+		}
+	case int64:
+		return &modelv1.Field{
+			ValueType: &modelv1.Field_Int{
+				Int: &modelv1.Int{
+					Value: v,
+				},
+			},
+		}
+	case []int64:
+		return &modelv1.Field{
+			ValueType: &modelv1.Field_IntArray{
+				IntArray: &modelv1.IntArray{
+					Value: v,
+				},
+			},
+		}
+	default:
+		panic("type not supported")
+	}
+}
+
+func (evb *EntityValueBuilder) Build() *tracev1.EntityValue {
+	return evb.ev
+}
diff --git a/pkg/query/v2/executor/interface.go b/pkg/query/v2/executor/interface.go
new file mode 100644
index 0000000..4eb797e
--- /dev/null
+++ b/pkg/query/v2/executor/interface.go
@@ -0,0 +1,31 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package executor
+
+import (
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/banyand/stream"
+)
+
+type ExecutionContext interface {
+	stream.Stream
+}
+
+type Executable interface {
+	Execute(ExecutionContext) ([]*streamv2.Element, error)
+}
diff --git a/pkg/query/v2/logical/analyzer.go b/pkg/query/v2/logical/analyzer.go
new file mode 100644
index 0000000..77ebf82
--- /dev/null
+++ b/pkg/query/v2/logical/analyzer.go
@@ -0,0 +1,226 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+	"context"
+
+	"github.com/pkg/errors"
+
+	commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/banyand/metadata"
+	metadataSchema "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+var (
+	ErrFieldNotDefined            = errors.New("field is not defined")
+	ErrInvalidConditionType       = errors.New("invalid pair type")
+	ErrIncompatibleQueryCondition = errors.New("incompatible query condition type")
+	ErrIndexNotDefined            = errors.New("index is not define for the field")
+	ErrMultipleGlobalIndexes      = errors.New("multiple global indexes are not supported")
+)
+
+var (
+	DefaultLimit uint32 = 100
+)
+
+type Tag struct {
+	familyName, name string
+}
+
+func NewTag(family, name string) *Tag {
+	return &Tag{
+		familyName: family,
+		name:       name,
+	}
+}
+
+// NewTags create an array of Tag within a TagFamily
+func NewTags(family string, tagNames ...string) []*Tag {
+	tags := make([]*Tag, len(tagNames))
+	for i, name := range tagNames {
+		tags[i] = NewTag(family, name)
+	}
+	return tags
+}
+
+// GetCompoundName is only used for error message
+func (t *Tag) GetCompoundName() string {
+	return t.familyName + ":" + t.name
+}
+
+func (t *Tag) GetTagName() string {
+	return t.name
+}
+
+func (t *Tag) GetFamilyName() string {
+	return t.familyName
+}
+
+type Analyzer struct {
+	indexRuleRepo        metadataSchema.IndexRule
+	indexRuleBindingRepo metadataSchema.IndexRuleBinding
+	metadataRepoImpl     metadata.Repo
+}
+
+func DefaultAnalyzer() *Analyzer {
+	indexRule, _ := metadataSchema.NewIndexRule()
+	indexRuleBinding, _ := metadataSchema.NewIndexRuleBinding()
+	metadataService, _ := metadata.NewService(context.TODO())
+
+	return &Analyzer{
+		indexRule,
+		indexRuleBinding,
+		metadataService,
+	}
+}
+
+func (a *Analyzer) BuildStreamSchema(ctx context.Context, metadata *commonv2.Metadata) (Schema, error) {
+	stream, err := a.metadataRepoImpl.Stream().Get(ctx, metadata)
+
+	if err != nil {
+		return nil, err
+	}
+
+	indexRules, err := a.metadataRepoImpl.IndexRules(context.TODO(), metadata)
+
+	if err != nil {
+		return nil, err
+	}
+
+	s := &schema{
+		stream:     stream,
+		indexRules: indexRules,
+		fieldMap:   make(map[string]*tagSpec),
+		entityList: stream.GetEntity().GetTagNames(),
+	}
+
+	// generate the schema of the fields for the traceSeries
+	for tagFamilyIdx, tagFamily := range stream.GetTagFamilies() {
+		for tagIdx, spec := range tagFamily.GetTags() {
+			s.registerField(spec.GetName(), tagFamilyIdx, tagIdx, spec)
+		}
+	}
+
+	return s, nil
+}
+
+func (a *Analyzer) Analyze(_ context.Context, criteria *streamv2.QueryRequest, metadata *commonv2.Metadata, s Schema) (Plan, error) {
+	// parse fields
+	plan, err := parseFields(criteria, metadata, s)
+	if err != nil {
+		return nil, err
+	}
+
+	// parse orderBy
+	queryOrder := criteria.GetOrderBy()
+	if queryOrder != nil {
+		if v, ok := plan.(*unresolvedIndexScan); ok {
+			v.unresolvedOrderBy = OrderBy(queryOrder.GetIndexRuleName(), queryOrder.GetSort())
+		}
+	}
+
+	// parse offset
+	plan = Offset(plan, criteria.GetOffset())
+
+	// parse limit
+	limitParameter := criteria.GetLimit()
+	if limitParameter == 0 {
+		limitParameter = DefaultLimit
+	}
+	plan = Limit(plan, limitParameter)
+
+	return plan.Analyze(s)
+}
+
+// parseFields parses the query request to decide which kind of plan should be generated
+// Basically,
+// 1 - If no criteria is given, we can only scan all shards
+// 2 - If criteria is given, but all of those fields exist in the "entity" definition,
+//     i.e. they are top-level sharding keys. For example, for the current skywalking's schema,
+//     we use service_id + service_instance_id + state as the compound sharding keys.
+func parseFields(criteria *streamv2.QueryRequest, metadata *commonv2.Metadata, s Schema) (UnresolvedPlan, error) {
+	timeRange := criteria.GetTimeRange()
+
+	projTags := make([][]*Tag, len(criteria.GetProjection().GetTagFamilies()))
+	for i, tagFamily := range criteria.GetProjection().GetTagFamilies() {
+		var projTagInFamily []*Tag
+		for _, tagName := range tagFamily.GetTags() {
+			projTagInFamily = append(projTagInFamily, NewTag(tagFamily.GetName(), tagName))
+		}
+		projTags[i] = projTagInFamily
+	}
+
+	var tagExprs []Expr
+
+	entityList := s.EntityList()
+	entityMap := make(map[string]int)
+	entity := make([]tsdb.Entry, len(entityList))
+	for idx, e := range entityList {
+		entityMap[e] = idx
+		// fill AnyEntry by default
+		entity[idx] = tsdb.AnyEntry
+	}
+
+	for _, criteriaFamily := range criteria.GetCriteria() {
+		for _, pairQuery := range criteriaFamily.GetConditions() {
+			op := pairQuery.GetOp()
+			typedTagValue := pairQuery.GetValue()
+			var e Expr
+			switch v := typedTagValue.GetValue().(type) {
+			case *modelv2.TagValue_Str:
+				if entityIdx, ok := entityMap[pairQuery.GetName()]; ok {
+					entity[entityIdx] = []byte(v.Str.GetValue())
+				} else {
+					e = &strLiteral{
+						string: v.Str.GetValue(),
+					}
+				}
+			case *modelv2.TagValue_StrArray:
+				e = &strArrLiteral{
+					arr: v.StrArray.GetValue(),
+				}
+			case *modelv2.TagValue_Int:
+				if entityIdx, ok := entityMap[pairQuery.GetName()]; ok {
+					entity[entityIdx] = convert.Int64ToBytes(v.Int.GetValue())
+				} else {
+					e = &int64Literal{
+						int64: v.Int.GetValue(),
+					}
+				}
+			case *modelv2.TagValue_IntArray:
+				e = &int64ArrLiteral{
+					arr: v.IntArray.GetValue(),
+				}
+			default:
+				return nil, ErrInvalidConditionType
+			}
+			// we collect Condition only if it is not a part of entity
+			if e != nil {
+				tagExprs = append(tagExprs, binaryOpFactory[op](NewFieldRef(criteriaFamily.GetTagFamilyName(), pairQuery.GetName()), e))
+			}
+		}
+	}
+
+	return IndexScan(timeRange.GetBegin().AsTime(), timeRange.GetEnd().AsTime(), metadata,
+		tagExprs, entity, nil, projTags...), nil
+}
diff --git a/pkg/query/v2/logical/analyzer_test.go b/pkg/query/v2/logical/analyzer_test.go
new file mode 100644
index 0000000..1a6421b
--- /dev/null
+++ b/pkg/query/v2/logical/analyzer_test.go
@@ -0,0 +1,232 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical_test
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"github.com/google/go-cmp/cmp"
+	"github.com/stretchr/testify/require"
+
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	pb "github.com/apache/skywalking-banyandb/pkg/pb/v2"
+	"github.com/apache/skywalking-banyandb/pkg/query/v2/logical"
+)
+
+func TestAnalyzer_SimpleTimeScan(t *testing.T) {
+	assert := require.New(t)
+
+	ana := logical.DefaultAnalyzer()
+
+	sT, eT := time.Now().Add(-3*time.Hour), time.Now()
+
+	criteria := pb.NewQueryRequestBuilder().
+		Limit(20).
+		Offset(0).
+		Metadata("default", "sw").
+		TimeRange(sT, eT).
+		Build()
+
+	metadata := criteria.GetMetadata()
+
+	schema, err := ana.BuildStreamSchema(context.TODO(), metadata)
+	assert.NoError(err)
+
+	plan, err := ana.Analyze(context.TODO(), criteria, metadata, schema)
+	assert.NoError(err)
+	assert.NotNil(plan)
+	correctPlan, err := logical.Limit(
+		logical.Offset(
+			logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil),
+			0),
+		20).
+		Analyze(schema)
+	assert.NoError(err)
+	assert.NotNil(correctPlan)
+	assert.True(cmp.Equal(plan, correctPlan), "plan is not equal to correct plan")
+}
+
+func TestAnalyzer_ComplexQuery(t *testing.T) {
+	assert := require.New(t)
+
+	ana := logical.DefaultAnalyzer()
+
+	sT, eT := time.Now().Add(-3*time.Hour), time.Now()
+
+	criteria := pb.NewQueryRequestBuilder().
+		Limit(5).
+		Offset(10).
+		OrderBy("duration", modelv2.QueryOrder_SORT_DESC).
+		Metadata("default", "sw").
+		Projection("searchable", "http.method", "service_id", "duration").
+		FieldsInTagFamily("searchable", "service_id", "=", "my_app", "http.method", "=", "GET", "mq.topic", "=", "event_topic").
+		TimeRange(sT, eT).
+		Build()
+
+	metadata := criteria.GetMetadata()
+
+	schema, err := ana.BuildStreamSchema(context.TODO(), metadata)
+	assert.NoError(err)
+
+	plan, err := ana.Analyze(context.TODO(), criteria, metadata, schema)
+	assert.NoError(err)
+	assert.NotNil(plan)
+
+	correctPlan, err := logical.Limit(
+		logical.Offset(
+			logical.IndexScan(sT, eT, metadata,
+				[]logical.Expr{
+					logical.Eq(logical.NewSearchableFieldRef("mq.topic"), logical.Str("event_topic")),
+					logical.Eq(logical.NewSearchableFieldRef("http.method"), logical.Str("GET")),
+				}, tsdb.Entity{tsdb.Entry("my_app"), tsdb.AnyEntry, tsdb.AnyEntry},
+				logical.OrderBy("duration", modelv2.QueryOrder_SORT_DESC),
+				logical.NewTags("searchable", "http.method", "service_id", "duration")),
+			10),
+		5).
+		Analyze(schema)
+	assert.NoError(err)
+	assert.NotNil(correctPlan)
+	assert.True(cmp.Equal(plan, correctPlan), "plan is not equal to correct plan")
+}
+
+func TestAnalyzer_TraceIDQuery(t *testing.T) {
+	assert := require.New(t)
+
+	ana := logical.DefaultAnalyzer()
+
+	criteria := pb.NewQueryRequestBuilder().
+		Limit(100).
+		Offset(0).
+		Metadata("default", "sw").
+		FieldsInTagFamily("searchable", "trace_id", "=", "123").
+		Build()
+
+	metadata := criteria.GetMetadata()
+
+	schema, err := ana.BuildStreamSchema(context.TODO(), metadata)
+	assert.NoError(err)
+
+	plan, err := ana.Analyze(context.TODO(), criteria, metadata, schema)
+	assert.NoError(err)
+	assert.NotNil(plan)
+	correctPlan, err := logical.Limit(
+		logical.Offset(logical.IndexScan(time.Now(), time.Now(), metadata, []logical.Expr{
+			logical.Eq(logical.NewSearchableFieldRef("trace_id"), logical.Str("123")),
+		}, nil, nil),
+			0),
+		100).Analyze(schema)
+	assert.NoError(err)
+	assert.NotNil(correctPlan)
+	assert.True(cmp.Equal(plan, correctPlan), "plan is not equal to correct plan")
+}
+
+func TestAnalyzer_OrderBy_IndexNotDefined(t *testing.T) {
+	assert := require.New(t)
+
+	ana := logical.DefaultAnalyzer()
+
+	criteria := pb.NewQueryRequestBuilder().
+		Limit(5).
+		Offset(10).
+		OrderBy("service_instance_id", modelv2.QueryOrder_SORT_DESC).
+		Metadata("default", "sw").
+		Projection("searchable", "trace_id", "service_id").
+		FieldsInTagFamily("searchable", "duration", ">", 500).
+		TimeRange(time.Now().Add(-3*time.Hour), time.Now()).
+		Build()
+
+	metadata := criteria.GetMetadata()
+
+	schema, err := ana.BuildStreamSchema(context.TODO(), metadata)
+	assert.NoError(err)
+
+	_, err = ana.Analyze(context.TODO(), criteria, metadata, schema)
+	assert.ErrorIs(err, logical.ErrIndexNotDefined)
+}
+
+func TestAnalyzer_OrderBy_FieldNotDefined(t *testing.T) {
+	assert := require.New(t)
+
+	ana := logical.DefaultAnalyzer()
+
+	criteria := pb.NewQueryRequestBuilder().
+		Limit(5).
+		Offset(10).
+		OrderBy("duration2", modelv2.QueryOrder_SORT_DESC).
+		Metadata("default", "sw").
+		Projection("searchable", "trace_id", "service_id").
+		TimeRange(time.Now().Add(-3*time.Hour), time.Now()).
+		Build()
+
+	metadata := criteria.GetMetadata()
+
+	schema, err := ana.BuildStreamSchema(context.TODO(), metadata)
+	assert.NoError(err)
+
+	_, err = ana.Analyze(context.TODO(), criteria, metadata, schema)
+	assert.ErrorIs(err, logical.ErrIndexNotDefined)
+}
+
+func TestAnalyzer_Projection_FieldNotDefined(t *testing.T) {
+	assert := require.New(t)
+
+	ana := logical.DefaultAnalyzer()
+
+	criteria := pb.NewQueryRequestBuilder().
+		Limit(5).
+		Offset(10).
+		OrderBy("duration", modelv2.QueryOrder_SORT_DESC).
+		Metadata("default", "sw").
+		Projection("searchable", "duration", "service_id", "unknown").
+		TimeRange(time.Now().Add(-3*time.Hour), time.Now()).
+		Build()
+
+	metadata := criteria.GetMetadata()
+
+	schema, err := ana.BuildStreamSchema(context.TODO(), metadata)
+	assert.NoError(err)
+
+	_, err = ana.Analyze(context.TODO(), criteria, metadata, schema)
+	assert.ErrorIs(err, logical.ErrFieldNotDefined)
+}
+
+func TestAnalyzer_Fields_IndexNotDefined(t *testing.T) {
+	assert := require.New(t)
+
+	ana := logical.DefaultAnalyzer()
+
+	criteria := pb.NewQueryRequestBuilder().
+		Limit(5).
+		Offset(10).
+		Metadata("default", "sw").
+		Projection("duration", "service_id").
+		TimeRange(time.Now().Add(-3*time.Hour), time.Now()).
+		FieldsInTagFamily("searchable", "start_time", ">", 10000).
+		Build()
+
+	metadata := criteria.GetMetadata()
+
+	schema, err := ana.BuildStreamSchema(context.TODO(), metadata)
+	assert.NoError(err)
+
+	_, err = ana.Analyze(context.TODO(), criteria, metadata, schema)
+	assert.ErrorIs(err, logical.ErrIndexNotDefined)
+}
diff --git a/pkg/query/v2/logical/common.go b/pkg/query/v2/logical/common.go
new file mode 100644
index 0000000..c45c149
--- /dev/null
+++ b/pkg/query/v2/logical/common.go
@@ -0,0 +1,107 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+	"bytes"
+
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/query/v2/executor"
+)
+
+type (
+	seekerBuilder func(builder tsdb.SeekerBuilder)
+	comparator    func(a, b tsdb.Item) bool
+)
+
+func createComparator(sortDirection modelv2.QueryOrder_Sort) comparator {
+	return func(a, b tsdb.Item) bool {
+		comp := bytes.Compare(a.SortedField(), b.SortedField())
+		if sortDirection == modelv2.QueryOrder_SORT_DESC {
+			return comp == 1
+		}
+		return comp == -1
+	}
+}
+
+// projectItem parses the item within the ExecutionContext.
+// projectionFieldRefs must be prepared before calling this method, projectionFieldRefs should be a list of
+// tag list where the inner list must exist in the same tag family.
+// Strict order can be guaranteed in the result.
+func projectItem(ec executor.ExecutionContext, item tsdb.Item, projectionFieldRefs [][]*FieldRef) ([]*modelv2.TagFamily, error) {
+	tagFamily := make([]*modelv2.TagFamily, len(projectionFieldRefs))
+	for i, refs := range projectionFieldRefs {
+		tags := make([]*modelv2.Tag, len(refs))
+		familyName := refs[0].tag.GetFamilyName()
+		parsedTagFamily, err := ec.ParseTagFamily(familyName, item)
+		if err != nil {
+			return nil, err
+		}
+		for j, ref := range refs {
+			tags[j] = parsedTagFamily.GetTags()[ref.Spec.TagIdx]
+		}
+
+		tagFamily[i] = &modelv2.TagFamily{
+			Name: familyName,
+			Tags: tags,
+		}
+	}
+
+	return tagFamily, nil
+}
+
+// executeForShard fetches elements from series within a single shard. A list of series must be prepared in advanced
+// with the help of Entity. The result is a list of element set, where the order of inner list is kept
+// as what the users specify in the seekerBuilder.
+// This method is used by the underlying tableScan and localIndexScan plans.
+func executeForShard(series tsdb.SeriesList, timeRange tsdb.TimeRange,
+	builders ...seekerBuilder) ([]tsdb.Iterator, error) {
+	var itersInShard []tsdb.Iterator
+	for _, seriesFound := range series {
+		itersInSeries, err := func() ([]tsdb.Iterator, error) {
+			sp, errInner := seriesFound.Span(timeRange)
+			defer func(sp tsdb.SeriesSpan) {
+				_ = sp.Close()
+			}(sp)
+			if errInner != nil {
+				return nil, errInner
+			}
+			b := sp.SeekerBuilder()
+			for _, builder := range builders {
+				builder(b)
+			}
+			seeker, errInner := b.Build()
+			if errInner != nil {
+				return nil, errInner
+			}
+			iters, errInner := seeker.Seek()
+			if errInner != nil {
+				return nil, errInner
+			}
+			return iters, nil
+		}()
+		if err != nil {
+			return nil, err
+		}
+		if len(itersInSeries) > 0 {
+			itersInShard = append(itersInShard, itersInSeries...)
+		}
+	}
+	return itersInShard, nil
+}
diff --git a/pkg/query/v2/logical/common_test.go b/pkg/query/v2/logical/common_test.go
new file mode 100644
index 0000000..bb501fd
--- /dev/null
+++ b/pkg/query/v2/logical/common_test.go
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical_test
+
+import (
+	"context"
+	"embed"
+	"encoding/base64"
+	"encoding/json"
+	"strconv"
+	"testing"
+	"time"
+
+	"github.com/golang/protobuf/jsonpb"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/protobuf/types/known/timestamppb"
+
+	commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/banyand/metadata"
+	"github.com/apache/skywalking-banyandb/banyand/stream"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+//go:embed testdata/*.json
+var dataFS embed.FS
+
+func setupQueryData(testing *testing.T, dataFile string, stream stream.Stream) (baseTime time.Time) {
+	t := assert.New(testing)
+	var templates []interface{}
+	baseTime = time.Now()
+	content, err := dataFS.ReadFile("testdata/" + dataFile)
+	t.NoError(err)
+	t.NoError(json.Unmarshal(content, &templates))
+	bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
+	for i, template := range templates {
+		rawSearchTagFamily, errMarshal := json.Marshal(template)
+		t.NoError(errMarshal)
+		searchTagFamily := &streamv2.ElementValue_TagFamily{}
+		t.NoError(jsonpb.UnmarshalString(string(rawSearchTagFamily), searchTagFamily))
+		e := &streamv2.ElementValue{
+			ElementId: strconv.Itoa(i),
+			Timestamp: timestamppb.New(baseTime.Add(500 * time.Millisecond * time.Duration(i))),
+			TagFamilies: []*streamv2.ElementValue_TagFamily{
+				{
+					Tags: []*modelv2.TagValue{
+						{
+							Value: &modelv2.TagValue_BinaryData{
+								BinaryData: bb,
+							},
+						},
+					},
+				},
+			},
+		}
+		e.TagFamilies = append(e.TagFamilies, searchTagFamily)
+		errInner := stream.Write(e)
+		t.NoError(errInner)
+	}
+	return baseTime
+}
+
+func setup(t *assert.Assertions) (stream.Stream, func()) {
+	t.NoError(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "info",
+	}))
+
+	tempDir, deferFunc := test.Space(t)
+
+	metadataSvc, err := metadata.NewService(context.TODO())
+	t.NoError(err)
+	streamSvc, err := stream.NewService(context.TODO(), metadataSvc, nil)
+	t.NoError(err)
+
+	err = streamSvc.FlagSet().Parse([]string{"--root-path=" + tempDir})
+	t.NoError(err)
+
+	err = streamSvc.PreRun()
+	t.NoError(err)
+
+	s, err := streamSvc.Stream(&commonv2.Metadata{
+		Name:  "sw",
+		Group: "default",
+	})
+	t.NoError(err)
+
+	return s, func() {
+		_ = s.Close()
+		deferFunc()
+	}
+}
diff --git a/pkg/query/v2/logical/expr.go b/pkg/query/v2/logical/expr.go
new file mode 100644
index 0000000..f54ef4f
--- /dev/null
+++ b/pkg/query/v2/logical/expr.go
@@ -0,0 +1,200 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+	"fmt"
+
+	"github.com/pkg/errors"
+
+	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+)
+
+var binaryOpFactory = map[modelv2.Condition_BinaryOp]func(l, r Expr) Expr{
+	modelv2.Condition_BINARY_OP_EQ:         Eq,
+	modelv2.Condition_BINARY_OP_NE:         Ne,
+	modelv2.Condition_BINARY_OP_LT:         Lt,
+	modelv2.Condition_BINARY_OP_GT:         Gt,
+	modelv2.Condition_BINARY_OP_LE:         Le,
+	modelv2.Condition_BINARY_OP_GE:         Ge,
+	modelv2.Condition_BINARY_OP_HAVING:     Having,
+	modelv2.Condition_BINARY_OP_NOT_HAVING: NotHaving,
+}
+
+var _ ResolvableExpr = (*FieldRef)(nil)
+
+// FieldRef is the reference to the field
+// also it holds the definition/schema of the field
+type FieldRef struct {
+	// tag defines the family name and name of the Tag
+	tag *Tag
+	// spec contains the index of the key in the schema, as well as the underlying FieldSpec
+	Spec *tagSpec
+}
+
+func (f *FieldRef) Equal(expr Expr) bool {
+	if other, ok := expr.(*FieldRef); ok {
+		return other.tag.GetTagName() == f.tag.GetTagName() && other.Spec.spec.GetType() == f.Spec.spec.GetType()
+	}
+	return false
+}
+
+func (f *FieldRef) FieldType() databasev2.TagType {
+	if f.Spec == nil {
+		panic("should be resolved first")
+	}
+	return f.Spec.spec.GetType()
+}
+
+func (f *FieldRef) Resolve(s Schema) error {
+	specs, err := s.CreateRef([]*Tag{f.tag})
+	if err != nil {
+		return err
+	}
+	f.Spec = specs[0][0].Spec
+	return nil
+}
+
+func (f *FieldRef) String() string {
+	return fmt.Sprintf("#%s<%s>", f.tag.GetCompoundName(), f.Spec.spec.GetType().String())
+}
+
+func NewFieldRef(familyName, tagName string) *FieldRef {
+	return &FieldRef{
+		tag: NewTag(familyName, tagName),
+	}
+}
+
+// NewSearchableFieldRef is a short-hand method for creating a FieldRef to the tag in the searchable family
+func NewSearchableFieldRef(tagName string) *FieldRef {
+	return &FieldRef{
+		tag: NewTag("searchable", tagName),
+	}
+}
+
+var _ ResolvableExpr = (*binaryExpr)(nil)
+
+// binaryExpr is composed of two operands with one op as the operator
+// l is normally a reference to a field, while r is usually literals
+type binaryExpr struct {
+	op modelv2.Condition_BinaryOp
+	l  Expr
+	r  Expr
+}
+
+func (b *binaryExpr) Equal(expr Expr) bool {
+	if other, ok := expr.(*binaryExpr); ok {
+		return b.op == other.op && b.l.Equal(other.l) && b.r.Equal(other.r)
+	}
+	return false
+}
+
+func (b *binaryExpr) FieldType() databasev2.TagType {
+	panic("Boolean should be added")
+}
+
+func (b *binaryExpr) Resolve(s Schema) error {
+	if lr, ok := b.l.(ResolvableExpr); ok {
+		err := lr.Resolve(s)
+		if err != nil {
+			return err
+		}
+	}
+	if rr, ok := b.l.(ResolvableExpr); ok {
+		err := rr.Resolve(s)
+		if err != nil {
+			return err
+		}
+	}
+	if b.l.FieldType() != b.r.FieldType() {
+		return errors.Wrapf(ErrIncompatibleQueryCondition, "left is %s while right is %s",
+			b.l.FieldType().String(),
+			b.r.FieldType().String(),
+		)
+	}
+	return nil
+}
+
+func (b *binaryExpr) String() string {
+	return fmt.Sprintf("%s %s %s", b.l.String(), b.op.String(), b.r.String())
+}
+
+func Eq(l, r Expr) Expr {
+	return &binaryExpr{
+		op: modelv2.Condition_BINARY_OP_EQ,
+		l:  l,
+		r:  r,
+	}
+}
+
+func Ne(l, r Expr) Expr {
+	return &binaryExpr{
+		op: modelv2.Condition_BINARY_OP_NE,
+		l:  l,
+		r:  r,
+	}
+}
+
+func Lt(l, r Expr) Expr {
+	return &binaryExpr{
+		op: modelv2.Condition_BINARY_OP_LT,
+		l:  l,
+		r:  r,
+	}
+}
+
+func Le(l, r Expr) Expr {
+	return &binaryExpr{
+		op: modelv2.Condition_BINARY_OP_LE,
+		l:  l,
+		r:  r,
+	}
+}
+
+func Gt(l, r Expr) Expr {
+	return &binaryExpr{
+		op: modelv2.Condition_BINARY_OP_GT,
+		l:  l,
+		r:  r,
+	}
+}
+
+func Ge(l, r Expr) Expr {
+	return &binaryExpr{
+		op: modelv2.Condition_BINARY_OP_GE,
+		l:  l,
+		r:  r,
+	}
+}
+
+func Having(l, r Expr) Expr {
+	return &binaryExpr{
+		op: modelv2.Condition_BINARY_OP_HAVING,
+		l:  l,
+		r:  r,
+	}
+}
+
+func NotHaving(l, r Expr) Expr {
+	return &binaryExpr{
+		op: modelv2.Condition_BINARY_OP_NOT_HAVING,
+		l:  l,
+		r:  r,
+	}
+}
diff --git a/pkg/query/v2/logical/expr_literal.go b/pkg/query/v2/logical/expr_literal.go
new file mode 100644
index 0000000..6bdc72a
--- /dev/null
+++ b/pkg/query/v2/logical/expr_literal.go
@@ -0,0 +1,160 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+	"fmt"
+	"strconv"
+
+	"github.com/google/go-cmp/cmp"
+
+	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+var _ LiteralExpr = (*int64Literal)(nil)
+
+type int64Literal struct {
+	int64
+}
+
+func (i *int64Literal) Bytes() [][]byte {
+	return [][]byte{convert.Int64ToBytes(i.int64)}
+}
+
+func (i *int64Literal) Equal(expr Expr) bool {
+	if other, ok := expr.(*int64Literal); ok {
+		return other.int64 == i.int64
+	}
+
+	return false
+}
+
+func Int(num int64) Expr {
+	return &int64Literal{num}
+}
+
+func (i *int64Literal) FieldType() databasev2.TagType {
+	return databasev2.TagType_TAG_TYPE_INT
+}
+
+func (i *int64Literal) String() string {
+	return strconv.FormatInt(i.int64, 10)
+}
+
+var _ LiteralExpr = (*int64ArrLiteral)(nil)
+
+type int64ArrLiteral struct {
+	arr []int64
+}
+
+func (i *int64ArrLiteral) Bytes() [][]byte {
+	b := make([][]byte, 0, len(i.arr))
+	for _, i := range i.arr {
+		b = append(b, convert.Int64ToBytes(i))
+	}
+	return b
+}
+
+func (i *int64ArrLiteral) Equal(expr Expr) bool {
+	if other, ok := expr.(*int64ArrLiteral); ok {
+		return cmp.Equal(other.arr, i.arr)
+	}
+
+	return false
+}
+
+func Ints(ints ...int64) Expr {
+	return &int64ArrLiteral{
+		arr: ints,
+	}
+}
+
+func (i *int64ArrLiteral) FieldType() databasev2.TagType {
+	return databasev2.TagType_TAG_TYPE_INT_ARRAY
+}
+
+func (i *int64ArrLiteral) String() string {
+	return fmt.Sprintf("%v", i.arr)
+}
+
+var _ LiteralExpr = (*strLiteral)(nil)
+
+type strLiteral struct {
+	string
+}
+
+func (s *strLiteral) Bytes() [][]byte {
+	return [][]byte{[]byte(s.string)}
+}
+
+func (s *strLiteral) Equal(expr Expr) bool {
+	if other, ok := expr.(*strLiteral); ok {
+		return other.string == s.string
+	}
+
+	return false
+}
+
+func Str(str string) Expr {
+	return &strLiteral{str}
+}
+
+func (s *strLiteral) FieldType() databasev2.TagType {
+	return databasev2.TagType_TAG_TYPE_STRING
+}
+
+func (s *strLiteral) String() string {
+	return s.string
+}
+
+var _ LiteralExpr = (*strArrLiteral)(nil)
+
+type strArrLiteral struct {
+	arr []string
+}
+
+func (s *strArrLiteral) Bytes() [][]byte {
+	b := make([][]byte, 0, len(s.arr))
+	for _, str := range s.arr {
+		b = append(b, []byte(str))
+	}
+	return b
+}
+
+func (s *strArrLiteral) Equal(expr Expr) bool {
+	if other, ok := expr.(*strArrLiteral); ok {
+		return cmp.Equal(other.arr, s.arr)
+	}
+
+	return false
+}
+
+func Strs(strs ...string) Expr {
+	return &strArrLiteral{
+		arr: strs,
+	}
+}
+
+func (s *strArrLiteral) FieldType() databasev2.TagType {
+	return databasev2.TagType_TAG_TYPE_STRING_ARRAY
+}
+
+func (s *strArrLiteral) String() string {
+	return fmt.Sprintf("%v", s.arr)
+}
diff --git a/pkg/query/v2/logical/format.go b/pkg/query/v2/logical/format.go
new file mode 100644
index 0000000..c880b82
--- /dev/null
+++ b/pkg/query/v2/logical/format.go
@@ -0,0 +1,52 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+	"strings"
+)
+
+func Format(p Plan) string {
+	return formatWithIndent(p, 0)
+}
+
+func formatWithIndent(p Plan, indent int) string {
+	res := ""
+	if indent > 1 {
+		res += strings.Repeat(" ", 5*(indent-1))
+	}
+	if indent > 0 {
+		res += "+"
+		res += strings.Repeat("-", 4)
+	}
+	res += p.String() + "\n"
+	for _, child := range p.Children() {
+		res += formatWithIndent(child, indent+1)
+	}
+	return res
+}
+
+func formatExpr(sep string, exprGroup ...[]*FieldRef) string {
+	var exprsStr []string
+	for _, exprs := range exprGroup {
+		for i := 0; i < len(exprs); i++ {
+			exprsStr = append(exprsStr, exprs[i].String())
+		}
+	}
+	return strings.Join(exprsStr, sep)
+}
diff --git a/pkg/query/v2/logical/interface.go b/pkg/query/v2/logical/interface.go
new file mode 100644
index 0000000..937cdb1
--- /dev/null
+++ b/pkg/query/v2/logical/interface.go
@@ -0,0 +1,63 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+	"fmt"
+
+	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+	"github.com/apache/skywalking-banyandb/pkg/query/v2/executor"
+)
+
+type PlanType uint8
+
+const (
+	PlanLimit PlanType = iota
+	PlanOffset
+	PlanLocalIndexScan
+	PlanGlobalIndexScan
+)
+
+type UnresolvedPlan interface {
+	Analyze(Schema) (Plan, error)
+}
+
+type Plan interface {
+	fmt.Stringer
+	Type() PlanType
+	executor.Executable
+	Equal(Plan) bool
+	Children() []Plan
+	Schema() Schema
+}
+
+type Expr interface {
+	fmt.Stringer
+	FieldType() databasev2.TagType
+	Equal(Expr) bool
+}
+
+type LiteralExpr interface {
+	Expr
+	Bytes() [][]byte
+}
+
+type ResolvableExpr interface {
+	Expr
+	Resolve(Schema) error
+}
diff --git a/pkg/query/v2/logical/iter.go b/pkg/query/v2/logical/iter.go
new file mode 100644
index 0000000..5aa0bb7
--- /dev/null
+++ b/pkg/query/v2/logical/iter.go
@@ -0,0 +1,124 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+	"container/heap"
+
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+)
+
+var _ ItemIterator = (*itemIter)(nil)
+
+type ItemIterator interface {
+	HasNext() bool
+	Next() tsdb.Item
+}
+
+var _ heap.Interface = (*containerHeap)(nil)
+
+// container contains both iter and its current item
+type container struct {
+	c    comparator
+	item tsdb.Item
+	iter tsdb.Iterator
+}
+
+type containerHeap []*container
+
+func (h containerHeap) Len() int           { return len(h) }
+func (h containerHeap) Less(i, j int) bool { return h[i].c(h[i].item, h[j].item) }
+func (h containerHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
+
+func (h *containerHeap) Push(x interface{}) {
+	*h = append(*h, x.(*container))
+}
+
+func (h *containerHeap) Pop() interface{} {
+	old := *h
+	n := len(old)
+	x := old[n-1]
+	*h = old[0 : n-1]
+	return x
+}
+
+type itemIter struct {
+	// c is the comparator to sort items
+	c comparator
+	// iters is the list of initial Iterator
+	iters []tsdb.Iterator
+	// deq is the deque of the container
+	// 1. When we push a new container, we can normally append it to the tail of the deq,
+	//    and then sort the whole slice
+	// 2. When we pop a new container, we can just pop out the first element in the deq.
+	//    The rest of the slice is still sorted.
+	h *containerHeap
+}
+
+func NewItemIter(iters []tsdb.Iterator, c comparator) ItemIterator {
+	it := &itemIter{
+		c:     c,
+		iters: iters,
+		h:     &containerHeap{},
+	}
+	it.init()
+	return it
+}
+
+// init function MUST be called while initialization.
+// 1. Move all iterator to the first item by invoking their Next.
+// 2. Load all first items into a slice.
+func (it *itemIter) init() {
+	for _, iter := range it.iters {
+		it.pushIterator(iter)
+	}
+	// heap initialization
+	heap.Init(it.h)
+}
+
+// pushIterator pushes the given iterator into the underlying deque.
+// Status will be immediately checked if the Iterator has a next value.
+// 1 - If not, it will be close at once and will not be added to the slice,
+//     which means inactive iterator does not exist in the deq.
+// 2 - If so, it will be wrapped into a container and push to the deq.
+//     Then we call SliceStable sort to sort the deq.
+func (it *itemIter) pushIterator(iter tsdb.Iterator) {
+	if !iter.Next() {
+		_ = iter.Close()
+		return
+	}
+	heap.Push(it.h, &container{
+		item: iter.Val(),
+		iter: iter,
+		c:    it.c,
+	})
+}
+
+func (it *itemIter) HasNext() bool {
+	return it.h.Len() > 0
+}
+
+func (it *itemIter) Next() tsdb.Item {
+	// 3. Pop up the minimal item through the order value
+	c := heap.Pop(it.h).(*container)
+
+	// 4. Move the iterator whose value is popped in step 3, push the next value of this iterator into the slice.
+	it.pushIterator(c.iter)
+
+	return c.item
+}
diff --git a/pkg/query/v2/logical/plan.go b/pkg/query/v2/logical/plan.go
new file mode 100644
index 0000000..4b568c7
--- /dev/null
+++ b/pkg/query/v2/logical/plan.go
@@ -0,0 +1,162 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+	"fmt"
+
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/pkg/query/v2/executor"
+)
+
+var _ Plan = (*limit)(nil)
+var _ UnresolvedPlan = (*limit)(nil)
+
+type parent struct {
+	unresolvedInput UnresolvedPlan
+	input           Plan
+}
+
+type limit struct {
+	*parent
+	limitNum uint32
+}
+
+func (l *limit) Execute(ec executor.ExecutionContext) ([]*streamv2.Element, error) {
+	entities, err := l.parent.input.Execute(ec)
+	if err != nil {
+		return nil, err
+	}
+
+	if len(entities) > int(l.limitNum) {
+		return entities[:l.limitNum], nil
+	}
+
+	return entities, nil
+}
+
+func (l *limit) Equal(plan Plan) bool {
+	if plan.Type() != PlanLimit {
+		return false
+	}
+	other := plan.(*limit)
+	if l.limitNum == other.limitNum {
+		return l.input.Equal(other.input)
+	}
+	return false
+}
+
+func (l *limit) Analyze(s Schema) (Plan, error) {
+	var err error
+	l.input, err = l.unresolvedInput.Analyze(s)
+	if err != nil {
+		return nil, err
+	}
+	return l, nil
+}
+
+func (l *limit) Schema() Schema {
+	return l.input.Schema()
+}
+
+func (l *limit) String() string {
+	return fmt.Sprintf("Limit: %d", l.limitNum)
+}
+
+func (l *limit) Children() []Plan {
+	return []Plan{l.input}
+}
+
+func (l *limit) Type() PlanType {
+	return PlanLimit
+}
+
+func Limit(input UnresolvedPlan, num uint32) UnresolvedPlan {
+	return &limit{
+		parent: &parent{
+			unresolvedInput: input,
+		},
+		limitNum: num,
+	}
+}
+
+var _ Plan = (*offset)(nil)
+var _ UnresolvedPlan = (*offset)(nil)
+
+type offset struct {
+	*parent
+	offsetNum uint32
+}
+
+func (l *offset) Execute(ec executor.ExecutionContext) ([]*streamv2.Element, error) {
+	elements, err := l.parent.input.Execute(ec)
+	if err != nil {
+		return nil, err
+	}
+
+	if len(elements) > int(l.offsetNum) {
+		return elements[l.offsetNum:], nil
+	}
+
+	return []*streamv2.Element{}, nil
+}
+
+func (l *offset) Equal(plan Plan) bool {
+	if plan.Type() != PlanOffset {
+		return false
+	}
+	other := plan.(*offset)
+	if l.offsetNum == other.offsetNum {
+		return l.input.Equal(other.input)
+	}
+	return false
+}
+
+func (l *offset) Analyze(s Schema) (Plan, error) {
+	var err error
+	l.input, err = l.unresolvedInput.Analyze(s)
+	if err != nil {
+		return nil, err
+	}
+	return l, nil
+}
+
+func (l *offset) Schema() Schema {
+	return l.input.Schema()
+}
+
+func (l *offset) String() string {
+	return fmt.Sprintf("Offset: %d", l.offsetNum)
+}
+
+func (l *offset) Children() []Plan {
+	return []Plan{l.input}
+}
+
+func (l *offset) Type() PlanType {
+	return PlanOffset
+}
+
+func Offset(input UnresolvedPlan, num uint32) UnresolvedPlan {
+	return &offset{
+		parent: &parent{
+			unresolvedInput: input,
+		},
+		offsetNum: num,
+	}
+}
diff --git a/pkg/query/v2/logical/plan_execution_test.go b/pkg/query/v2/logical/plan_execution_test.go
new file mode 100644
index 0000000..55ade27
--- /dev/null
+++ b/pkg/query/v2/logical/plan_execution_test.go
@@ -0,0 +1,365 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical_test
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+
+	commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/query/v2/logical"
+)
+
+func TestPlanExecution_TableScan_Limit(t *testing.T) {
+	assertT := assert.New(t)
+	streamT, deferFunc := setup(assertT)
+	defer deferFunc()
+	baseTs := setupQueryData(t, "multiple_shards.json", streamT)
+
+	metadata := &commonv2.Metadata{
+		Name:  "sw",
+		Group: "default",
+	}
+
+	sT, eT := baseTs, baseTs.Add(1*time.Hour)
+
+	tests := []struct {
+		name           string
+		unresolvedPlan logical.UnresolvedPlan
+		wantLength     int
+	}{
+		{
+			name:           "Limit 1",
+			unresolvedPlan: logical.Limit(logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil), 1),
+			wantLength:     1,
+		},
+		{
+			name:           "Limit 5",
+			unresolvedPlan: logical.Limit(logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil), 5),
+			wantLength:     5,
+		},
+		{
+			name:           "Limit 10",
+			unresolvedPlan: logical.Limit(logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil), 10),
+			wantLength:     5,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			tester := require.New(t)
+			schema, err := logical.DefaultAnalyzer().BuildStreamSchema(context.TODO(), metadata)
+			tester.NoError(err)
+
+			plan, err := tt.unresolvedPlan.Analyze(schema)
+			tester.NoError(err)
+			tester.NotNil(plan)
+
+			entities, err := plan.Execute(streamT)
+			tester.NoError(err)
+			tester.Len(entities, tt.wantLength)
+			tester.True(logical.SortedByTimestamp(entities, modelv2.QueryOrder_SORT_ASC))
+		})
+	}
+}
+
+func TestPlanExecution_Offset(t *testing.T) {
+	assertT := assert.New(t)
+	streamT, deferFunc := setup(assertT)
+	defer deferFunc()
+	baseTs := setupQueryData(t, "multiple_shards.json", streamT)
+
+	metadata := &commonv2.Metadata{
+		Name:  "sw",
+		Group: "default",
+	}
+
+	sT, eT := baseTs, baseTs.Add(1*time.Hour)
+
+	tests := []struct {
+		name           string
+		unresolvedPlan logical.UnresolvedPlan
+		wantLength     int
+	}{
+		{
+			name:           "Offset 0",
+			unresolvedPlan: logical.Offset(logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil), 0),
+			wantLength:     5,
+		},
+		{
+			name:           "Offset 3",
+			unresolvedPlan: logical.Offset(logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil), 3),
+			wantLength:     2,
+		},
+		{
+			name:           "Limit 5",
+			unresolvedPlan: logical.Offset(logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil), 5),
+			wantLength:     0,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			tester := require.New(t)
+			schema, err := logical.DefaultAnalyzer().BuildStreamSchema(context.TODO(), metadata)
+			tester.NoError(err)
+
+			plan, err := tt.unresolvedPlan.Analyze(schema)
+			tester.NoError(err)
+			tester.NotNil(plan)
+
+			entities, err := plan.Execute(streamT)
+			tester.NoError(err)
+			tester.Len(entities, tt.wantLength)
+		})
+	}
+}
+
+func TestPlanExecution_TraceIDFetch(t *testing.T) {
+	assertT := assert.New(t)
+	streamT, deferFunc := setup(assertT)
+	defer deferFunc()
+	_ = setupQueryData(t, "multiple_shards.json", streamT)
+
+	m := &commonv2.Metadata{
+		Name:  "sw",
+		Group: "default",
+	}
+
+	tests := []struct {
+		name       string
+		traceID    string
+		wantLength int
+	}{
+		{
+			name:       "traceID = 1",
+			traceID:    "1",
+			wantLength: 1,
+		},
+		{
+			name:       "traceID = 2",
+			traceID:    "2",
+			wantLength: 1,
+		},
+		{
+			name:       "traceID = 3",
+			traceID:    "3",
+			wantLength: 1,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			tester := require.New(t)
+			s, err := logical.DefaultAnalyzer().BuildStreamSchema(context.TODO(), m)
+			tester.NoError(err)
+
+			p, err := logical.GlobalIndexScan(m, []logical.Expr{
+				logical.Eq(logical.NewFieldRef("searchable", "trace_id"), logical.Str(tt.traceID)),
+			}, logical.NewTags("searchable", "trace_id")).Analyze(s)
+			tester.NoError(err)
+			tester.NotNil(p)
+			entities, err := p.Execute(streamT)
+			tester.NoError(err)
+			for _, entity := range entities {
+				tester.Len(entity.GetTagFamilies(), 1)
+				tester.Len(entity.GetTagFamilies()[0].GetTags(), 1)
+				tester.Equal(entity.GetTagFamilies()[0].GetTags()[0].GetValue().GetStr().GetValue(), tt.traceID)
+			}
+			tester.Len(entities, tt.wantLength)
+		})
+	}
+}
+
+func TestPlanExecution_IndexScan(t *testing.T) {
+	assertT := assert.New(t)
+	streamT, deferFunc := setup(assertT)
+	defer deferFunc()
+	baseTs := setupQueryData(t, "multiple_shards.json", streamT)
+
+	metadata := &commonv2.Metadata{
+		Name:  "sw",
+		Group: "default",
+	}
+
+	sT, eT := baseTs, baseTs.Add(1*time.Hour)
+
+	tests := []struct {
+		name           string
+		unresolvedPlan logical.UnresolvedPlan
+		wantLength     int
+	}{
+		{
+			name: "Single Index Search using POST without entity returns nothing",
+			unresolvedPlan: logical.IndexScan(sT, eT, metadata, []logical.Expr{
+				logical.Eq(logical.NewFieldRef("searchable", "http.method"), logical.Str("POST")),
+			}, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil),
+			wantLength: 0,
+		},
+		{
+			name: "Single Index Search using inverted index",
+			unresolvedPlan: logical.IndexScan(sT, eT, metadata, []logical.Expr{
+				logical.Eq(logical.NewFieldRef("searchable", "http.method"), logical.Str("GET")),
+			}, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil),
+			wantLength: 3,
+		},
+		{
+			name: "Single Index Search using LSM tree index",
+			unresolvedPlan: logical.IndexScan(sT, eT, metadata, []logical.Expr{
+				logical.Lt(logical.NewFieldRef("searchable", "duration"), logical.Int(100)),
+			}, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil),
+			wantLength: 2,
+		},
+		{
+			name: "Single Index Search without entity returns results",
+			unresolvedPlan: logical.IndexScan(sT, eT, metadata, []logical.Expr{
+				logical.Eq(logical.NewFieldRef("searchable", "endpoint_id"), logical.Str("/home_id")),
+			}, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil),
+			wantLength: 2,
+		},
+		{
+			name: "Multiple Index Search",
+			unresolvedPlan: logical.IndexScan(sT, eT, metadata, []logical.Expr{
+				logical.Eq(logical.NewFieldRef("searchable", "http.method"), logical.Str("GET")),
+				logical.Eq(logical.NewFieldRef("searchable", "endpoint_id"), logical.Str("/home_id")),
+			}, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil),
+			wantLength: 1,
+		},
+		{
+			name: "Multiple Index Search with a combination of numerical index and textual index",
+			unresolvedPlan: logical.IndexScan(sT, eT, metadata, []logical.Expr{
+				logical.Eq(logical.NewFieldRef("searchable", "http.method"), logical.Str("GET")),
+				logical.Lt(logical.NewFieldRef("searchable", "duration"), logical.Int(100)),
+			}, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil),
+			wantLength: 2,
+		},
+		{
+			name: "Multiple Index With One Empty Result(ChunkID)",
+			unresolvedPlan: logical.IndexScan(sT, eT, metadata, []logical.Expr{
+				logical.Eq(logical.NewFieldRef("searchable", "http.method"), logical.Str("GET")),
+				logical.Eq(logical.NewFieldRef("searchable", "endpoint_id"), logical.Str("/unknown")),
+			}, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, nil),
+			wantLength: 0,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			tester := require.New(t)
+			schema, err := logical.DefaultAnalyzer().BuildStreamSchema(context.TODO(), metadata)
+			tester.NoError(err)
+
+			plan, err := tt.unresolvedPlan.Analyze(schema)
+			tester.NoError(err)
+			tester.NotNil(plan)
+
+			entities, err := plan.Execute(streamT)
+			tester.NoError(err)
+			tester.Len(entities, tt.wantLength)
+		})
+	}
+}
+
+func TestPlanExecution_OrderBy(t *testing.T) {
+	assertT := assert.New(t)
+	streamT, deferFunc := setup(assertT)
+	defer deferFunc()
+	baseTs := setupQueryData(t, "multiple_shards.json", streamT)
+
+	metadata := &commonv2.Metadata{
+		Name:  "sw",
+		Group: "default",
+	}
+
+	sT, eT := baseTs, baseTs.Add(1*time.Hour)
+
+	tests := []struct {
+		name            string
+		targetIndexRule string
+		sortDirection   modelv2.QueryOrder_Sort
+		// TODO: avoid hardcoded index?
+		targetFamilyIdx int
+		targetTagIdx    int
+	}{
+		{
+			name:            "Sort By duration ASC",
+			targetIndexRule: "duration",
+			sortDirection:   modelv2.QueryOrder_SORT_ASC,
+			targetFamilyIdx: 0,
+			targetTagIdx:    0,
+		},
+		{
+			name:            "Sort By duration DESC",
+			targetIndexRule: "duration",
+			sortDirection:   modelv2.QueryOrder_SORT_DESC,
+			targetFamilyIdx: 0,
+			targetTagIdx:    0,
+		},
+		{
+			name:            "Sort By start_time DESC",
+			targetIndexRule: "",
+			sortDirection:   modelv2.QueryOrder_SORT_DESC,
+		},
+		{
+			name:            "Sort By start_time ASC",
+			targetIndexRule: "",
+			sortDirection:   modelv2.QueryOrder_SORT_ASC,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			tester := require.New(t)
+			schema, err := logical.DefaultAnalyzer().BuildStreamSchema(context.TODO(), metadata)
+			tester.NoError(err)
+			tester.NotNil(schema)
+
+			if tt.targetIndexRule == "" {
+				p, err := logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+					logical.OrderBy("", tt.sortDirection), logical.NewTags("searchable", "start_time")).
+					Analyze(schema)
+				tester.NoError(err)
+				tester.NotNil(p)
+
+				entities, err := p.Execute(streamT)
+				tester.NoError(err)
+				tester.NotNil(entities)
+
+				tester.True(logical.SortedByTimestamp(entities, tt.sortDirection))
+			} else {
+				p, err := logical.IndexScan(sT, eT, metadata, nil, tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+					logical.OrderBy(tt.targetIndexRule, tt.sortDirection), logical.NewTags("searchable", tt.targetIndexRule)).
+					Analyze(schema)
+				tester.NoError(err)
+				tester.NotNil(p)
+
+				entities, err := p.Execute(streamT)
+				tester.NoError(err)
+				tester.NotNil(entities)
+
+				tester.True(logical.SortedByIndex(entities, tt.targetFamilyIdx, tt.targetTagIdx, tt.sortDirection))
+			}
+		})
+	}
+}
diff --git a/pkg/query/v2/logical/plan_indexscan_global.go b/pkg/query/v2/logical/plan_indexscan_global.go
new file mode 100644
index 0000000..fea123e
--- /dev/null
+++ b/pkg/query/v2/logical/plan_indexscan_global.go
@@ -0,0 +1,146 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+	"fmt"
+	"io"
+	"time"
+
+	"github.com/google/go-cmp/cmp"
+	"github.com/pkg/errors"
+	"google.golang.org/protobuf/types/known/timestamppb"
+
+	commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/index"
+	"github.com/apache/skywalking-banyandb/pkg/query/v2/executor"
+)
+
+var _ Plan = (*globalIndexScan)(nil)
+
+type globalIndexScan struct {
+	schema              Schema
+	metadata            *commonv2.Metadata
+	globalIndexRule     *databasev2.IndexRule
+	expr                Expr
+	projectionFieldRefs [][]*FieldRef
+}
+
+func (t *globalIndexScan) String() string {
+	if len(t.projectionFieldRefs) == 0 {
+		return fmt.Sprintf("GlobalIndexScan: Metadata{group=%s,name=%s},condition=%s; projection=None",
+			t.metadata.GetGroup(), t.metadata.GetName(), t.expr.String())
+	}
+	return fmt.Sprintf("GlobalIndexScan: Metadata{group=%s,name=%s},conditions=%s; projection=%s",
+		t.metadata.GetGroup(), t.metadata.GetName(),
+		t.expr.String(), formatExpr(", ", t.projectionFieldRefs...))
+}
+
+func (t *globalIndexScan) Children() []Plan {
+	return []Plan{}
+}
+
+func (t *globalIndexScan) Type() PlanType {
+	return PlanGlobalIndexScan
+}
+
+func (t *globalIndexScan) Schema() Schema {
+	return t.schema
+}
+
+func (t *globalIndexScan) Equal(plan Plan) bool {
+	if plan.Type() != PlanGlobalIndexScan {
+		return false
+	}
+	other := plan.(*globalIndexScan)
+	return t.metadata.GetGroup() == other.metadata.GetGroup() &&
+		t.metadata.GetName() == other.metadata.GetName() &&
+		cmp.Equal(t.projectionFieldRefs, other.projectionFieldRefs) &&
+		cmp.Equal(t.schema, other.schema) &&
+		cmp.Equal(t.globalIndexRule.GetMetadata().GetId(), other.globalIndexRule.GetMetadata().GetId()) &&
+		cmp.Equal(t.expr, other.expr)
+}
+
+func (t *globalIndexScan) Execute(ec executor.ExecutionContext) ([]*streamv2.Element, error) {
+	shards, err := ec.Shards(nil)
+	if err != nil {
+		return nil, err
+	}
+	var elements []*streamv2.Element
+	for _, shard := range shards {
+		elementsInShard, err := t.executeForShard(ec, shard)
+		if err != nil {
+			return elements, err
+		}
+		elements = append(elements, elementsInShard...)
+	}
+	return elements, nil
+}
+
+func (t *globalIndexScan) executeForShard(ec executor.ExecutionContext, shard tsdb.Shard) ([]*streamv2.Element, error) {
+	var elementsInShard []*streamv2.Element
+	itemIDs, err := shard.Index().Seek(index.Field{
+		Key: index.FieldKey{
+			IndexRuleID: t.globalIndexRule.GetMetadata().GetId(),
+		},
+		Term: t.expr.(*binaryExpr).r.(LiteralExpr).Bytes()[0],
+	})
+	if err != nil || len(itemIDs) < 1 {
+		return elementsInShard, nil
+	}
+	for _, itemID := range itemIDs {
+		segShard, err := ec.Shard(itemID.ShardID)
+		if err != nil {
+			return elementsInShard, errors.WithStack(err)
+		}
+		series, err := segShard.Series().GetByID(itemID.SeriesID)
+		if err != nil {
+			return elementsInShard, errors.WithStack(err)
+		}
+		err = func() error {
+			item, closer, errInner := series.Get(itemID)
+			defer func(closer io.Closer) {
+				_ = closer.Close()
+			}(closer)
+			if errInner != nil {
+				return errors.WithStack(errInner)
+			}
+			tagFamilies, errInner := projectItem(ec, item, t.projectionFieldRefs)
+			if errInner != nil {
+				return errors.WithStack(errInner)
+			}
+			elementID, errInner := ec.ParseElementID(item)
+			if errInner != nil {
+				return errors.WithStack(errInner)
+			}
+			elementsInShard = append(elementsInShard, &streamv2.Element{
+				ElementId:   elementID,
+				Timestamp:   timestamppb.New(time.Unix(0, int64(item.Time()))),
+				TagFamilies: tagFamilies,
+			})
+			return nil
+		}()
+		if err != nil {
+			return nil, err
+		}
+	}
+	return elementsInShard, nil
+}
diff --git a/pkg/query/v2/logical/plan_indexscan_local.go b/pkg/query/v2/logical/plan_indexscan_local.go
new file mode 100644
index 0000000..35619e0
--- /dev/null
+++ b/pkg/query/v2/logical/plan_indexscan_local.go
@@ -0,0 +1,291 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+	"bytes"
+	"fmt"
+	"strings"
+	"time"
+
+	"github.com/google/go-cmp/cmp"
+	"github.com/pkg/errors"
+	"google.golang.org/protobuf/types/known/timestamppb"
+
+	commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/index"
+	"github.com/apache/skywalking-banyandb/pkg/query/v2/executor"
+)
+
+var _ UnresolvedPlan = (*unresolvedIndexScan)(nil)
+
+type unresolvedIndexScan struct {
+	unresolvedOrderBy *UnresolvedOrderBy
+	startTime         time.Time
+	endTime           time.Time
+	metadata          *commonv2.Metadata
+	conditions        []Expr
+	projectionFields  [][]*Tag
+	entity            tsdb.Entity
+}
+
+func (uis *unresolvedIndexScan) Analyze(s Schema) (Plan, error) {
+	localConditionMap := make(map[*databasev2.IndexRule][]Expr)
+	globalConditions := make([]interface{}, 0)
+	for _, cond := range uis.conditions {
+		if resolvable, ok := cond.(ResolvableExpr); ok {
+			err := resolvable.Resolve(s)
+			if err != nil {
+				return nil, err
+			}
+
+			if bCond, ok := cond.(*binaryExpr); ok {
+				tag := bCond.l.(*FieldRef).tag
+				if defined, indexObj := s.IndexDefined(tag); defined {
+					if indexObj.GetLocation() == databasev2.IndexRule_LOCATION_SERIES {
+						if v, exist := localConditionMap[indexObj]; exist {
+							v = append(v, cond)
+							localConditionMap[indexObj] = v
+						} else {
+							localConditionMap[indexObj] = []Expr{cond}
+						}
+					} else if indexObj.GetLocation() == databasev2.IndexRule_LOCATION_GLOBAL {
+						globalConditions = append(globalConditions, indexObj, cond)
+					}
+				} else {
+					return nil, errors.Wrap(ErrIndexNotDefined, tag.GetCompoundName())
+				}
+			}
+		}
+	}
+
+	var projFieldsRefs [][]*FieldRef
+	if uis.projectionFields != nil && len(uis.projectionFields) > 0 {
+		var err error
+		projFieldsRefs, err = s.CreateRef(uis.projectionFields...)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	if len(globalConditions) > 0 {
+		if len(globalConditions)/2 > 1 {
+			return nil, ErrMultipleGlobalIndexes
+		}
+		return &globalIndexScan{
+			schema:              s,
+			projectionFieldRefs: projFieldsRefs,
+			metadata:            uis.metadata,
+			globalIndexRule:     globalConditions[0].(*databasev2.IndexRule),
+			expr:                globalConditions[1].(Expr),
+		}, nil
+	}
+
+	// resolve sub-plan with the projected view of schema
+	orderBySubPlan, err := uis.unresolvedOrderBy.analyze(s.Proj(projFieldsRefs...))
+
+	if err != nil {
+		return nil, err
+	}
+
+	return &localIndexScan{
+		orderBy:             orderBySubPlan,
+		timeRange:           tsdb.NewTimeRange(uis.startTime, uis.endTime),
+		schema:              s,
+		projectionFieldRefs: projFieldsRefs,
+		metadata:            uis.metadata,
+		conditionMap:        localConditionMap,
+		entity:              uis.entity,
+	}, nil
+}
+
+var _ Plan = (*localIndexScan)(nil)
+
+type localIndexScan struct {
+	*orderBy
+	timeRange           tsdb.TimeRange
+	schema              Schema
+	metadata            *commonv2.Metadata
+	conditionMap        map[*databasev2.IndexRule][]Expr
+	projectionFieldRefs [][]*FieldRef
+	entity              tsdb.Entity
+}
+
+func (i *localIndexScan) Execute(ec executor.ExecutionContext) ([]*streamv2.Element, error) {
+	shards, err := ec.Shards(i.entity)
+	if err != nil {
+		return nil, err
+	}
+	var iters []tsdb.Iterator
+	for _, shard := range shards {
+		itersInShard, err := i.executeInShard(shard)
+		if err != nil {
+			return nil, err
+		}
+		iters = append(iters, itersInShard...)
+	}
+
+	c := createComparator(i.sort)
+
+	var elems []*streamv2.Element
+	it := NewItemIter(iters, c)
+	for it.HasNext() {
+		nextItem := it.Next()
+		tagFamilies, innerErr := projectItem(ec, nextItem, i.projectionFieldRefs)
+		if innerErr != nil {
+			return nil, innerErr
+		}
+		elementID, innerErr := ec.ParseElementID(nextItem)
+		if innerErr != nil {
+			return nil, innerErr
+		}
+		elems = append(elems, &streamv2.Element{
+			ElementId:   elementID,
+			Timestamp:   timestamppb.New(time.Unix(0, int64(nextItem.Time()))),
+			TagFamilies: tagFamilies,
+		})
+	}
+	return elems, nil
+}
+
+func (i *localIndexScan) executeInShard(shard tsdb.Shard) ([]tsdb.Iterator, error) {
+	seriesList, err := shard.Series().List(tsdb.NewPath(i.entity))
+	if err != nil {
+		return nil, err
+	}
+
+	var builders []seekerBuilder
+
+	if i.index != nil {
+		builders = append(builders, func(builder tsdb.SeekerBuilder) {
+			builder.OrderByIndex(i.index, i.sort)
+		})
+	} else {
+		builders = append(builders, func(builder tsdb.SeekerBuilder) {
+			builder.OrderByTime(i.sort)
+		})
+	}
+
+	if i.conditionMap != nil && len(i.conditionMap) > 0 {
+		builders = append(builders, func(b tsdb.SeekerBuilder) {
+			for idxRule, exprs := range i.conditionMap {
+				b.Filter(idxRule, exprToCondition(exprs))
+			}
+		})
+	}
+
+	return executeForShard(seriesList, i.timeRange, builders...)
+}
+
+func (i *localIndexScan) String() string {
+	exprStr := make([]string, 0, len(i.conditionMap))
+	for _, conditions := range i.conditionMap {
+		var conditionStr []string
+		for _, cond := range conditions {
+			conditionStr = append(conditionStr, cond.String())
+		}
+		exprStr = append(exprStr, fmt.Sprintf("(%s)", strings.Join(conditionStr, " AND ")))
+	}
+	if len(i.projectionFieldRefs) == 0 {
+		return fmt.Sprintf("IndexScan: startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; projection=None",
+			i.timeRange.Start.Unix(), i.timeRange.End.Unix(), i.metadata.GetGroup(), i.metadata.GetName(), strings.Join(exprStr, " AND "))
+	}
+	return fmt.Sprintf("IndexScan: startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; projection=%s",
+		i.timeRange.Start.Unix(), i.timeRange.End.Unix(), i.metadata.GetGroup(), i.metadata.GetName(),
+		strings.Join(exprStr, " AND "), formatExpr(", ", i.projectionFieldRefs...))
+}
+
+func (i *localIndexScan) Type() PlanType {
+	return PlanLocalIndexScan
+}
+
+func (i *localIndexScan) Children() []Plan {
+	return []Plan{}
+}
+
+func (i *localIndexScan) Schema() Schema {
+	if i.projectionFieldRefs == nil || len(i.projectionFieldRefs) == 0 {
+		return i.schema
+	}
+	return i.schema.Proj(i.projectionFieldRefs...)
+}
+
+func (i *localIndexScan) Equal(plan Plan) bool {
+	if plan.Type() != PlanLocalIndexScan {
+		return false
+	}
+	other := plan.(*localIndexScan)
+	return i.metadata.GetGroup() == other.metadata.GetGroup() &&
+		i.metadata.GetName() == other.metadata.GetName() &&
+		i.timeRange.Start.UnixNano() == other.timeRange.Start.UnixNano() &&
+		i.timeRange.End.UnixNano() == other.timeRange.End.UnixNano() &&
+		bytes.Equal(i.entity.Marshal(), other.entity.Marshal()) &&
+		cmp.Equal(i.projectionFieldRefs, other.projectionFieldRefs) &&
+		cmp.Equal(i.schema, other.schema) &&
+		cmp.Equal(i.conditionMap, other.conditionMap) &&
+		cmp.Equal(i.orderBy, other.orderBy)
+}
+
+func IndexScan(startTime, endTime time.Time, metadata *commonv2.Metadata, conditions []Expr, entity tsdb.Entity,
+	orderBy *UnresolvedOrderBy, projection ...[]*Tag) UnresolvedPlan {
+	return &unresolvedIndexScan{
+		unresolvedOrderBy: orderBy,
+		startTime:         startTime,
+		endTime:           endTime,
+		metadata:          metadata,
+		conditions:        conditions,
+		projectionFields:  projection,
+		entity:            entity,
+	}
+}
+
+// GlobalIndexScan is a short-hand method for composing a globalIndexScan plan
+func GlobalIndexScan(metadata *commonv2.Metadata, conditions []Expr, projection ...[]*Tag) UnresolvedPlan {
+	return &unresolvedIndexScan{
+		metadata:         metadata,
+		conditions:       conditions,
+		projectionFields: projection,
+	}
+}
+
+func exprToCondition(exprs []Expr) tsdb.Condition {
+	cond := make(map[string][]index.ConditionValue)
+	for _, expr := range exprs {
+		bExpr := expr.(*binaryExpr)
+		l := bExpr.l.(*FieldRef)
+		r := bExpr.r.(LiteralExpr)
+		if existingList, ok := cond[l.tag.GetTagName()]; ok {
+			existingList = append(existingList, index.ConditionValue{
+				Values: r.Bytes(),
+				Op:     bExpr.op,
+			})
+			cond[l.tag.GetTagName()] = existingList
+		} else {
+			cond[l.tag.GetTagName()] = []index.ConditionValue{
+				{
+					Values: r.Bytes(),
+					Op:     bExpr.op,
+				},
+			}
+		}
+	}
+	return cond
+}
diff --git a/pkg/query/v2/logical/plan_orderby.go b/pkg/query/v2/logical/plan_orderby.go
new file mode 100644
index 0000000..254f0b2
--- /dev/null
+++ b/pkg/query/v2/logical/plan_orderby.go
@@ -0,0 +1,171 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+	"bytes"
+	"fmt"
+	"sort"
+
+	"github.com/pkg/errors"
+
+	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+type UnresolvedOrderBy struct {
+	sort                modelv2.QueryOrder_Sort
+	targetIndexRuleName string
+}
+
+func (u *UnresolvedOrderBy) analyze(s Schema) (*orderBy, error) {
+	if u == nil {
+		// return a default orderBy sub-plan
+		return &orderBy{
+			sort: modelv2.QueryOrder_SORT_UNSPECIFIED,
+		}, nil
+	}
+
+	if u.targetIndexRuleName == "" {
+		return &orderBy{
+			sort: u.sort,
+		}, nil
+	}
+
+	defined, indexRule := s.IndexRuleDefined(u.targetIndexRuleName)
+	if !defined {
+		return nil, errors.Wrap(ErrIndexNotDefined, u.targetIndexRuleName)
+	}
+
+	projFieldSpecs, err := s.CreateRef(NewTags("", indexRule.GetTags()...))
+
+	if err != nil {
+		return nil, ErrFieldNotDefined
+	}
+
+	return &orderBy{
+		sort:      u.sort,
+		index:     indexRule,
+		fieldRefs: projFieldSpecs[0],
+	}, nil
+}
+
+type orderBy struct {
+	// orderByIndex describes the indexRule used to sort the elements/
+	// It can be null since by default we may sort by created-time.
+	index *databasev2.IndexRule
+	// while orderBySort describes the sort direction
+	sort modelv2.QueryOrder_Sort
+	// TODO: support multiple tags. Currently only the first member will be used for sorting.
+	fieldRefs []*FieldRef
+}
+
+func (o *orderBy) Equal(other interface{}) bool {
+	if otherOrderBy, ok := other.(*orderBy); ok {
+		if o == nil && otherOrderBy == nil {
+			return true
+		}
+		if o != nil && otherOrderBy == nil || o == nil && otherOrderBy != nil {
+			return false
+		}
+		return o.sort == otherOrderBy.sort &&
+			o.index.GetMetadata().GetName() == otherOrderBy.index.GetMetadata().GetName()
+	}
+
+	return false
+}
+
+func (o *orderBy) String() string {
+	return fmt.Sprintf("OrderBy: %v, sort=%s", o.index.GetTags(), o.sort.String())
+}
+
+func OrderBy(indexRuleName string, sort modelv2.QueryOrder_Sort) *UnresolvedOrderBy {
+	return &UnresolvedOrderBy{
+		sort:                sort,
+		targetIndexRuleName: indexRuleName,
+	}
+}
+
+func getRawTagValue(typedPair *modelv2.Tag) ([]byte, error) {
+	switch v := typedPair.GetValue().Value.(type) {
+	case *modelv2.TagValue_Str:
+		return []byte(v.Str.GetValue()), nil
+	case *modelv2.TagValue_Int:
+		return convert.Int64ToBytes(v.Int.GetValue()), nil
+	default:
+		return nil, errors.New("unsupported data types")
+	}
+}
+
+// SortedByIndex is used to test whether the given entities are sorted by the sortDirection
+// The given entities MUST satisfy both the positive check and the negative check for the reversed direction
+func SortedByIndex(elements []*streamv2.Element, tagFamilyIdx, tagIdx int, sortDirection modelv2.QueryOrder_Sort) bool {
+	if modelv2.QueryOrder_SORT_UNSPECIFIED == sortDirection {
+		sortDirection = modelv2.QueryOrder_SORT_ASC
+	}
+	if len(elements) == 1 {
+		return true
+	}
+	return sort.SliceIsSorted(elements, sortByIndex(elements, tagFamilyIdx, tagIdx, sortDirection)) &&
+		!sort.SliceIsSorted(elements, sortByIndex(elements, tagFamilyIdx, tagIdx, reverseSortDirection(sortDirection)))
+}
+
+func SortedByTimestamp(elements []*streamv2.Element, sortDirection modelv2.QueryOrder_Sort) bool {
+	if modelv2.QueryOrder_SORT_UNSPECIFIED == sortDirection {
+		sortDirection = modelv2.QueryOrder_SORT_ASC
+	}
+	if len(elements) == 1 {
+		return true
+	}
+	return sort.SliceIsSorted(elements, sortByTimestamp(elements, sortDirection)) &&
+		!sort.SliceIsSorted(elements, sortByTimestamp(elements, reverseSortDirection(sortDirection)))
+}
+
+func reverseSortDirection(sort modelv2.QueryOrder_Sort) modelv2.QueryOrder_Sort {
+	if sort == modelv2.QueryOrder_SORT_DESC {
+		return modelv2.QueryOrder_SORT_ASC
+	}
+	return modelv2.QueryOrder_SORT_DESC
+}
+
+func sortByIndex(entities []*streamv2.Element, tagFamilyIdx, tagIdx int, sortDirection modelv2.QueryOrder_Sort) func(i, j int) bool {
+	return func(i, j int) bool {
+		iPair := entities[i].GetTagFamilies()[tagFamilyIdx].GetTags()[tagIdx]
+		jPair := entities[j].GetTagFamilies()[tagFamilyIdx].GetTags()[tagIdx]
+		lField, _ := getRawTagValue(iPair)
+		rField, _ := getRawTagValue(jPair)
+		comp := bytes.Compare(lField, rField)
+		if sortDirection == modelv2.QueryOrder_SORT_ASC {
+			return comp == -1
+		}
+		return comp == 1
+	}
+}
+
+func sortByTimestamp(entities []*streamv2.Element, sortDirection modelv2.QueryOrder_Sort) func(i, j int) bool {
+	return func(i, j int) bool {
+		iPair := entities[i].GetTimestamp().AsTime().UnixNano()
+		jPair := entities[j].GetTimestamp().AsTime().UnixNano()
+		if sortDirection == modelv2.QueryOrder_SORT_DESC {
+			return iPair > jPair
+		}
+		return iPair < jPair
+	}
+}
diff --git a/pkg/query/v2/logical/schema.go b/pkg/query/v2/logical/schema.go
new file mode 100644
index 0000000..13130e4
--- /dev/null
+++ b/pkg/query/v2/logical/schema.go
@@ -0,0 +1,176 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+	"github.com/google/go-cmp/cmp"
+	"github.com/pkg/errors"
+
+	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
+)
+
+type Schema interface {
+	EntityList() []string
+	IndexDefined(*Tag) (bool, *databasev2.IndexRule)
+	IndexRuleDefined(string) (bool, *databasev2.IndexRule)
+	FieldSubscript(string) (bool, int)
+	FieldDefined(string) bool
+	CreateRef(tags ...[]*Tag) ([][]*FieldRef, error)
+	Proj(refs ...[]*FieldRef) Schema
+	Equal(Schema) bool
+	ShardNumber() uint32
+	TraceIDFieldName() string
+}
+
+type tagSpec struct {
+	// Idx is defined as
+	// 1) the field index based on the (trace) schema for the underlying plans which
+	//    directly interact with the database and index modules,
+	// 2) the projection index given by the users for those plans which can only access the data from parent plans,
+	//    e.g. orderBy plan uses this projection index to access the data entities (normally a projection view)
+	//    from the parent plan.
+	TagFamilyIdx int
+	TagIdx       int
+	spec         *databasev2.TagSpec
+}
+
+func (fs *tagSpec) Equal(other *tagSpec) bool {
+	return fs.TagFamilyIdx == other.TagFamilyIdx && fs.TagIdx == other.TagIdx &&
+		fs.spec.GetType() == other.spec.GetType() && fs.spec.GetName() == other.spec.GetName()
+}
+
+var _ Schema = (*schema)(nil)
+
+type schema struct {
+	stream     *databasev2.Stream
+	indexRules []*databasev2.IndexRule
+	fieldMap   map[string]*tagSpec
+	entityList []string
+}
+
+func (s *schema) IndexRuleDefined(indexRuleName string) (bool, *databasev2.IndexRule) {
+	for _, idxRule := range s.indexRules {
+		if idxRule.GetMetadata().GetName() == indexRuleName {
+			return true, idxRule
+		}
+	}
+	return false, nil
+}
+
+func (s *schema) EntityList() []string {
+	return s.entityList
+}
+
+func (s *schema) TraceIDFieldName() string {
+	// TODO: how to extract trace_id?
+	return "trace_id"
+}
+
+// IndexDefined checks whether the field given is indexed
+func (s *schema) IndexDefined(tag *Tag) (bool, *databasev2.IndexRule) {
+	for _, idxRule := range s.indexRules {
+		for _, tagName := range idxRule.GetTags() {
+			if tag.GetTagName() == tagName {
+				return true, idxRule
+			}
+		}
+	}
+	return false, nil
+}
+
+func (s *schema) FieldSubscript(field string) (bool, int) {
+	// TODO: what does this mean
+	for i, indexObj := range s.indexRules {
+		for _, fieldName := range indexObj.GetTags() {
+			if field == fieldName {
+				return true, i
+			}
+		}
+	}
+	return false, -1
+}
+
+func (s *schema) Equal(s2 Schema) bool {
+	if other, ok := s2.(*schema); ok {
+		return cmp.Equal(other.fieldMap, s.fieldMap)
+	}
+	return false
+}
+
+// registerField registers the field spec with given tagFamilyName, tagName and indexes.
+func (s *schema) registerField(tagName string, tagFamilyIdx, tagIdx int, spec *databasev2.TagSpec) {
+	s.fieldMap[tagName] = &tagSpec{
+		TagIdx:       tagIdx,
+		TagFamilyIdx: tagFamilyIdx,
+		spec:         spec,
+	}
+}
+
+func (s *schema) FieldDefined(name string) bool {
+	if _, ok := s.fieldMap[name]; ok {
+		return true
+	}
+	return false
+}
+
+// CreateRef create FieldRef to the given tags.
+// The family name of the tag is actually not used
+// since the uniqueness of the tag names can be guaranteed across families.
+func (s *schema) CreateRef(tags ...[]*Tag) ([][]*FieldRef, error) {
+	fieldRefs := make([][]*FieldRef, len(tags))
+	for i, tagInFamily := range tags {
+		var fieldRefsInFamily []*FieldRef
+		for _, tag := range tagInFamily {
+			if fs, ok := s.fieldMap[tag.GetTagName()]; ok {
+				fieldRefsInFamily = append(fieldRefsInFamily, &FieldRef{tag, fs})
+			} else {
+				return nil, errors.Wrap(ErrFieldNotDefined, tag.GetCompoundName())
+			}
+		}
+		fieldRefs[i] = fieldRefsInFamily
+	}
+	return fieldRefs, nil
+}
+
+// Proj creates a projection view from the present schema
+// with a given list of projections
+func (s *schema) Proj(refs ...[]*FieldRef) Schema {
+	if len(refs) == 0 {
+		return nil
+	}
+	newSchema := &schema{
+		stream:     s.stream,
+		indexRules: s.indexRules,
+		fieldMap:   make(map[string]*tagSpec),
+		entityList: s.entityList,
+	}
+	for projFamilyIdx, refInFamily := range refs {
+		for projIdx, ref := range refInFamily {
+			newSchema.fieldMap[ref.tag.GetTagName()] = &tagSpec{
+				TagFamilyIdx: projFamilyIdx,
+				TagIdx:       projIdx,
+				spec:         ref.Spec.spec,
+			}
+		}
+	}
+	return newSchema
+}
+
+func (s *schema) ShardNumber() uint32 {
+	return s.stream.ShardNum
+}
diff --git a/pkg/query/v2/logical/testdata/global_index.json b/pkg/query/v2/logical/testdata/global_index.json
new file mode 100644
index 0000000..9e81928
--- /dev/null
+++ b/pkg/query/v2/logical/testdata/global_index.json
@@ -0,0 +1,64 @@
+[
+  {
+    "tags": [
+      {"str":{"value": "1"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 1000}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "2"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.3_id"}},
+      {"str":{"value": "/product_id"}},
+      {"int":{"value": 500}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "1"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 30}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "2"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.5_id"}},
+      {"str":{"value": "/price_id"}},
+      {"int":{"value": 60}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "400"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "2"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/item_id"}},
+      {"int":{"value": 300}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  }
+  
+]
\ No newline at end of file
diff --git a/pkg/query/v2/logical/testdata/multiple_shards.json b/pkg/query/v2/logical/testdata/multiple_shards.json
new file mode 100644
index 0000000..ec6427e
--- /dev/null
+++ b/pkg/query/v2/logical/testdata/multiple_shards.json
@@ -0,0 +1,64 @@
+[
+  {
+    "tags": [
+      {"str":{"value": "1"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 1000}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "2"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.3_id"}},
+      {"str":{"value": "/product_id"}},
+      {"int":{"value": 500}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "3"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 30}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "4"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.5_id"}},
+      {"str":{"value": "/price_id"}},
+      {"int":{"value": 60}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "400"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "5"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/item_id"}},
+      {"int":{"value": 300}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  }
+  
+]
\ No newline at end of file