You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2021/06/03 10:51:14 UTC

[GitHub] [skywalking-banyandb] hanahmily opened a new pull request #7: Add query module dependent interfaces

hanahmily opened a new pull request #7:
URL: https://github.com/apache/skywalking-banyandb/pull/7


   @lujiajing1126 Pls review these changes, which include:
   
    1. Series module interface. In up comping PR, I will add a metadata interface to it.
    2. Index module interface.
    3. Databus system adds a new feat: a `bidirectional` channel. You can leverage it to implement the local queue in the Queue module which is the bridge between liaison and query. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily merged pull request #7: Add query module dependent interfaces

Posted by GitBox <gi...@apache.org>.
hanahmily merged pull request #7:
URL: https://github.com/apache/skywalking-banyandb/pull/7


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #7: Add query module dependent interfaces

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #7:
URL: https://github.com/apache/skywalking-banyandb/pull/7#discussion_r644717841



##########
File path: banyand/internal/bus/bus.go
##########
@@ -77,31 +102,94 @@ var (
 	ErrTopicEmpty    = errors.New("the topic is empty")
 	ErrTopicNotExist = errors.New("the topic does not exist")
 	ErrListenerEmpty = errors.New("the message listener is empty")
+	ErrEmptyFuture   = errors.New("can't invoke Get() on an empty future")
 )
 
