You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/05/10 13:06:13 UTC
[skywalking-banyandb] 01/01: Add trace API to storage module
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit df6f00ee73e52bab9aaa3e2c1dcd3898ad4bf9f0
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon May 10 20:58:06 2021 +0800
Add trace API to storage module
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
api/data/trace.go | 2 +-
api/fbs/v1/database.fbs | 2 +-
api/fbs/v1/trace.fbs | 2 +
banyand/index/index.go | 2 +-
banyand/internal/cmd/standalone.go | 4 +-
banyand/liaison/grpc/grpc.go | 4 +-
banyand/liaison/liaison.go | 2 +-
banyand/queue/local.go | 10 +++
banyand/queue/queue.go | 7 +-
banyand/storage/database.go | 106 ++++++++++++++++++++++-
api/fbs/v1/trace.fbs => banyand/storage/kv/kv.go | 11 ++-
banyand/storage/storage.go | 5 +-
12 files changed, 140 insertions(+), 17 deletions(-)
diff --git a/api/data/trace.go b/api/data/trace.go
index a86ffbd..b48ccf2 100644
--- a/api/data/trace.go
+++ b/api/data/trace.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package trace
+package data
import (
"github.com/apache/skywalking-banyandb/api/common"
diff --git a/api/fbs/v1/database.fbs b/api/fbs/v1/database.fbs
index d9d98cc..a446e32 100644
--- a/api/fbs/v1/database.fbs
+++ b/api/fbs/v1/database.fbs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-namespace database;
+namespace v1;
table Node {
id:string;
diff --git a/api/fbs/v1/trace.fbs b/api/fbs/v1/trace.fbs
index eaef6a0..8f31592 100644
--- a/api/fbs/v1/trace.fbs
+++ b/api/fbs/v1/trace.fbs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+namespace v1;
+
table Trace {
}
\ No newline at end of file
diff --git a/banyand/index/index.go b/banyand/index/index.go
index 717f71e..eb4706a 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -30,6 +30,6 @@ type Builder interface {
run.PreRunner
}
-func NewBuilder(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Pipeline) (Builder, error) {
+func NewBuilder(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Builder, error) {
return nil, nil
}
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index 5f3ac30..b9f6e90 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -49,11 +49,11 @@ func newStandaloneCmd() *cobra.Command {
if err != nil {
l.Fatal("failed to initiate service repository", logger.Error(err))
}
- pipeline, err := queue.NewPipeline(ctx, repo)
+ pipeline, err := queue.NewQueue(ctx, repo)
if err != nil {
l.Fatal("failed to initiate data pipeline", logger.Error(err))
}
- db, err := storage.NewDB(ctx, repo)
+ db, err := storage.NewDB(ctx, repo, pipeline)
if err != nil {
l.Fatal("failed to initiate database", logger.Error(err))
}
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index f19dce3..cdf7e71 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -34,10 +34,10 @@ type Server struct {
addr string
log *logger.Logger
ser *grpclib.Server
- pipeline queue.Pipeline
+ pipeline queue.Queue
}
-func NewServer(ctx context.Context, pipeline queue.Pipeline) *Server {
+func NewServer(ctx context.Context, pipeline queue.Queue) *Server {
return &Server{pipeline: pipeline}
}
diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go
index e13a83f..117b361 100644
--- a/banyand/liaison/liaison.go
+++ b/banyand/liaison/liaison.go
@@ -30,6 +30,6 @@ type Endpoint interface {
run.Service
}
-func NewEndpoint(ctx context.Context, pipeline queue.Pipeline) (Endpoint, error) {
+func NewEndpoint(ctx context.Context, pipeline queue.Queue) (Endpoint, error) {
return grpc.NewServer(ctx, pipeline), nil
}
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 0e99b90..aeabda5 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -44,6 +44,8 @@ type DataPublisher interface {
var _ run.PreRunner = (*Local)(nil)
var _ run.Config = (*Local)(nil)
+var _ bus.Publisher = (*Local)(nil)
+var _ bus.Subscriber = (*Local)(nil)
type Local struct {
logger *logger.Logger
@@ -54,6 +56,14 @@ type Local struct {
repo discovery.ServiceRepo
}
+func (e *Local) Subscribe(topic bus.Topic, listener bus.MessageListener) error {
+ panic("implement me")
+}
+
+func (e *Local) Publish(topic bus.Topic, message ...bus.Message) error {
+ panic("implement me")
+}
+
func (e *Local) FlagSet() *run.FlagSet {
e.logger = logger.GetLogger(name)
fs := run.NewFlagSet("storage")
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 43b0ebc..ff59cee 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -21,14 +21,17 @@ import (
"context"
"github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/internal/bus"
"github.com/apache/skywalking-banyandb/pkg/run"
)
-type Pipeline interface {
+type Queue interface {
run.Config
run.PreRunner
+ bus.Subscriber
+ bus.Publisher
}
-func NewPipeline(ctx context.Context, repo discovery.ServiceRepo) (Pipeline, error) {
+func NewQueue(ctx context.Context, repo discovery.ServiceRepo) (Queue, error) {
return &Local{repo: repo}, nil
}
diff --git a/banyand/storage/database.go b/banyand/storage/database.go
index 7cba5da..b8fa32b 100644
--- a/banyand/storage/database.go
+++ b/banyand/storage/database.go
@@ -18,16 +18,29 @@
package storage
import (
+ "fmt"
+ "io/ioutil"
+ "os"
+ "sync"
+
+ "go.uber.org/multierr"
+
+ "github.com/apache/skywalking-banyandb/api/data"
"github.com/apache/skywalking-banyandb/api/event"
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/internal/bus"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/banyand/storage/kv"
"github.com/apache/skywalking-banyandb/pkg/run"
)
var _ Database = (*DB)(nil)
type DB struct {
- repo discovery.ServiceRepo
+ root string
+ shards int
+ repo discovery.ServiceRepo
+ q queue.Queue
}
func (d *DB) Name() string {
@@ -35,7 +48,10 @@ func (d *DB) Name() string {
}
func (d *DB) FlagSet() *run.FlagSet {
- return nil
+ fs := run.NewFlagSet("storage")
+ fs.StringVar(&d.root, "root-path", "", "the root path of database")
+ fs.IntVar(&d.shards, "shards", 1, "total shards size")
+ return fs
}
func (d *DB) Validate() error {
@@ -43,5 +59,91 @@ func (d *DB) Validate() error {
}
func (d *DB) PreRun() error {
+ if err := d.init(); err != nil {
+ return fmt.Errorf("failed to initialize db: %v", err)
+ }
+ if err := d.start(); err != nil {
+ return fmt.Errorf("failed to start db: %v", err)
+ }
return d.repo.Publish(bus.Topic(event.ShardEventKindVersion.String()), bus.NewMessage(1, event.NewShard()))
}
+
+type segment struct {
+ lst []kv.Block
+ sync.Mutex
+}
+
+func (s *segment) AddBlock(b kv.Block) {
+ s.Lock()
+ defer s.Unlock()
+ s.lst = append(s.lst, b)
+}
+
+type shard struct {
+ id int
+ lst []*segment
+ sync.Mutex
+}
+
+func (s *shard) newSeg() *segment {
+ s.Lock()
+ defer s.Unlock()
+ seg := &segment{}
+ s.lst = append(s.lst, seg)
+ return seg
+}
+
+func (s *shard) init() error {
+ seg := s.newSeg()
+ b, err := kv.NewBlock()
+ if err != nil {
+ return fmt.Errorf("failed to create segment: %v", err)
+ }
+ seg.AddBlock(b)
+ return nil
+}
+
+func (d *DB) init() (err error) {
+ if err = os.MkdirAll(d.root, os.ModeDir); err != nil {
+ return fmt.Errorf("failed to create %s: %v", d.root, err)
+ }
+ var isEmpty bool
+ if isEmpty, err = isEmptyDir(d.root); err != nil {
+ return fmt.Errorf("checking directory contents failed: %v", err)
+ }
+ if !isEmpty {
+ return nil
+ }
+ for i := 0; i < d.shards; i++ {
+ s := newShard(i)
+ err = multierr.Append(err, s.init())
+ }
+ if err != nil {
+ return fmt.Errorf("failed to init shards: %v", err)
+ }
+ return nil
+}
+
+func (d *DB) start() error {
+ return d.q.Subscribe(bus.Topic(data.TraceKindVersion.String()), d)
+}
+
+func (d *DB) Rev(message bus.Message) {
+ _, ok := message.Data().(data.Trace)
+ if !ok {
+ return
+ }
+ //TODO: save data into target shard
+}
+
+func newShard(id int) *shard {
+ return &shard{id: id}
+}
+
+func isEmptyDir(name string) (bool, error) {
+ entries, err := ioutil.ReadDir(name)
+ if err != nil {
+ return false, err
+ }
+ return len(entries) == 0, nil
+}
diff --git a/api/fbs/v1/trace.fbs b/banyand/storage/kv/kv.go
similarity index 90%
copy from api/fbs/v1/trace.fbs
copy to banyand/storage/kv/kv.go
index eaef6a0..c145286 100644
--- a/api/fbs/v1/trace.fbs
+++ b/banyand/storage/kv/kv.go
@@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-table Trace {
-
-}
\ No newline at end of file
+package kv
+
+type Block interface {
+}
+
+func NewBlock() (Block, error) {
+ return nil, nil
+}
diff --git a/banyand/storage/storage.go b/banyand/storage/storage.go
index c78cde5..feb408c 100644
--- a/banyand/storage/storage.go
+++ b/banyand/storage/storage.go
@@ -21,6 +21,7 @@ import (
"context"
"github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -29,6 +30,6 @@ type Database interface {
run.PreRunner
}
-func NewDB(ctx context.Context, repo discovery.ServiceRepo) (Database, error) {
- return &DB{repo: repo}, nil
+func NewDB(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Database, error) {
+ return &DB{repo: repo, q: pipeline}, nil
}