You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/05/10 13:06:12 UTC

[skywalking-banyandb] branch main updated (0c2eeca -> df6f00e)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.


 discard 0c2eeca  Add trace API to storage module
     new df6f00e  Add trace API to storage module

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (0c2eeca)
            \
             N -- N -- N   refs/heads/main (df6f00e)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 banyand/storage/database.go | 9 +++------
 banyand/storage/kv/kv.go    | 5 +++--
 2 files changed, 6 insertions(+), 8 deletions(-)

[skywalking-banyandb] 01/01: Add trace API to storage module

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit df6f00ee73e52bab9aaa3e2c1dcd3898ad4bf9f0
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon May 10 20:58:06 2021 +0800

    Add trace API to storage module
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 api/data/trace.go                                |   2 +-
 api/fbs/v1/database.fbs                          |   2 +-
 api/fbs/v1/trace.fbs                             |   2 +
 banyand/index/index.go                           |   2 +-
 banyand/internal/cmd/standalone.go               |   4 +-
 banyand/liaison/grpc/grpc.go                     |   4 +-
 banyand/liaison/liaison.go                       |   2 +-
 banyand/queue/local.go                           |  10 +++
 banyand/queue/queue.go                           |   7 +-
 banyand/storage/database.go                      | 106 ++++++++++++++++++++++-
 api/fbs/v1/trace.fbs => banyand/storage/kv/kv.go |  11 ++-
 banyand/storage/storage.go                       |   5 +-
 12 files changed, 140 insertions(+), 17 deletions(-)

