You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2022/01/13 11:05:17 UTC

[GitHub] [skywalking-banyandb] lujiajing1126 opened a new pull request #66: Reload stream when metadata changes

lujiajing1126 opened a new pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66


   This PR supersedes the previous PR #65 to allow metadata reload while it put logic mostly in the stream module instead of pursuing a strong-consistent metadata in the previous PR.
   
   As a result, the stream model starts a serially-running background job to continuously reconcile the opened stream (with underlying storage).
   
   Please have a review with the new design @hanahmily
   
   More test cases will be added later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 edited a comment on pull request #66: Reload stream when metadata changes

Posted by GitBox <gi...@apache.org>.
lujiajing1126 edited a comment on pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#issuecomment-1013618832


   > What I focus on is the process to handle failures of syncing, which is absent in the current stage. The module which is receiving events should ensure it could handle any event successfully. Otherwise, the conflicting metadata must bring us a series of troubles.
   
   Currently, I just skip the processing of the given entity since the failure may probably caused by other modifications, and will be probably reaching consistent in the next round of syncing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily commented on pull request #66: Reload stream when metadata changes

Posted by GitBox <gi...@apache.org>.
hanahmily commented on pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#issuecomment-1012638673


   
   > More test cases will be added later. I suppose `Eventually` method is necessary for these kinds of tests.
   
   @lujiajing1126 
   https://github.com/apache/skywalking-banyandb/pull/67 will introduce the `Eventually`. and I also noticed that `AfterEach` has a capacity to clean up the testing when facing a `panic`. Is the `flow` you introduced still necessary for testing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 edited a comment on pull request #66: Reload stream when metadata changes

Posted by GitBox <gi...@apache.org>.
lujiajing1126 edited a comment on pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#issuecomment-1014664247


   > > > What I focus on is the process to handle failures of syncing, which is absent in the current stage. The module which is receiving events should ensure it could handle any event successfully. Otherwise, the conflicting metadata must bring us a series of troubles.
   > > 
   > > 
   > > Currently, I just skip the processing of the given entity since the failure may probably caused by other modifications, and will be probably reaching consistent in the next round of syncing.
   > 
   > FWIK, the changing triggers the metadata syncing. As I mentioned, the metadata update operation rarely happens, which will lead to a continuing inconsistency. How about introducing a periodic syncing strategy as a complementary solution?
   
   A periodic syncing is added with the help of etcd's [`mod_revision`](https://etcd.io/docs/v3.5/learning/data_model/#logical-view).
   
   The basic logic is first list all schemas, then
   
   1. Serve the new schema if we never met it,
   2. Compare `mod_revision` on the stream, reload if we observe a larger one,
   3. Compare the length of the index rules, reload if we find difference,
   4. Compare the max observed `mod_revision` for index rules, reload if we observe a larger one


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #66: Reload stream when metadata changes

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#discussion_r786362600



