You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2022/01/14 01:10:09 UTC
[GitHub] [skywalking-banyandb] hanahmily commented on a change in pull request #66: Reload stream when metadata changes
hanahmily commented on a change in pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#discussion_r784430384
##########
File path: banyand/stream/service.go
##########
@@ -52,23 +53,29 @@ type Service interface {
var _ Service = (*service)(nil)
type service struct {
- schemaMap map[string]*stream
+ schemaMap sync.Map
writeListener *writeCallback
l *logger.Logger
metadata metadata.Repo
root string
pipeline queue.Queue
repo discovery.ServiceRepo
- stopCh chan struct{}
+ // stop channel for the service
+ stopCh chan struct{}
+ // stop channel for the inner worker
+ stopCh2 chan struct{}
Review comment:
Could you provide a more specific name for this channel?
##########
File path: banyand/stream/service.go
##########
@@ -52,23 +53,29 @@ type Service interface {
var _ Service = (*service)(nil)
type service struct {
- schemaMap map[string]*stream
+ schemaMap sync.Map
writeListener *writeCallback
l *logger.Logger
metadata metadata.Repo
root string
pipeline queue.Queue
repo discovery.ServiceRepo
- stopCh chan struct{}
+ // stop channel for the service
+ stopCh chan struct{}
+ // stop channel for the inner worker
+ stopCh2 chan struct{}
+
+ updateCh chan *databasev1.Stream
+ deleteCh chan *commonv1.Metadata
Review comment:
How about merging them to a single channel? They are the data management operations, conflicting on the concurrent execution.
##########
File path: banyand/stream/service.go
##########
@@ -94,101 +101,266 @@ func (s *service) PreRun() error {
return err
}
- s.schemaMap = make(map[string]*stream, len(schemas))
+ s.schemaMap = sync.Map{}
s.l = logger.GetLogger(s.Name())
for _, sa := range schemas {
- iRules, errIndexRules := s.metadata.IndexRules(context.TODO(), sa.Metadata)
- if errIndexRules != nil {
- return errIndexRules
- }
- sm, errTS := openStream(s.root, streamSpec{
- schema: sa,
- indexRules: iRules,
- }, s.l)
- if errTS != nil {
- return errTS
+ if _, innerErr := s.initStream(sa); innerErr != nil {
+ s.l.Error().Err(innerErr).Msg("fail to initialize stream")
}
- id := formatStreamID(sm.name, sm.group)
- s.schemaMap[id] = sm
- s.l.Info().Str("id", id).Msg("initialize stream")
}
- s.writeListener = setUpWriteCallback(s.l, s.schemaMap)
+ s.writeListener = setUpWriteCallback(s.l, &s.schemaMap)
return err
}
func (s *service) Serve() error {
- t := time.Now()
- now := time.Now().UnixNano()
- nowBp := timestamppb.New(t)
- for _, sMeta := range s.schemaMap {
- locator := make([]*databasev1.EntityEvent_TagLocator, 0, len(sMeta.entityLocator))
- for _, tagLocator := range sMeta.entityLocator {
- locator = append(locator, &databasev1.EntityEvent_TagLocator{
- FamilyOffset: uint32(tagLocator.FamilyOffset),
- TagOffset: uint32(tagLocator.TagOffset),
+ s.schemaMap.Range(func(key, value interface{}) bool {
+ s.l.Debug().Str("streamID", key.(string)).Msg("serve stream")
+ s.serveStream(value.(*stream))
+ return true
+ })
+
+ errWrite := s.pipeline.Subscribe(data.TopicStreamWrite, s.writeListener)
+ if errWrite != nil {
+ return errWrite
+ }
+
+ s.stopCh2 = make(chan struct{})
+ // run a serial reconciler
+ go s.reconcile()
+
+ s.metadata.StreamRegistry().RegisterHandler(schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule, s)
+
+ s.stopCh = make(chan struct{})
+ <-s.stopCh
+
+ return nil
+}
+
+func (s *service) reconcile() {
+ for {
+ select {
+ case streamEntity := <-s.updateCh:
+ s.reloadStream(streamEntity)
+ case metadataEntity := <-s.deleteCh:
+ s.removeStream(metadataEntity)
+ case <-s.stopCh2:
+ return
+ }
+ }
+}
+
+func (s *service) OnAddOrUpdate(m schema.Metadata) {
+ switch m.Kind {
+ case schema.KindStream:
+ s.updateCh <- m.Spec.(*databasev1.Stream)
+ case schema.KindIndexRuleBinding:
+ if m.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM {
+ stm, err := s.metadata.StreamRegistry().GetStream(context.TODO(), &commonv1.Metadata{
+ Name: m.Name,
+ Group: m.Group,
})
+ if err != nil {
+ s.l.Error().Err(err).Msg("fail to get subject")
+ return
+ }
+ s.updateCh <- stm
}
- _, err := s.repo.Publish(event.StreamTopicEntityEvent, bus.NewMessage(bus.MessageID(now), &databasev1.EntityEvent{
- Subject: &commonv1.Metadata{
- Name: sMeta.name,
- Group: sMeta.group,
+ case schema.KindIndexRule:
+ subjects, err := s.metadata.Subjects(context.TODO(), m.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_STREAM)
+ if err != nil {
+ s.l.Error().Err(err).Msg("fail to get subjects(stream)")
+ return
+ }
+ for _, sub := range subjects {
+ s.updateCh <- sub.(*databasev1.Stream)
+ }
+ default:
+ }
+}
+
+func (s *service) OnDelete(m schema.Metadata) {
+ switch m.Kind {
+ case schema.KindStream:
+ s.deleteCh <- &commonv1.Metadata{
+ Name: m.Name,
+ Group: m.Group,
+ }
+ case schema.KindIndexRuleBinding:
+ // TODO: we only have metadata(group and name) here, what shall we remove?
Review comment:
just ignore them
##########
File path: banyand/stream/service.go
##########
@@ -94,101 +101,266 @@ func (s *service) PreRun() error {
return err
}
- s.schemaMap = make(map[string]*stream, len(schemas))
+ s.schemaMap = sync.Map{}
s.l = logger.GetLogger(s.Name())
for _, sa := range schemas {
- iRules, errIndexRules := s.metadata.IndexRules(context.TODO(), sa.Metadata)
- if errIndexRules != nil {
- return errIndexRules
- }
- sm, errTS := openStream(s.root, streamSpec{
- schema: sa,
- indexRules: iRules,
- }, s.l)
- if errTS != nil {
- return errTS
+ if _, innerErr := s.initStream(sa); innerErr != nil {
+ s.l.Error().Err(innerErr).Msg("fail to initialize stream")
}
- id := formatStreamID(sm.name, sm.group)
- s.schemaMap[id] = sm
- s.l.Info().Str("id", id).Msg("initialize stream")
}
- s.writeListener = setUpWriteCallback(s.l, s.schemaMap)
+ s.writeListener = setUpWriteCallback(s.l, &s.schemaMap)
return err
}
func (s *service) Serve() error {
- t := time.Now()
- now := time.Now().UnixNano()
- nowBp := timestamppb.New(t)
- for _, sMeta := range s.schemaMap {
- locator := make([]*databasev1.EntityEvent_TagLocator, 0, len(sMeta.entityLocator))
- for _, tagLocator := range sMeta.entityLocator {
- locator = append(locator, &databasev1.EntityEvent_TagLocator{
- FamilyOffset: uint32(tagLocator.FamilyOffset),
- TagOffset: uint32(tagLocator.TagOffset),
+ s.schemaMap.Range(func(key, value interface{}) bool {
+ s.l.Debug().Str("streamID", key.(string)).Msg("serve stream")
+ s.serveStream(value.(*stream))
+ return true
+ })
+
+ errWrite := s.pipeline.Subscribe(data.TopicStreamWrite, s.writeListener)
+ if errWrite != nil {
+ return errWrite
+ }
+
+ s.stopCh2 = make(chan struct{})
+ // run a serial reconciler
+ go s.reconcile()
+
+ s.metadata.StreamRegistry().RegisterHandler(schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule, s)
+
+ s.stopCh = make(chan struct{})
+ <-s.stopCh
+
+ return nil
+}
+
+func (s *service) reconcile() {
+ for {
+ select {
+ case streamEntity := <-s.updateCh:
+ s.reloadStream(streamEntity)
+ case metadataEntity := <-s.deleteCh:
+ s.removeStream(metadataEntity)
+ case <-s.stopCh2:
+ return
+ }
+ }
+}
+
+func (s *service) OnAddOrUpdate(m schema.Metadata) {
+ switch m.Kind {
+ case schema.KindStream:
+ s.updateCh <- m.Spec.(*databasev1.Stream)
+ case schema.KindIndexRuleBinding:
+ if m.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM {
+ stm, err := s.metadata.StreamRegistry().GetStream(context.TODO(), &commonv1.Metadata{
+ Name: m.Name,
+ Group: m.Group,
})
+ if err != nil {
+ s.l.Error().Err(err).Msg("fail to get subject")
Review comment:
An error handler should be introduced here to sync the metadata. We should ensure all changes should be applied to this module successfully. We want eventually consistency, not never consistency 😄
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org