You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/06/03 10:44:18 UTC

[skywalking-banyandb] branch query-dependent-coms created (now 05f20f9)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch query-dependent-coms
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.


      at 05f20f9  Add query module dependent interfaces

This branch includes the following new commits:

     new 05f20f9  Add query module dependent interfaces

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking-banyandb] 01/01: Add query module dependent interfaces

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch query-dependent-coms
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 05f20f937dfb280acc105e693df1ae0164112135
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Jun 3 18:39:38 2021 +0800

    Add query module dependent interfaces
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 api/{fbs/v1/trace.fbs => common/id.go}       |   6 +-
 api/data/trace.go                            |   6 +-
 api/fbs/v1/trace_generated.go                |  41 ----------
 banyand/index/index.go                       |  16 +++-
 banyand/internal/bus/bus.go                  | 118 ++++++++++++++++++++++++---
 banyand/internal/bus/bus_test.go             | 102 ++++++++++++++++-------
 banyand/internal/cmd/standalone.go           |  16 ++--
 banyand/query/query.go                       |   7 +-
 banyand/queue/local.go                       |   4 +-
 banyand/{query/query.go => series/series.go} |  21 ++++-
 10 files changed, 236 insertions(+), 101 deletions(-)

diff --git a/api/fbs/v1/trace.fbs b/api/common/id.go
similarity index 95%
rename from api/fbs/v1/trace.fbs
rename to api/common/id.go
index 8f31592..64d4fbf 100644
--- a/api/fbs/v1/trace.fbs
+++ b/api/common/id.go
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-namespace v1;
+package common
 
-table Trace {
-    
-}
\ No newline at end of file
+type ChunkID uint64
diff --git a/api/data/trace.go b/api/data/trace.go
index b48ccf2..8339577 100644
--- a/api/data/trace.go
+++ b/api/data/trace.go
@@ -26,7 +26,11 @@ var TraceKindVersion = common.KindVersion{Version: "v1", Kind: "data-trace"}
 
 type Trace struct {
 	common.KindVersion
-	PayLoad v1.Trace
+	entities []Entity
+}
+
+type Entity struct {
+	v1.Entity
 }
 
 func NewTrace() *Trace {
diff --git a/api/fbs/v1/trace_generated.go b/api/fbs/v1/trace_generated.go
deleted file mode 100644
index df3dd7a..0000000
--- a/api/fbs/v1/trace_generated.go
+++ /dev/null
@@ -1,41 +0,0 @@
-// Code generated by the FlatBuffers compiler. DO NOT EDIT.
-
-package v1
-
-import (
-	flatbuffers "github.com/google/flatbuffers/go"
-)
-
-type Trace struct {
-	_tab flatbuffers.Table
-}
-
-func GetRootAsTrace(buf []byte, offset flatbuffers.UOffsetT) *Trace {
-	n := flatbuffers.GetUOffsetT(buf[offset:])
-	x := &Trace{}
-	x.Init(buf, n+offset)
-	return x
-}
-
-func GetSizePrefixedRootAsTrace(buf []byte, offset flatbuffers.UOffsetT) *Trace {
-	n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
-	x := &Trace{}
-	x.Init(buf, n+offset+flatbuffers.SizeUint32)
-	return x
-}
-
-func (rcv *Trace) Init(buf []byte, i flatbuffers.UOffsetT) {
-	rcv._tab.Bytes = buf
-	rcv._tab.Pos = i
-}
-
-func (rcv *Trace) Table() flatbuffers.Table {
-	return rcv._tab
-}
-
-func TraceStart(builder *flatbuffers.Builder) {
-	builder.StartObject(0)
-}
-func TraceEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
-	return builder.EndObject()
-}
diff --git a/banyand/index/index.go b/banyand/index/index.go
index eb4706a..8db5fd2 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -15,21 +15,35 @@
 // specific language governing permissions and limitations
 // under the License.
 
