You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/08/05 11:58:53 UTC

[skywalking-banyandb] branch main updated: Implement local pipeline

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new a2898d7  Implement local pipeline
a2898d7 is described below

commit a2898d7a6243e7feabeb3b7267acc60fc2a64446
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Aug 5 19:55:30 2021 +0800

    Implement local pipeline
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/queue/local.go | 83 +++++++-------------------------------------------
 banyand/queue/queue.go |  7 ++---
 2 files changed, 14 insertions(+), 76 deletions(-)

diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index f79315b..b96d82c 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -18,87 +18,26 @@
 package queue
 
 import (
-	"go.uber.org/multierr"
-
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/pkg/bus"
-	"github.com/apache/skywalking-banyandb/pkg/logger"
-	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-const name = "storage-engine"
-
-type Component interface {
-	ComponentName() string
-}
-
-type DataSubscriber interface {
-	Component
-	Sub(subscriber bus.Subscriber) error
-}
-
-type DataPublisher interface {
-	Component
-	Pub(publisher bus.Publisher) error
-}
-
-var _ run.PreRunner = (*Local)(nil)
-var _ run.Config = (*Local)(nil)
-var _ bus.Publisher = (*Local)(nil)
-var _ bus.Subscriber = (*Local)(nil)
-
-type Local struct {
-	logger  *logger.Logger
-	test    string
-	dataBus *bus.Bus
-	dps     []DataPublisher
-	dss     []DataSubscriber
-	repo    discovery.ServiceRepo
-}
-
-func (e *Local) Subscribe(topic bus.Topic, listener bus.MessageListener) error {
-	return nil
-}
-
-func (e *Local) Publish(topic bus.Topic, message ...bus.Message) (bus.Future, error) {
-	return nil, nil
-}
-
-func (e *Local) FlagSet() *run.FlagSet {
-	e.logger = logger.GetLogger(name)
-	fs := run.NewFlagSet("storage")
-	fs.StringVarP(&e.test, "storage.test", "", "a", "test config")
-	return fs
-}
+var _ bus.Publisher = (*local)(nil)
+var _ bus.Subscriber = (*local)(nil)
 
-func (e *Local) Validate() error {
-	e.logger.Info().Str("val", e.test).Msg("test")
-	return nil
+type local struct {
+	local *bus.Bus
+	repo  discovery.ServiceRepo
 }
 
-func (e Local) Name() string {
-	return name
+func (l *local) Subscribe(topic bus.Topic, listener bus.MessageListener) error {
+	return l.local.Subscribe(topic, listener)
 }
 
-func (e *Local) PreRun() error {
-	var err error
-	e.dataBus = bus.NewBus()
-	for _, dp := range e.dps {
-		err = multierr.Append(err, dp.Pub(e.dataBus))
-	}
-	for _, ds := range e.dss {
-		err = multierr.Append(err, ds.Sub(e.dataBus))
-	}
-	return err
+func (l *local) Publish(topic bus.Topic, message ...bus.Message) (bus.Future, error) {
+	return l.local.Publish(topic, message...)
 }
 
-func (e *Local) Register(component ...Component) {
-	for _, c := range component {
-		if ds, ok := c.(DataSubscriber); ok {
-			e.dss = append(e.dss, ds)
-		}
-		if ps, ok := c.(DataPublisher); ok {
-			e.dps = append(e.dps, ps)
-		}
-	}
+func (l local) Name() string {
+	return "local-pipeline"
 }
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 1e5f861..57c7b37 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -26,12 +26,11 @@ import (
 )
 
 type Queue interface {
-	run.Config
-	run.PreRunner
+	run.Unit
 	bus.Subscriber
 	bus.Publisher
 }
 
-func NewQueue(ctx context.Context, repo discovery.ServiceRepo) (Queue, error) {
-	return &Local{repo: repo}, nil
+func NewQueue(_ context.Context, repo discovery.ServiceRepo) (Queue, error) {
+	return &local{repo: repo}, nil
 }