##########
File path: banyand/stream/service_test.go
##########
@@ -0,0 +1,302 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/golang/mock/gomock"
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
+
+	"github.com/apache/skywalking-banyandb/api/event"
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/metadata"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/run"
+	"github.com/apache/skywalking-banyandb/pkg/test"
+	teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+)
+
+var (
+	_ gomock.Matcher = (*shardEventMatcher)(nil)
+	_ gomock.Matcher = (*entityEventMatcher)(nil)
+)
+
+type shardEventMatcher struct {
+	action databasev1.Action
+}
+
+func (s *shardEventMatcher) Matches(x interface{}) bool {
+	if m, messageOk := x.(bus.Message); messageOk {
+		if evt, dataOk := m.Data().(*databasev1.ShardEvent); dataOk {
+			return evt.Action == s.action
+		}
+	}
+
+	return false
+}
+
+func (s *shardEventMatcher) String() string {
+	return fmt.Sprintf("shard-event-matcher(%s)", databasev1.Action_name[int32(s.action)])
+}
+
+type entityEventMatcher struct {
+	action databasev1.Action
+}
+
+func (s *entityEventMatcher) Matches(x interface{}) bool {
+	if m, messageOk := x.(bus.Message); messageOk {
+		if evt, dataOk := m.Data().(*databasev1.EntityEvent); dataOk {
+			return evt.Action == s.action
+		}
+	}
+
+	return false
+}
+
+func (s *entityEventMatcher) String() string {
+	return fmt.Sprintf("entity-event-matcher(%s)", databasev1.Action_name[int32(s.action)])
+}
+
+type streamEventSubscriber struct {
+	repo                 discovery.ServiceRepo
+	shardEventSubscriber bus.MessageListener
+	entityEventListener  bus.MessageListener
+}
+
+func (ses *streamEventSubscriber) Name() string {
+	return "stream-event-subscriber"
+}
+
+func (ses *streamEventSubscriber) PreRun() error {
+	components := []struct {
+		shardEvent  bus.Topic
+		entityEvent bus.Topic
+	}{
+		{
+			shardEvent:  event.StreamTopicShardEvent,
+			entityEvent: event.StreamTopicEntityEvent,
+		},
+	}
+	for _, c := range components {
+		err := ses.repo.Subscribe(c.shardEvent, ses.shardEventSubscriber)
+		if err != nil {
+			return err
+		}
+		err = ses.repo.Subscribe(c.entityEvent, ses.entityEventListener)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// service to preload stream
+type preloadStreamService struct {
+	metaSvc metadata.Service
+}
+
+func (p *preloadStreamService) Name() string {
+	return "preload-stream"
+}
+
+func (p *preloadStreamService) PreRun() error {
+	return teststream.PreloadSchema(p.metaSvc.SchemaRegistry())
+}
+
+var ctrl *gomock.Controller
+
+// BeforeSuite - Init logger
+var _ = BeforeSuite(func() {
+	Expect(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "info",
+	})).To(Succeed())
+})
+
+// BeforeEach - Create Mock Controller
+var _ = BeforeEach(func() {
+	ctrl = gomock.NewController(GinkgoT())
+	Expect(ctrl).ShouldNot(BeNil())
+})
+
+var _ = Describe("Stream Service", func() {
+	var closer run.Service
+	var streamService Service
+	var metadataService metadata.Service
+	var shardEventListener *bus.MockMessageListener
+	var entityEventListener *bus.MockMessageListener
+
+	BeforeEach(func() {
+		var flags []string
+
+		// Init Discovery
+		repo, err := discovery.NewServiceRepo(context.TODO())
+		Expect(err).NotTo(HaveOccurred())
+
+		// Init Pipeline
+		pipeline, err := queue.NewQueue(context.TODO(), repo)
+		Expect(err).NotTo(HaveOccurred())
+
+		// Init Metadata Service
+		metadataService, err = metadata.NewService(context.TODO())
+		Expect(err).NotTo(HaveOccurred())
+		etcdRootDir := teststream.RandomTempDir()
+		flags = append(flags, "--metadata-root-path="+etcdRootDir)
+		DeferCleanup(func() {
+			_ = os.RemoveAll(etcdRootDir)
+		})
+
+		// Init Stream Service
+		streamService, err = NewService(context.TODO(), metadataService, repo, pipeline)
+		Expect(err).NotTo(HaveOccurred())
+		rootPath, deferFunc, err := test.NewSpace()
+		Expect(err).NotTo(HaveOccurred())
+		DeferCleanup(func() {
+			deferFunc()

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #66: Reload stream when metadata changes

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#discussion_r786362718



##########
File path: banyand/stream/service.go
##########
@@ -94,101 +101,266 @@ func (s *service) PreRun() error {
 		return err
 	}
 
-	s.schemaMap = make(map[string]*stream, len(schemas))
+	s.schemaMap = sync.Map{}
 	s.l = logger.GetLogger(s.Name())
 	for _, sa := range schemas {
-		iRules, errIndexRules := s.metadata.IndexRules(context.TODO(), sa.Metadata)
-		if errIndexRules != nil {
-			return errIndexRules
-		}
-		sm, errTS := openStream(s.root, streamSpec{
-			schema:     sa,
-			indexRules: iRules,
-		}, s.l)
-		if errTS != nil {
-			return errTS
+		if _, innerErr := s.initStream(sa); innerErr != nil {
+			s.l.Error().Err(innerErr).Msg("fail to initialize stream")
 		}
-		id := formatStreamID(sm.name, sm.group)
-		s.schemaMap[id] = sm
-		s.l.Info().Str("id", id).Msg("initialize stream")
 	}
-	s.writeListener = setUpWriteCallback(s.l, s.schemaMap)
+	s.writeListener = setUpWriteCallback(s.l, &s.schemaMap)
 	return err
 }
 
 func (s *service) Serve() error {
-	t := time.Now()
-	now := time.Now().UnixNano()
-	nowBp := timestamppb.New(t)
-	for _, sMeta := range s.schemaMap {
-		locator := make([]*databasev1.EntityEvent_TagLocator, 0, len(sMeta.entityLocator))
-		for _, tagLocator := range sMeta.entityLocator {
-			locator = append(locator, &databasev1.EntityEvent_TagLocator{
-				FamilyOffset: uint32(tagLocator.FamilyOffset),
-				TagOffset:    uint32(tagLocator.TagOffset),
+	s.schemaMap.Range(func(key, value interface{}) bool {
+		s.l.Debug().Str("streamID", key.(string)).Msg("serve stream")
+		s.serveStream(value.(*stream))
+		return true
+	})
+
+	errWrite := s.pipeline.Subscribe(data.TopicStreamWrite, s.writeListener)
+	if errWrite != nil {
+		return errWrite
+	}
+
+	s.stopCh2 = make(chan struct{})
+	// run a serial reconciler
+	go s.reconcile()
+
+	s.metadata.StreamRegistry().RegisterHandler(schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule, s)
+
+	s.stopCh = make(chan struct{})
+	<-s.stopCh
+
+	return nil
+}
+
+func (s *service) reconcile() {
+	for {
+		select {
+		case streamEntity := <-s.updateCh:
+			s.reloadStream(streamEntity)
+		case metadataEntity := <-s.deleteCh:
+			s.removeStream(metadataEntity)
+		case <-s.stopCh2:
+			return
+		}
+	}
+}
+
+func (s *service) OnAddOrUpdate(m schema.Metadata) {
+	switch m.Kind {
+	case schema.KindStream:
+		s.updateCh <- m.Spec.(*databasev1.Stream)
+	case schema.KindIndexRuleBinding:
+		if m.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM {
+			stm, err := s.metadata.StreamRegistry().GetStream(context.TODO(), &commonv1.Metadata{
+				Name:  m.Name,
+				Group: m.Group,
 			})
+			if err != nil {
+				s.l.Error().Err(err).Msg("fail to get subject")

Review comment:
       A periodic syncing has been introduced.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on pull request #66: Reload stream when metadata changes

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#issuecomment-1013668271


   > > More test cases will be added later. I suppose `Eventually` method is necessary for these kinds of tests.
   > 
   > @lujiajing1126 #67 will introduce the `Eventually`. and I also noticed that `AfterEach` has a capacity to clean up the testing when facing a `panic`. Is the `flow` you introduced still necessary for testing?
   
   I agree. We may fully migrate to the `gingko`/`gomega` later in the next PR.
   
   I suppose we can make use of `DeferCleanup`, `BeforeSuite, and `Before/AfterEach` to better organize cleanup codes. You may check the latest commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily commented on pull request #66: Reload stream when metadata changes

Posted by GitBox <gi...@apache.org>.
hanahmily commented on pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#issuecomment-1014079609


   > > What I focus on is the process to handle failures of syncing, which is absent in the current stage. The module which is receiving events should ensure it could handle any event successfully. Otherwise, the conflicting metadata must bring us a series of troubles.
   > 
   > Currently, I just skip the processing of the given entity since the failure may probably caused by other modifications, and will be probably reaching consistent in the next round of syncing.
   
   FWIK, the changing triggers the metadata syncing. As I mentioned, the metadata update operation rarely happens, which will lead to a continuing inconsistency. How about introducing a periodic syncing strategy as a complementary solution?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 edited a comment on pull request #66: Reload stream when metadata changes

Posted by GitBox <gi...@apache.org>.
lujiajing1126 edited a comment on pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#issuecomment-1013668271






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on pull request #66: Reload stream when metadata changes

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#issuecomment-1014664247


   > > > What I focus on is the process to handle failures of syncing, which is absent in the current stage. The module which is receiving events should ensure it could handle any event successfully. Otherwise, the conflicting metadata must bring us a series of troubles.
   > > 
   > > 
   > > Currently, I just skip the processing of the given entity since the failure may probably caused by other modifications, and will be probably reaching consistent in the next round of syncing.
   > 
   > FWIK, the changing triggers the metadata syncing. As I mentioned, the metadata update operation rarely happens, which will lead to a continuing inconsistency. How about introducing a periodic syncing strategy as a complementary solution?
   
   A periodic syncing is added with the help of etcd's [`mod_revision`](https://etcd.io/docs/v3.5/learning/data_model/#logical-view).
   
   The basic logic is,
   
   List all schemas, then
   
   1. Serve the new schema if we never met it,
   2. Compare `mod_revision` on the stream, reload if we observe a larger one,
   3. Compare the length of the index rules, reload if we find difference,
   4. Compare the max observed `mod_revision` for index rules, reload if we observe a larger one


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #66: Reload stream when metadata changes

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#discussion_r785274500



##########
File path: banyand/stream/service.go
##########
@@ -52,23 +53,29 @@ type Service interface {
 var _ Service = (*service)(nil)
 
 type service struct {
-	schemaMap     map[string]*stream
+	schemaMap     sync.Map
 	writeListener *writeCallback
 	l             *logger.Logger
 	metadata      metadata.Repo
 	root          string
 	pipeline      queue.Queue
 	repo          discovery.ServiceRepo
-	stopCh        chan struct{}
+	// stop channel for the service
+	stopCh chan struct{}
+	// stop channel for the inner worker
+	stopCh2 chan struct{}
+
+	updateCh chan *databasev1.Stream
+	deleteCh chan *commonv1.Metadata

Review comment:
       a single event channel is used.

##########
File path: banyand/stream/service.go
##########
@@ -52,23 +53,29 @@ type Service interface {
 var _ Service = (*service)(nil)
 
 type service struct {
-	schemaMap     map[string]*stream
+	schemaMap     sync.Map
 	writeListener *writeCallback
 	l             *logger.Logger
 	metadata      metadata.Repo
 	root          string
 	pipeline      queue.Queue
 	repo          discovery.ServiceRepo
-	stopCh        chan struct{}
+	// stop channel for the service
+	stopCh chan struct{}
+	// stop channel for the inner worker
+	stopCh2 chan struct{}

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on pull request #66: Reload stream when metadata changes

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#issuecomment-1013618832


   > What I focus on is the process to handle failures of syncing, which is absent in the current stage. The module which is receiving events should ensure it could handle any event successfully. Otherwise, the conflicting metadata must bring us a series of troubles.
   
   Currently, I just skip the processing of the given entity since the failure may probably caused by other modifications, and will be probably reaching consistent in the next round of syncing.
   
   So, I suppose we can only focus on the metadata of the stream/measure, without any dependence on the event type, right?
   
   If we cannot fetch the schema with the given metadata, it naturally means the entity has been deleted, so we delete the opened stream.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily commented on a change in pull request #66: Reload stream when metadata changes

Posted by GitBox <gi...@apache.org>.
hanahmily commented on a change in pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#discussion_r784430384



##########
File path: banyand/stream/service.go
##########
@@ -52,23 +53,29 @@ type Service interface {
 var _ Service = (*service)(nil)
 
 type service struct {
-	schemaMap     map[string]*stream
+	schemaMap     sync.Map
 	writeListener *writeCallback
 	l             *logger.Logger
 	metadata      metadata.Repo
 	root          string
 	pipeline      queue.Queue
 	repo          discovery.ServiceRepo
-	stopCh        chan struct{}
+	// stop channel for the service
+	stopCh chan struct{}
+	// stop channel for the inner worker
+	stopCh2 chan struct{}

Review comment:
       Could you provide a more specific name for this channel?

##########
File path: banyand/stream/service.go
##########
@@ -52,23 +53,29 @@ type Service interface {
 var _ Service = (*service)(nil)
 
 type service struct {
-	schemaMap     map[string]*stream
+	schemaMap     sync.Map
 	writeListener *writeCallback
 	l             *logger.Logger
 	metadata      metadata.Repo
 	root          string
 	pipeline      queue.Queue
 	repo          discovery.ServiceRepo
-	stopCh        chan struct{}
+	// stop channel for the service
+	stopCh chan struct{}
+	// stop channel for the inner worker
+	stopCh2 chan struct{}
+
+	updateCh chan *databasev1.Stream
+	deleteCh chan *commonv1.Metadata

Review comment:
       How about merging them to a single channel? They are the data management operations, conflicting on the concurrent execution. 
   
   

##########
File path: banyand/stream/service.go
##########
@@ -94,101 +101,266 @@ func (s *service) PreRun() error {
 		return err
 	}
 
-	s.schemaMap = make(map[string]*stream, len(schemas))
+	s.schemaMap = sync.Map{}
 	s.l = logger.GetLogger(s.Name())
 	for _, sa := range schemas {
-		iRules, errIndexRules := s.metadata.IndexRules(context.TODO(), sa.Metadata)
-		if errIndexRules != nil {
-			return errIndexRules
-		}
-		sm, errTS := openStream(s.root, streamSpec{
-			schema:     sa,
-			indexRules: iRules,
-		}, s.l)
-		if errTS != nil {
-			return errTS
+		if _, innerErr := s.initStream(sa); innerErr != nil {
+			s.l.Error().Err(innerErr).Msg("fail to initialize stream")
 		}
-		id := formatStreamID(sm.name, sm.group)
-		s.schemaMap[id] = sm
-		s.l.Info().Str("id", id).Msg("initialize stream")
 	}
-	s.writeListener = setUpWriteCallback(s.l, s.schemaMap)
+	s.writeListener = setUpWriteCallback(s.l, &s.schemaMap)
 	return err
 }
 
 func (s *service) Serve() error {
-	t := time.Now()
-	now := time.Now().UnixNano()
-	nowBp := timestamppb.New(t)
-	for _, sMeta := range s.schemaMap {
-		locator := make([]*databasev1.EntityEvent_TagLocator, 0, len(sMeta.entityLocator))
-		for _, tagLocator := range sMeta.entityLocator {
-			locator = append(locator, &databasev1.EntityEvent_TagLocator{
-				FamilyOffset: uint32(tagLocator.FamilyOffset),
-				TagOffset:    uint32(tagLocator.TagOffset),
+	s.schemaMap.Range(func(key, value interface{}) bool {
+		s.l.Debug().Str("streamID", key.(string)).Msg("serve stream")
+		s.serveStream(value.(*stream))
+		return true
+	})
+
+	errWrite := s.pipeline.Subscribe(data.TopicStreamWrite, s.writeListener)
+	if errWrite != nil {
+		return errWrite
+	}
+
+	s.stopCh2 = make(chan struct{})
+	// run a serial reconciler
+	go s.reconcile()
+
+	s.metadata.StreamRegistry().RegisterHandler(schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule, s)
+
+	s.stopCh = make(chan struct{})
+	<-s.stopCh
+
+	return nil
+}
+
+func (s *service) reconcile() {
+	for {
+		select {
+		case streamEntity := <-s.updateCh:
+			s.reloadStream(streamEntity)
+		case metadataEntity := <-s.deleteCh:
+			s.removeStream(metadataEntity)
+		case <-s.stopCh2:
+			return
+		}
+	}
+}
+
+func (s *service) OnAddOrUpdate(m schema.Metadata) {
+	switch m.Kind {
+	case schema.KindStream:
+		s.updateCh <- m.Spec.(*databasev1.Stream)
+	case schema.KindIndexRuleBinding:
+		if m.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM {
+			stm, err := s.metadata.StreamRegistry().GetStream(context.TODO(), &commonv1.Metadata{
+				Name:  m.Name,
+				Group: m.Group,
 			})
+			if err != nil {
+				s.l.Error().Err(err).Msg("fail to get subject")
+				return
+			}
+			s.updateCh <- stm
 		}
-		_, err := s.repo.Publish(event.StreamTopicEntityEvent, bus.NewMessage(bus.MessageID(now), &databasev1.EntityEvent{
-			Subject: &commonv1.Metadata{
-				Name:  sMeta.name,
-				Group: sMeta.group,
+	case schema.KindIndexRule:
+		subjects, err := s.metadata.Subjects(context.TODO(), m.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_STREAM)
+		if err != nil {
+			s.l.Error().Err(err).Msg("fail to get subjects(stream)")
+			return
+		}
+		for _, sub := range subjects {
+			s.updateCh <- sub.(*databasev1.Stream)
+		}
+	default:
+	}
+}
+
+func (s *service) OnDelete(m schema.Metadata) {
+	switch m.Kind {
+	case schema.KindStream:
+		s.deleteCh <- &commonv1.Metadata{
+			Name:  m.Name,
+			Group: m.Group,
+		}
+	case schema.KindIndexRuleBinding:
+		// TODO: we only have metadata(group and name) here, what shall we remove?

Review comment:
       just ignore them

##########
File path: banyand/stream/service.go
##########
@@ -94,101 +101,266 @@ func (s *service) PreRun() error {
 		return err
 	}
 
-	s.schemaMap = make(map[string]*stream, len(schemas))
+	s.schemaMap = sync.Map{}
 	s.l = logger.GetLogger(s.Name())
 	for _, sa := range schemas {
-		iRules, errIndexRules := s.metadata.IndexRules(context.TODO(), sa.Metadata)
-		if errIndexRules != nil {
-			return errIndexRules
-		}
-		sm, errTS := openStream(s.root, streamSpec{
-			schema:     sa,
-			indexRules: iRules,
-		}, s.l)
-		if errTS != nil {
-			return errTS
+		if _, innerErr := s.initStream(sa); innerErr != nil {
+			s.l.Error().Err(innerErr).Msg("fail to initialize stream")
 		}
-		id := formatStreamID(sm.name, sm.group)
-		s.schemaMap[id] = sm
-		s.l.Info().Str("id", id).Msg("initialize stream")
 	}
-	s.writeListener = setUpWriteCallback(s.l, s.schemaMap)
+	s.writeListener = setUpWriteCallback(s.l, &s.schemaMap)
 	return err
 }
 
 func (s *service) Serve() error {
-	t := time.Now()
-	now := time.Now().UnixNano()
-	nowBp := timestamppb.New(t)
-	for _, sMeta := range s.schemaMap {
-		locator := make([]*databasev1.EntityEvent_TagLocator, 0, len(sMeta.entityLocator))
-		for _, tagLocator := range sMeta.entityLocator {
-			locator = append(locator, &databasev1.EntityEvent_TagLocator{
-				FamilyOffset: uint32(tagLocator.FamilyOffset),
-				TagOffset:    uint32(tagLocator.TagOffset),
+	s.schemaMap.Range(func(key, value interface{}) bool {
+		s.l.Debug().Str("streamID", key.(string)).Msg("serve stream")
+		s.serveStream(value.(*stream))
+		return true
+	})
+
+	errWrite := s.pipeline.Subscribe(data.TopicStreamWrite, s.writeListener)
+	if errWrite != nil {
+		return errWrite
+	}
+
+	s.stopCh2 = make(chan struct{})
+	// run a serial reconciler
+	go s.reconcile()
+
+	s.metadata.StreamRegistry().RegisterHandler(schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule, s)
+
+	s.stopCh = make(chan struct{})
+	<-s.stopCh
+
+	return nil
+}
+
+func (s *service) reconcile() {
+	for {
+		select {
+		case streamEntity := <-s.updateCh:
+			s.reloadStream(streamEntity)
+		case metadataEntity := <-s.deleteCh:
+			s.removeStream(metadataEntity)
+		case <-s.stopCh2:
+			return
+		}
+	}
+}
+
+func (s *service) OnAddOrUpdate(m schema.Metadata) {
+	switch m.Kind {
+	case schema.KindStream:
+		s.updateCh <- m.Spec.(*databasev1.Stream)
+	case schema.KindIndexRuleBinding:
+		if m.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM {
+			stm, err := s.metadata.StreamRegistry().GetStream(context.TODO(), &commonv1.Metadata{
+				Name:  m.Name,
+				Group: m.Group,
 			})
+			if err != nil {
+				s.l.Error().Err(err).Msg("fail to get subject")

Review comment:
       An error handler should be introduced here to sync the metadata. We should ensure all changes should be applied to this module successfully. We want eventually consistency, not never consistency 😄 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #66: Reload stream when metadata changes

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#discussion_r785274566



##########
File path: banyand/stream/service.go
##########
@@ -94,101 +101,266 @@ func (s *service) PreRun() error {
 		return err
 	}
 
-	s.schemaMap = make(map[string]*stream, len(schemas))
+	s.schemaMap = sync.Map{}
 	s.l = logger.GetLogger(s.Name())
 	for _, sa := range schemas {
-		iRules, errIndexRules := s.metadata.IndexRules(context.TODO(), sa.Metadata)
-		if errIndexRules != nil {
-			return errIndexRules
-		}
-		sm, errTS := openStream(s.root, streamSpec{
-			schema:     sa,
-			indexRules: iRules,
-		}, s.l)
-		if errTS != nil {
-			return errTS
+		if _, innerErr := s.initStream(sa); innerErr != nil {
+			s.l.Error().Err(innerErr).Msg("fail to initialize stream")
 		}
-		id := formatStreamID(sm.name, sm.group)
-		s.schemaMap[id] = sm
-		s.l.Info().Str("id", id).Msg("initialize stream")
 	}
-	s.writeListener = setUpWriteCallback(s.l, s.schemaMap)
+	s.writeListener = setUpWriteCallback(s.l, &s.schemaMap)
 	return err
 }
 
 func (s *service) Serve() error {
-	t := time.Now()
-	now := time.Now().UnixNano()
-	nowBp := timestamppb.New(t)
-	for _, sMeta := range s.schemaMap {
-		locator := make([]*databasev1.EntityEvent_TagLocator, 0, len(sMeta.entityLocator))
-		for _, tagLocator := range sMeta.entityLocator {
-			locator = append(locator, &databasev1.EntityEvent_TagLocator{
-				FamilyOffset: uint32(tagLocator.FamilyOffset),
-				TagOffset:    uint32(tagLocator.TagOffset),
+	s.schemaMap.Range(func(key, value interface{}) bool {
+		s.l.Debug().Str("streamID", key.(string)).Msg("serve stream")
+		s.serveStream(value.(*stream))
+		return true
+	})
+
+	errWrite := s.pipeline.Subscribe(data.TopicStreamWrite, s.writeListener)
+	if errWrite != nil {
+		return errWrite
+	}
+
+	s.stopCh2 = make(chan struct{})
+	// run a serial reconciler
+	go s.reconcile()
+
+	s.metadata.StreamRegistry().RegisterHandler(schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule, s)
+
+	s.stopCh = make(chan struct{})
+	<-s.stopCh
+
+	return nil
+}
+
+func (s *service) reconcile() {
+	for {
+		select {
+		case streamEntity := <-s.updateCh:
+			s.reloadStream(streamEntity)
+		case metadataEntity := <-s.deleteCh:
+			s.removeStream(metadataEntity)
+		case <-s.stopCh2:
+			return
+		}
+	}
+}
+
+func (s *service) OnAddOrUpdate(m schema.Metadata) {
+	switch m.Kind {
+	case schema.KindStream:
+		s.updateCh <- m.Spec.(*databasev1.Stream)
+	case schema.KindIndexRuleBinding:
+		if m.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM {
+			stm, err := s.metadata.StreamRegistry().GetStream(context.TODO(), &commonv1.Metadata{
+				Name:  m.Name,
+				Group: m.Group,
 			})
+			if err != nil {
+				s.l.Error().Err(err).Msg("fail to get subject")
+				return
+			}
+			s.updateCh <- stm
 		}
-		_, err := s.repo.Publish(event.StreamTopicEntityEvent, bus.NewMessage(bus.MessageID(now), &databasev1.EntityEvent{
-			Subject: &commonv1.Metadata{
-				Name:  sMeta.name,
-				Group: sMeta.group,
+	case schema.KindIndexRule:
+		subjects, err := s.metadata.Subjects(context.TODO(), m.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_STREAM)
+		if err != nil {
+			s.l.Error().Err(err).Msg("fail to get subjects(stream)")
+			return
+		}
+		for _, sub := range subjects {
+			s.updateCh <- sub.(*databasev1.Stream)
+		}
+	default:
+	}
+}
+
+func (s *service) OnDelete(m schema.Metadata) {
+	switch m.Kind {
+	case schema.KindStream:
+		s.deleteCh <- &commonv1.Metadata{
+			Name:  m.Name,
+			Group: m.Group,
+		}
+	case schema.KindIndexRuleBinding:
+		// TODO: we only have metadata(group and name) here, what shall we remove?

Review comment:
       Now, I use an option in etcd client to get the previous value while deleting keys. So it is possible to know later which entity has been deleted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily commented on a change in pull request #66: Reload stream when metadata changes

Posted by GitBox <gi...@apache.org>.
hanahmily commented on a change in pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#discussion_r785594400



##########
File path: banyand/stream/service.go
##########
@@ -94,101 +101,266 @@ func (s *service) PreRun() error {
 		return err
 	}
 
-	s.schemaMap = make(map[string]*stream, len(schemas))
+	s.schemaMap = sync.Map{}
 	s.l = logger.GetLogger(s.Name())
 	for _, sa := range schemas {
-		iRules, errIndexRules := s.metadata.IndexRules(context.TODO(), sa.Metadata)
-		if errIndexRules != nil {
-			return errIndexRules
-		}
-		sm, errTS := openStream(s.root, streamSpec{
-			schema:     sa,
-			indexRules: iRules,
-		}, s.l)
-		if errTS != nil {
-			return errTS
+		if _, innerErr := s.initStream(sa); innerErr != nil {
+			s.l.Error().Err(innerErr).Msg("fail to initialize stream")
 		}
-		id := formatStreamID(sm.name, sm.group)
-		s.schemaMap[id] = sm
-		s.l.Info().Str("id", id).Msg("initialize stream")
 	}
-	s.writeListener = setUpWriteCallback(s.l, s.schemaMap)
+	s.writeListener = setUpWriteCallback(s.l, &s.schemaMap)
 	return err
 }
 
 func (s *service) Serve() error {
-	t := time.Now()
-	now := time.Now().UnixNano()
-	nowBp := timestamppb.New(t)
-	for _, sMeta := range s.schemaMap {
-		locator := make([]*databasev1.EntityEvent_TagLocator, 0, len(sMeta.entityLocator))
-		for _, tagLocator := range sMeta.entityLocator {
-			locator = append(locator, &databasev1.EntityEvent_TagLocator{
-				FamilyOffset: uint32(tagLocator.FamilyOffset),
-				TagOffset:    uint32(tagLocator.TagOffset),
+	s.schemaMap.Range(func(key, value interface{}) bool {
+		s.l.Debug().Str("streamID", key.(string)).Msg("serve stream")
+		s.serveStream(value.(*stream))
+		return true
+	})
+
+	errWrite := s.pipeline.Subscribe(data.TopicStreamWrite, s.writeListener)
+	if errWrite != nil {
+		return errWrite
+	}
+
+	s.stopCh2 = make(chan struct{})
+	// run a serial reconciler
+	go s.reconcile()
+
+	s.metadata.StreamRegistry().RegisterHandler(schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule, s)
+
+	s.stopCh = make(chan struct{})
+	<-s.stopCh
+
+	return nil
+}
+
+func (s *service) reconcile() {
+	for {
+		select {
+		case streamEntity := <-s.updateCh:
+			s.reloadStream(streamEntity)
+		case metadataEntity := <-s.deleteCh:
+			s.removeStream(metadataEntity)
+		case <-s.stopCh2:
+			return
+		}
+	}
+}
+
+func (s *service) OnAddOrUpdate(m schema.Metadata) {
+	switch m.Kind {
+	case schema.KindStream:
+		s.updateCh <- m.Spec.(*databasev1.Stream)
+	case schema.KindIndexRuleBinding:
+		if m.Spec.(*databasev1.IndexRuleBinding).GetSubject().Catalog == commonv1.Catalog_CATALOG_STREAM {
+			stm, err := s.metadata.StreamRegistry().GetStream(context.TODO(), &commonv1.Metadata{
+				Name:  m.Name,
+				Group: m.Group,
 			})
+			if err != nil {
+				s.l.Error().Err(err).Msg("fail to get subject")
+				return
+			}
+			s.updateCh <- stm
 		}
-		_, err := s.repo.Publish(event.StreamTopicEntityEvent, bus.NewMessage(bus.MessageID(now), &databasev1.EntityEvent{
-			Subject: &commonv1.Metadata{
-				Name:  sMeta.name,
-				Group: sMeta.group,
+	case schema.KindIndexRule:
+		subjects, err := s.metadata.Subjects(context.TODO(), m.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_STREAM)
+		if err != nil {
+			s.l.Error().Err(err).Msg("fail to get subjects(stream)")
+			return
+		}
+		for _, sub := range subjects {
+			s.updateCh <- sub.(*databasev1.Stream)
+		}
+	default:
+	}
+}
+
+func (s *service) OnDelete(m schema.Metadata) {
+	switch m.Kind {
+	case schema.KindStream:
+		s.deleteCh <- &commonv1.Metadata{
+			Name:  m.Name,
+			Group: m.Group,
+		}
+	case schema.KindIndexRuleBinding:
+		// TODO: we only have metadata(group and name) here, what shall we remove?

Review comment:
       nice 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily merged pull request #66: Reload stream when metadata changes

Posted by GitBox <gi...@apache.org>.
hanahmily merged pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily commented on a change in pull request #66: Reload stream when metadata changes

Posted by GitBox <gi...@apache.org>.
hanahmily commented on a change in pull request #66:
URL: https://github.com/apache/skywalking-banyandb/pull/66#discussion_r785599033



##########
File path: banyand/stream/service_test.go
##########
@@ -0,0 +1,302 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/golang/mock/gomock"
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
+
+	"github.com/apache/skywalking-banyandb/api/event"
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/metadata"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/run"
+	"github.com/apache/skywalking-banyandb/pkg/test"
+	teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+)
+
+var (
+	_ gomock.Matcher = (*shardEventMatcher)(nil)
+	_ gomock.Matcher = (*entityEventMatcher)(nil)
+)
+
+type shardEventMatcher struct {
+	action databasev1.Action
+}
+
+func (s *shardEventMatcher) Matches(x interface{}) bool {
+	if m, messageOk := x.(bus.Message); messageOk {
+		if evt, dataOk := m.Data().(*databasev1.ShardEvent); dataOk {
+			return evt.Action == s.action
+		}
+	}
+
+	return false
+}
+
+func (s *shardEventMatcher) String() string {
+	return fmt.Sprintf("shard-event-matcher(%s)", databasev1.Action_name[int32(s.action)])
+}
+
+type entityEventMatcher struct {
+	action databasev1.Action
+}
+
+func (s *entityEventMatcher) Matches(x interface{}) bool {
+	if m, messageOk := x.(bus.Message); messageOk {
+		if evt, dataOk := m.Data().(*databasev1.EntityEvent); dataOk {
+			return evt.Action == s.action
+		}
+	}
+
+	return false
+}
+
+func (s *entityEventMatcher) String() string {
+	return fmt.Sprintf("entity-event-matcher(%s)", databasev1.Action_name[int32(s.action)])
+}
+
+type streamEventSubscriber struct {
+	repo                 discovery.ServiceRepo
+	shardEventSubscriber bus.MessageListener
+	entityEventListener  bus.MessageListener
+}
+
+func (ses *streamEventSubscriber) Name() string {
+	return "stream-event-subscriber"
+}
+
+func (ses *streamEventSubscriber) PreRun() error {
+	components := []struct {
+		shardEvent  bus.Topic
+		entityEvent bus.Topic
+	}{
+		{
+			shardEvent:  event.StreamTopicShardEvent,
+			entityEvent: event.StreamTopicEntityEvent,
+		},
+	}
+	for _, c := range components {
+		err := ses.repo.Subscribe(c.shardEvent, ses.shardEventSubscriber)
+		if err != nil {
+			return err
+		}
+		err = ses.repo.Subscribe(c.entityEvent, ses.entityEventListener)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// service to preload stream
+type preloadStreamService struct {
+	metaSvc metadata.Service
+}
+
+func (p *preloadStreamService) Name() string {
+	return "preload-stream"
+}
+
+func (p *preloadStreamService) PreRun() error {
+	return teststream.PreloadSchema(p.metaSvc.SchemaRegistry())
+}
+
+var ctrl *gomock.Controller
+
+// BeforeSuite - Init logger
+var _ = BeforeSuite(func() {
+	Expect(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "info",
+	})).To(Succeed())
+})
+
+// BeforeEach - Create Mock Controller
+var _ = BeforeEach(func() {
+	ctrl = gomock.NewController(GinkgoT())
+	Expect(ctrl).ShouldNot(BeNil())
+})
+
+var _ = Describe("Stream Service", func() {
+	var closer run.Service
+	var streamService Service
+	var metadataService metadata.Service
+	var shardEventListener *bus.MockMessageListener
+	var entityEventListener *bus.MockMessageListener
+
+	BeforeEach(func() {
+		var flags []string
+
+		// Init Discovery
+		repo, err := discovery.NewServiceRepo(context.TODO())
+		Expect(err).NotTo(HaveOccurred())
+
+		// Init Pipeline
+		pipeline, err := queue.NewQueue(context.TODO(), repo)
+		Expect(err).NotTo(HaveOccurred())
+
+		// Init Metadata Service
+		metadataService, err = metadata.NewService(context.TODO())
+		Expect(err).NotTo(HaveOccurred())
+		etcdRootDir := teststream.RandomTempDir()
+		flags = append(flags, "--metadata-root-path="+etcdRootDir)
+		DeferCleanup(func() {
+			_ = os.RemoveAll(etcdRootDir)
+		})
+
+		// Init Stream Service
+		streamService, err = NewService(context.TODO(), metadataService, repo, pipeline)
+		Expect(err).NotTo(HaveOccurred())
+		rootPath, deferFunc, err := test.NewSpace()
+		Expect(err).NotTo(HaveOccurred())
+		DeferCleanup(func() {
+			deferFunc()

Review comment:
       The testing temp dir is removed before the stream module managed in the run-group tends to close the kv storage.
   This will cause a panic thrown on the kv closing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org