+//go:generate mockgen -destination=./index_mock.go -package=index . Repo
 package index
 
 import (
 	"context"
 
+	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+type Condition struct {
+}
+
+type Repo interface {
+	Search(indexName string, startTime, endTime uint64, conditions []Condition) ([]common.ChunkID, error)
+}
+
 type Builder interface {
 	run.Config
 	run.PreRunner
 }
 
-func NewBuilder(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Builder, error) {
+type Service interface {
+	Repo
+	Builder
+}
+
+func NewService(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Service, error) {
 	return nil, nil
 }
diff --git a/banyand/internal/bus/bus.go b/banyand/internal/bus/bus.go
index 4dac157..8752933 100644
--- a/banyand/internal/bus/bus.go
+++ b/banyand/internal/bus/bus.go
@@ -19,12 +19,19 @@ package bus
 
 import (
 	"errors"
+	"io"
 	"sync"
+
+	"go.uber.org/multierr"
 )
 
 // Payload represents a simple data
 type Payload interface{}
 type MessageID uint64
+type Future interface {
+	Get() (Message, error)
+	GetAll() ([]Message, error)
+}
 
 // Message is send on the bus to all subscribed listeners
 type Message struct {
@@ -46,7 +53,7 @@ func NewMessage(id MessageID, data interface{}) Message {
 
 //MessageListener is the signature of functions that can handle an EventMessage.
 type MessageListener interface {
-	Rev(message Message)
+	Rev(message Message) Message
 }
 
 type Subscriber interface {
@@ -54,12 +61,30 @@ type Subscriber interface {
 }
 
 type Publisher interface {
-	Publish(topic Topic, message ...Message) error
+	Publish(topic Topic, message ...Message) (Future, error)
 }
 
-type Channel chan Message
+type Channel chan Event
+
+type ChType int
 
-type Topic string
+var (
+	ChTypeUnidirectional ChType = 0
+	ChTypeBidirectional  ChType = 1
+)
+
+type Topic struct {
+	ID   string
+	Type ChType
+}
+
+func UniTopic(ID string) Topic {
+	return Topic{ID: ID, Type: ChTypeUnidirectional}
+}
+
+func BiTopic(ID string) Topic {
+	return Topic{ID: ID, Type: ChTypeBidirectional}
+}
 
 // The Bus allows publish-subscribe-style communication between components
 type Bus struct {
@@ -77,31 +102,94 @@ var (
 	ErrTopicEmpty    = errors.New("the topic is empty")
 	ErrTopicNotExist = errors.New("the topic does not exist")
 	ErrListenerEmpty = errors.New("the message listener is empty")
+	ErrEmptyFuture   = errors.New("can't invoke Get() on an empty future")
 )
 
-func (b *Bus) Publish(topic Topic, message ...Message) error {
-	if topic == "" {
-		return ErrTopicEmpty
+type emptyFuture struct {
+}
+
+func (e *emptyFuture) Get() (Message, error) {
+	return Message{}, ErrEmptyFuture
+}
+
+func (e *emptyFuture) GetAll() ([]Message, error) {
+	return nil, ErrEmptyFuture
+}
+
+type localFuture struct {
+	retCh    chan Message
+	retCount int
+}
+
+func (l *localFuture) Get() (Message, error) {
+	if l.retCount < 1 {
+		return Message{}, io.EOF
+	}
+	m, ok := <-l.retCh
+	if ok {
+		l.retCount = l.retCount - 1
+		return m, nil
+	}
+	return Message{}, io.EOF
+}
+
+func (l *localFuture) GetAll() ([]Message, error) {
+	var globalErr error
+	ret := make([]Message, 0, l.retCount)
+	for {
+		m, err := l.Get()
+		if err == io.EOF {
+			return ret, globalErr
+		}
+		if err != nil {
+			globalErr = multierr.Append(globalErr, err)
+			continue
+		}
+		ret = append(ret, m)
+	}
+}
+
+type Event struct {
+	m Message
+	f Future
+}
+
+func (b *Bus) Publish(topic Topic, message ...Message) (Future, error) {
+	if topic.ID == "" {
+		return nil, ErrTopicEmpty
 	}
 	cc, exit := b.topics[topic]
 	if !exit {
-		return ErrTopicNotExist
+		return nil, ErrTopicNotExist
 	}
 	b.mutex.RLock()
 	defer b.mutex.RUnlock()
+	var f Future
+	switch topic.Type {
+	case ChTypeUnidirectional:
+		f = nil
+	case ChTypeBidirectional:
+		f = &localFuture{retCount: len(message), retCh: make(chan Message)}
+	}
 	for _, each := range cc {
 		for _, m := range message {
 			go func(ch Channel, message Message) {
-				ch <- message
+				ch <- Event{
+					m: message,
+					f: f,
+				}
 			}(each, m)
 		}
 	}
-	return nil
+	if f == nil {
+		return &emptyFuture{}, nil
+	}
+	return f, nil
 }
 
 // Subscribe adds an MessageListener to be called when a message of a Topic is posted.
 func (b *Bus) Subscribe(topic Topic, listener MessageListener) error {
-	if topic == "" {
+	if topic.ID == "" {
 		return ErrTopicEmpty
 	}
 	if listener == nil {
@@ -120,7 +208,13 @@ func (b *Bus) Subscribe(topic Topic, listener MessageListener) error {
 		for {
 			c, ok := <-ch
 			if ok {
-				listener.Rev(c)
+				ret := listener.Rev(c.m)
+				if c.f == nil {
+					continue
+				}
+				if lf, ok := c.f.(*localFuture); ok {
+					lf.retCh <- ret
+				}
 			} else {
 				break
 			}
diff --git a/banyand/internal/bus/bus_test.go b/banyand/internal/bus/bus_test.go
index 2c18077..34e9b7a 100644
--- a/banyand/internal/bus/bus_test.go
+++ b/banyand/internal/bus/bus_test.go
@@ -30,11 +30,13 @@ func TestBus_PubAndSub(t *testing.T) {
 		topic      Topic
 		messageIDS []MessageID
 		wantErr    bool
+		wantRet    []MessageID
 	}
 	type listener struct {
 		wantTopic    Topic
 		wantMessages []MessageID
 		wantErr      bool
+		ret          []MessageID
 	}
 	tests := []struct {
 		name      string
@@ -45,14 +47,16 @@ func TestBus_PubAndSub(t *testing.T) {
 			name: "golden path",
 			messages: []message{
 				{
-					topic:      Topic("default"),
+					topic:      BiTopic("default"),
 					messageIDS: []MessageID{12, 33},
+					wantRet:    []MessageID{22, 43},
 				},
 			},
 			listeners: []listener{
 				{
-					wantTopic:    Topic("default"),
+					wantTopic:    BiTopic("default"),
 					wantMessages: []MessageID{12, 33},
+					ret:          []MessageID{22, 43},
 				},
 			},
 		},
@@ -60,21 +64,21 @@ func TestBus_PubAndSub(t *testing.T) {
 			name: "two topics",
 			messages: []message{
 				{
-					topic:      Topic("t1"),
+					topic:      UniTopic("t1"),
 					messageIDS: []MessageID{12, 33},
 				},
 				{
-					topic:      Topic("t2"),
+					topic:      UniTopic("t2"),
 					messageIDS: []MessageID{101, 102},
 				},
 			},
 			listeners: []listener{
 				{
-					wantTopic:    Topic("t1"),
+					wantTopic:    UniTopic("t1"),
 					wantMessages: []MessageID{12, 33},
 				},
 				{
-					wantTopic:    Topic("t2"),
+					wantTopic:    UniTopic("t2"),
 					wantMessages: []MessageID{101, 102},
 				},
 			},
@@ -83,29 +87,29 @@ func TestBus_PubAndSub(t *testing.T) {
 			name: "two topics with two listeners",
 			messages: []message{
 				{
-					topic:      Topic("t1"),
+					topic:      UniTopic("t1"),
 					messageIDS: []MessageID{12, 33},
 				},
 				{
-					topic:      Topic("t2"),
+					topic:      UniTopic("t2"),
 					messageIDS: []MessageID{101, 102},
 				},
 			},
 			listeners: []listener{
 				{
-					wantTopic:    Topic("t1"),
+					wantTopic:    UniTopic("t1"),
 					wantMessages: []MessageID{12, 33},
 				},
 				{
-					wantTopic:    Topic("t1"),
+					wantTopic:    UniTopic("t1"),
 					wantMessages: []MessageID{12, 33},
 				},
 				{
-					wantTopic:    Topic("t2"),
+					wantTopic:    UniTopic("t2"),
 					wantMessages: []MessageID{101, 102},
 				},
 				{
-					wantTopic:    Topic("t2"),
+					wantTopic:    UniTopic("t2"),
 					wantMessages: []MessageID{101, 102},
 				},
 			},
@@ -114,7 +118,7 @@ func TestBus_PubAndSub(t *testing.T) {
 			name: "publish invalid topic",
 			messages: []message{
 				{
-					topic:      Topic(""),
+					topic:      UniTopic(""),
 					messageIDS: []MessageID{12, 33},
 					wantErr:    true,
 				},
@@ -124,13 +128,13 @@ func TestBus_PubAndSub(t *testing.T) {
 			name: "publish empty message",
 			messages: []message{
 				{
-					topic:      Topic("default"),
+					topic:      UniTopic("default"),
 					messageIDS: []MessageID{},
 				},
 			},
 			listeners: []listener{
 				{
-					wantTopic:    Topic("default"),
+					wantTopic:    UniTopic("default"),
 					wantMessages: []MessageID{},
 				},
 			},
@@ -139,7 +143,7 @@ func TestBus_PubAndSub(t *testing.T) {
 			name: "subscribe invalid topic",
 			listeners: []listener{
 				{
-					wantTopic: Topic(""),
+					wantTopic: UniTopic(""),
 					wantErr:   true,
 				},
 			},
@@ -151,27 +155,56 @@ func TestBus_PubAndSub(t *testing.T) {
 			wg := sync.WaitGroup{}
 			mll := make([]*mockListener, 0)
 			for _, l := range tt.listeners {
-				ml := &mockListener{wg: &wg}
+				var ret chan Message
+				if len(l.ret) > 0 {
+					ret = make(chan Message, len(l.ret))
+					for _, id := range l.ret {
+						ret <- NewMessage(id, nil)
+					}
+				}
+				ml := &mockListener{wg: &wg, ret: ret}
 				mll = append(mll, ml)
 				wg.Add(len(l.wantMessages))
 				if err := e.Subscribe(l.wantTopic, ml); (err != nil) != l.wantErr {
 					t.Errorf("Subscribe() error = %v, wantErr %v", err, l.wantErr)
 				}
 			}
-			for _, m := range tt.messages {
-				mm := make([]Message, 0)
-				for _, id := range m.messageIDS {
-					mm = append(mm, NewMessage(id, nil))
-				}
-				err := e.Publish(m.topic, mm...)
-				if (err != nil) != m.wantErr {
-					t.Errorf("Publish() error = %v, wantErr %v", err, m.wantErr)
+			go func() {
+				for _, m := range tt.messages {
+					mm := make([]Message, 0)
+					for _, id := range m.messageIDS {
+						mm = append(mm, NewMessage(id, nil))
+					}
+					f, err := e.Publish(m.topic, mm...)
+					if (err != nil) != m.wantErr {
+						t.Errorf("Publish() error = %v, wantErr %v", err, m.wantErr)
+						continue
+					}
+					if f == nil {
+						continue
+					}
+					ret, errRet := f.GetAll()
+					if errRet == ErrEmptyFuture {
+						continue
+					} else if errRet != nil {
+						t.Errorf("Publish()'s return message error = %v", err)
+					}
+					ids := make([]MessageID, 0, len(ret))
+					for i := range ret {
+						ids = append(ids, ret[i].ID())
+					}
+					ids = sortMessage(ids)
+					if !reflect.DeepEqual(ids, m.wantRet) {
+						t.Errorf("Publish()'s return = %v, want %v", ret, m.wantRet)
+					}
+					for i := 0; i < len(ret); i++ {
+						wg.Done()
+					}
 				}
-			}
+			}()
 			if waitTimeout(&wg, 10*time.Second) {
 				t.Error("message receiving is time out")
 			}
-			wg.Wait()
 			for i, l := range tt.listeners {
 				if len(mll[i].queue) > 0 && len(l.wantMessages) > 0 &&
 					!reflect.DeepEqual(mll[i].queue, l.wantMessages) {
@@ -202,14 +235,20 @@ type mockListener struct {
 	queue   []MessageID
 	wg      *sync.WaitGroup
 	closeWg *sync.WaitGroup
+	ret     chan Message
 }
 
-func (m *mockListener) Rev(message Message) {
+func (m *mockListener) Rev(message Message) Message {
 	m.queue = append(m.queue, message.id)
 	sort.SliceStable(m.queue, func(i, j int) bool {
 		return uint64(m.queue[i]) < uint64(m.queue[j])
 	})
+	if m.ret != nil {
+		r := <-m.ret
+		return r
+	}
 	m.wg.Done()
+	return Message{}
 }
 
 func (m *mockListener) Close() error {
@@ -217,3 +256,10 @@ func (m *mockListener) Close() error {
 	m.closeWg.Done()
 	return nil
 }
+
+func sortMessage(ids []MessageID) []MessageID {
+	sort.SliceStable(ids, func(i, j int) bool {
+		return uint64(ids[i]) < uint64(ids[j])
+	})
+	return ids
+}
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index d4f4291..480e377 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -29,6 +29,7 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/liaison"
 	"github.com/apache/skywalking-banyandb/banyand/query"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/banyand/series"
 	"github.com/apache/skywalking-banyandb/banyand/storage"
 	"github.com/apache/skywalking-banyandb/pkg/config"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -57,13 +58,17 @@ func newStandaloneCmd() *cobra.Command {
 	if err != nil {
 		l.Fatal("failed to initiate database", logger.Error(err))
 	}
-	idxBuilder, err := index.NewBuilder(ctx, repo, pipeline)
+	idx, err := index.NewService(ctx, repo, pipeline)
 	if err != nil {
 		l.Fatal("failed to initiate index builder", logger.Error(err))
 	}
-	composer, err := query.NewPlanComposer(ctx, repo)
+	s, err := series.NewService(ctx, db)
 	if err != nil {
-		l.Fatal("failed to initiate execution plan composer", logger.Error(err))
+		l.Fatal("failed to initiate series", logger.Error(err))
+	}
+	q, err := query.NewExecutor(ctx, idx, s)
+	if err != nil {
+		l.Fatal("failed to initiate query executor", logger.Error(err))
 	}
 	tcp, err := liaison.NewEndpoint(ctx, pipeline)
 	if err != nil {
@@ -75,8 +80,9 @@ func newStandaloneCmd() *cobra.Command {
 		new(signal.Handler),
 		repo,
 		db,
-		idxBuilder,
-		composer,
+		s,
+		idx,
+		q,
 		tcp,
 	)
 	logging := logger.Logging{}
diff --git a/banyand/query/query.go b/banyand/query/query.go
index b9a2091..56d0b0f 100644
--- a/banyand/query/query.go
+++ b/banyand/query/query.go
@@ -20,14 +20,15 @@ package query
 import (
 	"context"
 
-	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/index"
+	"github.com/apache/skywalking-banyandb/banyand/series"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-type PlanComposer interface {
+type Executor interface {
 	run.PreRunner
 }
 
-func NewPlanComposer(ctx context.Context, repo discovery.ServiceRepo) (PlanComposer, error) {
+func NewExecutor(ctx context.Context, idx index.Repo, s series.UniModel) (Executor, error) {
 	return nil, nil
 }
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 5a88d0a..e3696b4 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -60,8 +60,8 @@ func (e *Local) Subscribe(topic bus.Topic, listener bus.MessageListener) error {
 	return nil
 }
 
-func (e *Local) Publish(topic bus.Topic, message ...bus.Message) error {
-	return nil
+func (e *Local) Publish(topic bus.Topic, message ...bus.Message) (bus.Future, error) {
+	return nil, nil
 }
 
 func (e *Local) FlagSet() *run.FlagSet {
diff --git a/banyand/query/query.go b/banyand/series/series.go
similarity index 61%
copy from banyand/query/query.go
copy to banyand/series/series.go
index b9a2091..de81f84 100644
--- a/banyand/query/query.go
+++ b/banyand/series/series.go
@@ -15,19 +15,32 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package query
+//go:generate mockgen -destination=./series_mock.go -package=series . UniModel
+package series
 
 import (
 	"context"
 
-	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/api/data"
+	"github.com/apache/skywalking-banyandb/banyand/storage"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-type PlanComposer interface {
+type Trace interface {
+	FetchTrace(traceID string) (data.Trace, error)
+	FetchEntity(chunkIDs []string, fields []string) ([]data.Entity, error)
+	ScanEntity(startTime, endTime uint64, fields []string) ([]data.Entity, error)
+}
+
+type UniModel interface {
+	Trace
+}
+
+type Service interface {
+	UniModel
 	run.PreRunner
 }
 
-func NewPlanComposer(ctx context.Context, repo discovery.ServiceRepo) (PlanComposer, error) {
+func NewService(ctx context.Context, db storage.Database) (Service, error) {
 	return nil, nil
 }