-func (b *Bus) Publish(topic Topic, message ...Message) error {
-	if topic == "" {
-		return ErrTopicEmpty
+type emptyFuture struct {
+}
+
+func (e *emptyFuture) Get() (Message, error) {
+	return Message{}, ErrEmptyFuture
+}
+
+func (e *emptyFuture) GetAll() ([]Message, error) {
+	return nil, ErrEmptyFuture
+}
+
+type localFuture struct {
+	retCh    chan Message
+	retCount int
+}
+
+func (l *localFuture) Get() (Message, error) {
+	if l.retCount < 1 {
+		return Message{}, io.EOF
+	}
+	m, ok := <-l.retCh
+	if ok {
+		l.retCount = l.retCount - 1
+		return m, nil
+	}
+	return Message{}, io.EOF
+}
+
+func (l *localFuture) GetAll() ([]Message, error) {
+	var globalErr error
+	ret := make([]Message, 0, l.retCount)
+	for {
+		m, err := l.Get()
+		if err == io.EOF {
+			return ret, globalErr
+		}
+		if err != nil {
+			globalErr = multierr.Append(globalErr, err)
+			continue
+		}
+		ret = append(ret, m)
+	}
+}
+
+type Event struct {
+	m Message
+	f Future
+}
+
+func (b *Bus) Publish(topic Topic, message ...Message) (Future, error) {
+	if topic.ID == "" {
+		return nil, ErrTopicEmpty
 	}
 	cc, exit := b.topics[topic]
 	if !exit {
-		return ErrTopicNotExist
+		return nil, ErrTopicNotExist
 	}
 	b.mutex.RLock()

Review comment:
       `RLock` should be called before actual read `b.topics[topic]`?

##########
File path: banyand/internal/bus/bus.go
##########
@@ -77,31 +102,94 @@ var (
 	ErrTopicEmpty    = errors.New("the topic is empty")
 	ErrTopicNotExist = errors.New("the topic does not exist")
 	ErrListenerEmpty = errors.New("the message listener is empty")
+	ErrEmptyFuture   = errors.New("can't invoke Get() on an empty future")
 )
 
-func (b *Bus) Publish(topic Topic, message ...Message) error {
-	if topic == "" {
-		return ErrTopicEmpty
+type emptyFuture struct {
+}
+
+func (e *emptyFuture) Get() (Message, error) {
+	return Message{}, ErrEmptyFuture
+}
+
+func (e *emptyFuture) GetAll() ([]Message, error) {
+	return nil, ErrEmptyFuture
+}
+
+type localFuture struct {
+	retCh    chan Message
+	retCount int
+}
+
+func (l *localFuture) Get() (Message, error) {
+	if l.retCount < 1 {
+		return Message{}, io.EOF
+	}
+	m, ok := <-l.retCh

Review comment:
       Suppose multiple messages are sent, let's say N, if actually if there are replies less than N are returned, I guess we will stuck here since we never close this channel.
   
   A timeout may be necessary

##########
File path: banyand/internal/bus/bus.go
##########
@@ -77,31 +102,94 @@ var (
 	ErrTopicEmpty    = errors.New("the topic is empty")
 	ErrTopicNotExist = errors.New("the topic does not exist")
 	ErrListenerEmpty = errors.New("the message listener is empty")
+	ErrEmptyFuture   = errors.New("can't invoke Get() on an empty future")
 )
 
-func (b *Bus) Publish(topic Topic, message ...Message) error {
-	if topic == "" {
-		return ErrTopicEmpty
+type emptyFuture struct {
+}
+
+func (e *emptyFuture) Get() (Message, error) {
+	return Message{}, ErrEmptyFuture
+}
+
+func (e *emptyFuture) GetAll() ([]Message, error) {
+	return nil, ErrEmptyFuture
+}
+
+type localFuture struct {
+	retCh    chan Message
+	retCount int
+}
+
+func (l *localFuture) Get() (Message, error) {
+	if l.retCount < 1 {
+		return Message{}, io.EOF
+	}
+	m, ok := <-l.retCh
+	if ok {
+		l.retCount = l.retCount - 1
+		return m, nil
+	}
+	return Message{}, io.EOF
+}
+
+func (l *localFuture) GetAll() ([]Message, error) {
+	var globalErr error
+	ret := make([]Message, 0, l.retCount)
+	for {
+		m, err := l.Get()
+		if err == io.EOF {
+			return ret, globalErr
+		}
+		if err != nil {
+			globalErr = multierr.Append(globalErr, err)
+			continue
+		}
+		ret = append(ret, m)
+	}
+}
+
+type Event struct {
+	m Message
+	f Future
+}
+
+func (b *Bus) Publish(topic Topic, message ...Message) (Future, error) {
+	if topic.ID == "" {
+		return nil, ErrTopicEmpty
 	}
 	cc, exit := b.topics[topic]
 	if !exit {
-		return ErrTopicNotExist
+		return nil, ErrTopicNotExist
 	}
 	b.mutex.RLock()

Review comment:
       Do we actually need `sync.RWMutex`? Or we can use `sync.Map` directly.
   
   It seems we may face frequent read and very few write which would match the scenario of `sync.Map`

##########
File path: banyand/internal/bus/bus.go
##########
@@ -77,31 +102,94 @@ var (
 	ErrTopicEmpty    = errors.New("the topic is empty")
 	ErrTopicNotExist = errors.New("the topic does not exist")
 	ErrListenerEmpty = errors.New("the message listener is empty")
+	ErrEmptyFuture   = errors.New("can't invoke Get() on an empty future")
 )
 
-func (b *Bus) Publish(topic Topic, message ...Message) error {
-	if topic == "" {
-		return ErrTopicEmpty
+type emptyFuture struct {
+}
+
+func (e *emptyFuture) Get() (Message, error) {
+	return Message{}, ErrEmptyFuture
+}
+
+func (e *emptyFuture) GetAll() ([]Message, error) {
+	return nil, ErrEmptyFuture
+}
+
+type localFuture struct {
+	retCh    chan Message
+	retCount int
+}
+
+func (l *localFuture) Get() (Message, error) {
+	if l.retCount < 1 {
+		return Message{}, io.EOF
+	}
+	m, ok := <-l.retCh
+	if ok {
+		l.retCount = l.retCount - 1
+		return m, nil
+	}
+	return Message{}, io.EOF
+}
+
+func (l *localFuture) GetAll() ([]Message, error) {
+	var globalErr error
+	ret := make([]Message, 0, l.retCount)
+	for {
+		m, err := l.Get()
+		if err == io.EOF {
+			return ret, globalErr
+		}
+		if err != nil {
+			globalErr = multierr.Append(globalErr, err)
+			continue
+		}
+		ret = append(ret, m)
+	}
+}
+
+type Event struct {
+	m Message
+	f Future
+}
+
+func (b *Bus) Publish(topic Topic, message ...Message) (Future, error) {
+	if topic.ID == "" {
+		return nil, ErrTopicEmpty
 	}
 	cc, exit := b.topics[topic]
 	if !exit {
-		return ErrTopicNotExist
+		return nil, ErrTopicNotExist
 	}
 	b.mutex.RLock()
 	defer b.mutex.RUnlock()
+	var f Future
+	switch topic.Type {
+	case ChTypeUnidirectional:
+		f = nil
+	case ChTypeBidirectional:
+		f = &localFuture{retCount: len(message), retCh: make(chan Message)}
+	}
 	for _, each := range cc {
 		for _, m := range message {
 			go func(ch Channel, message Message) {
-				ch <- message
+				ch <- Event{
+					m: message,
+					f: f,
+				}
 			}(each, m)
 		}
 	}
-	return nil
+	if f == nil {
+		return &emptyFuture{}, nil
+	}
+	return f, nil
 }
 
 // Subscribe adds an MessageListener to be called when a message of a Topic is posted.
 func (b *Bus) Subscribe(topic Topic, listener MessageListener) error {

Review comment:
       Can we support subscribe in parallel with a parameter?

##########
File path: banyand/internal/bus/bus.go
##########
@@ -46,20 +53,38 @@ func NewMessage(id MessageID, data interface{}) Message {
 
 //MessageListener is the signature of functions that can handle an EventMessage.
 type MessageListener interface {
-	Rev(message Message)
+	Rev(message Message) Message
 }
 
 type Subscriber interface {
 	Subscribe(topic Topic, listener MessageListener) error
 }
 
 type Publisher interface {
-	Publish(topic Topic, message ...Message) error
+	Publish(topic Topic, message ...Message) (Future, error)
 }
 
-type Channel chan Message
+type Channel chan Event

Review comment:
       `Channel` should contain references of `*Event` instead of `Event`?

##########
File path: banyand/internal/bus/bus.go
##########
@@ -46,20 +53,38 @@ func NewMessage(id MessageID, data interface{}) Message {
 
 //MessageListener is the signature of functions that can handle an EventMessage.
 type MessageListener interface {
-	Rev(message Message)
+	Rev(message Message) Message

Review comment:
       `Recv` seems better for me.
   
   Do we need an error as return object? Probably we can trigger re-send in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #7: Add query module dependent interfaces

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #7:
URL: https://github.com/apache/skywalking-banyandb/pull/7#discussion_r644717841



##########
File path: banyand/internal/bus/bus.go
##########
@@ -77,31 +102,94 @@ var (
 	ErrTopicEmpty    = errors.New("the topic is empty")
 	ErrTopicNotExist = errors.New("the topic does not exist")
 	ErrListenerEmpty = errors.New("the message listener is empty")
+	ErrEmptyFuture   = errors.New("can't invoke Get() on an empty future")
 )
 
-func (b *Bus) Publish(topic Topic, message ...Message) error {
-	if topic == "" {
-		return ErrTopicEmpty
+type emptyFuture struct {
+}
+
+func (e *emptyFuture) Get() (Message, error) {
+	return Message{}, ErrEmptyFuture
+}
+
+func (e *emptyFuture) GetAll() ([]Message, error) {
+	return nil, ErrEmptyFuture
+}
+
+type localFuture struct {
+	retCh    chan Message
+	retCount int
+}
+
+func (l *localFuture) Get() (Message, error) {
+	if l.retCount < 1 {
+		return Message{}, io.EOF
+	}
+	m, ok := <-l.retCh
+	if ok {
+		l.retCount = l.retCount - 1
+		return m, nil
+	}
+	return Message{}, io.EOF
+}
+
+func (l *localFuture) GetAll() ([]Message, error) {
+	var globalErr error
+	ret := make([]Message, 0, l.retCount)
+	for {
+		m, err := l.Get()
+		if err == io.EOF {
+			return ret, globalErr
+		}
+		if err != nil {
+			globalErr = multierr.Append(globalErr, err)
+			continue
+		}
+		ret = append(ret, m)
+	}
+}
+
+type Event struct {
+	m Message
+	f Future
+}
+
+func (b *Bus) Publish(topic Topic, message ...Message) (Future, error) {
+	if topic.ID == "" {
+		return nil, ErrTopicEmpty
 	}
 	cc, exit := b.topics[topic]
 	if !exit {
-		return ErrTopicNotExist
+		return nil, ErrTopicNotExist
 	}
 	b.mutex.RLock()

Review comment:
       `RLock` should be called before actual read `b.topics[topic]`?

##########
File path: banyand/internal/bus/bus.go
##########
@@ -77,31 +102,94 @@ var (
 	ErrTopicEmpty    = errors.New("the topic is empty")
 	ErrTopicNotExist = errors.New("the topic does not exist")
 	ErrListenerEmpty = errors.New("the message listener is empty")
+	ErrEmptyFuture   = errors.New("can't invoke Get() on an empty future")
 )
 
-func (b *Bus) Publish(topic Topic, message ...Message) error {
-	if topic == "" {
-		return ErrTopicEmpty
+type emptyFuture struct {
+}
+
+func (e *emptyFuture) Get() (Message, error) {
+	return Message{}, ErrEmptyFuture
+}
+
+func (e *emptyFuture) GetAll() ([]Message, error) {
+	return nil, ErrEmptyFuture
+}
+
+type localFuture struct {
+	retCh    chan Message
+	retCount int
+}
+
+func (l *localFuture) Get() (Message, error) {
+	if l.retCount < 1 {
+		return Message{}, io.EOF
+	}
+	m, ok := <-l.retCh

Review comment:
       Suppose multiple messages are sent, let's say N, if actually if there are replies less than N are returned, I guess we will stuck here since we never close this channel.
   
   A timeout may be necessary

##########
File path: banyand/internal/bus/bus.go
##########
@@ -77,31 +102,94 @@ var (
 	ErrTopicEmpty    = errors.New("the topic is empty")
 	ErrTopicNotExist = errors.New("the topic does not exist")
 	ErrListenerEmpty = errors.New("the message listener is empty")
+	ErrEmptyFuture   = errors.New("can't invoke Get() on an empty future")
 )
 
-func (b *Bus) Publish(topic Topic, message ...Message) error {
-	if topic == "" {
-		return ErrTopicEmpty
+type emptyFuture struct {
+}
+
+func (e *emptyFuture) Get() (Message, error) {
+	return Message{}, ErrEmptyFuture
+}
+
+func (e *emptyFuture) GetAll() ([]Message, error) {
+	return nil, ErrEmptyFuture
+}
+
+type localFuture struct {
+	retCh    chan Message
+	retCount int
+}
+
+func (l *localFuture) Get() (Message, error) {
+	if l.retCount < 1 {
+		return Message{}, io.EOF
+	}
+	m, ok := <-l.retCh
+	if ok {
+		l.retCount = l.retCount - 1
+		return m, nil
+	}
+	return Message{}, io.EOF
+}
+
+func (l *localFuture) GetAll() ([]Message, error) {
+	var globalErr error
+	ret := make([]Message, 0, l.retCount)
+	for {
+		m, err := l.Get()
+		if err == io.EOF {
+			return ret, globalErr
+		}
+		if err != nil {
+			globalErr = multierr.Append(globalErr, err)
+			continue
+		}
+		ret = append(ret, m)
+	}
+}
+
+type Event struct {
+	m Message
+	f Future
+}
+
+func (b *Bus) Publish(topic Topic, message ...Message) (Future, error) {
+	if topic.ID == "" {
+		return nil, ErrTopicEmpty
 	}
 	cc, exit := b.topics[topic]
 	if !exit {
-		return ErrTopicNotExist
+		return nil, ErrTopicNotExist
 	}
 	b.mutex.RLock()

Review comment:
       Do we actually need `sync.RWMutex`? Or we can use `sync.Map` directly.
   
   It seems we may face frequent read and very few write which would match the scenario of `sync.Map`

##########
File path: banyand/internal/bus/bus.go
##########
@@ -77,31 +102,94 @@ var (
 	ErrTopicEmpty    = errors.New("the topic is empty")
 	ErrTopicNotExist = errors.New("the topic does not exist")
 	ErrListenerEmpty = errors.New("the message listener is empty")
+	ErrEmptyFuture   = errors.New("can't invoke Get() on an empty future")
 )
 
-func (b *Bus) Publish(topic Topic, message ...Message) error {
-	if topic == "" {
-		return ErrTopicEmpty
+type emptyFuture struct {
+}
+
+func (e *emptyFuture) Get() (Message, error) {
+	return Message{}, ErrEmptyFuture
+}
+
+func (e *emptyFuture) GetAll() ([]Message, error) {
+	return nil, ErrEmptyFuture
+}
+
+type localFuture struct {
+	retCh    chan Message
+	retCount int
+}
+
+func (l *localFuture) Get() (Message, error) {
+	if l.retCount < 1 {
+		return Message{}, io.EOF
+	}
+	m, ok := <-l.retCh
+	if ok {
+		l.retCount = l.retCount - 1
+		return m, nil
+	}
+	return Message{}, io.EOF
+}
+
+func (l *localFuture) GetAll() ([]Message, error) {
+	var globalErr error
+	ret := make([]Message, 0, l.retCount)
+	for {
+		m, err := l.Get()
+		if err == io.EOF {
+			return ret, globalErr
+		}
+		if err != nil {
+			globalErr = multierr.Append(globalErr, err)
+			continue
+		}
+		ret = append(ret, m)
+	}
+}
+
+type Event struct {
+	m Message
+	f Future
+}
+
+func (b *Bus) Publish(topic Topic, message ...Message) (Future, error) {
+	if topic.ID == "" {
+		return nil, ErrTopicEmpty
 	}
 	cc, exit := b.topics[topic]
 	if !exit {
-		return ErrTopicNotExist
+		return nil, ErrTopicNotExist
 	}
 	b.mutex.RLock()
 	defer b.mutex.RUnlock()
+	var f Future
+	switch topic.Type {
+	case ChTypeUnidirectional:
+		f = nil
+	case ChTypeBidirectional:
+		f = &localFuture{retCount: len(message), retCh: make(chan Message)}
+	}
 	for _, each := range cc {
 		for _, m := range message {
 			go func(ch Channel, message Message) {
-				ch <- message
+				ch <- Event{
+					m: message,
+					f: f,
+				}
 			}(each, m)
 		}
 	}
-	return nil
+	if f == nil {
+		return &emptyFuture{}, nil
+	}
+	return f, nil
 }
 
 // Subscribe adds an MessageListener to be called when a message of a Topic is posted.
 func (b *Bus) Subscribe(topic Topic, listener MessageListener) error {

Review comment:
       Can we support subscribe in parallel with a parameter?

##########
File path: banyand/internal/bus/bus.go
##########
@@ -46,20 +53,38 @@ func NewMessage(id MessageID, data interface{}) Message {
 
 //MessageListener is the signature of functions that can handle an EventMessage.
 type MessageListener interface {
-	Rev(message Message)
+	Rev(message Message) Message
 }
 
 type Subscriber interface {
 	Subscribe(topic Topic, listener MessageListener) error
 }
 
 type Publisher interface {
-	Publish(topic Topic, message ...Message) error
+	Publish(topic Topic, message ...Message) (Future, error)
 }
 
-type Channel chan Message
+type Channel chan Event

Review comment:
       `Channel` should contain references of `*Event` instead of `Event`?

##########
File path: banyand/internal/bus/bus.go
##########
@@ -46,20 +53,38 @@ func NewMessage(id MessageID, data interface{}) Message {
 
 //MessageListener is the signature of functions that can handle an EventMessage.
 type MessageListener interface {
-	Rev(message Message)
+	Rev(message Message) Message

Review comment:
       `Recv` seems better for me.
   
   Do we need an error as return object? Probably we can trigger re-send in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily merged pull request #7: Add query module dependent interfaces

Posted by GitBox <gi...@apache.org>.
hanahmily merged pull request #7:
URL: https://github.com/apache/skywalking-banyandb/pull/7


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org