You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/07/12 03:26:27 UTC
[skywalking-banyandb] 02/02: Add local discovery and shard event
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch series-trace
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit ccb0d430c5222a7dfbc0a90ca694082c631401a8
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Jul 12 09:13:34 2021 +0800
Add local discovery and shard event
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
api/event/discovery.go | 5 +++
api/fbs/v1/database.fbs | 4 +-
api/fbs/v1/database_generated.go | 61 +++++++++++++++++++++++--------
banyand/discovery/discovery.go | 36 ++++++++++++++----
banyand/internal/cmd/standalone.go | 4 +-
banyand/liaison/grpc/grpc.go | 32 ++++++++++++++--
banyand/liaison/liaison.go | 6 ++-
banyand/queue/local.go | 18 ++++-----
banyand/queue/queue.go | 6 +--
banyand/series/series.go | 1 +
banyand/series/trace/common_test.go | 2 +-
banyand/series/trace/trace.go | 41 ++++++++++++++++++++-
{banyand/internal => pkg}/bus/bus.go | 0
{banyand/internal => pkg}/bus/bus_test.go | 0
pkg/convert/hash.go | 4 ++
pkg/fb/database.go | 59 ++++++++++++++++++++++++++++++
16 files changed, 234 insertions(+), 45 deletions(-)
diff --git a/api/event/discovery.go b/api/event/discovery.go
index 6916c65..40360c4 100644
--- a/api/event/discovery.go
+++ b/api/event/discovery.go
@@ -20,6 +20,7 @@ package event
import (
"github.com/apache/skywalking-banyandb/api/common"
v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
)
var (
@@ -27,6 +28,10 @@ var (
Version: "v1",
Kind: "event-shard",
}
+ TopicShardEvent = bus.Topic{
+ ID: ShardEventKindVersion.String(),
+ Type: bus.ChTypeUnidirectional,
+ }
)
type Shard struct {
diff --git a/api/fbs/v1/database.fbs b/api/fbs/v1/database.fbs
index 836553c..d71c1f1 100644
--- a/api/fbs/v1/database.fbs
+++ b/api/fbs/v1/database.fbs
@@ -25,8 +25,10 @@ table Node {
}
table Shard {
- id:int64;
+ id:uint64;
+ series:Metadata;
node:Node;
+ total:uint8;
update_time:int64;
create_time:int64;
}
diff --git a/api/fbs/v1/database_generated.go b/api/fbs/v1/database_generated.go
index 0021dc6..4236dff 100644
--- a/api/fbs/v1/database_generated.go
+++ b/api/fbs/v1/database_generated.go
@@ -145,23 +145,36 @@ func (rcv *Shard) Table() flatbuffers.Table {
return rcv._tab
}
-func (rcv *Shard) Id() int64 {
+func (rcv *Shard) Id() uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
- return rcv._tab.GetInt64(o + rcv._tab.Pos)
+ return rcv._tab.GetUint64(o + rcv._tab.Pos)
}
return 0
}
-func (rcv *Shard) MutateId(n int64) bool {
- return rcv._tab.MutateInt64Slot(4, n)
+func (rcv *Shard) MutateId(n uint64) bool {
+ return rcv._tab.MutateUint64Slot(4, n)
}
-func (rcv *Shard) Node(obj *Node) *Node {
+func (rcv *Shard) Series(obj *Metadata) *Metadata {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
x := rcv._tab.Indirect(o + rcv._tab.Pos)
if obj == nil {
+ obj = new(Metadata)
+ }
+ obj.Init(rcv._tab.Bytes, x)
+ return obj
+ }
+ return nil
+}
+
+func (rcv *Shard) Node(obj *Node) *Node {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ if o != 0 {
+ x := rcv._tab.Indirect(o + rcv._tab.Pos)
+ if obj == nil {
obj = new(Node)
}
obj.Init(rcv._tab.Bytes, x)
@@ -170,8 +183,20 @@ func (rcv *Shard) Node(obj *Node) *Node {
return nil
}
+func (rcv *Shard) Total() byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
+ if o != 0 {
+ return rcv._tab.GetByte(o + rcv._tab.Pos)
+ }
+ return 0
+}
+
+func (rcv *Shard) MutateTotal(n byte) bool {
+ return rcv._tab.MutateByteSlot(10, n)
+}
+
func (rcv *Shard) UpdateTime() int64 {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
if o != 0 {
return rcv._tab.GetInt64(o + rcv._tab.Pos)
}
@@ -179,11 +204,11 @@ func (rcv *Shard) UpdateTime() int64 {
}
func (rcv *Shard) MutateUpdateTime(n int64) bool {
- return rcv._tab.MutateInt64Slot(8, n)
+ return rcv._tab.MutateInt64Slot(12, n)
}
func (rcv *Shard) CreateTime() int64 {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(14))
if o != 0 {
return rcv._tab.GetInt64(o + rcv._tab.Pos)
}
@@ -191,23 +216,29 @@ func (rcv *Shard) CreateTime() int64 {
}
func (rcv *Shard) MutateCreateTime(n int64) bool {
- return rcv._tab.MutateInt64Slot(10, n)
+ return rcv._tab.MutateInt64Slot(14, n)
}
func ShardStart(builder *flatbuffers.Builder) {
- builder.StartObject(4)
+ builder.StartObject(6)
+}
+func ShardAddId(builder *flatbuffers.Builder, id uint64) {
+ builder.PrependUint64Slot(0, id, 0)
}
-func ShardAddId(builder *flatbuffers.Builder, id int64) {
- builder.PrependInt64Slot(0, id, 0)
+func ShardAddSeries(builder *flatbuffers.Builder, series flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(series), 0)
}
func ShardAddNode(builder *flatbuffers.Builder, node flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(node), 0)
+ builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(node), 0)
+}
+func ShardAddTotal(builder *flatbuffers.Builder, total byte) {
+ builder.PrependByteSlot(3, total, 0)
}
func ShardAddUpdateTime(builder *flatbuffers.Builder, updateTime int64) {
- builder.PrependInt64Slot(2, updateTime, 0)
+ builder.PrependInt64Slot(4, updateTime, 0)
}
func ShardAddCreateTime(builder *flatbuffers.Builder, createTime int64) {
- builder.PrependInt64Slot(3, createTime, 0)
+ builder.PrependInt64Slot(5, createTime, 0)
}
func ShardEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
diff --git a/banyand/discovery/discovery.go b/banyand/discovery/discovery.go
index 9e7197c..8856166 100644
--- a/banyand/discovery/discovery.go
+++ b/banyand/discovery/discovery.go
@@ -20,17 +20,39 @@ package discovery
import (
"context"
- "github.com/apache/skywalking-banyandb/banyand/internal/bus"
+ bus2 "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/run"
)
type ServiceRepo interface {
- run.Config
- run.Service
- bus.Subscriber
- bus.Publisher
+ NodeID() string
+ run.Unit
+ bus2.Subscriber
+ bus2.Publisher
}
-func NewServiceRepo(ctx context.Context) (ServiceRepo, error) {
- return nil, nil
+type repo struct {
+ local *bus2.Bus
+}
+
+func (r *repo) NodeID() string {
+ return "local"
+}
+
+func (r *repo) Name() string {
+ return "service-discovery"
+}
+
+func (r *repo) Subscribe(topic bus2.Topic, listener bus2.MessageListener) error {
+ return r.local.Subscribe(topic, listener)
+}
+
+func (r *repo) Publish(topic bus2.Topic, message ...bus2.Message) (bus2.Future, error) {
+ return r.local.Publish(topic, message...)
+}
+
+func NewServiceRepo(_ context.Context) (ServiceRepo, error) {
+ return &repo{
+ local: bus2.NewBus(),
+ }, nil
}
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index 69e069e..2d50601 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -61,7 +61,7 @@ func newStandaloneCmd() *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate index builder")
}
- traceSeries, err := trace.NewService(ctx, db)
+ traceSeries, err := trace.NewService(ctx, db, repo)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate trace series")
}
@@ -69,7 +69,7 @@ func newStandaloneCmd() *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate query executor")
}
- tcp, err := liaison.NewEndpoint(ctx, pipeline)
+ tcp, err := liaison.NewEndpoint(ctx, pipeline, repo)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer")
}
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index 36f658c..deade08 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -25,7 +25,11 @@ import (
grpclib "google.golang.org/grpc"
"google.golang.org/grpc/encoding"
+ "github.com/apache/skywalking-banyandb/api/event"
+ v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -35,10 +39,33 @@ type Server struct {
log *logger.Logger
ser *grpclib.Server
pipeline queue.Queue
+ repo discovery.ServiceRepo
}
-func NewServer(ctx context.Context, pipeline queue.Queue) *Server {
- return &Server{pipeline: pipeline}
+func (s *Server) Rev(message bus.Message) (resp bus.Message) {
+ data, ok := message.Data().([]byte)
+ if !ok {
+ s.log.Warn().Msg("invalid event data type")
+ return
+ }
+ shardEvent := v1.GetRootAsShardEvent(data, 0)
+ s.log.Info().
+ Str("action", shardEvent.Action().String()).
+ Uint64("shardID", shardEvent.Shard(nil).Id()).
+ Msg("received a shard event")
+ return
+}
+
+func (s *Server) PreRun() error {
+ s.log = logger.GetLogger("liaison-grpc")
+ return s.repo.Subscribe(event.TopicShardEvent, s)
+}
+
+func NewServer(ctx context.Context, pipeline queue.Queue, repo discovery.ServiceRepo) *Server {
+ return &Server{
+ pipeline: pipeline,
+ repo: repo,
+ }
}
func (s *Server) Name() string {
@@ -56,7 +83,6 @@ func (s *Server) Validate() error {
}
func (s *Server) Serve() error {
- s.log = logger.GetLogger("grpc")
lis, err := net.Listen("tcp", s.addr)
if err != nil {
s.log.Fatal().Err(err).Msg("Failed to listen")
diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go
index 117b361..8eb9274 100644
--- a/banyand/liaison/liaison.go
+++ b/banyand/liaison/liaison.go
@@ -20,6 +20,7 @@ package liaison
import (
"context"
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/run"
@@ -27,9 +28,10 @@ import (
type Endpoint interface {
run.Config
+ run.PreRunner
run.Service
}
-func NewEndpoint(ctx context.Context, pipeline queue.Queue) (Endpoint, error) {
- return grpc.NewServer(ctx, pipeline), nil
+func NewEndpoint(ctx context.Context, pipeline queue.Queue, repo discovery.ServiceRepo) (Endpoint, error) {
+ return grpc.NewServer(ctx, pipeline, repo), nil
}
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 2a4fe7a..45bad1d 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -21,7 +21,7 @@ import (
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/banyand/discovery"
- "github.com/apache/skywalking-banyandb/banyand/internal/bus"
+ bus2 "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -34,33 +34,33 @@ type Component interface {
type DataSubscriber interface {
Component
- Sub(subscriber bus.Subscriber) error
+ Sub(subscriber bus2.Subscriber) error
}
type DataPublisher interface {
Component
- Pub(publisher bus.Publisher) error
+ Pub(publisher bus2.Publisher) error
}
var _ run.PreRunner = (*Local)(nil)
var _ run.Config = (*Local)(nil)
-var _ bus.Publisher = (*Local)(nil)
-var _ bus.Subscriber = (*Local)(nil)
+var _ bus2.Publisher = (*Local)(nil)
+var _ bus2.Subscriber = (*Local)(nil)
type Local struct {
logger *logger.Logger
test string
- dataBus *bus.Bus
+ dataBus *bus2.Bus
dps []DataPublisher
dss []DataSubscriber
repo discovery.ServiceRepo
}
-func (e *Local) Subscribe(topic bus.Topic, listener bus.MessageListener) error {
+func (e *Local) Subscribe(topic bus2.Topic, listener bus2.MessageListener) error {
return nil
}
-func (e *Local) Publish(topic bus.Topic, message ...bus.Message) (bus.Future, error) {
+func (e *Local) Publish(topic bus2.Topic, message ...bus2.Message) (bus2.Future, error) {
return nil, nil
}
@@ -82,7 +82,7 @@ func (e Local) Name() string {
func (e *Local) PreRun() error {
var err error
- e.dataBus = bus.NewBus()
+ e.dataBus = bus2.NewBus()
for _, dp := range e.dps {
err = multierr.Append(err, dp.Pub(e.dataBus))
}
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index ff59cee..e2cd6af 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -21,15 +21,15 @@ import (
"context"
"github.com/apache/skywalking-banyandb/banyand/discovery"
- "github.com/apache/skywalking-banyandb/banyand/internal/bus"
+ bus2 "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/run"
)
type Queue interface {
run.Config
run.PreRunner
- bus.Subscriber
- bus.Publisher
+ bus2.Subscriber
+ bus2.Publisher
}
func NewQueue(ctx context.Context, repo discovery.ServiceRepo) (Queue, error) {
diff --git a/banyand/series/series.go b/banyand/series/series.go
index e75db05..db280f3 100644
--- a/banyand/series/series.go
+++ b/banyand/series/series.go
@@ -81,4 +81,5 @@ type Service interface {
SchemaRepo
IndexFilter
run.PreRunner
+ run.Service
}
diff --git a/banyand/series/trace/common_test.go b/banyand/series/trace/common_test.go
index 205b15e..7616ebd 100644
--- a/banyand/series/trace/common_test.go
+++ b/banyand/series/trace/common_test.go
@@ -58,7 +58,7 @@ func setup(t *testing.T) (*traceSeries, func()) {
db, err := storage.NewDB(context.TODO(), nil)
assert.NoError(t, err)
assert.NoError(t, db.FlagSet().Parse(nil))
- svc, err := NewService(context.TODO(), db)
+ svc, err := NewService(context.TODO(), db, nil)
assert.NoError(t, err)
assert.NoError(t, svc.PreRun())
assert.NoError(t, db.PreRun())
diff --git a/banyand/series/trace/trace.go b/banyand/series/trace/trace.go
index 3bd6c89..a785c7b 100644
--- a/banyand/series/trace/trace.go
+++ b/banyand/series/trace/trace.go
@@ -22,17 +22,22 @@ import (
"context"
"fmt"
"strconv"
+ "time"
flatbuffers "github.com/google/flatbuffers/go"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
+ "github.com/apache/skywalking-banyandb/api/event"
v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
apischema "github.com/apache/skywalking-banyandb/api/schema"
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/series"
"github.com/apache/skywalking-banyandb/banyand/series/schema"
"github.com/apache/skywalking-banyandb/banyand/storage"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/fb"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
@@ -78,11 +83,16 @@ type service struct {
db storage.Database
schemaMap map[string]*traceSeries
l *logger.Logger
+ repo discovery.ServiceRepo
+ stopCh chan struct{}
}
//NewService returns a new service
-func NewService(_ context.Context, db storage.Database) (series.Service, error) {
- return &service{db: db}, nil
+func NewService(_ context.Context, db storage.Database, repo discovery.ServiceRepo) (series.Service, error) {
+ return &service{
+ db: db,
+ repo: repo,
+ }, nil
}
func (s *service) Name() string {
@@ -109,6 +119,33 @@ func (s *service) PreRun() error {
return err
}
+func (s *service) Serve() error {
+ for _, sMeta := range s.schemaMap {
+ for i := 0; i < int(sMeta.shardNum); i++ {
+ now := time.Now().UnixNano()
+ _, err := s.repo.Publish(event.TopicShardEvent, bus.NewMessage(bus.MessageID(now), fb.BuildShardEvent(
+ s.repo.NodeID(),
+ sMeta.name,
+ sMeta.group,
+ uint(i),
+ sMeta.shardNum,
+ )))
+ if err != nil {
+ return err
+ }
+ }
+ }
+ s.stopCh = make(chan struct{})
+ <-s.stopCh
+ return nil
+}
+
+func (s *service) GracefulStop() {
+ if s.stopCh != nil {
+ close(s.stopCh)
+ }
+}
+
func (s *service) FetchTrace(traceSeries common.Metadata, traceID string, opt series.ScanOptions) (data.Trace, error) {
ts, err := s.getSeries(traceSeries)
if err != nil {
diff --git a/banyand/internal/bus/bus.go b/pkg/bus/bus.go
similarity index 100%
rename from banyand/internal/bus/bus.go
rename to pkg/bus/bus.go
diff --git a/banyand/internal/bus/bus_test.go b/pkg/bus/bus_test.go
similarity index 100%
rename from banyand/internal/bus/bus_test.go
rename to pkg/bus/bus_test.go
diff --git a/pkg/convert/hash.go b/pkg/convert/hash.go
index e7014fe..b08c2a5 100644
--- a/pkg/convert/hash.go
+++ b/pkg/convert/hash.go
@@ -22,3 +22,7 @@ import "github.com/cespare/xxhash"
func Hash(key []byte) uint64 {
return xxhash.Sum64(key)
}
+
+func HashStr(key string) uint64 {
+ return xxhash.Sum64String(key)
+}
diff --git a/pkg/fb/database.go b/pkg/fb/database.go
new file mode 100644
index 0000000..0d18fc0
--- /dev/null
+++ b/pkg/fb/database.go
@@ -0,0 +1,59 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package fb
+
+import (
+ "time"
+
+ flatbuffers "github.com/google/flatbuffers/go"
+
+ v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
+)
+
+func BuildShardEvent(nodeID, seriesName, seriesGroup string, shardID, totalShards uint) []byte {
+ now := time.Now().UnixNano()
+ b := flatbuffers.NewBuilder(0)
+ nID := b.CreateString(nodeID)
+ addr := b.CreateString("localhost")
+ v1.NodeStart(b)
+ v1.NodeAddId(b, nID)
+ v1.NodeAddAddr(b, addr)
+ v1.NodeAddCreateTime(b, now)
+ v1.NodeAddUpdateTime(b, now)
+ node := v1.NodeEnd(b)
+ name := b.CreateString(seriesName)
+ group := b.CreateString(seriesGroup)
+ v1.MetadataStart(b)
+ v1.MetadataAddName(b, name)
+ v1.MetadataAddGroup(b, group)
+ md := v1.MetadataEnd(b)
+ v1.ShardStart(b)
+ v1.ShardAddId(b, uint64(shardID))
+ v1.ShardAddNode(b, node)
+ v1.ShardAddSeries(b, md)
+ v1.ShardAddTotal(b, uint8(totalShards))
+ v1.ShardAddCreateTime(b, now)
+ v1.ShardAddUpdateTime(b, now)
+ shard := v1.ShardEnd(b)
+ v1.ShardEventStart(b)
+ v1.ShardEventAddShard(b, shard)
+ v1.ShardEventAddTime(b, time.Now().UnixNano())
+ v1.ShardEventAddAction(b, v1.ActionPut)
+ b.Finish(v1.ShardEventEnd(b))
+ return b.FinishedBytes()
+}