diff --git a/api/data/trace.go b/api/data/trace.go
index a86ffbd..b48ccf2 100644
--- a/api/data/trace.go
+++ b/api/data/trace.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package trace
+package data
 
 import (
 	"github.com/apache/skywalking-banyandb/api/common"
diff --git a/api/fbs/v1/database.fbs b/api/fbs/v1/database.fbs
index d9d98cc..a446e32 100644
--- a/api/fbs/v1/database.fbs
+++ b/api/fbs/v1/database.fbs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-namespace database;
+namespace v1;
 
 table Node {
     id:string;
diff --git a/api/fbs/v1/trace.fbs b/api/fbs/v1/trace.fbs
index eaef6a0..8f31592 100644
--- a/api/fbs/v1/trace.fbs
+++ b/api/fbs/v1/trace.fbs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+namespace v1;
+
 table Trace {
     
 }
\ No newline at end of file
diff --git a/banyand/index/index.go b/banyand/index/index.go
index 717f71e..eb4706a 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -30,6 +30,6 @@ type Builder interface {
 	run.PreRunner
 }
 
-func NewBuilder(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Pipeline) (Builder, error) {
+func NewBuilder(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Builder, error) {
 	return nil, nil
 }
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index 5f3ac30..b9f6e90 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -49,11 +49,11 @@ func newStandaloneCmd() *cobra.Command {
 	if err != nil {
 		l.Fatal("failed to initiate service repository", logger.Error(err))
 	}
-	pipeline, err := queue.NewPipeline(ctx, repo)
+	pipeline, err := queue.NewQueue(ctx, repo)
 	if err != nil {
 		l.Fatal("failed to initiate data pipeline", logger.Error(err))
 	}
-	db, err := storage.NewDB(ctx, repo)
+	db, err := storage.NewDB(ctx, repo, pipeline)
 	if err != nil {
 		l.Fatal("failed to initiate database", logger.Error(err))
 	}
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index f19dce3..cdf7e71 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -34,10 +34,10 @@ type Server struct {
 	addr     string
 	log      *logger.Logger
 	ser      *grpclib.Server
-	pipeline queue.Pipeline
+	pipeline queue.Queue
 }
 
-func NewServer(ctx context.Context, pipeline queue.Pipeline) *Server {
+func NewServer(ctx context.Context, pipeline queue.Queue) *Server {
 	return &Server{pipeline: pipeline}
 }
 
diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go
index e13a83f..117b361 100644
--- a/banyand/liaison/liaison.go
+++ b/banyand/liaison/liaison.go
@@ -30,6 +30,6 @@ type Endpoint interface {
 	run.Service
 }
 
-func NewEndpoint(ctx context.Context, pipeline queue.Pipeline) (Endpoint, error) {
+func NewEndpoint(ctx context.Context, pipeline queue.Queue) (Endpoint, error) {
 	return grpc.NewServer(ctx, pipeline), nil
 }
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 0e99b90..aeabda5 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -44,6 +44,8 @@ type DataPublisher interface {
 
 var _ run.PreRunner = (*Local)(nil)
 var _ run.Config = (*Local)(nil)
+var _ bus.Publisher = (*Local)(nil)
+var _ bus.Subscriber = (*Local)(nil)
 
 type Local struct {
 	logger  *logger.Logger
@@ -54,6 +56,14 @@ type Local struct {
 	repo    discovery.ServiceRepo
 }
 
+func (e *Local) Subscribe(topic bus.Topic, listener bus.MessageListener) error {
+	panic("implement me")
+}
+
+func (e *Local) Publish(topic bus.Topic, message ...bus.Message) error {
+	panic("implement me")
+}
+
 func (e *Local) FlagSet() *run.FlagSet {
 	e.logger = logger.GetLogger(name)
 	fs := run.NewFlagSet("storage")
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 43b0ebc..ff59cee 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -21,14 +21,17 @@ import (
 	"context"
 
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-type Pipeline interface {
+type Queue interface {
 	run.Config
 	run.PreRunner
+	bus.Subscriber
+	bus.Publisher
 }
 
-func NewPipeline(ctx context.Context, repo discovery.ServiceRepo) (Pipeline, error) {
+func NewQueue(ctx context.Context, repo discovery.ServiceRepo) (Queue, error) {
 	return &Local{repo: repo}, nil
 }
diff --git a/banyand/storage/database.go b/banyand/storage/database.go
index 7cba5da..b8fa32b 100644
--- a/banyand/storage/database.go
+++ b/banyand/storage/database.go
@@ -18,16 +18,29 @@
 package storage
 
 import (
+	"fmt"
+	"io/ioutil"
+	"os"
+	"sync"
+
+	"go.uber.org/multierr"
+
+	"github.com/apache/skywalking-banyandb/api/data"
 	"github.com/apache/skywalking-banyandb/api/event"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/banyand/storage/kv"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 var _ Database = (*DB)(nil)
 
 type DB struct {
-	repo discovery.ServiceRepo
+	root   string
+	shards int
+	repo   discovery.ServiceRepo
+	q      queue.Queue
 }
 
 func (d *DB) Name() string {
@@ -35,7 +48,10 @@ func (d *DB) Name() string {
 }
 
 func (d *DB) FlagSet() *run.FlagSet {
-	return nil
+	fs := run.NewFlagSet("storage")
+	fs.StringVar(&d.root, "root-path", "", "the root path of database")
+	fs.IntVar(&d.shards, "shards", 1, "total shards size")
+	return fs
 }
 
 func (d *DB) Validate() error {
@@ -43,5 +59,91 @@ func (d *DB) Validate() error {
 }
 
 func (d *DB) PreRun() error {
+	if err := d.init(); err != nil {
+		return fmt.Errorf("failed to initialize db: %v", err)
+	}
+	if err := d.start(); err != nil {
+		return fmt.Errorf("failed to start db: %v", err)
+	}
 	return d.repo.Publish(bus.Topic(event.ShardEventKindVersion.String()), bus.NewMessage(1, event.NewShard()))
 }
+
+type segment struct {
+	lst []kv.Block
+	sync.Mutex
+}
+
+func (s *segment) AddBlock(b kv.Block) {
+	s.Lock()
+	defer s.Unlock()
+	s.lst = append(s.lst, b)
+}
+
+type shard struct {
+	id  int
+	lst []*segment
+	sync.Mutex
+}
+
+func (s *shard) newSeg() *segment {
+	s.Lock()
+	defer s.Unlock()
+	seg := &segment{}
+	s.lst = append(s.lst, seg)
+	return seg
+}
+
+func (s *shard) init() error {
+	seg := s.newSeg()
+	b, err := kv.NewBlock()
+	if err != nil {
+		return fmt.Errorf("failed to create segment: %v", err)
+	}
+	seg.AddBlock(b)
+	return nil
+}
+
+func (d *DB) init() (err error) {
+	if err = os.MkdirAll(d.root, os.ModeDir); err != nil {
+		return fmt.Errorf("failed to create %s: %v", d.root, err)
+	}
+	var isEmpty bool
+	if isEmpty, err = isEmptyDir(d.root); err != nil {
+		return fmt.Errorf("checking directory contents failed: %v", err)
+	}
+	if !isEmpty {
+		return nil
+	}
+	for i := 0; i < d.shards; i++ {
+		s := newShard(i)
+		err = multierr.Append(err, s.init())
+	}
+	if err != nil {
+		return fmt.Errorf("failed to init shards: %v", err)
+	}
+	return nil
+}
+
+func (d *DB) start() error {
+	return d.q.Subscribe(bus.Topic(data.TraceKindVersion.String()), d)
+}
+
+func (d *DB) Rev(message bus.Message) {
+	_, ok := message.Data().(data.Trace)
+	if !ok {
+		return
+	}
+	//TODO: save data into target shard
+}
+
+func newShard(id int) *shard {
+	return &shard{id: id}
+}
+
+func isEmptyDir(name string) (bool, error) {
+	entries, err := ioutil.ReadDir(name)
+	if err != nil {
+		return false, err
+	}
+	return len(entries) == 0, nil
+}
diff --git a/api/fbs/v1/trace.fbs b/banyand/storage/kv/kv.go
similarity index 90%
copy from api/fbs/v1/trace.fbs
copy to banyand/storage/kv/kv.go
index eaef6a0..c145286 100644
--- a/api/fbs/v1/trace.fbs
+++ b/banyand/storage/kv/kv.go
@@ -15,6 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-table Trace {
-    
-}
\ No newline at end of file
+package kv
+
+type Block interface {
+}
+
+func NewBlock() (Block, error) {
+	return nil, nil
+}
diff --git a/banyand/storage/storage.go b/banyand/storage/storage.go
index c78cde5..feb408c 100644
--- a/banyand/storage/storage.go
+++ b/banyand/storage/storage.go
@@ -21,6 +21,7 @@ import (
 	"context"
 
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -29,6 +30,6 @@ type Database interface {
 	run.PreRunner
 }
 
-func NewDB(ctx context.Context, repo discovery.ServiceRepo) (Database, error) {
-	return &DB{repo: repo}, nil
+func NewDB(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Database, error) {
+	return &DB{repo: repo, q: pipeline}, nil
 }