You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/05/10 13:06:13 UTC

[skywalking-banyandb] 01/01: Add trace API to storage module

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit df6f00ee73e52bab9aaa3e2c1dcd3898ad4bf9f0
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon May 10 20:58:06 2021 +0800

    Add trace API to storage module
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 api/data/trace.go                                |   2 +-
 api/fbs/v1/database.fbs                          |   2 +-
 api/fbs/v1/trace.fbs                             |   2 +
 banyand/index/index.go                           |   2 +-
 banyand/internal/cmd/standalone.go               |   4 +-
 banyand/liaison/grpc/grpc.go                     |   4 +-
 banyand/liaison/liaison.go                       |   2 +-
 banyand/queue/local.go                           |  10 +++
 banyand/queue/queue.go                           |   7 +-
 banyand/storage/database.go                      | 106 ++++++++++++++++++++++-
 api/fbs/v1/trace.fbs => banyand/storage/kv/kv.go |  11 ++-
 banyand/storage/storage.go                       |   5 +-
 12 files changed, 140 insertions(+), 17 deletions(-)

diff --git a/api/data/trace.go b/api/data/trace.go
index a86ffbd..b48ccf2 100644
--- a/api/data/trace.go
+++ b/api/data/trace.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package trace
+package data
 
 import (
 	"github.com/apache/skywalking-banyandb/api/common"
diff --git a/api/fbs/v1/database.fbs b/api/fbs/v1/database.fbs
index d9d98cc..a446e32 100644
--- a/api/fbs/v1/database.fbs
+++ b/api/fbs/v1/database.fbs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-namespace database;
+namespace v1;
 
 table Node {
     id:string;
diff --git a/api/fbs/v1/trace.fbs b/api/fbs/v1/trace.fbs
index eaef6a0..8f31592 100644
--- a/api/fbs/v1/trace.fbs
+++ b/api/fbs/v1/trace.fbs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+namespace v1;
+
 table Trace {
     
 }
\ No newline at end of file
diff --git a/banyand/index/index.go b/banyand/index/index.go
index 717f71e..eb4706a 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -30,6 +30,6 @@ type Builder interface {
 	run.PreRunner
 }
 
-func NewBuilder(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Pipeline) (Builder, error) {
+func NewBuilder(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Builder, error) {
 	return nil, nil
 }
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index 5f3ac30..b9f6e90 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -49,11 +49,11 @@ func newStandaloneCmd() *cobra.Command {
 	if err != nil {
 		l.Fatal("failed to initiate service repository", logger.Error(err))
 	}
-	pipeline, err := queue.NewPipeline(ctx, repo)
+	pipeline, err := queue.NewQueue(ctx, repo)
 	if err != nil {
 		l.Fatal("failed to initiate data pipeline", logger.Error(err))
 	}
-	db, err := storage.NewDB(ctx, repo)
+	db, err := storage.NewDB(ctx, repo, pipeline)
 	if err != nil {
 		l.Fatal("failed to initiate database", logger.Error(err))
 	}
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index f19dce3..cdf7e71 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -34,10 +34,10 @@ type Server struct {
 	addr     string
 	log      *logger.Logger
 	ser      *grpclib.Server
-	pipeline queue.Pipeline
+	pipeline queue.Queue
 }
 
-func NewServer(ctx context.Context, pipeline queue.Pipeline) *Server {
+func NewServer(ctx context.Context, pipeline queue.Queue) *Server {
 	return &Server{pipeline: pipeline}
 }
 
diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go
index e13a83f..117b361 100644
--- a/banyand/liaison/liaison.go
+++ b/banyand/liaison/liaison.go
@@ -30,6 +30,6 @@ type Endpoint interface {
 	run.Service
 }
 
-func NewEndpoint(ctx context.Context, pipeline queue.Pipeline) (Endpoint, error) {
+func NewEndpoint(ctx context.Context, pipeline queue.Queue) (Endpoint, error) {
 	return grpc.NewServer(ctx, pipeline), nil
 }
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 0e99b90..aeabda5 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -44,6 +44,8 @@ type DataPublisher interface {
 
 var _ run.PreRunner = (*Local)(nil)
 var _ run.Config = (*Local)(nil)
+var _ bus.Publisher = (*Local)(nil)
+var _ bus.Subscriber = (*Local)(nil)
 
 type Local struct {
 	logger  *logger.Logger
@@ -54,6 +56,14 @@ type Local struct {
 	repo    discovery.ServiceRepo
 }
 
+func (e *Local) Subscribe(topic bus.Topic, listener bus.MessageListener) error {
+	panic("implement me")
+}
+
+func (e *Local) Publish(topic bus.Topic, message ...bus.Message) error {
+	panic("implement me")
+}
+
 func (e *Local) FlagSet() *run.FlagSet {
 	e.logger = logger.GetLogger(name)
 	fs := run.NewFlagSet("storage")
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 43b0ebc..ff59cee 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -21,14 +21,17 @@ import (
 	"context"
 
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-type Pipeline interface {
+type Queue interface {
 	run.Config
 	run.PreRunner
+	bus.Subscriber
+	bus.Publisher
 }
 
-func NewPipeline(ctx context.Context, repo discovery.ServiceRepo) (Pipeline, error) {
+func NewQueue(ctx context.Context, repo discovery.ServiceRepo) (Queue, error) {
 	return &Local{repo: repo}, nil
 }
diff --git a/banyand/storage/database.go b/banyand/storage/database.go
index 7cba5da..b8fa32b 100644
--- a/banyand/storage/database.go
+++ b/banyand/storage/database.go
@@ -18,16 +18,29 @@
 package storage
 
 import (
+	"fmt"
+	"io/ioutil"
+	"os"
+	"sync"
+
+	"go.uber.org/multierr"
+
+	"github.com/apache/skywalking-banyandb/api/data"
 	"github.com/apache/skywalking-banyandb/api/event"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/banyand/storage/kv"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 var _ Database = (*DB)(nil)
 
 type DB struct {
-	repo discovery.ServiceRepo
+	root   string
+	shards int
+	repo   discovery.ServiceRepo
+	q      queue.Queue
 }
 
 func (d *DB) Name() string {
@@ -35,7 +48,10 @@ func (d *DB) Name() string {
 }
 
 func (d *DB) FlagSet() *run.FlagSet {
-	return nil
+	fs := run.NewFlagSet("storage")
+	fs.StringVar(&d.root, "root-path", "", "the root path of database")
+	fs.IntVar(&d.shards, "shards", 1, "total shards size")
+	return fs
 }
 
 func (d *DB) Validate() error {
@@ -43,5 +59,91 @@ func (d *DB) Validate() error {
 }
 
 func (d *DB) PreRun() error {
+	if err := d.init(); err != nil {
+		return fmt.Errorf("failed to initialize db: %v", err)
+	}
+	if err := d.start(); err != nil {
+		return fmt.Errorf("failed to start db: %v", err)
+	}
 	return d.repo.Publish(bus.Topic(event.ShardEventKindVersion.String()), bus.NewMessage(1, event.NewShard()))
 }
+
+type segment struct {
+	lst []kv.Block
+	sync.Mutex
+}
+
+func (s *segment) AddBlock(b kv.Block) {
+	s.Lock()
+	defer s.Unlock()
+	s.lst = append(s.lst, b)
+}
+
+type shard struct {
+	id  int
+	lst []*segment
+	sync.Mutex
+}
+
+func (s *shard) newSeg() *segment {
+	s.Lock()
+	defer s.Unlock()
+	seg := &segment{}
+	s.lst = append(s.lst, seg)
+	return seg
+}
+
+func (s *shard) init() error {
+	seg := s.newSeg()
+	b, err := kv.NewBlock()
+	if err != nil {
+		return fmt.Errorf("failed to create segment: %v", err)
+	}
+	seg.AddBlock(b)
+	return nil
+}
+
+func (d *DB) init() (err error) {
+	if err = os.MkdirAll(d.root, os.ModeDir); err != nil {
+		return fmt.Errorf("failed to create %s: %v", d.root, err)
+	}
+	var isEmpty bool
+	if isEmpty, err = isEmptyDir(d.root); err != nil {
+		return fmt.Errorf("checking directory contents failed: %v", err)
+	}
+	if !isEmpty {
+		return nil
+	}
+	for i := 0; i < d.shards; i++ {
+		s := newShard(i)
+		err = multierr.Append(err, s.init())
+	}
+	if err != nil {
+		return fmt.Errorf("failed to init shards: %v", err)
+	}
+	return nil
+}
+
+func (d *DB) start() error {
+	return d.q.Subscribe(bus.Topic(data.TraceKindVersion.String()), d)
+}
+
+func (d *DB) Rev(message bus.Message) {
+	_, ok := message.Data().(data.Trace)
+	if !ok {
+		return
+	}
+	//TODO: save data into target shard
+}
+
+func newShard(id int) *shard {
+	return &shard{id: id}
+}
+
+func isEmptyDir(name string) (bool, error) {
+	entries, err := ioutil.ReadDir(name)
+	if err != nil {
+		return false, err
+	}
+	return len(entries) == 0, nil
+}
diff --git a/api/fbs/v1/trace.fbs b/banyand/storage/kv/kv.go
similarity index 90%
copy from api/fbs/v1/trace.fbs
copy to banyand/storage/kv/kv.go
index eaef6a0..c145286 100644
--- a/api/fbs/v1/trace.fbs
+++ b/banyand/storage/kv/kv.go
@@ -15,6 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-table Trace {
-    
-}
\ No newline at end of file
+package kv
+
+type Block interface {
+}
+
+func NewBlock() (Block, error) {
+	return nil, nil
+}
diff --git a/banyand/storage/storage.go b/banyand/storage/storage.go
index c78cde5..feb408c 100644
--- a/banyand/storage/storage.go
+++ b/banyand/storage/storage.go
@@ -21,6 +21,7 @@ import (
 	"context"
 
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -29,6 +30,6 @@ type Database interface {
 	run.PreRunner
 }
 
-func NewDB(ctx context.Context, repo discovery.ServiceRepo) (Database, error) {
-	return &DB{repo: repo}, nil
+func NewDB(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Database, error) {
+	return &DB{repo: repo, q: pipeline}, nil
 }