You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/06/03 10:44:18 UTC
[skywalking-banyandb] branch query-dependent-coms created (now
05f20f9)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a change to branch query-dependent-coms
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.
at 05f20f9 Add query module dependent interfaces
This branch includes the following new commits:
new 05f20f9 Add query module dependent interfaces
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[skywalking-banyandb] 01/01: Add query module dependent interfaces
Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch query-dependent-coms
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 05f20f937dfb280acc105e693df1ae0164112135
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Jun 3 18:39:38 2021 +0800
Add query module dependent interfaces
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
api/{fbs/v1/trace.fbs => common/id.go} | 6 +-
api/data/trace.go | 6 +-
api/fbs/v1/trace_generated.go | 41 ----------
banyand/index/index.go | 16 +++-
banyand/internal/bus/bus.go | 118 ++++++++++++++++++++++++---
banyand/internal/bus/bus_test.go | 102 ++++++++++++++++-------
banyand/internal/cmd/standalone.go | 16 ++--
banyand/query/query.go | 7 +-
banyand/queue/local.go | 4 +-
banyand/{query/query.go => series/series.go} | 21 ++++-
10 files changed, 236 insertions(+), 101 deletions(-)
diff --git a/api/fbs/v1/trace.fbs b/api/common/id.go
similarity index 95%
rename from api/fbs/v1/trace.fbs
rename to api/common/id.go
index 8f31592..64d4fbf 100644
--- a/api/fbs/v1/trace.fbs
+++ b/api/common/id.go
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-namespace v1;
+package common
-table Trace {
-
-}
\ No newline at end of file
+type ChunkID uint64
diff --git a/api/data/trace.go b/api/data/trace.go
index b48ccf2..8339577 100644
--- a/api/data/trace.go
+++ b/api/data/trace.go
@@ -26,7 +26,11 @@ var TraceKindVersion = common.KindVersion{Version: "v1", Kind: "data-trace"}
type Trace struct {
common.KindVersion
- PayLoad v1.Trace
+ entities []Entity
+}
+
+type Entity struct {
+ v1.Entity
}
func NewTrace() *Trace {
diff --git a/api/fbs/v1/trace_generated.go b/api/fbs/v1/trace_generated.go
deleted file mode 100644
index df3dd7a..0000000
--- a/api/fbs/v1/trace_generated.go
+++ /dev/null
@@ -1,41 +0,0 @@
-// Code generated by the FlatBuffers compiler. DO NOT EDIT.
-
-package v1
-
-import (
- flatbuffers "github.com/google/flatbuffers/go"
-)
-
-type Trace struct {
- _tab flatbuffers.Table
-}
-
-func GetRootAsTrace(buf []byte, offset flatbuffers.UOffsetT) *Trace {
- n := flatbuffers.GetUOffsetT(buf[offset:])
- x := &Trace{}
- x.Init(buf, n+offset)
- return x
-}
-
-func GetSizePrefixedRootAsTrace(buf []byte, offset flatbuffers.UOffsetT) *Trace {
- n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
- x := &Trace{}
- x.Init(buf, n+offset+flatbuffers.SizeUint32)
- return x
-}
-
-func (rcv *Trace) Init(buf []byte, i flatbuffers.UOffsetT) {
- rcv._tab.Bytes = buf
- rcv._tab.Pos = i
-}
-
-func (rcv *Trace) Table() flatbuffers.Table {
- return rcv._tab
-}
-
-func TraceStart(builder *flatbuffers.Builder) {
- builder.StartObject(0)
-}
-func TraceEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
- return builder.EndObject()
-}
diff --git a/banyand/index/index.go b/banyand/index/index.go
index eb4706a..8db5fd2 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -15,21 +15,35 @@
// specific language governing permissions and limitations
// under the License.
+//go:generate mockgen -destination=./index_mock.go -package=index . Repo
package index
import (
"context"
+ "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/run"
)
+type Condition struct {
+}
+
+type Repo interface {
+ Search(indexName string, startTime, endTime uint64, conditions []Condition) ([]common.ChunkID, error)
+}
+
type Builder interface {
run.Config
run.PreRunner
}
-func NewBuilder(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Builder, error) {
+type Service interface {
+ Repo
+ Builder
+}
+
+func NewService(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Service, error) {
return nil, nil
}
diff --git a/banyand/internal/bus/bus.go b/banyand/internal/bus/bus.go
index 4dac157..8752933 100644
--- a/banyand/internal/bus/bus.go
+++ b/banyand/internal/bus/bus.go
@@ -19,12 +19,19 @@ package bus
import (
"errors"
+ "io"
"sync"
+
+ "go.uber.org/multierr"
)
// Payload represents a simple data
type Payload interface{}
type MessageID uint64
+type Future interface {
+ Get() (Message, error)
+ GetAll() ([]Message, error)
+}
// Message is send on the bus to all subscribed listeners
type Message struct {
@@ -46,7 +53,7 @@ func NewMessage(id MessageID, data interface{}) Message {
//MessageListener is the signature of functions that can handle an EventMessage.
type MessageListener interface {
- Rev(message Message)
+ Rev(message Message) Message
}
type Subscriber interface {
@@ -54,12 +61,30 @@ type Subscriber interface {
}
type Publisher interface {
- Publish(topic Topic, message ...Message) error
+ Publish(topic Topic, message ...Message) (Future, error)
}
-type Channel chan Message
+type Channel chan Event
+
+type ChType int
-type Topic string
+var (
+ ChTypeUnidirectional ChType = 0
+ ChTypeBidirectional ChType = 1
+)
+
+type Topic struct {
+ ID string
+ Type ChType
+}
+
+func UniTopic(ID string) Topic {
+ return Topic{ID: ID, Type: ChTypeUnidirectional}
+}
+
+func BiTopic(ID string) Topic {
+ return Topic{ID: ID, Type: ChTypeBidirectional}
+}
// The Bus allows publish-subscribe-style communication between components
type Bus struct {
@@ -77,31 +102,94 @@ var (
ErrTopicEmpty = errors.New("the topic is empty")
ErrTopicNotExist = errors.New("the topic does not exist")
ErrListenerEmpty = errors.New("the message listener is empty")
+ ErrEmptyFuture = errors.New("can't invoke Get() on an empty future")
)
-func (b *Bus) Publish(topic Topic, message ...Message) error {
- if topic == "" {
- return ErrTopicEmpty
+type emptyFuture struct {
+}
+
+func (e *emptyFuture) Get() (Message, error) {
+ return Message{}, ErrEmptyFuture
+}
+
+func (e *emptyFuture) GetAll() ([]Message, error) {
+ return nil, ErrEmptyFuture
+}
+
+type localFuture struct {
+ retCh chan Message
+ retCount int
+}
+
+func (l *localFuture) Get() (Message, error) {
+ if l.retCount < 1 {
+ return Message{}, io.EOF
+ }
+ m, ok := <-l.retCh
+ if ok {
+ l.retCount = l.retCount - 1
+ return m, nil
+ }
+ return Message{}, io.EOF
+}
+
+func (l *localFuture) GetAll() ([]Message, error) {
+ var globalErr error
+ ret := make([]Message, 0, l.retCount)
+ for {
+ m, err := l.Get()
+ if err == io.EOF {
+ return ret, globalErr
+ }
+ if err != nil {
+ globalErr = multierr.Append(globalErr, err)
+ continue
+ }
+ ret = append(ret, m)
+ }
+}
+
+type Event struct {
+ m Message
+ f Future
+}
+
+func (b *Bus) Publish(topic Topic, message ...Message) (Future, error) {
+ if topic.ID == "" {
+ return nil, ErrTopicEmpty
}
cc, exit := b.topics[topic]
if !exit {
- return ErrTopicNotExist
+ return nil, ErrTopicNotExist
}
b.mutex.RLock()
defer b.mutex.RUnlock()
+ var f Future
+ switch topic.Type {
+ case ChTypeUnidirectional:
+ f = nil
+ case ChTypeBidirectional:
+ f = &localFuture{retCount: len(message), retCh: make(chan Message)}
+ }
for _, each := range cc {
for _, m := range message {
go func(ch Channel, message Message) {
- ch <- message
+ ch <- Event{
+ m: message,
+ f: f,
+ }
}(each, m)
}
}
- return nil
+ if f == nil {
+ return &emptyFuture{}, nil
+ }
+ return f, nil
}
// Subscribe adds an MessageListener to be called when a message of a Topic is posted.
func (b *Bus) Subscribe(topic Topic, listener MessageListener) error {
- if topic == "" {
+ if topic.ID == "" {
return ErrTopicEmpty
}
if listener == nil {
@@ -120,7 +208,13 @@ func (b *Bus) Subscribe(topic Topic, listener MessageListener) error {
for {
c, ok := <-ch
if ok {
- listener.Rev(c)
+ ret := listener.Rev(c.m)
+ if c.f == nil {
+ continue
+ }
+ if lf, ok := c.f.(*localFuture); ok {
+ lf.retCh <- ret
+ }
} else {
break
}
diff --git a/banyand/internal/bus/bus_test.go b/banyand/internal/bus/bus_test.go
index 2c18077..34e9b7a 100644
--- a/banyand/internal/bus/bus_test.go
+++ b/banyand/internal/bus/bus_test.go
@@ -30,11 +30,13 @@ func TestBus_PubAndSub(t *testing.T) {
topic Topic
messageIDS []MessageID
wantErr bool
+ wantRet []MessageID
}
type listener struct {
wantTopic Topic
wantMessages []MessageID
wantErr bool
+ ret []MessageID
}
tests := []struct {
name string
@@ -45,14 +47,16 @@ func TestBus_PubAndSub(t *testing.T) {
name: "golden path",
messages: []message{
{
- topic: Topic("default"),
+ topic: BiTopic("default"),
messageIDS: []MessageID{12, 33},
+ wantRet: []MessageID{22, 43},
},
},
listeners: []listener{
{
- wantTopic: Topic("default"),
+ wantTopic: BiTopic("default"),
wantMessages: []MessageID{12, 33},
+ ret: []MessageID{22, 43},
},
},
},
@@ -60,21 +64,21 @@ func TestBus_PubAndSub(t *testing.T) {
name: "two topics",
messages: []message{
{
- topic: Topic("t1"),
+ topic: UniTopic("t1"),
messageIDS: []MessageID{12, 33},
},
{
- topic: Topic("t2"),
+ topic: UniTopic("t2"),
messageIDS: []MessageID{101, 102},
},
},
listeners: []listener{
{
- wantTopic: Topic("t1"),
+ wantTopic: UniTopic("t1"),
wantMessages: []MessageID{12, 33},
},
{
- wantTopic: Topic("t2"),
+ wantTopic: UniTopic("t2"),
wantMessages: []MessageID{101, 102},
},
},
@@ -83,29 +87,29 @@ func TestBus_PubAndSub(t *testing.T) {
name: "two topics with two listeners",
messages: []message{
{
- topic: Topic("t1"),
+ topic: UniTopic("t1"),
messageIDS: []MessageID{12, 33},
},
{
- topic: Topic("t2"),
+ topic: UniTopic("t2"),
messageIDS: []MessageID{101, 102},
},
},
listeners: []listener{
{
- wantTopic: Topic("t1"),
+ wantTopic: UniTopic("t1"),
wantMessages: []MessageID{12, 33},
},
{
- wantTopic: Topic("t1"),
+ wantTopic: UniTopic("t1"),
wantMessages: []MessageID{12, 33},
},
{
- wantTopic: Topic("t2"),
+ wantTopic: UniTopic("t2"),
wantMessages: []MessageID{101, 102},
},
{
- wantTopic: Topic("t2"),
+ wantTopic: UniTopic("t2"),
wantMessages: []MessageID{101, 102},
},
},
@@ -114,7 +118,7 @@ func TestBus_PubAndSub(t *testing.T) {
name: "publish invalid topic",
messages: []message{
{
- topic: Topic(""),
+ topic: UniTopic(""),
messageIDS: []MessageID{12, 33},
wantErr: true,
},
@@ -124,13 +128,13 @@ func TestBus_PubAndSub(t *testing.T) {
name: "publish empty message",
messages: []message{
{
- topic: Topic("default"),
+ topic: UniTopic("default"),
messageIDS: []MessageID{},
},
},
listeners: []listener{
{
- wantTopic: Topic("default"),
+ wantTopic: UniTopic("default"),
wantMessages: []MessageID{},
},
},
@@ -139,7 +143,7 @@ func TestBus_PubAndSub(t *testing.T) {
name: "subscribe invalid topic",
listeners: []listener{
{
- wantTopic: Topic(""),
+ wantTopic: UniTopic(""),
wantErr: true,
},
},
@@ -151,27 +155,56 @@ func TestBus_PubAndSub(t *testing.T) {
wg := sync.WaitGroup{}
mll := make([]*mockListener, 0)
for _, l := range tt.listeners {
- ml := &mockListener{wg: &wg}
+ var ret chan Message
+ if len(l.ret) > 0 {
+ ret = make(chan Message, len(l.ret))
+ for _, id := range l.ret {
+ ret <- NewMessage(id, nil)
+ }
+ }
+ ml := &mockListener{wg: &wg, ret: ret}
mll = append(mll, ml)
wg.Add(len(l.wantMessages))
if err := e.Subscribe(l.wantTopic, ml); (err != nil) != l.wantErr {
t.Errorf("Subscribe() error = %v, wantErr %v", err, l.wantErr)
}
}
- for _, m := range tt.messages {
- mm := make([]Message, 0)
- for _, id := range m.messageIDS {
- mm = append(mm, NewMessage(id, nil))
- }
- err := e.Publish(m.topic, mm...)
- if (err != nil) != m.wantErr {
- t.Errorf("Publish() error = %v, wantErr %v", err, m.wantErr)
+ go func() {
+ for _, m := range tt.messages {
+ mm := make([]Message, 0)
+ for _, id := range m.messageIDS {
+ mm = append(mm, NewMessage(id, nil))
+ }
+ f, err := e.Publish(m.topic, mm...)
+ if (err != nil) != m.wantErr {
+ t.Errorf("Publish() error = %v, wantErr %v", err, m.wantErr)
+ continue
+ }
+ if f == nil {
+ continue
+ }
+ ret, errRet := f.GetAll()
+ if errRet == ErrEmptyFuture {
+ continue
+ } else if errRet != nil {
+ t.Errorf("Publish()'s return message error = %v", err)
+ }
+ ids := make([]MessageID, 0, len(ret))
+ for i := range ret {
+ ids = append(ids, ret[i].ID())
+ }
+ ids = sortMessage(ids)
+ if !reflect.DeepEqual(ids, m.wantRet) {
+ t.Errorf("Publish()'s return = %v, want %v", ret, m.wantRet)
+ }
+ for i := 0; i < len(ret); i++ {
+ wg.Done()
+ }
}
- }
+ }()
if waitTimeout(&wg, 10*time.Second) {
t.Error("message receiving is time out")
}
- wg.Wait()
for i, l := range tt.listeners {
if len(mll[i].queue) > 0 && len(l.wantMessages) > 0 &&
!reflect.DeepEqual(mll[i].queue, l.wantMessages) {
@@ -202,14 +235,20 @@ type mockListener struct {
queue []MessageID
wg *sync.WaitGroup
closeWg *sync.WaitGroup
+ ret chan Message
}
-func (m *mockListener) Rev(message Message) {
+func (m *mockListener) Rev(message Message) Message {
m.queue = append(m.queue, message.id)
sort.SliceStable(m.queue, func(i, j int) bool {
return uint64(m.queue[i]) < uint64(m.queue[j])
})
+ if m.ret != nil {
+ r := <-m.ret
+ return r
+ }
m.wg.Done()
+ return Message{}
}
func (m *mockListener) Close() error {
@@ -217,3 +256,10 @@ func (m *mockListener) Close() error {
m.closeWg.Done()
return nil
}
+
+func sortMessage(ids []MessageID) []MessageID {
+ sort.SliceStable(ids, func(i, j int) bool {
+ return uint64(ids[i]) < uint64(ids[j])
+ })
+ return ids
+}
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index d4f4291..480e377 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/liaison"
"github.com/apache/skywalking-banyandb/banyand/query"
"github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/banyand/series"
"github.com/apache/skywalking-banyandb/banyand/storage"
"github.com/apache/skywalking-banyandb/pkg/config"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -57,13 +58,17 @@ func newStandaloneCmd() *cobra.Command {
if err != nil {
l.Fatal("failed to initiate database", logger.Error(err))
}
- idxBuilder, err := index.NewBuilder(ctx, repo, pipeline)
+ idx, err := index.NewService(ctx, repo, pipeline)
if err != nil {
l.Fatal("failed to initiate index builder", logger.Error(err))
}
- composer, err := query.NewPlanComposer(ctx, repo)
+ s, err := series.NewService(ctx, db)
if err != nil {
- l.Fatal("failed to initiate execution plan composer", logger.Error(err))
+ l.Fatal("failed to initiate series", logger.Error(err))
+ }
+ q, err := query.NewExecutor(ctx, idx, s)
+ if err != nil {
+ l.Fatal("failed to initiate query executor", logger.Error(err))
}
tcp, err := liaison.NewEndpoint(ctx, pipeline)
if err != nil {
@@ -75,8 +80,9 @@ func newStandaloneCmd() *cobra.Command {
new(signal.Handler),
repo,
db,
- idxBuilder,
- composer,
+ s,
+ idx,
+ q,
tcp,
)
logging := logger.Logging{}
diff --git a/banyand/query/query.go b/banyand/query/query.go
index b9a2091..56d0b0f 100644
--- a/banyand/query/query.go
+++ b/banyand/query/query.go
@@ -20,14 +20,15 @@ package query
import (
"context"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/index"
+ "github.com/apache/skywalking-banyandb/banyand/series"
"github.com/apache/skywalking-banyandb/pkg/run"
)
-type PlanComposer interface {
+type Executor interface {
run.PreRunner
}
-func NewPlanComposer(ctx context.Context, repo discovery.ServiceRepo) (PlanComposer, error) {
+func NewExecutor(ctx context.Context, idx index.Repo, s series.UniModel) (Executor, error) {
return nil, nil
}
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 5a88d0a..e3696b4 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -60,8 +60,8 @@ func (e *Local) Subscribe(topic bus.Topic, listener bus.MessageListener) error {
return nil
}
-func (e *Local) Publish(topic bus.Topic, message ...bus.Message) error {
- return nil
+func (e *Local) Publish(topic bus.Topic, message ...bus.Message) (bus.Future, error) {
+ return nil, nil
}
func (e *Local) FlagSet() *run.FlagSet {
diff --git a/banyand/query/query.go b/banyand/series/series.go
similarity index 61%
copy from banyand/query/query.go
copy to banyand/series/series.go
index b9a2091..de81f84 100644
--- a/banyand/query/query.go
+++ b/banyand/series/series.go
@@ -15,19 +15,32 @@
// specific language governing permissions and limitations
// under the License.
-package query
+//go:generate mockgen -destination=./series_mock.go -package=series . UniModel
+package series
import (
"context"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/api/data"
+ "github.com/apache/skywalking-banyandb/banyand/storage"
"github.com/apache/skywalking-banyandb/pkg/run"
)
-type PlanComposer interface {
+type Trace interface {
+ FetchTrace(traceID string) (data.Trace, error)
+ FetchEntity(chunkIDs []string, fields []string) ([]data.Entity, error)
+ ScanEntity(startTime, endTime uint64, fields []string) ([]data.Entity, error)
+}
+
+type UniModel interface {
+ Trace
+}
+
+type Service interface {
+ UniModel
run.PreRunner
}
-func NewPlanComposer(ctx context.Context, repo discovery.ServiceRepo) (PlanComposer, error) {
+func NewService(ctx context.Context, db storage.Database) (Service, error) {
return nil, nil
}