You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/08/11 13:05:48 UTC

[skywalking-banyandb] branch main updated: feat: Implement Write in the liaison (#30)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 32524d2  feat: Implement Write in the liaison (#30)
32524d2 is described below

commit 32524d2fbd3e42d6d7693bb78f1d1236fda38e84
Author: Fine0830 <fi...@outlook.com>
AuthorDate: Wed Aug 11 21:05:28 2021 +0800

    feat: Implement Write in the liaison (#30)
    
    * feat: implement Write and Query
    
    * feat: add test server
    
    * feat: get field subscripts
    
    * feat: generate shardID
    
    * fix: compute shardID
    
    * fix: update test data
    
    * update
    
    * feat: use protobuf
    
    * fix: update incorrect syntax
    
    * Fix service register failures
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
    
    * feat: update grpc
    
    * feat: publish and subscribe write data
    
    * feat: add series id
    
    * fix: write data
    
    * feat: add pipeline in series
    
    * feat: write trace data
    
    * feat: use pipeline
    
    * fix: update custom event
    
    * fix: update repo
    
    * fix: grpc test
    
    * fix: add param
    
    * fix: grpc test
    
    * fix: lint
    
    * fix: update header ignore files
    
    * fix: use embed to access to files
    
    * fix: update header ignore files
    
    * fix: format code
    
    * refactor: address pr
    
    * revert: flags
    
    * fix: update schemaMap in writeListener
    
    * fix: use assert in tests
    
    * fix: add sync.RWMutex to protect seriesEvent and shardEvent
    
    * fix: update server opts
    
    * fix: remove dead code
    
    * fix: update header ignore files
    
    * fix: set maxRecMsgSize flag
    
    * fix: typo
    
    * fix: address pr
    
    * fix: update flags
    
    * fix: add tls tests
    
    * fix: update test
    
    * fix: lint
    
    * revert version
    
    * refactor: update flags
    
    * fix: add sync.RWMutex to protect events
    
    * fix: lint
    
    * feat: separate shardid and seriesid func
    
    * feat: add maps for events
    
    * fix: update RWMutex
    
    * fix: update test
    
    * feat: generate tls certificate
    
    * fix: update flags
    
    * fix: update grpc test
    
    Co-authored-by: Gao Hongtao <ha...@gmail.com>
---
 .licenserc.yaml                           |   3 +-
 api/data/trace.go                         |  16 ++
 api/event/discovery.go                    |   6 +-
 api/proto/banyandb/v1/write.pb.go         |   2 +-
 api/proto/banyandb/v1/write.proto         |   2 +-
 banyand/internal/cmd/standalone.go        |   2 +-
 banyand/liaison/grpc/data/server_cert.pem |  23 +++
 banyand/liaison/grpc/data/server_key.pem  |  27 +++
 banyand/liaison/grpc/grpc.go              | 262 +++++++++++++++++++++++++++---
 banyand/liaison/grpc/grpc_test.go         | 250 ++++++++++++++++++++++++++++
 banyand/query/processor_test.go           |   8 +-
 banyand/series/trace/common_test.go       |   4 +-
 banyand/series/trace/query.go             |   5 +-
 banyand/series/trace/service.go           |  57 +++++--
 banyand/series/trace/trace.go             |   6 +-
 banyand/series/trace/write.go             |   5 +-
 banyand/series/trace/write_test.go        |   6 +-
 pkg/partition/route.go                    |  13 +-
 pkg/query/logical/expr.go                 |  12 +-
 pkg/query/logical/plan_orderby.go         |   2 +-
 pkg/query/logical/schema.go               |  21 ++-
 21 files changed, 668 insertions(+), 64 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index 3049c0b..3f25016 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -74,10 +74,9 @@ header: # `header` section is configurations for source codes license header.
     - '**/*.json'
     - '**/*_mock.go'
     - '**/*_mock_test.go'
-    - '**/*_generated.go'
-    - '**/Trace_grpc.go'
     - '**/*.pb.go'
     - '**/*.textproto'
+    - '**/*.pem'
 
   comment: on-failure # on what condition license-eye will comment on the pull request, `on-failure`, `always`, `never`.
 
diff --git a/api/data/trace.go b/api/data/trace.go
index 5351990..6d59308 100644
--- a/api/data/trace.go
+++ b/api/data/trace.go
@@ -20,10 +20,17 @@ package data
 import (
 	"github.com/apache/skywalking-banyandb/api/common"
 	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
 )
 
 var TraceKindVersion = common.KindVersion{Version: "v1", Kind: "data-trace"}
 
+var WriteEventKindVersion = common.KindVersion{
+	Version: "v1",
+	Kind:    "trace-write",
+}
+var TopicWriteEvent = bus.UniTopic(WriteEventKindVersion.String())
+
 type Trace struct {
 	common.KindVersion
 	Entities []Entity
@@ -36,6 +43,15 @@ type Entity struct {
 type EntityValue struct {
 	*v1.EntityValue
 }
+type TraceWriteDate struct {
+	ShardID      uint
+	SeriesID     uint64
+	WriteRequest *v1.WriteRequest
+}
+type Write struct {
+	common.KindVersion
+	Payload *TraceWriteDate
+}
 
 func NewTrace() *Trace {
 	return &Trace{KindVersion: TraceKindVersion}
diff --git a/api/event/discovery.go b/api/event/discovery.go
index 10476b8..7b55de8 100644
--- a/api/event/discovery.go
+++ b/api/event/discovery.go
@@ -28,12 +28,14 @@ var (
 		Version: "v1",
 		Kind:    "event-shard",
 	}
-	TopicShardEvent        = bus.UniTopic(ShardEventKindVersion.String())
+	TopicShardEvent = bus.UniTopic(ShardEventKindVersion.String())
+
 	SeriesEventKindVersion = common.KindVersion{
 		Version: "v1",
 		Kind:    "event-series",
 	}
-	TopicSeriesEvent     = bus.UniTopic(SeriesEventKindVersion.String())
+	TopicSeriesEvent = bus.UniTopic(SeriesEventKindVersion.String())
+
 	IndexRuleKindVersion = common.KindVersion{Version: "v1", Kind: "index-rule"}
 	TopicIndexRule       = bus.UniTopic(IndexRuleKindVersion.String())
 )
diff --git a/api/proto/banyandb/v1/write.pb.go b/api/proto/banyandb/v1/write.pb.go
index 6808076..c2c1026 100644
--- a/api/proto/banyandb/v1/write.pb.go
+++ b/api/proto/banyandb/v1/write.pb.go
@@ -364,7 +364,7 @@ type EntityValue struct {
 	// binary representation of segments, including tags, spans...
 	DataBinary []byte `protobuf:"bytes,3,opt,name=data_binary,json=dataBinary,proto3" json:"data_binary,omitempty"`
 	// support all of indexed fields in the fields.
-	// Pair only has value, as the value of PairValue match with the key
+	// Field only has value, as the value of value_type match with the key
 	// by the index rules and index rule bindings of Metadata group.
 	// indexed fields of multiple entities are compression in the fields.
 	Fields []*Field `protobuf:"bytes,4,rep,name=fields,proto3" json:"fields,omitempty"`
diff --git a/api/proto/banyandb/v1/write.proto b/api/proto/banyandb/v1/write.proto
index 3d7ddc0..02e2d1a 100644
--- a/api/proto/banyandb/v1/write.proto
+++ b/api/proto/banyandb/v1/write.proto
@@ -62,7 +62,7 @@ message EntityValue {
   // binary representation of segments, including tags, spans...
   bytes data_binary = 3;
   // support all of indexed fields in the fields.
-  // Pair only has value, as the value of PairValue match with the key
+  // Field only has value, as the value of value_type match with the key
   // by the index rules and index rule bindings of Metadata group.
   // indexed fields of multiple entities are compression in the fields.
   repeated Field fields = 4;
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index d694895..1aa0b4f 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -61,7 +61,7 @@ func newStandaloneCmd() *cobra.Command {
 	if err != nil {
 		l.Fatal().Err(err).Msg("failed to initiate index builder")
 	}
-	traceSeries, err := trace.NewService(ctx, db, repo, idx)
+	traceSeries, err := trace.NewService(ctx, db, repo, idx, pipeline)
 	if err != nil {
 		l.Fatal().Err(err).Msg("failed to initiate trace series")
 	}
diff --git a/banyand/liaison/grpc/data/server_cert.pem b/banyand/liaison/grpc/data/server_cert.pem
new file mode 100644
index 0000000..d7e693f
--- /dev/null
+++ b/banyand/liaison/grpc/data/server_cert.pem
@@ -0,0 +1,23 @@
+-----BEGIN CERTIFICATE-----
+MIIDyzCCArOgAwIBAgIJAMtUjj8gb0gnMA0GCSqGSIb3DQEBCwUAMIGIMQswCQYD
+VQQGEwJDTjELMAkGA1UECAwCU0MxCzAJBgNVBAcMAkNEMRMwEQYDVQQKDApza3l3
+YWxraW5nMREwDwYDVQQLDAhiYW55YW5kYjEjMCEGCSqGSIb3DQEJARYUZmFueHVl
+MDgzMEBlbWFpbC5jb20xEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0yMTA4MTEwNjU0
+MjNaFw0yNjA4MTAwNjU0MjNaMIGIMQswCQYDVQQGEwJDTjELMAkGA1UECAwCU0Mx
+CzAJBgNVBAcMAkNEMRMwEQYDVQQKDApza3l3YWxraW5nMREwDwYDVQQLDAhiYW55
+YW5kYjEjMCEGCSqGSIb3DQEJARYUZmFueHVlMDgzMEBlbWFpbC5jb20xEjAQBgNV
+BAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMwN
+2N/JCT64EMySDMqwA4hJ2lDY6ro1BMgnfjFXZNg2dZagMYRwNao+D6HSl1y9Apr4
+EGC46hGuL92zrAUHEeMNZrEWBEB16UrnbyQeT4d1/Lq9tywGG0j2uvjCO5zMN43e
+WX8o7c9gZOPPQp/MKFFDZtxbj5+Ve2LWBdx1VWkIvJzITlLbmnASjBXzWs1kPrJY
+GiSrAASyb9nKNn5Lvw3bdNgOGLBQmd6jRKOkKHAEvFCHSKpQV4R2lEWAOb1NSfYr
+/YFz+GGc+HjZeSZt/4nIpMa6+jOs/2LQybz0jVtAprBE8lVRMSn0L+Ud9Gj7oJcJ
+hOJCoCTG+0vfGl0P3I0CAwEAAaM2MDQwMgYDVR0RBCswKYIJbG9jYWxob3N0ggtl
+eGFtcGxlLmNvbYIPd3d3LmV4YW1wbGUuY29tMA0GCSqGSIb3DQEBCwUAA4IBAQCJ
+L7mG1mjG+9zg5Hl4TbakQ/zxnz3VuAUN+8WEVcTY65Tc4LbabD56mf5PumDQaGzz
+mFhblsi2mXk4H+dbUCh3wEgpcbmCjZKa5Kuyypfcg8JgFzIS3PwNp/BCzxdP2uqA
+Lw1lHYczyWIaW+M7tVG4V9ewpo/DOX5xlkMw0kDJatYxCJ8CzSzFropNIeINkEX1
+dT/vDtH0qwWNUDGf5yr8dnjowKCRczmc1FY7hV+Q0SBZKhqHkTKrxgjT7eVNG5CR
+5xajFa96ut8UXTFd9+TXJNyPaYVotMD4WcLShc31yfwGR+Y3YpmH8uHzN3NRNwPJ
+wcIM5AFYEIf0ing6eIws
+-----END CERTIFICATE-----
diff --git a/banyand/liaison/grpc/data/server_key.pem b/banyand/liaison/grpc/data/server_key.pem
new file mode 100644
index 0000000..fc80ebe
--- /dev/null
+++ b/banyand/liaison/grpc/data/server_key.pem
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpAIBAAKCAQEAzA3Y38kJPrgQzJIMyrADiEnaUNjqujUEyCd+MVdk2DZ1lqAx
+hHA1qj4PodKXXL0CmvgQYLjqEa4v3bOsBQcR4w1msRYEQHXpSudvJB5Ph3X8ur23
+LAYbSPa6+MI7nMw3jd5Zfyjtz2Bk489Cn8woUUNm3FuPn5V7YtYF3HVVaQi8nMhO
+UtuacBKMFfNazWQ+slgaJKsABLJv2co2fku/Ddt02A4YsFCZ3qNEo6QocAS8UIdI
+qlBXhHaURYA5vU1J9iv9gXP4YZz4eNl5Jm3/icikxrr6M6z/YtDJvPSNW0CmsETy
+VVExKfQv5R30aPuglwmE4kKgJMb7S98aXQ/cjQIDAQABAoIBAQCL4VowXmHmCswJ
+UH1QXMSvIuFz1p9iMoIqq1gIfv50cTC+puYLAdjn8U9KAVEdk7w7e53OkDR1FlFd
+y5M6hxQt77vb3VngznO0k15PBjBCjhFH+lGc7jq6E9ksOgoffKcAq9HyJ56OMGg4
++pWTcaKZwni6ylF0dkZ1BH5UGGMKcmX+E6NW13tUgU2fRGttjGFvUbHflCNg2vJh
+3IEFUmDGoDvJt4tdK2C1sBv7+cbW1+XBnPa9Gja2MF+v6DIs6AIpnHKwg33Bk33Q
+FX9oNErRYdtT3A9dcvR0qP6uVyVBPRREiuyBFJ0LUGXsbWwIjoPQ8SBZv1LaB1sW
+lo37BX/JAoGBAPuFJ07YNxkCniItKybPzT8mKKUPWS5iJy5pe5hBYSuIKSXZXhBl
+ETwhO/zpilwN8khXWokSW6QU/JmLQnHyG9ox3o5B/eAhCVCBZMe7eJaHJhXifj2A
+1i6KzAePIykUi4i5XKy5XSgRjyfO7E3ofW7MyDvg/OLWYX66MMcDVAf/AoGBAM+w
+Q7twak11okLckJKdtkasy7g5bHhKvksnw2G/CKave1c/CULPnSYYkiffiinUvVoP
+eUM8ANnFY36p1Lz+tE7g7VN1LAzN10ZwCPEdWZ0sjCveasiQrpNCXNWl5IRyRkQT
+m70Cr78zX5ahMIrU0ArLVRLVFB2ZX3lpnOK7h7tzAoGAIPKjYI+wQAV4w49ZLL9h
+6pjMEDs/enT/HvRQbXR7DyHKChw8Vzd2F4NfAVVye3aUO2e+A2C1QnxBTrfQX27Q
+uTd5KPd6E0cgmjwpAIUNWeKgWZOO5+2doQErkv3sJDB9ys5FVpb9ngcW0qcni1ke
+PUp0HGvvlKNyqBAp3ZgRBO8CgYBdEwcnpxdco00WXbZEnn0jayjY5JMhzY0+LRG4
+al48JQRHcy55TIWGnxhQ2jMW0AoTpD+Zy/gtn/IYv49hK1wuxUpWTnpxOoYxQOAg
+/iA8+cvPlRuRypUR1Xm5HWEtofCvbYIr0Fpme2VpIc+ZSAn77Gexyt/669MHnDb8
+vUH01QKBgQC6f88E/9nOb+Xwf2ZXSpK3BQQixPdi+wRY+smqYomSH+Vvou/ny8Cu
+AExWLc1KIH39tvs3e7RarB+hVDTGoHDD86basJ2eaaZdJfGUU0GGA39yRayn9xWz
+3eGVnQLmbguB9R4XYyF5lkp6qzopqp0eGNYMSxtvIhIYroJfOkb6vA==
+-----END RSA PRIVATE KEY-----
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index 91a75d7..39451c5 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -19,64 +19,142 @@ package grpc
 
 import (
 	"context"
+	"fmt"
+	"io"
+	"log"
 	"net"
+	"path/filepath"
+	"runtime"
+	"strings"
+	"sync"
+	"time"
 
-	"google.golang.org/grpc"
+	"github.com/pkg/errors"
+	grpclib "google.golang.org/grpc"
+	"google.golang.org/grpc/credentials"
 
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/api/data"
 	"github.com/apache/skywalking-banyandb/api/event"
 	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+	apischema "github.com/apache/skywalking-banyandb/api/schema"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/partition"
+	"github.com/apache/skywalking-banyandb/pkg/query/logical"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+var (
+	ErrSeriesEvents    = errors.New("no seriesEvent")
+	ErrShardEvents     = errors.New("no shardEvent")
+	ErrInvalidSeriesID = errors.New("invalid seriesID")
+	ErrServerCert      = errors.New("invalid server cert file")
+	ErrServerKey       = errors.New("invalid server key file")
+	ErrNoAddr          = errors.New("no address")
+)
+
 type Server struct {
-	addr       string
-	log        *logger.Logger
-	ser        *grpc.Server
-	pipeline   queue.Queue
-	repo       discovery.ServiceRepo
-	shardInfo  *shardInfo
-	seriesInfo *seriesInfo
+	addr           string
+	maxRecvMsgSize int
+	tlsVal         bool
+	certFile       string
+	keyFile        string
+	log            *logger.Logger
+	ser            *grpclib.Server
+	pipeline       queue.Queue
+	repo           discovery.ServiceRepo
+	shardInfo      *shardInfo
+	seriesInfo     *seriesInfo
+	v1.UnimplementedTraceServiceServer
 }
 
 type shardInfo struct {
-	log *logger.Logger
+	log        *logger.Logger
+	shardEvent *shardEvent
 }
 
 func (s *shardInfo) Rev(message bus.Message) (resp bus.Message) {
-	shardEvent, ok := message.Data().(*v1.ShardEvent)
+	event, ok := message.Data().(*v1.ShardEvent)
 	if !ok {
 		s.log.Warn().Msg("invalid event data type")
 		return
 	}
+	s.shardEvent.setShardEvents(event)
 	s.log.Info().
-		Str("action", v1.Action_name[int32(shardEvent.Action)]).
-		Uint64("shardID", shardEvent.Shard.Id).
+		Str("action", v1.Action_name[int32(event.Action)]).
+		Uint64("shardID", event.Shard.Id).
 		Msg("received a shard event")
 	return
 }
 
+type shardEvent struct {
+	shardEventsMap map[string]*v1.ShardEvent
+	sync.RWMutex
+}
+
+func (s *shardEvent) setShardEvents(eventVal *v1.ShardEvent) {
+	s.RWMutex.Lock()
+	defer s.RWMutex.Unlock()
+	idx := eventVal.Shard.Series.GetName() + "-" + eventVal.Shard.Series.GetGroup()
+	if eventVal.Action == v1.Action_ACTION_PUT {
+		s.shardEventsMap[idx] = eventVal
+	} else if eventVal.Action == v1.Action_ACTION_DELETE {
+		delete(s.shardEventsMap, idx)
+	}
+}
+
+func (s *shardEvent) getShardEvent(idx string) *v1.ShardEvent {
+	s.RWMutex.RLock()
+	defer s.RWMutex.RUnlock()
+	return s.shardEventsMap[idx]
+}
+
 type seriesInfo struct {
-	log *logger.Logger
+	log         *logger.Logger
+	seriesEvent *seriesEvent
 }
 
 func (s *seriesInfo) Rev(message bus.Message) (resp bus.Message) {
-	seriesEvent, ok := message.Data().(*v1.SeriesEvent)
+	event, ok := message.Data().(*v1.SeriesEvent)
 	if !ok {
 		s.log.Warn().Msg("invalid event data type")
 		return
 	}
+	s.seriesEvent.setSeriesEvents(event)
 	s.log.Info().
-		Str("action", v1.Action_name[int32(seriesEvent.Action)]).
-		Str("name", seriesEvent.Series.Name).
-		Str("group", seriesEvent.Series.Group).
+		Str("action", v1.Action_name[int32(event.Action)]).
+		Str("name", event.Series.Name).
+		Str("group", event.Series.Group).
 		Msg("received a shard event")
 	return
 }
 
+type seriesEvent struct {
+	seriesEventsMap map[string]*v1.SeriesEvent
+	sync.RWMutex
+}
+
+func (s *seriesEvent) setSeriesEvents(seriesEventVal *v1.SeriesEvent) {
+	s.RWMutex.Lock()
+	defer s.RWMutex.Unlock()
+	str := seriesEventVal.Series.GetName() + "-" + seriesEventVal.Series.GetGroup()
+	if seriesEventVal.Action == v1.Action_ACTION_PUT {
+		s.seriesEventsMap[str] = seriesEventVal
+	} else if seriesEventVal.Action == v1.Action_ACTION_DELETE {
+		delete(s.seriesEventsMap, str)
+	}
+}
+
+func (s *seriesEvent) getSeriesEvent(idx string) *v1.SeriesEvent {
+	s.RWMutex.RLock()
+	defer s.RWMutex.RUnlock()
+	return s.seriesEventsMap[idx]
+}
+
 func (s *Server) PreRun() error {
 	s.log = logger.GetLogger("liaison-grpc")
 	s.shardInfo.log = s.log
@@ -92,8 +170,8 @@ func NewServer(ctx context.Context, pipeline queue.Queue, repo discovery.Service
 	return &Server{
 		pipeline:   pipeline,
 		repo:       repo,
-		shardInfo:  &shardInfo{},
-		seriesInfo: &seriesInfo{},
+		shardInfo:  &shardInfo{shardEvent: &shardEvent{shardEventsMap: map[string]*v1.ShardEvent{}}},
+		seriesInfo: &seriesInfo{seriesEvent: &seriesEvent{seriesEventsMap: map[string]*v1.SeriesEvent{}}},
 	}
 }
 
@@ -102,12 +180,38 @@ func (s *Server) Name() string {
 }
 
 func (s *Server) FlagSet() *run.FlagSet {
+	size := 1024 * 1024 * 10
+	_, currentFile, _, _ := runtime.Caller(0)
+	basePath := filepath.Dir(currentFile)
+	serverCert := filepath.Join(basePath, "data/server_cert.pem")
+	serverKey := filepath.Join(basePath, "data/server_key.pem")
+
 	fs := run.NewFlagSet("grpc")
-	fs.StringVarP(&s.addr, "addr", "", ":17912", "the address of banyand listens")
+	fs.IntVarP(&s.maxRecvMsgSize, "maxRecvMsgSize", "", size, "The size of max receiving message")
+	fs.BoolVarP(&s.tlsVal, "tlsVal", "", true, "Connection uses TLS if true, else plain TCP")
+	fs.StringVarP(&s.certFile, "certFile", "", serverCert, "The TLS cert file")
+	fs.StringVarP(&s.keyFile, "keyFile", "", serverKey, "The TLS key file")
+	fs.StringVarP(&s.addr, "addr", "", ":17912", "The address of banyand listens")
+
 	return fs
 }
 
 func (s *Server) Validate() error {
+	if s.addr == "" {
+		return ErrNoAddr
+	}
+	if s.tlsVal {
+		if s.certFile == "" {
+			return ErrServerCert
+		}
+		if s.keyFile == "" {
+			return ErrServerKey
+		}
+		_, errTLS := credentials.NewServerTLSFromFile(s.certFile, s.keyFile)
+		if errTLS != nil {
+			return errTLS
+		}
+	}
 	return nil
 }
 
@@ -116,10 +220,17 @@ func (s *Server) Serve() error {
 	if err != nil {
 		s.log.Fatal().Err(err).Msg("Failed to listen")
 	}
-
-	s.ser = grpc.NewServer()
-	// TODO: add server implementation here
-	v1.RegisterTraceServiceServer(s.ser, v1.UnimplementedTraceServiceServer{})
+	if errValidate := s.Validate(); errValidate != nil {
+		s.log.Fatal().Err(errValidate).Msg("Failed to validate data")
+	}
+	var opts []grpclib.ServerOption
+	if s.tlsVal {
+		creds, _ := credentials.NewServerTLSFromFile(s.certFile, s.keyFile)
+		opts = []grpclib.ServerOption{grpclib.Creds(creds)}
+	}
+	opts = append(opts, grpclib.MaxRecvMsgSize(s.maxRecvMsgSize))
+	s.ser = grpclib.NewServer(opts...)
+	v1.RegisterTraceServiceServer(s.ser, s)
 
 	return s.ser.Serve(lis)
 }
@@ -128,3 +239,106 @@ func (s *Server) GracefulStop() {
 	s.log.Info().Msg("stopping")
 	s.ser.GracefulStop()
 }
+
+func (s *Server) computeSeriesID(writeEntity *v1.WriteRequest, mapIndexName string) (SeriesID []byte, err error) {
+	ana := logical.DefaultAnalyzer()
+	metadata := common.Metadata{
+		KindVersion: apischema.SeriesKindVersion,
+		Spec:        writeEntity.GetMetadata(),
+	}
+	schema, ruleError := ana.BuildTraceSchema(context.TODO(), metadata)
+	if ruleError != nil {
+		return nil, ruleError
+	}
+	seriesEventVal := s.seriesInfo.seriesEvent.getSeriesEvent(mapIndexName)
+	if seriesEventVal == nil {
+		return nil, ErrSeriesEvents
+	}
+	var str string
+	var arr []string
+	fieldRefs, errField := schema.CreateRef(seriesEventVal.FieldNamesCompositeSeriesId...)
+	if errField != nil {
+		return nil, errField
+	}
+	for _, ref := range fieldRefs {
+		field := writeEntity.GetEntity().GetFields()[ref.Spec.Idx]
+		switch v := field.GetValueType().(type) {
+		case *v1.Field_StrArray:
+			for j := 0; j < len(v.StrArray.Value); j++ {
+				arr = append(arr, v.StrArray.Value[j])
+			}
+		case *v1.Field_IntArray:
+			for t := 0; t < len(v.IntArray.Value); t++ {
+				arr = append(arr, fmt.Sprint(v.IntArray.Value[t]))
+			}
+		case *v1.Field_Int:
+			arr = append(arr, fmt.Sprint(v.Int.Value))
+		case *v1.Field_Str:
+			arr = append(arr, fmt.Sprint(v.Str.Value))
+		}
+	}
+	str = strings.Join(arr, "")
+	if str == "" {
+		return nil, ErrInvalidSeriesID
+	}
+	seriesID := []byte(str)
+
+	return seriesID, nil
+}
+
+func (s *Server) computeShardID(seriesID []byte, mapIndexName string) (shardID uint, err error) {
+	shardEventVal := s.shardInfo.shardEvent.getShardEvent(mapIndexName)
+	if shardEventVal == nil {
+		return 0, ErrShardEvents
+	}
+	shardNum := shardEventVal.GetShard().GetId()
+	if shardNum < 1 {
+		shardNum = 1
+	}
+	shardID, shardIDError := partition.ShardID(seriesID, uint32(shardNum))
+	if shardIDError != nil {
+		return 0, shardIDError
+	}
+
+	return shardID, nil
+}
+
+func (s *Server) Write(TraceWriteServer v1.TraceService_WriteServer) error {
+	for {
+		writeEntity, err := TraceWriteServer.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+		mapIndexName := writeEntity.GetMetadata().GetName() + "-" + writeEntity.GetMetadata().GetGroup()
+		seriesID, err := s.computeSeriesID(writeEntity, mapIndexName)
+		if err != nil {
+			return err
+		}
+		shardID, err := s.computeShardID(seriesID, mapIndexName)
+		if err != nil {
+			return err
+		}
+		mergeData := assemblyWriteData(shardID, writeEntity, convert.BytesToUint64(seriesID))
+		message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), mergeData)
+		_, errWritePub := s.pipeline.Publish(data.TopicWriteEvent, message)
+		if errWritePub != nil {
+			return errWritePub
+		}
+		if errSend := TraceWriteServer.Send(&v1.WriteResponse{}); errSend != nil {
+			return errSend
+		}
+	}
+}
+
+func (s *Server) Query(ctx context.Context, entityCriteria *v1.QueryRequest) (*v1.QueryResponse, error) {
+	log.Println("entityCriteria:", entityCriteria)
+
+	return &v1.QueryResponse{}, nil
+}
+
+func assemblyWriteData(shardID uint, writeEntity *v1.WriteRequest, seriesID uint64) data.TraceWriteDate {
+	return data.TraceWriteDate{ShardID: shardID, SeriesID: seriesID, WriteRequest: writeEntity}
+}
diff --git a/banyand/liaison/grpc/grpc_test.go b/banyand/liaison/grpc/grpc_test.go
new file mode 100644
index 0000000..5aea70f
--- /dev/null
+++ b/banyand/liaison/grpc/grpc_test.go
@@ -0,0 +1,250 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc_test
+
+import (
+	"context"
+	"io"
+	"os"
+	"path"
+	"path/filepath"
+	"runtime"
+	"testing"
+	"time"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+	grpclib "google.golang.org/grpc"
+	"google.golang.org/grpc/credentials"
+
+	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/index"
+	"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+	"github.com/apache/skywalking-banyandb/banyand/query"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/banyand/series/trace"
+	"github.com/apache/skywalking-banyandb/banyand/storage"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/pb"
+)
+
+type testData struct {
+	certFile           string
+	serverHostOverride string
+	TLS                bool
+	addr               string
+}
+
+func setup(tester *require.Assertions) (*grpc.Server, *grpc.Server, func()) {
+	tester.NoError(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "warn",
+	}))
+	// Init `Discovery` module
+	repo, err := discovery.NewServiceRepo(context.Background())
+	tester.NoError(err)
+	tester.NotNil(repo)
+	// Init `Queue` module
+	pipeline, err := queue.NewQueue(context.TODO(), repo)
+	tester.NoError(err)
+	// Init `Database` module
+	db, err := storage.NewDB(context.TODO(), repo)
+	tester.NoError(err)
+	uuid, err := googleUUID.NewUUID()
+	tester.NoError(err)
+	rootPath := path.Join(os.TempDir(), "banyandb-"+uuid.String())
+	tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
+	// Init `Index` module
+	indexSvc, err := index.NewService(context.TODO(), repo)
+	tester.NoError(err)
+	// Init `Trace` module
+	traceSvc, err := trace.NewService(context.TODO(), db, repo, indexSvc, pipeline)
+	tester.NoError(err)
+	// Init `Query` module
+	executor, err := query.NewExecutor(context.TODO(), repo, indexSvc, traceSvc, traceSvc)
+	tester.NoError(err)
+	// Init `liaison` module
+	tcp := grpc.NewServer(context.TODO(), pipeline, repo)
+	tester.NoError(tcp.FlagSet().Parse([]string{"--tlsVal=false", "--addr=:17912"}))
+	tcpTLS := grpc.NewServer(context.TODO(), pipeline, repo)
+	tester.NoError(tcpTLS.FlagSet().Parse([]string{"--tlsVal=true", "--addr=:17913"}))
+
+	err = indexSvc.PreRun()
+	tester.NoError(err)
+
+	err = traceSvc.PreRun()
+	tester.NoError(err)
+
+	err = db.PreRun()
+	tester.NoError(err)
+
+	err = executor.PreRun()
+	tester.NoError(err)
+
+	err = tcp.PreRun()
+	tester.NoError(err)
+	tester.NoError(tcpTLS.PreRun())
+
+	go func() {
+		tester.NoError(traceSvc.Serve())
+	}()
+
+	go func() {
+		tester.NoError(tcpTLS.Serve())
+	}()
+
+	go func() {
+		tester.NoError(tcp.Serve())
+	}()
+
+	ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancelFunc()
+
+	tester.True(indexSvc.Ready(ctx, index.MetaExists("default", "sw")))
+
+	return tcp, tcpTLS, func() {
+		db.GracefulStop()
+		_ = os.RemoveAll(rootPath)
+	}
+}
+
+func TestTraceService(t *testing.T) {
+	tester := require.New(t)
+	tcp, tcpTLS, gracefulStop := setup(tester)
+	defer gracefulStop()
+	_, currentFile, _, _ := runtime.Caller(0)
+	basePath := filepath.Dir(currentFile)
+	certFile := filepath.Join(basePath, "data/server_cert.pem")
+	testCases := []struct {
+		name    string
+		args    testData
+		wantErr bool
+	}{
+		{
+			name: "isTLS",
+			args: testData{
+				TLS:                true,
+				certFile:           certFile,
+				serverHostOverride: "localhost",
+				addr:               "localhost:17913",
+			},
+		},
+		{
+			name: "noTLS",
+			args: testData{
+				TLS:  false,
+				addr: "localhost:17912",
+			},
+		},
+	}
+	for _, tc := range testCases {
+		if tc.args.TLS {
+			errValidate := tcpTLS.Validate()
+			assert.NoError(t, errValidate)
+			var opts []grpclib.DialOption
+			creds, err := credentials.NewClientTLSFromFile(tc.args.certFile, tc.args.serverHostOverride)
+			assert.NoError(t, err)
+			opts = append(opts, grpclib.WithTransportCredentials(creds))
+			linkService(t, tc.args.addr, opts)
+		} else {
+			errValidate := tcp.Validate()
+			assert.NoError(t, errValidate)
+			var opts []grpclib.DialOption
+			opts = append(opts, grpclib.WithInsecure())
+			linkService(t, tc.args.addr, opts)
+		}
+	}
+}
+
+func linkService(t *testing.T, addr string, opts []grpclib.DialOption) {
+	conn, err := grpclib.Dial(addr, opts...)
+	assert.NoError(t, err)
+	defer conn.Close()
+	traceWrite(t, conn)
+	traceQuery(t, conn)
+}
+
+func traceWrite(t *testing.T, conn *grpclib.ClientConn) {
+	client := v1.NewTraceServiceClient(conn)
+	ctx := context.Background()
+	entityValue := pb.NewEntityValueBuilder().
+		EntityID("entityId").
+		DataBinary([]byte{12}).
+		Fields("trace_id-xxfff.111323",
+			0,
+			"webapp_id",
+			"10.0.0.1_id",
+			"/home_id",
+			"webapp",
+			"10.0.0.1",
+			"/home",
+			300,
+			1622933202000000000).
+		Timestamp(time.Now()).
+		Build()
+	criteria := pb.NewWriteEntityBuilder().
+		EntityValue(entityValue).
+		Metadata("default", "sw").
+		Build()
+	stream, errorWrite := client.Write(ctx)
+	if errorWrite != nil {
+		t.Errorf("%v.Write(_) = _, %v", client, errorWrite)
+	}
+	waitc := make(chan struct{})
+	go func() {
+		for {
+			writeResponse, errRecv := stream.Recv()
+			if errRecv == io.EOF {
+				// read done.
+				close(waitc)
+				return
+			}
+			assert.NoError(t, errRecv)
+			assert.NotNil(t, writeResponse)
+		}
+	}()
+	if errSend := stream.Send(criteria); errSend != nil {
+		t.Errorf("Failed to send a note: %v", errSend)
+	}
+	if errorSend := stream.CloseSend(); errorSend != nil {
+		t.Errorf("Failed to send a note: %v", errorSend)
+	}
+	<-waitc
+}
+
+func traceQuery(t *testing.T, conn *grpclib.ClientConn) {
+	client := v1.NewTraceServiceClient(conn)
+	ctx := context.Background()
+	sT, eT := time.Now().Add(-3*time.Hour), time.Now()
+	criteria := pb.NewQueryRequestBuilder().
+		Limit(5).
+		Offset(10).
+		OrderBy("service_instance_id", v1.QueryOrder_SORT_DESC).
+		Metadata("default", "trace").
+		Projection("http.method", "service_id", "service_instance_id").
+		Fields("service_id", "=", "my_app", "http.method", "=", "GET").
+		TimeRange(sT, eT).
+		Build()
+	stream, errRev := client.Query(ctx, criteria)
+	if errRev != nil {
+		t.Errorf("Retrieve client failed: %v", errRev)
+	}
+	assert.NotNil(t, stream)
+}
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index dbea2ae..f3da0a2 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -33,6 +33,7 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/index"
 	"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/banyand/series"
 	"github.com/apache/skywalking-banyandb/banyand/series/trace"
 	"github.com/apache/skywalking-banyandb/banyand/storage"
@@ -62,6 +63,9 @@ func setupServices(t *testing.T, tester *require.Assertions) (discovery.ServiceR
 	repo, err := discovery.NewServiceRepo(context.Background())
 	tester.NoError(err)
 	tester.NotNil(repo)
+	// Init `Queue` module
+	pipeline, err := queue.NewQueue(context.TODO(), repo)
+	tester.NoError(err)
 
 	// Init `Index` module
 	indexSvc, err := index.NewService(context.TODO(), repo)
@@ -76,7 +80,7 @@ func setupServices(t *testing.T, tester *require.Assertions) (discovery.ServiceR
 	tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
 
 	// Init `Trace` module
-	traceSvc, err := trace.NewService(context.TODO(), db, repo, indexSvc)
+	traceSvc, err := trace.NewService(context.TODO(), db, repo, indexSvc, pipeline)
 	tester.NoError(err)
 
 	// Init `Query` module
@@ -84,7 +88,7 @@ func setupServices(t *testing.T, tester *require.Assertions) (discovery.ServiceR
 	tester.NoError(err)
 
 	// Init `Liaison` module
-	liaison := grpc.NewServer(context.TODO(), nil, repo)
+	liaison := grpc.NewServer(context.TODO(), pipeline, repo)
 
 	// :PreRun:
 	// 1) TraceSeries,
diff --git a/banyand/series/trace/common_test.go b/banyand/series/trace/common_test.go
index f8b40ae..8d13a7c 100644
--- a/banyand/series/trace/common_test.go
+++ b/banyand/series/trace/common_test.go
@@ -68,7 +68,7 @@ func setup(t *testing.T) (*traceSeries, func()) {
 	ctrl := gomock.NewController(t)
 	mockIndex := index.NewMockService(ctrl)
 	mockIndex.EXPECT().Insert(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
-	svc, err := NewService(context.TODO(), db, nil, mockIndex)
+	svc, err := NewService(context.TODO(), db, nil, mockIndex, nil)
 	assert.NoError(t, err)
 	assert.NoError(t, svc.PreRun())
 	assert.NoError(t, db.PreRun())
@@ -257,7 +257,7 @@ func setupTestData(t *testing.T, ts *traceSeries, seriesEntities []seriesEntity)
 			Timestamp(se.entity.t).
 			Fields(se.entity.items...).
 			Build()
-		shardID := partition.ShardID(seriesID, 2)
+		shardID, _ := partition.ShardID(seriesID, 2)
 		got, err := ts.Write(common.SeriesID(convert.Hash(seriesID)), shardID, data.EntityValue{
 			EntityValue: ev,
 		})
diff --git a/banyand/series/trace/query.go b/banyand/series/trace/query.go
index 64985f3..770c877 100644
--- a/banyand/series/trace/query.go
+++ b/banyand/series/trace/query.go
@@ -42,7 +42,10 @@ func (t *traceSeries) FetchTrace(traceID string, opt series.ScanOptions) (trace
 		return trace, ErrInvalidTraceID
 	}
 	traceIDBytes := []byte(traceID)
-	traceIDShardID := partition.ShardID(traceIDBytes, t.shardNum)
+	traceIDShardID, shardIDError := partition.ShardID(traceIDBytes, t.shardNum)
+	if shardIDError != nil {
+		return trace, shardIDError
+	}
 	bb, errTraceID := t.reader.TimeSeriesReader(traceIDShardID, traceIndex, 0, 0).GetAll(traceIDBytes)
 	if errTraceID != nil {
 		return trace, errTraceID
diff --git a/banyand/series/trace/service.go b/banyand/series/trace/service.go
index 1e77d4b..d646447 100644
--- a/banyand/series/trace/service.go
+++ b/banyand/series/trace/service.go
@@ -23,10 +23,13 @@ import (
 
 	"google.golang.org/protobuf/types/known/timestamppb"
 
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/api/data"
 	"github.com/apache/skywalking-banyandb/api/event"
 	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/index"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/banyand/series"
 	"github.com/apache/skywalking-banyandb/banyand/series/schema"
 	"github.com/apache/skywalking-banyandb/banyand/storage"
@@ -38,20 +41,24 @@ import (
 var _ series.Service = (*service)(nil)
 
 type service struct {
-	db        storage.Database
-	schemaMap map[string]*traceSeries
-	l         *logger.Logger
-	repo      discovery.ServiceRepo
-	stopCh    chan struct{}
-	idx       index.Service
+	db            storage.Database
+	schemaMap     map[string]*traceSeries
+	l             *logger.Logger
+	repo          discovery.ServiceRepo
+	stopCh        chan struct{}
+	idx           index.Service
+	writeListener *writeCallback
+	pipeline      queue.Queue
 }
 
 //NewService returns a new service
-func NewService(_ context.Context, db storage.Database, repo discovery.ServiceRepo, idx index.Service) (series.Service, error) {
+func NewService(_ context.Context, db storage.Database, repo discovery.ServiceRepo, idx index.Service, pipeline queue.Queue) (series.Service, error) {
 	return &service{
-		db:   db,
-		repo: repo,
-		idx:  idx,
+		db:            db,
+		repo:          repo,
+		idx:           idx,
+		pipeline:      pipeline,
+		writeListener: &writeCallback{},
 	}, nil
 }
 
@@ -65,7 +72,9 @@ func (s *service) PreRun() error {
 		return err
 	}
 	s.schemaMap = make(map[string]*traceSeries, len(schemas))
+	s.writeListener.schemaMap = s.schemaMap
 	s.l = logger.GetLogger(s.Name())
+	s.writeListener.l = s.l
 	for _, sa := range schemas {
 		ts, errTS := newTraceSeries(sa, s.l, s.idx)
 		if errTS != nil {
@@ -74,6 +83,7 @@ func (s *service) PreRun() error {
 		s.db.Register(ts)
 		id := formatTraceSeriesID(ts.name, ts.group)
 		s.schemaMap[id] = ts
+		s.writeListener.schemaMap[id] = ts
 		s.l.Info().Str("id", id).Msg("initialize Trace series")
 	}
 	return err
@@ -136,6 +146,10 @@ func (s *service) Serve() error {
 			return errPublishRules
 		}
 	}
+	errWrite := s.pipeline.Subscribe(data.TopicWriteEvent, s.writeListener)
+	if errWrite != nil {
+		return errWrite
+	}
 	s.stopCh = make(chan struct{})
 	<-s.stopCh
 	return nil
@@ -146,3 +160,26 @@ func (s *service) GracefulStop() {
 		close(s.stopCh)
 	}
 }
+
+type writeCallback struct {
+	l         *logger.Logger
+	schemaMap map[string]*traceSeries
+}
+
+func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
+	writeEvent, ok := message.Data().(data.TraceWriteDate)
+	if !ok {
+		w.l.Warn().Msg("invalid event data type")
+		return
+	}
+	entityValue := writeEvent.WriteRequest.GetEntity()
+	ts := writeEvent.WriteRequest.GetMetadata()
+	id := formatTraceSeriesID(ts.GetName(), ts.GetGroup())
+	_, err := w.schemaMap[id].Write(common.SeriesID(writeEvent.SeriesID), writeEvent.ShardID, data.EntityValue{
+		EntityValue: entityValue,
+	})
+	if err != nil {
+		w.l.Warn().Err(err)
+	}
+	return
+}
diff --git a/banyand/series/trace/trace.go b/banyand/series/trace/trace.go
index 1c5f639..96af476 100644
--- a/banyand/series/trace/trace.go
+++ b/banyand/series/trace/trace.go
@@ -120,8 +120,10 @@ func (s *service) Write(traceSeriesMetadata common.Metadata, ts time.Time, serie
 		Build()
 
 	seriesIDBytes := []byte(seriesID)
-	shardID := partition.ShardID(seriesIDBytes, traceSeries.shardNum)
-
+	shardID, shardIDError := partition.ShardID(seriesIDBytes, traceSeries.shardNum)
+	if shardIDError != nil {
+		return err == nil, shardIDError
+	}
 	_, err = traceSeries.Write(common.SeriesID(convert.Hash(seriesIDBytes)), shardID, data.EntityValue{
 		EntityValue: ev,
 	})
diff --git a/banyand/series/trace/write.go b/banyand/series/trace/write.go
index 115ba15..1b362d1 100644
--- a/banyand/series/trace/write.go
+++ b/banyand/series/trace/write.go
@@ -75,7 +75,10 @@ func (t *traceSeries) Write(seriesID common.SeriesID, shardID uint, entity data.
 	if err = wp.Writer(shardID, chunkIDMapping).Put(chunkIDBytes, bydb_bytes.Join(stateBytes, seriesIDBytes, wallTsBytes)); err != nil {
 		return 0, errors.Wrap(err, "failed to write chunkID index")
 	}
-	traceIDShardID := partition.ShardID(traceID, t.shardNum)
+	traceIDShardID, shardIDError := partition.ShardID(traceID, t.shardNum)
+	if shardIDError != nil {
+		return 0, shardIDError
+	}
 	if err = wp.TimeSeriesWriter(traceIDShardID, traceIndex).
 		Put(traceID, bydb_bytes.Join(convert.Uint16ToBytes(uint16(shardID)), chunkIDBytes), entityTs); err != nil {
 		return 0, errors.Wrap(err, "failed to Trace index")
diff --git a/banyand/series/trace/write_test.go b/banyand/series/trace/write_test.go
index 0ca51d8..a671cb2 100644
--- a/banyand/series/trace/write_test.go
+++ b/banyand/series/trace/write_test.go
@@ -180,13 +180,17 @@ func Test_traceSeries_Write(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			seriesID := []byte(tt.args.seriesID)
+			shardID, shardIDError := partition.ShardID(seriesID, 2)
+			if shardIDError != nil {
+				return
+			}
 			ev := pb.NewEntityValueBuilder().
 				DataBinary(tt.args.entity.binary).
 				EntityID(tt.args.entity.id).
 				Fields(tt.args.entity.items...).
 				Timestamp(time.Now()).
 				Build()
-			got, err := ts.Write(common.SeriesID(convert.Hash(seriesID)), partition.ShardID(seriesID, 2), data.EntityValue{
+			got, err := ts.Write(common.SeriesID(convert.Hash(seriesID)), shardID, data.EntityValue{
 				EntityValue: ev,
 			})
 			if (err != nil) != tt.wantErr {
diff --git a/pkg/partition/route.go b/pkg/partition/route.go
index 9b4d93c..bcb18af 100644
--- a/pkg/partition/route.go
+++ b/pkg/partition/route.go
@@ -17,9 +17,16 @@
 
 package partition
 
-import "github.com/apache/skywalking-banyandb/pkg/convert"
+import (
+	"github.com/pkg/errors"
 
-func ShardID(key []byte, shardNum uint32) uint {
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+func ShardID(key []byte, shardNum uint32) (uint, error) {
+	if shardNum < 1 {
+		return 0, errors.New("invalid shardNum")
+	}
 	encodeKey := convert.Hash(key)
-	return uint(encodeKey % uint64(shardNum))
+	return uint(encodeKey % uint64(shardNum)), nil
 }
diff --git a/pkg/query/logical/expr.go b/pkg/query/logical/expr.go
index 03898da..caf0e92 100644
--- a/pkg/query/logical/expr.go
+++ b/pkg/query/logical/expr.go
@@ -44,21 +44,21 @@ type FieldRef struct {
 	// name defines the key of the field
 	name string
 	// spec contains the index of the key in the schema, as well as the underlying FieldSpec
-	spec *fieldSpec
+	Spec *fieldSpec
 }
 
 func (f *FieldRef) Equal(expr Expr) bool {
 	if other, ok := expr.(*FieldRef); ok {
-		return other.name == f.name && other.spec.spec.GetType() == f.spec.spec.GetType()
+		return other.name == f.name && other.Spec.spec.GetType() == f.Spec.spec.GetType()
 	}
 	return false
 }
 
 func (f *FieldRef) FieldType() apiv1.FieldSpec_FieldType {
-	if f.spec == nil {
+	if f.Spec == nil {
 		panic("should be resolved first")
 	}
-	return f.spec.spec.GetType()
+	return f.Spec.spec.GetType()
 }
 
 func (f *FieldRef) Resolve(s Schema) error {
@@ -66,12 +66,12 @@ func (f *FieldRef) Resolve(s Schema) error {
 	if err != nil {
 		return err
 	}
-	f.spec = specs[0].spec
+	f.Spec = specs[0].Spec
 	return nil
 }
 
 func (f *FieldRef) String() string {
-	return fmt.Sprintf("#%s<%s>", f.name, f.spec.spec.GetType().String())
+	return fmt.Sprintf("#%s<%s>", f.name, f.Spec.spec.GetType().String())
 }
 
 func NewFieldRef(fieldName string) *FieldRef {
diff --git a/pkg/query/logical/plan_orderby.go b/pkg/query/logical/plan_orderby.go
index 0c087d4..92cb50d 100644
--- a/pkg/query/logical/plan_orderby.go
+++ b/pkg/query/logical/plan_orderby.go
@@ -77,7 +77,7 @@ func (o *orderBy) Execute(ec executor.ExecutionContext) ([]data.Entity, error) {
 		return entities, nil
 	}
 
-	sort.Slice(entities, sortMethod(entities, o.targetRef.spec.idx, o.sort))
+	sort.Slice(entities, sortMethod(entities, o.targetRef.Spec.Idx, o.sort))
 
 	return entities, nil
 }
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index 00542a5..6f33f3a 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -27,6 +27,7 @@ import (
 
 type Schema interface {
 	IndexDefined(string) (bool, *apiv1.IndexObject)
+	FieldSubscript(string) (bool, int)
 	FieldDefined(string) bool
 	CreateRef(names ...string) ([]*FieldRef, error)
 	Map(refs ...*FieldRef) Schema
@@ -37,12 +38,12 @@ type Schema interface {
 }
 
 type fieldSpec struct {
-	idx  int
+	Idx  int
 	spec *apiv1.FieldSpec
 }
 
 func (fs *fieldSpec) Equal(other *fieldSpec) bool {
-	return fs.idx == other.idx && fs.spec.GetType() == other.spec.GetType() && fs.spec.GetName() == other.spec.GetName()
+	return fs.Idx == other.Idx && fs.spec.GetType() == other.spec.GetType() && fs.spec.GetName() == other.spec.GetName()
 }
 
 var _ Schema = (*schema)(nil)
@@ -74,6 +75,18 @@ func (s *schema) IndexDefined(field string) (bool, *apiv1.IndexObject) {
 	return false, nil
 }
 
+func (s *schema) FieldSubscript(field string) (bool, int) {
+	idxRule := s.indexRule.Spec
+	for i, indexObj := range idxRule.GetObjects() {
+		for _, fieldName := range indexObj.GetFields() {
+			if field == fieldName {
+				return true, i
+			}
+		}
+	}
+	return false, -1
+}
+
 func (s *schema) Equal(s2 Schema) bool {
 	if other, ok := s2.(*schema); ok {
 		return cmp.Equal(other.fieldMap, s.fieldMap)
@@ -83,7 +96,7 @@ func (s *schema) Equal(s2 Schema) bool {
 
 func (s *schema) RegisterField(name string, i int, spec *apiv1.FieldSpec) {
 	s.fieldMap[name] = &fieldSpec{
-		idx:  i,
+		Idx:  i,
 		spec: spec,
 	}
 }
@@ -117,7 +130,7 @@ func (s *schema) Map(refs ...*FieldRef) Schema {
 		fieldMap:    make(map[string]*fieldSpec),
 	}
 	for _, ref := range refs {
-		newS.fieldMap[ref.name] = ref.spec
+		newS.fieldMap[ref.name] = ref.Spec
 	}
 	return newS
 }