You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2022/01/14 01:10:09 UTC

[GitHub] [skywalking-banyandb] hanahmily commented on a change in pull request #66: Reload stream when metadata changes

hanahmily commented on a change in pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#discussion_r784430384



##########
File path: banyand/stream/service.go
##########
@@ -52,23 +53,29 @@ type Service interface {
 var _ Service = (*service)(nil)
 
 type service struct {
-	schemaMap     map[string]*stream
+	schemaMap     sync.Map
 	writeListener *writeCallback
 	l             *logger.Logger
 	metadata      metadata.Repo
 	root          string
 	pipeline      queue.Queue
 	repo          discovery.ServiceRepo
-	stopCh        chan struct{}
+	// stop channel for the service
+	stopCh chan struct{}
+	// stop channel for the inner worker
+	stopCh2 chan struct{}

Review comment:
       Could you provide a more specific name for this channel?

##########
File path: banyand/stream/service.go
##########
@@ -52,23 +53,29 @@ type Service interface {
 var _ Service = (*service)(nil)
 
 type service struct {
-	schemaMap     map[string]*stream
+	schemaMap     sync.Map
 	writeListener *writeCallback
 	l             *logger.Logger
 	metadata      metadata.Repo
 	root          string
 	pipeline      queue.Queue
 	repo          discovery.ServiceRepo
-	stopCh        chan struct{}
+	// stop channel for the service
+	stopCh chan struct{}
+	// stop channel for the inner worker
+	stopCh2 chan struct{}
+
+	updateCh chan *databasev1.Stream
+	deleteCh chan *commonv1.Metadata

Review comment:
       How about merging them to a single channel? They are the data management operations, conflicting on the concurrent execution. 
   
   

##########
File path: banyand/stream/service.go
##########
@@ -94,101 +101,266 @@ func (s *service) PreRun() error {
 		return err
 	}
 
-	s.schemaMap = make(map[string]*stream, len(schemas))
+	s.schemaMap = sync.Map{}
 	s.l = logger.GetLogger(s.Name())
 	for _, sa := range schemas {
-		iRules, errIndexRules := s.metadata.IndexRules(context.TODO(), sa.Metadata)
-		if errIndexRules != nil {
-			return errIndexRules
-		}
-		sm, errTS := openStream(s.root, streamSpec{
-			schema:     sa,
-			indexRules: iRules,
-		}, s.l)
-		if errTS != nil {
-			return errTS
+		if _, innerErr := s.initStream(sa); innerErr != nil {
+			s.l.Error().Err(innerErr).Msg("fail to initialize stream")
 		}
-		id := formatStreamID(sm.name, sm.group)
-		s.schemaMap[id] = sm
-		s.l.Info().Str("id", id).Msg("initialize stream")
 	}
-	s.writeListener = setUpWriteCallback(s.l, s.schemaMap)
+	s.writeListener = setUpWriteCallback(s.l, &s.schemaMap)
 	return err
 }
 
 func (s *service) Serve() error {
-	t := time.Now()
-	now := time.Now().UnixNano()
-	nowBp := timestamppb.New(t)
-	for _, sMeta := range s.schemaMap {
-		locator := make([]*databasev1.EntityEvent_TagLocator, 0, len(sMeta.entityLocator))
-		for _, tagLocator := range sMeta.entityLocator {
-			locator = append(locator, &databasev1.EntityEvent_TagLocator{
-				FamilyOffset: uint32(tagLocator.FamilyOffset),
-				TagOffset:    uint32(tagLocator.TagOffset),
+	s.schemaMap.Range(func(key, value interface{}) bool {
+		s.l.Debug().Str("streamID", key.(string)).Msg("serve stream")
+		s.serveStream(value.(*stream))
+		return true
+	})
+
+	errWrite := s.pipeline.Subscribe(data.TopicStreamWrite, s.writeListener)
+	if errWrite != nil {
+		return errWrite
+	}
+
+	s.stopCh2 = make(chan struct{})
+	// run a serial reconciler
+	go s.reconcile()
+
+	s.metadata.StreamRegistry().RegisterHandler(schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule, s)
+
+	s.stopCh = make(chan struct{})
+	<-s.stopCh
+
+	return nil
+}
+
+func (s *service) reconcile() {
+	for {
+		select {
+		case streamEntity := <-s.updateCh:
+			s.reloadStream(streamEntity)
+		case metadataEntity := <-s.deleteCh:
+			s.removeStream(metadataEntity)
+		case <-s.stopCh2:
+			return
+		}
+	}
+}
+
+func (s *service) OnAddOrUpdate(m schema.Metadata) {
+	switch m.Kind {
+	case schema.KindStream:
+		s.updateCh <- m.Spec.(*databasev1.Stream)
+	case schema.KindIndexRuleBinding:
+		if m.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM {
+			stm, err := s.metadata.StreamRegistry().GetStream(context.TODO(), &commonv1.Metadata{
+				Name:  m.Name,
+				Group: m.Group,
 			})
+			if err != nil {
+				s.l.Error().Err(err).Msg("fail to get subject")
+				return
+			}
+			s.updateCh <- stm
 		}
-		_, err := s.repo.Publish(event.StreamTopicEntityEvent, bus.NewMessage(bus.MessageID(now), &databasev1.EntityEvent{
-			Subject: &commonv1.Metadata{
-				Name:  sMeta.name,
-				Group: sMeta.group,
+	case schema.KindIndexRule:
+		subjects, err := s.metadata.Subjects(context.TODO(), m.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_STREAM)
+		if err != nil {
+			s.l.Error().Err(err).Msg("fail to get subjects(stream)")
+			return
+		}
+		for _, sub := range subjects {
+			s.updateCh <- sub.(*databasev1.Stream)
+		}
+	default:
+	}
+}
+
+func (s *service) OnDelete(m schema.Metadata) {
+	switch m.Kind {
+	case schema.KindStream:
+		s.deleteCh <- &commonv1.Metadata{
+			Name:  m.Name,
+			Group: m.Group,
+		}
+	case schema.KindIndexRuleBinding:
+		// TODO: we only have metadata(group and name) here, what shall we remove?

Review comment:
       just ignore them

##########
File path: banyand/stream/service.go
##########
@@ -94,101 +101,266 @@ func (s *service) PreRun() error {
 		return err
 	}
 
-	s.schemaMap = make(map[string]*stream, len(schemas))
+	s.schemaMap = sync.Map{}
 	s.l = logger.GetLogger(s.Name())
 	for _, sa := range schemas {
-		iRules, errIndexRules := s.metadata.IndexRules(context.TODO(), sa.Metadata)
-		if errIndexRules != nil {
-			return errIndexRules
-		}
-		sm, errTS := openStream(s.root, streamSpec{
-			schema:     sa,
-			indexRules: iRules,
-		}, s.l)
-		if errTS != nil {
-			return errTS
+		if _, innerErr := s.initStream(sa); innerErr != nil {
+			s.l.Error().Err(innerErr).Msg("fail to initialize stream")
 		}
-		id := formatStreamID(sm.name, sm.group)
-		s.schemaMap[id] = sm
-		s.l.Info().Str("id", id).Msg("initialize stream")
 	}
-	s.writeListener = setUpWriteCallback(s.l, s.schemaMap)
+	s.writeListener = setUpWriteCallback(s.l, &s.schemaMap)
 	return err
 }
 
 func (s *service) Serve() error {
-	t := time.Now()
-	now := time.Now().UnixNano()
-	nowBp := timestamppb.New(t)
-	for _, sMeta := range s.schemaMap {
-		locator := make([]*databasev1.EntityEvent_TagLocator, 0, len(sMeta.entityLocator))
-		for _, tagLocator := range sMeta.entityLocator {
-			locator = append(locator, &databasev1.EntityEvent_TagLocator{
-				FamilyOffset: uint32(tagLocator.FamilyOffset),
-				TagOffset:    uint32(tagLocator.TagOffset),
+	s.schemaMap.Range(func(key, value interface{}) bool {
+		s.l.Debug().Str("streamID", key.(string)).Msg("serve stream")
+		s.serveStream(value.(*stream))
+		return true
+	})
+
+	errWrite := s.pipeline.Subscribe(data.TopicStreamWrite, s.writeListener)
+	if errWrite != nil {
+		return errWrite
+	}
+
+	s.stopCh2 = make(chan struct{})
+	// run a serial reconciler
+	go s.reconcile()
+
+	s.metadata.StreamRegistry().RegisterHandler(schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule, s)
+
+	s.stopCh = make(chan struct{})
+	<-s.stopCh
+
+	return nil
+}
+
+func (s *service) reconcile() {
+	for {
+		select {
+		case streamEntity := <-s.updateCh:
+			s.reloadStream(streamEntity)
+		case metadataEntity := <-s.deleteCh:
+			s.removeStream(metadataEntity)
+		case <-s.stopCh2:
+			return
+		}
+	}
+}
+
+func (s *service) OnAddOrUpdate(m schema.Metadata) {
+	switch m.Kind {
+	case schema.KindStream:
+		s.updateCh <- m.Spec.(*databasev1.Stream)
+	case schema.KindIndexRuleBinding:
+		if m.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM {
+			stm, err := s.metadata.StreamRegistry().GetStream(context.TODO(), &commonv1.Metadata{
+				Name:  m.Name,
+				Group: m.Group,
 			})
+			if err != nil {
+				s.l.Error().Err(err).Msg("fail to get subject")

Review comment:
       An error handler should be introduced here to sync the metadata. We should ensure all changes should be applied to this module successfully. We want eventually consistency, not never consistency 😄 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org