You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/08/11 13:05:48 UTC
[skywalking-banyandb] branch main updated: feat: Implement Write in
the liaison (#30)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 32524d2 feat: Implement Write in the liaison (#30)
32524d2 is described below
commit 32524d2fbd3e42d6d7693bb78f1d1236fda38e84
Author: Fine0830 <fi...@outlook.com>
AuthorDate: Wed Aug 11 21:05:28 2021 +0800
feat: Implement Write in the liaison (#30)
* feat: implement Write and Query
* feat: add test server
* feat: get field subscripts
* feat: generate shardID
* fix: compute shardID
* fix: update test data
* update
* feat: use protobuf
* fix: update incorrect syntax
* Fix service register failures
Signed-off-by: Gao Hongtao <ha...@gmail.com>
* feat: update grpc
* feat: publish and subscribe write data
* feat: add series id
* fix: write data
* feat: add pipeline in series
* feat: write trace data
* feat: use pipeline
* fix: update custom event
* fix: update repo
* fix: grpc test
* fix: add param
* fix: grpc test
* fix: lint
* fix: update header ignore files
* fix: use embed to access to files
* fix: update header ignore files
* fix: format code
* refactor: address pr
* revert: flags
* fix: update schemaMap in writeListener
* fix: use assert in tests
* fix: add sync.RWMutex to protect seriesEvent and shardEvent
* fix: update server opts
* fix: remove dead code
* fix: update header ignore files
* fix: set maxRecMsgSize flag
* fix: typo
* fix: address pr
* fix: update flags
* fix: add tls tests
* fix: update test
* fix: lint
* revert version
* refactor: update flags
* fix: add sync.RWMutex to protect events
* fix: lint
* feat: separate shardid and seriesid func
* feat: add maps for events
* fix: update RWMutex
* fix: update test
* feat: generate tls certificate
* fix: update flags
* fix: update grpc test
Co-authored-by: Gao Hongtao <ha...@gmail.com>
---
.licenserc.yaml | 3 +-
api/data/trace.go | 16 ++
api/event/discovery.go | 6 +-
api/proto/banyandb/v1/write.pb.go | 2 +-
api/proto/banyandb/v1/write.proto | 2 +-
banyand/internal/cmd/standalone.go | 2 +-
banyand/liaison/grpc/data/server_cert.pem | 23 +++
banyand/liaison/grpc/data/server_key.pem | 27 +++
banyand/liaison/grpc/grpc.go | 262 +++++++++++++++++++++++++++---
banyand/liaison/grpc/grpc_test.go | 250 ++++++++++++++++++++++++++++
banyand/query/processor_test.go | 8 +-
banyand/series/trace/common_test.go | 4 +-
banyand/series/trace/query.go | 5 +-
banyand/series/trace/service.go | 57 +++++--
banyand/series/trace/trace.go | 6 +-
banyand/series/trace/write.go | 5 +-
banyand/series/trace/write_test.go | 6 +-
pkg/partition/route.go | 13 +-
pkg/query/logical/expr.go | 12 +-
pkg/query/logical/plan_orderby.go | 2 +-
pkg/query/logical/schema.go | 21 ++-
21 files changed, 668 insertions(+), 64 deletions(-)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 3049c0b..3f25016 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -74,10 +74,9 @@ header: # `header` section is configurations for source codes license header.
- '**/*.json'
- '**/*_mock.go'
- '**/*_mock_test.go'
- - '**/*_generated.go'
- - '**/Trace_grpc.go'
- '**/*.pb.go'
- '**/*.textproto'
+ - '**/*.pem'
comment: on-failure # on what condition license-eye will comment on the pull request, `on-failure`, `always`, `never`.
diff --git a/api/data/trace.go b/api/data/trace.go
index 5351990..6d59308 100644
--- a/api/data/trace.go
+++ b/api/data/trace.go
@@ -20,10 +20,17 @@ package data
import (
"github.com/apache/skywalking-banyandb/api/common"
v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
)
var TraceKindVersion = common.KindVersion{Version: "v1", Kind: "data-trace"}
+var WriteEventKindVersion = common.KindVersion{
+ Version: "v1",
+ Kind: "trace-write",
+}
+var TopicWriteEvent = bus.UniTopic(WriteEventKindVersion.String())
+
type Trace struct {
common.KindVersion
Entities []Entity
@@ -36,6 +43,15 @@ type Entity struct {
type EntityValue struct {
*v1.EntityValue
}
+type TraceWriteDate struct {
+ ShardID uint
+ SeriesID uint64
+ WriteRequest *v1.WriteRequest
+}
+type Write struct {
+ common.KindVersion
+ Payload *TraceWriteDate
+}
func NewTrace() *Trace {
return &Trace{KindVersion: TraceKindVersion}
diff --git a/api/event/discovery.go b/api/event/discovery.go
index 10476b8..7b55de8 100644
--- a/api/event/discovery.go
+++ b/api/event/discovery.go
@@ -28,12 +28,14 @@ var (
Version: "v1",
Kind: "event-shard",
}
- TopicShardEvent = bus.UniTopic(ShardEventKindVersion.String())
+ TopicShardEvent = bus.UniTopic(ShardEventKindVersion.String())
+
SeriesEventKindVersion = common.KindVersion{
Version: "v1",
Kind: "event-series",
}
- TopicSeriesEvent = bus.UniTopic(SeriesEventKindVersion.String())
+ TopicSeriesEvent = bus.UniTopic(SeriesEventKindVersion.String())
+
IndexRuleKindVersion = common.KindVersion{Version: "v1", Kind: "index-rule"}
TopicIndexRule = bus.UniTopic(IndexRuleKindVersion.String())
)
diff --git a/api/proto/banyandb/v1/write.pb.go b/api/proto/banyandb/v1/write.pb.go
index 6808076..c2c1026 100644
--- a/api/proto/banyandb/v1/write.pb.go
+++ b/api/proto/banyandb/v1/write.pb.go
@@ -364,7 +364,7 @@ type EntityValue struct {
// binary representation of segments, including tags, spans...
DataBinary []byte `protobuf:"bytes,3,opt,name=data_binary,json=dataBinary,proto3" json:"data_binary,omitempty"`
// support all of indexed fields in the fields.
- // Pair only has value, as the value of PairValue match with the key
+ // Field only has value, as the value of value_type match with the key
// by the index rules and index rule bindings of Metadata group.
// indexed fields of multiple entities are compression in the fields.
Fields []*Field `protobuf:"bytes,4,rep,name=fields,proto3" json:"fields,omitempty"`
diff --git a/api/proto/banyandb/v1/write.proto b/api/proto/banyandb/v1/write.proto
index 3d7ddc0..02e2d1a 100644
--- a/api/proto/banyandb/v1/write.proto
+++ b/api/proto/banyandb/v1/write.proto
@@ -62,7 +62,7 @@ message EntityValue {
// binary representation of segments, including tags, spans...
bytes data_binary = 3;
// support all of indexed fields in the fields.
- // Pair only has value, as the value of PairValue match with the key
+ // Field only has value, as the value of value_type match with the key
// by the index rules and index rule bindings of Metadata group.
// indexed fields of multiple entities are compression in the fields.
repeated Field fields = 4;
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index d694895..1aa0b4f 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -61,7 +61,7 @@ func newStandaloneCmd() *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate index builder")
}
- traceSeries, err := trace.NewService(ctx, db, repo, idx)
+ traceSeries, err := trace.NewService(ctx, db, repo, idx, pipeline)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate trace series")
}
diff --git a/banyand/liaison/grpc/data/server_cert.pem b/banyand/liaison/grpc/data/server_cert.pem
new file mode 100644
index 0000000..d7e693f
--- /dev/null
+++ b/banyand/liaison/grpc/data/server_cert.pem
@@ -0,0 +1,23 @@
+-----BEGIN CERTIFICATE-----
+MIIDyzCCArOgAwIBAgIJAMtUjj8gb0gnMA0GCSqGSIb3DQEBCwUAMIGIMQswCQYD
+VQQGEwJDTjELMAkGA1UECAwCU0MxCzAJBgNVBAcMAkNEMRMwEQYDVQQKDApza3l3
+YWxraW5nMREwDwYDVQQLDAhiYW55YW5kYjEjMCEGCSqGSIb3DQEJARYUZmFueHVl
+MDgzMEBlbWFpbC5jb20xEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0yMTA4MTEwNjU0
+MjNaFw0yNjA4MTAwNjU0MjNaMIGIMQswCQYDVQQGEwJDTjELMAkGA1UECAwCU0Mx
+CzAJBgNVBAcMAkNEMRMwEQYDVQQKDApza3l3YWxraW5nMREwDwYDVQQLDAhiYW55
+YW5kYjEjMCEGCSqGSIb3DQEJARYUZmFueHVlMDgzMEBlbWFpbC5jb20xEjAQBgNV
+BAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMwN
+2N/JCT64EMySDMqwA4hJ2lDY6ro1BMgnfjFXZNg2dZagMYRwNao+D6HSl1y9Apr4
+EGC46hGuL92zrAUHEeMNZrEWBEB16UrnbyQeT4d1/Lq9tywGG0j2uvjCO5zMN43e
+WX8o7c9gZOPPQp/MKFFDZtxbj5+Ve2LWBdx1VWkIvJzITlLbmnASjBXzWs1kPrJY
+GiSrAASyb9nKNn5Lvw3bdNgOGLBQmd6jRKOkKHAEvFCHSKpQV4R2lEWAOb1NSfYr
+/YFz+GGc+HjZeSZt/4nIpMa6+jOs/2LQybz0jVtAprBE8lVRMSn0L+Ud9Gj7oJcJ
+hOJCoCTG+0vfGl0P3I0CAwEAAaM2MDQwMgYDVR0RBCswKYIJbG9jYWxob3N0ggtl
+eGFtcGxlLmNvbYIPd3d3LmV4YW1wbGUuY29tMA0GCSqGSIb3DQEBCwUAA4IBAQCJ
+L7mG1mjG+9zg5Hl4TbakQ/zxnz3VuAUN+8WEVcTY65Tc4LbabD56mf5PumDQaGzz
+mFhblsi2mXk4H+dbUCh3wEgpcbmCjZKa5Kuyypfcg8JgFzIS3PwNp/BCzxdP2uqA
+Lw1lHYczyWIaW+M7tVG4V9ewpo/DOX5xlkMw0kDJatYxCJ8CzSzFropNIeINkEX1
+dT/vDtH0qwWNUDGf5yr8dnjowKCRczmc1FY7hV+Q0SBZKhqHkTKrxgjT7eVNG5CR
+5xajFa96ut8UXTFd9+TXJNyPaYVotMD4WcLShc31yfwGR+Y3YpmH8uHzN3NRNwPJ
+wcIM5AFYEIf0ing6eIws
+-----END CERTIFICATE-----
diff --git a/banyand/liaison/grpc/data/server_key.pem b/banyand/liaison/grpc/data/server_key.pem
new file mode 100644
index 0000000..fc80ebe
--- /dev/null
+++ b/banyand/liaison/grpc/data/server_key.pem
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpAIBAAKCAQEAzA3Y38kJPrgQzJIMyrADiEnaUNjqujUEyCd+MVdk2DZ1lqAx
+hHA1qj4PodKXXL0CmvgQYLjqEa4v3bOsBQcR4w1msRYEQHXpSudvJB5Ph3X8ur23
+LAYbSPa6+MI7nMw3jd5Zfyjtz2Bk489Cn8woUUNm3FuPn5V7YtYF3HVVaQi8nMhO
+UtuacBKMFfNazWQ+slgaJKsABLJv2co2fku/Ddt02A4YsFCZ3qNEo6QocAS8UIdI
+qlBXhHaURYA5vU1J9iv9gXP4YZz4eNl5Jm3/icikxrr6M6z/YtDJvPSNW0CmsETy
+VVExKfQv5R30aPuglwmE4kKgJMb7S98aXQ/cjQIDAQABAoIBAQCL4VowXmHmCswJ
+UH1QXMSvIuFz1p9iMoIqq1gIfv50cTC+puYLAdjn8U9KAVEdk7w7e53OkDR1FlFd
+y5M6hxQt77vb3VngznO0k15PBjBCjhFH+lGc7jq6E9ksOgoffKcAq9HyJ56OMGg4
++pWTcaKZwni6ylF0dkZ1BH5UGGMKcmX+E6NW13tUgU2fRGttjGFvUbHflCNg2vJh
+3IEFUmDGoDvJt4tdK2C1sBv7+cbW1+XBnPa9Gja2MF+v6DIs6AIpnHKwg33Bk33Q
+FX9oNErRYdtT3A9dcvR0qP6uVyVBPRREiuyBFJ0LUGXsbWwIjoPQ8SBZv1LaB1sW
+lo37BX/JAoGBAPuFJ07YNxkCniItKybPzT8mKKUPWS5iJy5pe5hBYSuIKSXZXhBl
+ETwhO/zpilwN8khXWokSW6QU/JmLQnHyG9ox3o5B/eAhCVCBZMe7eJaHJhXifj2A
+1i6KzAePIykUi4i5XKy5XSgRjyfO7E3ofW7MyDvg/OLWYX66MMcDVAf/AoGBAM+w
+Q7twak11okLckJKdtkasy7g5bHhKvksnw2G/CKave1c/CULPnSYYkiffiinUvVoP
+eUM8ANnFY36p1Lz+tE7g7VN1LAzN10ZwCPEdWZ0sjCveasiQrpNCXNWl5IRyRkQT
+m70Cr78zX5ahMIrU0ArLVRLVFB2ZX3lpnOK7h7tzAoGAIPKjYI+wQAV4w49ZLL9h
+6pjMEDs/enT/HvRQbXR7DyHKChw8Vzd2F4NfAVVye3aUO2e+A2C1QnxBTrfQX27Q
+uTd5KPd6E0cgmjwpAIUNWeKgWZOO5+2doQErkv3sJDB9ys5FVpb9ngcW0qcni1ke
+PUp0HGvvlKNyqBAp3ZgRBO8CgYBdEwcnpxdco00WXbZEnn0jayjY5JMhzY0+LRG4
+al48JQRHcy55TIWGnxhQ2jMW0AoTpD+Zy/gtn/IYv49hK1wuxUpWTnpxOoYxQOAg
+/iA8+cvPlRuRypUR1Xm5HWEtofCvbYIr0Fpme2VpIc+ZSAn77Gexyt/669MHnDb8
+vUH01QKBgQC6f88E/9nOb+Xwf2ZXSpK3BQQixPdi+wRY+smqYomSH+Vvou/ny8Cu
+AExWLc1KIH39tvs3e7RarB+hVDTGoHDD86basJ2eaaZdJfGUU0GGA39yRayn9xWz
+3eGVnQLmbguB9R4XYyF5lkp6qzopqp0eGNYMSxtvIhIYroJfOkb6vA==
+-----END RSA PRIVATE KEY-----
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index 91a75d7..39451c5 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -19,64 +19,142 @@ package grpc
import (
"context"
+ "fmt"
+ "io"
+ "log"
"net"
+ "path/filepath"
+ "runtime"
+ "strings"
+ "sync"
+ "time"
- "google.golang.org/grpc"
+ "github.com/pkg/errors"
+ grpclib "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/api/data"
"github.com/apache/skywalking-banyandb/api/event"
v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ apischema "github.com/apache/skywalking-banyandb/api/schema"
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/partition"
+ "github.com/apache/skywalking-banyandb/pkg/query/logical"
"github.com/apache/skywalking-banyandb/pkg/run"
)
+var (
+ ErrSeriesEvents = errors.New("no seriesEvent")
+ ErrShardEvents = errors.New("no shardEvent")
+ ErrInvalidSeriesID = errors.New("invalid seriesID")
+ ErrServerCert = errors.New("invalid server cert file")
+ ErrServerKey = errors.New("invalid server key file")
+ ErrNoAddr = errors.New("no address")
+)
+
type Server struct {
- addr string
- log *logger.Logger
- ser *grpc.Server
- pipeline queue.Queue
- repo discovery.ServiceRepo
- shardInfo *shardInfo
- seriesInfo *seriesInfo
+ addr string
+ maxRecvMsgSize int
+ tlsVal bool
+ certFile string
+ keyFile string
+ log *logger.Logger
+ ser *grpclib.Server
+ pipeline queue.Queue
+ repo discovery.ServiceRepo
+ shardInfo *shardInfo
+ seriesInfo *seriesInfo
+ v1.UnimplementedTraceServiceServer
}
type shardInfo struct {
- log *logger.Logger
+ log *logger.Logger
+ shardEvent *shardEvent
}
func (s *shardInfo) Rev(message bus.Message) (resp bus.Message) {
- shardEvent, ok := message.Data().(*v1.ShardEvent)
+ event, ok := message.Data().(*v1.ShardEvent)
if !ok {
s.log.Warn().Msg("invalid event data type")
return
}
+ s.shardEvent.setShardEvents(event)
s.log.Info().
- Str("action", v1.Action_name[int32(shardEvent.Action)]).
- Uint64("shardID", shardEvent.Shard.Id).
+ Str("action", v1.Action_name[int32(event.Action)]).
+ Uint64("shardID", event.Shard.Id).
Msg("received a shard event")
return
}
+type shardEvent struct {
+ shardEventsMap map[string]*v1.ShardEvent
+ sync.RWMutex
+}
+
+func (s *shardEvent) setShardEvents(eventVal *v1.ShardEvent) {
+ s.RWMutex.Lock()
+ defer s.RWMutex.Unlock()
+ idx := eventVal.Shard.Series.GetName() + "-" + eventVal.Shard.Series.GetGroup()
+ if eventVal.Action == v1.Action_ACTION_PUT {
+ s.shardEventsMap[idx] = eventVal
+ } else if eventVal.Action == v1.Action_ACTION_DELETE {
+ delete(s.shardEventsMap, idx)
+ }
+}
+
+func (s *shardEvent) getShardEvent(idx string) *v1.ShardEvent {
+ s.RWMutex.RLock()
+ defer s.RWMutex.RUnlock()
+ return s.shardEventsMap[idx]
+}
+
type seriesInfo struct {
- log *logger.Logger
+ log *logger.Logger
+ seriesEvent *seriesEvent
}
func (s *seriesInfo) Rev(message bus.Message) (resp bus.Message) {
- seriesEvent, ok := message.Data().(*v1.SeriesEvent)
+ event, ok := message.Data().(*v1.SeriesEvent)
if !ok {
s.log.Warn().Msg("invalid event data type")
return
}
+ s.seriesEvent.setSeriesEvents(event)
s.log.Info().
- Str("action", v1.Action_name[int32(seriesEvent.Action)]).
- Str("name", seriesEvent.Series.Name).
- Str("group", seriesEvent.Series.Group).
+ Str("action", v1.Action_name[int32(event.Action)]).
+ Str("name", event.Series.Name).
+ Str("group", event.Series.Group).
Msg("received a shard event")
return
}
+type seriesEvent struct {
+ seriesEventsMap map[string]*v1.SeriesEvent
+ sync.RWMutex
+}
+
+func (s *seriesEvent) setSeriesEvents(seriesEventVal *v1.SeriesEvent) {
+ s.RWMutex.Lock()
+ defer s.RWMutex.Unlock()
+ str := seriesEventVal.Series.GetName() + "-" + seriesEventVal.Series.GetGroup()
+ if seriesEventVal.Action == v1.Action_ACTION_PUT {
+ s.seriesEventsMap[str] = seriesEventVal
+ } else if seriesEventVal.Action == v1.Action_ACTION_DELETE {
+ delete(s.seriesEventsMap, str)
+ }
+}
+
+func (s *seriesEvent) getSeriesEvent(idx string) *v1.SeriesEvent {
+ s.RWMutex.RLock()
+ defer s.RWMutex.RUnlock()
+ return s.seriesEventsMap[idx]
+}
+
func (s *Server) PreRun() error {
s.log = logger.GetLogger("liaison-grpc")
s.shardInfo.log = s.log
@@ -92,8 +170,8 @@ func NewServer(ctx context.Context, pipeline queue.Queue, repo discovery.Service
return &Server{
pipeline: pipeline,
repo: repo,
- shardInfo: &shardInfo{},
- seriesInfo: &seriesInfo{},
+ shardInfo: &shardInfo{shardEvent: &shardEvent{shardEventsMap: map[string]*v1.ShardEvent{}}},
+ seriesInfo: &seriesInfo{seriesEvent: &seriesEvent{seriesEventsMap: map[string]*v1.SeriesEvent{}}},
}
}
@@ -102,12 +180,38 @@ func (s *Server) Name() string {
}
func (s *Server) FlagSet() *run.FlagSet {
+ size := 1024 * 1024 * 10
+ _, currentFile, _, _ := runtime.Caller(0)
+ basePath := filepath.Dir(currentFile)
+ serverCert := filepath.Join(basePath, "data/server_cert.pem")
+ serverKey := filepath.Join(basePath, "data/server_key.pem")
+
fs := run.NewFlagSet("grpc")
- fs.StringVarP(&s.addr, "addr", "", ":17912", "the address of banyand listens")
+ fs.IntVarP(&s.maxRecvMsgSize, "maxRecvMsgSize", "", size, "The size of max receiving message")
+ fs.BoolVarP(&s.tlsVal, "tlsVal", "", true, "Connection uses TLS if true, else plain TCP")
+ fs.StringVarP(&s.certFile, "certFile", "", serverCert, "The TLS cert file")
+ fs.StringVarP(&s.keyFile, "keyFile", "", serverKey, "The TLS key file")
+ fs.StringVarP(&s.addr, "addr", "", ":17912", "The address of banyand listens")
+
return fs
}
func (s *Server) Validate() error {
+ if s.addr == "" {
+ return ErrNoAddr
+ }
+ if s.tlsVal {
+ if s.certFile == "" {
+ return ErrServerCert
+ }
+ if s.keyFile == "" {
+ return ErrServerKey
+ }
+ _, errTLS := credentials.NewServerTLSFromFile(s.certFile, s.keyFile)
+ if errTLS != nil {
+ return errTLS
+ }
+ }
return nil
}
@@ -116,10 +220,17 @@ func (s *Server) Serve() error {
if err != nil {
s.log.Fatal().Err(err).Msg("Failed to listen")
}
-
- s.ser = grpc.NewServer()
- // TODO: add server implementation here
- v1.RegisterTraceServiceServer(s.ser, v1.UnimplementedTraceServiceServer{})
+ if errValidate := s.Validate(); errValidate != nil {
+ s.log.Fatal().Err(errValidate).Msg("Failed to validate data")
+ }
+ var opts []grpclib.ServerOption
+ if s.tlsVal {
+ creds, _ := credentials.NewServerTLSFromFile(s.certFile, s.keyFile)
+ opts = []grpclib.ServerOption{grpclib.Creds(creds)}
+ }
+ opts = append(opts, grpclib.MaxRecvMsgSize(s.maxRecvMsgSize))
+ s.ser = grpclib.NewServer(opts...)
+ v1.RegisterTraceServiceServer(s.ser, s)
return s.ser.Serve(lis)
}
@@ -128,3 +239,106 @@ func (s *Server) GracefulStop() {
s.log.Info().Msg("stopping")
s.ser.GracefulStop()
}
+
+func (s *Server) computeSeriesID(writeEntity *v1.WriteRequest, mapIndexName string) (SeriesID []byte, err error) {
+ ana := logical.DefaultAnalyzer()
+ metadata := common.Metadata{
+ KindVersion: apischema.SeriesKindVersion,
+ Spec: writeEntity.GetMetadata(),
+ }
+ schema, ruleError := ana.BuildTraceSchema(context.TODO(), metadata)
+ if ruleError != nil {
+ return nil, ruleError
+ }
+ seriesEventVal := s.seriesInfo.seriesEvent.getSeriesEvent(mapIndexName)
+ if seriesEventVal == nil {
+ return nil, ErrSeriesEvents
+ }
+ var str string
+ var arr []string
+ fieldRefs, errField := schema.CreateRef(seriesEventVal.FieldNamesCompositeSeriesId...)
+ if errField != nil {
+ return nil, errField
+ }
+ for _, ref := range fieldRefs {
+ field := writeEntity.GetEntity().GetFields()[ref.Spec.Idx]
+ switch v := field.GetValueType().(type) {
+ case *v1.Field_StrArray:
+ for j := 0; j < len(v.StrArray.Value); j++ {
+ arr = append(arr, v.StrArray.Value[j])
+ }
+ case *v1.Field_IntArray:
+ for t := 0; t < len(v.IntArray.Value); t++ {
+ arr = append(arr, fmt.Sprint(v.IntArray.Value[t]))
+ }
+ case *v1.Field_Int:
+ arr = append(arr, fmt.Sprint(v.Int.Value))
+ case *v1.Field_Str:
+ arr = append(arr, fmt.Sprint(v.Str.Value))
+ }
+ }
+ str = strings.Join(arr, "")
+ if str == "" {
+ return nil, ErrInvalidSeriesID
+ }
+ seriesID := []byte(str)
+
+ return seriesID, nil
+}
+
+func (s *Server) computeShardID(seriesID []byte, mapIndexName string) (shardID uint, err error) {
+ shardEventVal := s.shardInfo.shardEvent.getShardEvent(mapIndexName)
+ if shardEventVal == nil {
+ return 0, ErrShardEvents
+ }
+ shardNum := shardEventVal.GetShard().GetId()
+ if shardNum < 1 {
+ shardNum = 1
+ }
+ shardID, shardIDError := partition.ShardID(seriesID, uint32(shardNum))
+ if shardIDError != nil {
+ return 0, shardIDError
+ }
+
+ return shardID, nil
+}
+
+func (s *Server) Write(TraceWriteServer v1.TraceService_WriteServer) error {
+ for {
+ writeEntity, err := TraceWriteServer.Recv()
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+ mapIndexName := writeEntity.GetMetadata().GetName() + "-" + writeEntity.GetMetadata().GetGroup()
+ seriesID, err := s.computeSeriesID(writeEntity, mapIndexName)
+ if err != nil {
+ return err
+ }
+ shardID, err := s.computeShardID(seriesID, mapIndexName)
+ if err != nil {
+ return err
+ }
+ mergeData := assemblyWriteData(shardID, writeEntity, convert.BytesToUint64(seriesID))
+ message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), mergeData)
+ _, errWritePub := s.pipeline.Publish(data.TopicWriteEvent, message)
+ if errWritePub != nil {
+ return errWritePub
+ }
+ if errSend := TraceWriteServer.Send(&v1.WriteResponse{}); errSend != nil {
+ return errSend
+ }
+ }
+}
+
+func (s *Server) Query(ctx context.Context, entityCriteria *v1.QueryRequest) (*v1.QueryResponse, error) {
+ log.Println("entityCriteria:", entityCriteria)
+
+ return &v1.QueryResponse{}, nil
+}
+
+func assemblyWriteData(shardID uint, writeEntity *v1.WriteRequest, seriesID uint64) data.TraceWriteDate {
+ return data.TraceWriteDate{ShardID: shardID, SeriesID: seriesID, WriteRequest: writeEntity}
+}
diff --git a/banyand/liaison/grpc/grpc_test.go b/banyand/liaison/grpc/grpc_test.go
new file mode 100644
index 0000000..5aea70f
--- /dev/null
+++ b/banyand/liaison/grpc/grpc_test.go
@@ -0,0 +1,250 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc_test
+
+import (
+ "context"
+ "io"
+ "os"
+ "path"
+ "path/filepath"
+ "runtime"
+ "testing"
+ "time"
+
+ googleUUID "github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ grpclib "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+
+ v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/index"
+ "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+ "github.com/apache/skywalking-banyandb/banyand/query"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/banyand/series/trace"
+ "github.com/apache/skywalking-banyandb/banyand/storage"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pb"
+)
+
+type testData struct {
+ certFile string
+ serverHostOverride string
+ TLS bool
+ addr string
+}
+
+func setup(tester *require.Assertions) (*grpc.Server, *grpc.Server, func()) {
+ tester.NoError(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "warn",
+ }))
+ // Init `Discovery` module
+ repo, err := discovery.NewServiceRepo(context.Background())
+ tester.NoError(err)
+ tester.NotNil(repo)
+ // Init `Queue` module
+ pipeline, err := queue.NewQueue(context.TODO(), repo)
+ tester.NoError(err)
+ // Init `Database` module
+ db, err := storage.NewDB(context.TODO(), repo)
+ tester.NoError(err)
+ uuid, err := googleUUID.NewUUID()
+ tester.NoError(err)
+ rootPath := path.Join(os.TempDir(), "banyandb-"+uuid.String())
+ tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
+ // Init `Index` module
+ indexSvc, err := index.NewService(context.TODO(), repo)
+ tester.NoError(err)
+ // Init `Trace` module
+ traceSvc, err := trace.NewService(context.TODO(), db, repo, indexSvc, pipeline)
+ tester.NoError(err)
+ // Init `Query` module
+ executor, err := query.NewExecutor(context.TODO(), repo, indexSvc, traceSvc, traceSvc)
+ tester.NoError(err)
+ // Init `liaison` module
+ tcp := grpc.NewServer(context.TODO(), pipeline, repo)
+ tester.NoError(tcp.FlagSet().Parse([]string{"--tlsVal=false", "--addr=:17912"}))
+ tcpTLS := grpc.NewServer(context.TODO(), pipeline, repo)
+ tester.NoError(tcpTLS.FlagSet().Parse([]string{"--tlsVal=true", "--addr=:17913"}))
+
+ err = indexSvc.PreRun()
+ tester.NoError(err)
+
+ err = traceSvc.PreRun()
+ tester.NoError(err)
+
+ err = db.PreRun()
+ tester.NoError(err)
+
+ err = executor.PreRun()
+ tester.NoError(err)
+
+ err = tcp.PreRun()
+ tester.NoError(err)
+ tester.NoError(tcpTLS.PreRun())
+
+ go func() {
+ tester.NoError(traceSvc.Serve())
+ }()
+
+ go func() {
+ tester.NoError(tcpTLS.Serve())
+ }()
+
+ go func() {
+ tester.NoError(tcp.Serve())
+ }()
+
+ ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancelFunc()
+
+ tester.True(indexSvc.Ready(ctx, index.MetaExists("default", "sw")))
+
+ return tcp, tcpTLS, func() {
+ db.GracefulStop()
+ _ = os.RemoveAll(rootPath)
+ }
+}
+
+func TestTraceService(t *testing.T) {
+ tester := require.New(t)
+ tcp, tcpTLS, gracefulStop := setup(tester)
+ defer gracefulStop()
+ _, currentFile, _, _ := runtime.Caller(0)
+ basePath := filepath.Dir(currentFile)
+ certFile := filepath.Join(basePath, "data/server_cert.pem")
+ testCases := []struct {
+ name string
+ args testData
+ wantErr bool
+ }{
+ {
+ name: "isTLS",
+ args: testData{
+ TLS: true,
+ certFile: certFile,
+ serverHostOverride: "localhost",
+ addr: "localhost:17913",
+ },
+ },
+ {
+ name: "noTLS",
+ args: testData{
+ TLS: false,
+ addr: "localhost:17912",
+ },
+ },
+ }
+ for _, tc := range testCases {
+ if tc.args.TLS {
+ errValidate := tcpTLS.Validate()
+ assert.NoError(t, errValidate)
+ var opts []grpclib.DialOption
+ creds, err := credentials.NewClientTLSFromFile(tc.args.certFile, tc.args.serverHostOverride)
+ assert.NoError(t, err)
+ opts = append(opts, grpclib.WithTransportCredentials(creds))
+ linkService(t, tc.args.addr, opts)
+ } else {
+ errValidate := tcp.Validate()
+ assert.NoError(t, errValidate)
+ var opts []grpclib.DialOption
+ opts = append(opts, grpclib.WithInsecure())
+ linkService(t, tc.args.addr, opts)
+ }
+ }
+}
+
+func linkService(t *testing.T, addr string, opts []grpclib.DialOption) {
+ conn, err := grpclib.Dial(addr, opts...)
+ assert.NoError(t, err)
+ defer conn.Close()
+ traceWrite(t, conn)
+ traceQuery(t, conn)
+}
+
+func traceWrite(t *testing.T, conn *grpclib.ClientConn) {
+ client := v1.NewTraceServiceClient(conn)
+ ctx := context.Background()
+ entityValue := pb.NewEntityValueBuilder().
+ EntityID("entityId").
+ DataBinary([]byte{12}).
+ Fields("trace_id-xxfff.111323",
+ 0,
+ "webapp_id",
+ "10.0.0.1_id",
+ "/home_id",
+ "webapp",
+ "10.0.0.1",
+ "/home",
+ 300,
+ 1622933202000000000).
+ Timestamp(time.Now()).
+ Build()
+ criteria := pb.NewWriteEntityBuilder().
+ EntityValue(entityValue).
+ Metadata("default", "sw").
+ Build()
+ stream, errorWrite := client.Write(ctx)
+ if errorWrite != nil {
+ t.Errorf("%v.Write(_) = _, %v", client, errorWrite)
+ }
+ waitc := make(chan struct{})
+ go func() {
+ for {
+ writeResponse, errRecv := stream.Recv()
+ if errRecv == io.EOF {
+ // read done.
+ close(waitc)
+ return
+ }
+ assert.NoError(t, errRecv)
+ assert.NotNil(t, writeResponse)
+ }
+ }()
+ if errSend := stream.Send(criteria); errSend != nil {
+ t.Errorf("Failed to send a note: %v", errSend)
+ }
+ if errorSend := stream.CloseSend(); errorSend != nil {
+ t.Errorf("Failed to send a note: %v", errorSend)
+ }
+ <-waitc
+}
+
+func traceQuery(t *testing.T, conn *grpclib.ClientConn) {
+ client := v1.NewTraceServiceClient(conn)
+ ctx := context.Background()
+ sT, eT := time.Now().Add(-3*time.Hour), time.Now()
+ criteria := pb.NewQueryRequestBuilder().
+ Limit(5).
+ Offset(10).
+ OrderBy("service_instance_id", v1.QueryOrder_SORT_DESC).
+ Metadata("default", "trace").
+ Projection("http.method", "service_id", "service_instance_id").
+ Fields("service_id", "=", "my_app", "http.method", "=", "GET").
+ TimeRange(sT, eT).
+ Build()
+ stream, errRev := client.Query(ctx, criteria)
+ if errRev != nil {
+ t.Errorf("Retrieve client failed: %v", errRev)
+ }
+ assert.NotNil(t, stream)
+}
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index dbea2ae..f3da0a2 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -33,6 +33,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/index"
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/series"
"github.com/apache/skywalking-banyandb/banyand/series/trace"
"github.com/apache/skywalking-banyandb/banyand/storage"
@@ -62,6 +63,9 @@ func setupServices(t *testing.T, tester *require.Assertions) (discovery.ServiceR
repo, err := discovery.NewServiceRepo(context.Background())
tester.NoError(err)
tester.NotNil(repo)
+ // Init `Queue` module
+ pipeline, err := queue.NewQueue(context.TODO(), repo)
+ tester.NoError(err)
// Init `Index` module
indexSvc, err := index.NewService(context.TODO(), repo)
@@ -76,7 +80,7 @@ func setupServices(t *testing.T, tester *require.Assertions) (discovery.ServiceR
tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
// Init `Trace` module
- traceSvc, err := trace.NewService(context.TODO(), db, repo, indexSvc)
+ traceSvc, err := trace.NewService(context.TODO(), db, repo, indexSvc, pipeline)
tester.NoError(err)
// Init `Query` module
@@ -84,7 +88,7 @@ func setupServices(t *testing.T, tester *require.Assertions) (discovery.ServiceR
tester.NoError(err)
// Init `Liaison` module
- liaison := grpc.NewServer(context.TODO(), nil, repo)
+ liaison := grpc.NewServer(context.TODO(), pipeline, repo)
// :PreRun:
// 1) TraceSeries,
diff --git a/banyand/series/trace/common_test.go b/banyand/series/trace/common_test.go
index f8b40ae..8d13a7c 100644
--- a/banyand/series/trace/common_test.go
+++ b/banyand/series/trace/common_test.go
@@ -68,7 +68,7 @@ func setup(t *testing.T) (*traceSeries, func()) {
ctrl := gomock.NewController(t)
mockIndex := index.NewMockService(ctrl)
mockIndex.EXPECT().Insert(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
- svc, err := NewService(context.TODO(), db, nil, mockIndex)
+ svc, err := NewService(context.TODO(), db, nil, mockIndex, nil)
assert.NoError(t, err)
assert.NoError(t, svc.PreRun())
assert.NoError(t, db.PreRun())
@@ -257,7 +257,7 @@ func setupTestData(t *testing.T, ts *traceSeries, seriesEntities []seriesEntity)
Timestamp(se.entity.t).
Fields(se.entity.items...).
Build()
- shardID := partition.ShardID(seriesID, 2)
+ shardID, _ := partition.ShardID(seriesID, 2)
got, err := ts.Write(common.SeriesID(convert.Hash(seriesID)), shardID, data.EntityValue{
EntityValue: ev,
})
diff --git a/banyand/series/trace/query.go b/banyand/series/trace/query.go
index 64985f3..770c877 100644
--- a/banyand/series/trace/query.go
+++ b/banyand/series/trace/query.go
@@ -42,7 +42,10 @@ func (t *traceSeries) FetchTrace(traceID string, opt series.ScanOptions) (trace
return trace, ErrInvalidTraceID
}
traceIDBytes := []byte(traceID)
- traceIDShardID := partition.ShardID(traceIDBytes, t.shardNum)
+ traceIDShardID, shardIDError := partition.ShardID(traceIDBytes, t.shardNum)
+ if shardIDError != nil {
+ return trace, shardIDError
+ }
bb, errTraceID := t.reader.TimeSeriesReader(traceIDShardID, traceIndex, 0, 0).GetAll(traceIDBytes)
if errTraceID != nil {
return trace, errTraceID
diff --git a/banyand/series/trace/service.go b/banyand/series/trace/service.go
index 1e77d4b..d646447 100644
--- a/banyand/series/trace/service.go
+++ b/banyand/series/trace/service.go
@@ -23,10 +23,13 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/api/data"
"github.com/apache/skywalking-banyandb/api/event"
v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/index"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/series"
"github.com/apache/skywalking-banyandb/banyand/series/schema"
"github.com/apache/skywalking-banyandb/banyand/storage"
@@ -38,20 +41,24 @@ import (
var _ series.Service = (*service)(nil)
type service struct {
- db storage.Database
- schemaMap map[string]*traceSeries
- l *logger.Logger
- repo discovery.ServiceRepo
- stopCh chan struct{}
- idx index.Service
+ db storage.Database
+ schemaMap map[string]*traceSeries
+ l *logger.Logger
+ repo discovery.ServiceRepo
+ stopCh chan struct{}
+ idx index.Service
+ writeListener *writeCallback
+ pipeline queue.Queue
}
//NewService returns a new service
-func NewService(_ context.Context, db storage.Database, repo discovery.ServiceRepo, idx index.Service) (series.Service, error) {
+func NewService(_ context.Context, db storage.Database, repo discovery.ServiceRepo, idx index.Service, pipeline queue.Queue) (series.Service, error) {
return &service{
- db: db,
- repo: repo,
- idx: idx,
+ db: db,
+ repo: repo,
+ idx: idx,
+ pipeline: pipeline,
+ writeListener: &writeCallback{},
}, nil
}
@@ -65,7 +72,9 @@ func (s *service) PreRun() error {
return err
}
s.schemaMap = make(map[string]*traceSeries, len(schemas))
+ s.writeListener.schemaMap = s.schemaMap
s.l = logger.GetLogger(s.Name())
+ s.writeListener.l = s.l
for _, sa := range schemas {
ts, errTS := newTraceSeries(sa, s.l, s.idx)
if errTS != nil {
@@ -74,6 +83,7 @@ func (s *service) PreRun() error {
s.db.Register(ts)
id := formatTraceSeriesID(ts.name, ts.group)
s.schemaMap[id] = ts
+ s.writeListener.schemaMap[id] = ts
s.l.Info().Str("id", id).Msg("initialize Trace series")
}
return err
@@ -136,6 +146,10 @@ func (s *service) Serve() error {
return errPublishRules
}
}
+ errWrite := s.pipeline.Subscribe(data.TopicWriteEvent, s.writeListener)
+ if errWrite != nil {
+ return errWrite
+ }
s.stopCh = make(chan struct{})
<-s.stopCh
return nil
@@ -146,3 +160,26 @@ func (s *service) GracefulStop() {
close(s.stopCh)
}
}
+
+type writeCallback struct {
+ l *logger.Logger
+ schemaMap map[string]*traceSeries
+}
+
+func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
+ writeEvent, ok := message.Data().(data.TraceWriteDate)
+ if !ok {
+ w.l.Warn().Msg("invalid event data type")
+ return
+ }
+ entityValue := writeEvent.WriteRequest.GetEntity()
+ ts := writeEvent.WriteRequest.GetMetadata()
+ id := formatTraceSeriesID(ts.GetName(), ts.GetGroup())
+ _, err := w.schemaMap[id].Write(common.SeriesID(writeEvent.SeriesID), writeEvent.ShardID, data.EntityValue{
+ EntityValue: entityValue,
+ })
+ if err != nil {
+ w.l.Warn().Err(err)
+ }
+ return
+}
diff --git a/banyand/series/trace/trace.go b/banyand/series/trace/trace.go
index 1c5f639..96af476 100644
--- a/banyand/series/trace/trace.go
+++ b/banyand/series/trace/trace.go
@@ -120,8 +120,10 @@ func (s *service) Write(traceSeriesMetadata common.Metadata, ts time.Time, serie
Build()
seriesIDBytes := []byte(seriesID)
- shardID := partition.ShardID(seriesIDBytes, traceSeries.shardNum)
-
+ shardID, shardIDError := partition.ShardID(seriesIDBytes, traceSeries.shardNum)
+ if shardIDError != nil {
+ return err == nil, shardIDError
+ }
_, err = traceSeries.Write(common.SeriesID(convert.Hash(seriesIDBytes)), shardID, data.EntityValue{
EntityValue: ev,
})
diff --git a/banyand/series/trace/write.go b/banyand/series/trace/write.go
index 115ba15..1b362d1 100644
--- a/banyand/series/trace/write.go
+++ b/banyand/series/trace/write.go
@@ -75,7 +75,10 @@ func (t *traceSeries) Write(seriesID common.SeriesID, shardID uint, entity data.
if err = wp.Writer(shardID, chunkIDMapping).Put(chunkIDBytes, bydb_bytes.Join(stateBytes, seriesIDBytes, wallTsBytes)); err != nil {
return 0, errors.Wrap(err, "failed to write chunkID index")
}
- traceIDShardID := partition.ShardID(traceID, t.shardNum)
+ traceIDShardID, shardIDError := partition.ShardID(traceID, t.shardNum)
+ if shardIDError != nil {
+ return 0, shardIDError
+ }
if err = wp.TimeSeriesWriter(traceIDShardID, traceIndex).
Put(traceID, bydb_bytes.Join(convert.Uint16ToBytes(uint16(shardID)), chunkIDBytes), entityTs); err != nil {
return 0, errors.Wrap(err, "failed to Trace index")
diff --git a/banyand/series/trace/write_test.go b/banyand/series/trace/write_test.go
index 0ca51d8..a671cb2 100644
--- a/banyand/series/trace/write_test.go
+++ b/banyand/series/trace/write_test.go
@@ -180,13 +180,17 @@ func Test_traceSeries_Write(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
seriesID := []byte(tt.args.seriesID)
+ shardID, shardIDError := partition.ShardID(seriesID, 2)
+ if shardIDError != nil {
+ return
+ }
ev := pb.NewEntityValueBuilder().
DataBinary(tt.args.entity.binary).
EntityID(tt.args.entity.id).
Fields(tt.args.entity.items...).
Timestamp(time.Now()).
Build()
- got, err := ts.Write(common.SeriesID(convert.Hash(seriesID)), partition.ShardID(seriesID, 2), data.EntityValue{
+ got, err := ts.Write(common.SeriesID(convert.Hash(seriesID)), shardID, data.EntityValue{
EntityValue: ev,
})
if (err != nil) != tt.wantErr {
diff --git a/pkg/partition/route.go b/pkg/partition/route.go
index 9b4d93c..bcb18af 100644
--- a/pkg/partition/route.go
+++ b/pkg/partition/route.go
@@ -17,9 +17,16 @@
package partition
-import "github.com/apache/skywalking-banyandb/pkg/convert"
+import (
+ "github.com/pkg/errors"
-func ShardID(key []byte, shardNum uint32) uint {
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+)
+
+func ShardID(key []byte, shardNum uint32) (uint, error) {
+ if shardNum < 1 {
+ return 0, errors.New("invalid shardNum")
+ }
encodeKey := convert.Hash(key)
- return uint(encodeKey % uint64(shardNum))
+ return uint(encodeKey % uint64(shardNum)), nil
}
diff --git a/pkg/query/logical/expr.go b/pkg/query/logical/expr.go
index 03898da..caf0e92 100644
--- a/pkg/query/logical/expr.go
+++ b/pkg/query/logical/expr.go
@@ -44,21 +44,21 @@ type FieldRef struct {
// name defines the key of the field
name string
// spec contains the index of the key in the schema, as well as the underlying FieldSpec
- spec *fieldSpec
+ Spec *fieldSpec
}
func (f *FieldRef) Equal(expr Expr) bool {
if other, ok := expr.(*FieldRef); ok {
- return other.name == f.name && other.spec.spec.GetType() == f.spec.spec.GetType()
+ return other.name == f.name && other.Spec.spec.GetType() == f.Spec.spec.GetType()
}
return false
}
func (f *FieldRef) FieldType() apiv1.FieldSpec_FieldType {
- if f.spec == nil {
+ if f.Spec == nil {
panic("should be resolved first")
}
- return f.spec.spec.GetType()
+ return f.Spec.spec.GetType()
}
func (f *FieldRef) Resolve(s Schema) error {
@@ -66,12 +66,12 @@ func (f *FieldRef) Resolve(s Schema) error {
if err != nil {
return err
}
- f.spec = specs[0].spec
+ f.Spec = specs[0].Spec
return nil
}
func (f *FieldRef) String() string {
- return fmt.Sprintf("#%s<%s>", f.name, f.spec.spec.GetType().String())
+ return fmt.Sprintf("#%s<%s>", f.name, f.Spec.spec.GetType().String())
}
func NewFieldRef(fieldName string) *FieldRef {
diff --git a/pkg/query/logical/plan_orderby.go b/pkg/query/logical/plan_orderby.go
index 0c087d4..92cb50d 100644
--- a/pkg/query/logical/plan_orderby.go
+++ b/pkg/query/logical/plan_orderby.go
@@ -77,7 +77,7 @@ func (o *orderBy) Execute(ec executor.ExecutionContext) ([]data.Entity, error) {
return entities, nil
}
- sort.Slice(entities, sortMethod(entities, o.targetRef.spec.idx, o.sort))
+ sort.Slice(entities, sortMethod(entities, o.targetRef.Spec.Idx, o.sort))
return entities, nil
}
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index 00542a5..6f33f3a 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -27,6 +27,7 @@ import (
type Schema interface {
IndexDefined(string) (bool, *apiv1.IndexObject)
+ FieldSubscript(string) (bool, int)
FieldDefined(string) bool
CreateRef(names ...string) ([]*FieldRef, error)
Map(refs ...*FieldRef) Schema
@@ -37,12 +38,12 @@ type Schema interface {
}
type fieldSpec struct {
- idx int
+ Idx int
spec *apiv1.FieldSpec
}
func (fs *fieldSpec) Equal(other *fieldSpec) bool {
- return fs.idx == other.idx && fs.spec.GetType() == other.spec.GetType() && fs.spec.GetName() == other.spec.GetName()
+ return fs.Idx == other.Idx && fs.spec.GetType() == other.spec.GetType() && fs.spec.GetName() == other.spec.GetName()
}
var _ Schema = (*schema)(nil)
@@ -74,6 +75,18 @@ func (s *schema) IndexDefined(field string) (bool, *apiv1.IndexObject) {
return false, nil
}
+func (s *schema) FieldSubscript(field string) (bool, int) {
+ idxRule := s.indexRule.Spec
+ for i, indexObj := range idxRule.GetObjects() {
+ for _, fieldName := range indexObj.GetFields() {
+ if field == fieldName {
+ return true, i
+ }
+ }
+ }
+ return false, -1
+}
+
func (s *schema) Equal(s2 Schema) bool {
if other, ok := s2.(*schema); ok {
return cmp.Equal(other.fieldMap, s.fieldMap)
@@ -83,7 +96,7 @@ func (s *schema) Equal(s2 Schema) bool {
func (s *schema) RegisterField(name string, i int, spec *apiv1.FieldSpec) {
s.fieldMap[name] = &fieldSpec{
- idx: i,
+ Idx: i,
spec: spec,
}
}
@@ -117,7 +130,7 @@ func (s *schema) Map(refs ...*FieldRef) Schema {
fieldMap: make(map[string]*fieldSpec),
}
for _, ref := range refs {
- newS.fieldMap[ref.name] = ref.spec
+ newS.fieldMap[ref.name] = ref.Spec
}
return newS
}