You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2020/12/26 03:29:18 UTC

[GitHub] [skywalking-satellite] surlymo commented on a change in pull request #10: mmap-queue-plugin

surlymo commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r548934392



##########
File path: plugins/queue/mmap/meta/meta.go
##########
@@ -0,0 +1,152 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package meta
+
+import (
+	"fmt"
+	"path/filepath"
+	"syscall"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+const (
+	metaSize    = 80
+	metaName    = "meta.dat"
+	metaVersion = 1
+)
+
+// Metadata only needs 80B to store the Metadata for the pipe. But for memory alignment,
+// it takes at least one memory page size, which is generally 4K.
+//
+// [    8Bit   ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit  ]
+// [metaVersion][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][capacity]
+// [metaVersion][writing   offset][watermark offset][committed offset][reading   offset][capacity]
+type Metadata struct {
+	metaFile *mmap.File
+	name     string
+	size     int
+	capacity int
+}
+
+// NewMetaData read or create a Metadata with supported metaVersion
+func NewMetaData(metaDir string, capacity int) (*Metadata, error) {
+	path := filepath.Join(metaDir, metaName)
+	metaFile, err := segment.NewSegment(path, metaSize)
+	if err != nil {
+		return nil, fmt.Errorf("error in crating the Metadata memory mapped file: %v", err)
+	}
+
+	m := &Metadata{
+		metaFile: metaFile,
+		name:     metaName,
+		size:     metaSize,
+		capacity: capacity,
+	}
+
+	v := m.GetVersion()
+	if v != 0 && v != metaVersion {
+		return nil, fmt.Errorf("metadata metaVersion is not matching, the Metadata metaVersion is %d", v)
+	}
+	c := m.GetCapacity()
+	if c != 0 && c != capacity {
+		return nil, fmt.Errorf("metadata catapacity is not equal to the old capacity, the old capacity is %d", c)
+	}
+	m.PutVersion(metaVersion)
+	m.PutCapacity(int64(capacity))
+	return m, nil
+}
+
+// GetVersion returns the meta version.
+func (m *Metadata) GetVersion() int {
+	return int(m.metaFile.ReadUint64At(0))
+}
+
+// PutVersion put the version into the memory mapped file.
+func (m *Metadata) PutVersion(version int64) {
+	m.metaFile.WriteUint64At(uint64(version), 0)
+}
+
+// GetWritingOffset returns the writing offset, which contains the segment ID and the offset of the segment.
+func (m *Metadata) GetWritingOffset() (segmentID, offset int64) {
+	return int64(m.metaFile.ReadUint64At(8)), int64(m.metaFile.ReadUint64At(16))
+}
+
+// PutWritingOffset put the segment ID and the offset of the segment into the writing offset.
+func (m *Metadata) PutWritingOffset(segmentID, offset int64) {
+	m.metaFile.WriteUint64At(uint64(segmentID), 8)

Review comment:
       magic number

##########
File path: internal/satellite/module/sender/sender.go
##########
@@ -130,12 +131,12 @@ func (s *Sender) Shutdown() {
 func (s *Sender) consume(batch *buffer.BatchBuffer) {
 	log.Logger.Infof("sender module of %s namespace is flushing a new batch buffer."+
 		" the start offset is %s, and the size is %d", s.config.NamespaceName, batch.Last(), batch.Len())
-	var events = make(map[event.Type]event.BatchEvents)
+	var events = make(map[protocol.EventType]event.BatchEvents)
 	for i := 0; i < batch.Len(); i++ {
 		eventContext := batch.Buf()[i]
 		for _, e := range eventContext.Context {
-			if e.IsRemote() {
-				events[e.Type()] = append(events[e.Type()], e)
+			if e.Remote {

Review comment:
       why replace the name?

##########
File path: plugins/queue/mmap/queue.go
##########
@@ -0,0 +1,266 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"strconv"
+	"strings"
+	"sync"
+	"syscall"
+	"time"
+
+	"github.com/grandecola/mmap"
+
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
+	"github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+)
+
+// Queue is a memory mapped queue to store the input data.
+type Queue struct {
+	sync.Mutex
+	// config
+	SegmentSize           int    `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
+	MaxInMemSegments      int32  `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
+	QueueCapacitySegments int    `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
+	FlushPeriod           int    `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
+	FlushCeilingNum       int    `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
+	MaxEventSize          int    `mapstructure:"max_event_size"`          // The max size of the input event.
+	QueueDir              string `mapstructure:"queue_dir"`               // Contains all files in the queue.
+
+	// running components
+	meta                   *meta.Metadata // The metadata file.
+	segments               []*mmap.File   // The data files.
+	mmapCount              int32          // The number of the memory mapped files.
+	unflushedNum           int            // The unflushed number.
+	flushChannel           chan struct{}  // The flushChannel channel would receive a signal when the unflushedNum reach the flush_ceiling_num.
+	insufficientMemChannel chan struct{}  // Notify when memory is insufficient
+	sufficientMemChannel   chan struct{}  // Notify when memory is sufficient
+
+	// control components
+	ctx        context.Context    // Parent ctx
+	cancel     context.CancelFunc // Parent ctx cancel function
+	showDownWg sync.WaitGroup     // The shutdown wait group.
+
+}
+
+func (q *Queue) Name() string {
+	return "mmap-queue"
+}
+
+func (q *Queue) Description() string {
+	return "this is a mmap queue"
+}
+
+func (q *Queue) DefaultConfig() string {
+	return `
+# The size of each segment. Default value is 128K. The unit is Byte.
+segment_size: 131072
+# The max num of segments in memory. Default value is 10.
+max_in_mem_segments: 10
+# The capacity of Queue = segment_size * queue_capacity_segments.
+queue_capacity_segments: 4000
+# The period flush time. The unit is ms. Default value is 1 second.
+flush_period: 1000
+# The max number in one flush time.  Default value is 10000.
+flush_ceiling_num: 10000
+# Contains all files in the queue.
+queue_dir: satellite-mmap-queue
+# The max size of the input event. Default value is 20k.
+max_event_size: 20480
+`
+}
+
+func (q *Queue) Initialize() error {
+	// the size of each segment file should be a multiple of the page size.
+	pageSize := os.Getpagesize()
+	if q.SegmentSize%pageSize != 0 {
+		q.SegmentSize -= q.SegmentSize % pageSize
+	}
+	if q.SegmentSize/pageSize == 0 {
+		q.SegmentSize = 131072
+	}
+	// the minimum MaxInMemSegments value should be 4.
+	if q.MaxInMemSegments < 4 {
+		q.MaxInMemSegments = 4
+	}
+	// load metadata and override the reading or writing offset by the committed or watermark offset.
+	md, err := meta.NewMetaData(q.QueueDir, q.QueueCapacitySegments)
+	if err != nil {
+		return fmt.Errorf("error in creating the metadata: %v", err)
+	}
+	q.meta = md
+	cmID, cmOffset := md.GetCommittedOffset()
+	wmID, wmOffset := md.GetWatermarkOffset()
+	md.PutWritingOffset(wmID, wmOffset)
+	md.PutReadingOffset(cmID, cmOffset)
+	// keep the reading or writing segments in the memory.
+	q.segments = make([]*mmap.File, q.QueueCapacitySegments)
+	if _, err := q.GetSegment(cmID); err != nil {
+		return err
+	}
+	if _, err := q.GetSegment(wmID); err != nil {
+		return err
+	}
+	// init components
+	q.insufficientMemChannel = make(chan struct{})
+	q.sufficientMemChannel = make(chan struct{})
+	q.flushChannel = make(chan struct{})
+	q.ctx, q.cancel = context.WithCancel(context.Background())
+	// async supported processes.
+	q.showDownWg.Add(2)
+	go q.segmentSwapper()
+	go q.flush()
+	return nil
+}
+
+func (q *Queue) Push(e *protocol.Event) error {
+	data, err := proto.Marshal(e)

Review comment:
       mmap only be used in disaster recovery situation. runtime queue can simply implemented in a normal way.

##########
File path: plugins/queue/mmap/queue.go
##########
@@ -0,0 +1,266 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"strconv"
+	"strings"
+	"sync"
+	"syscall"
+	"time"
+
+	"github.com/grandecola/mmap"
+
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
+	"github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+)
+
+// Queue is a memory mapped queue to store the input data.
+type Queue struct {
+	sync.Mutex
+	// config
+	SegmentSize           int    `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
+	MaxInMemSegments      int32  `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
+	QueueCapacitySegments int    `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
+	FlushPeriod           int    `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
+	FlushCeilingNum       int    `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
+	MaxEventSize          int    `mapstructure:"max_event_size"`          // The max size of the input event.
+	QueueDir              string `mapstructure:"queue_dir"`               // Contains all files in the queue.
+
+	// running components
+	meta                   *meta.Metadata // The metadata file.
+	segments               []*mmap.File   // The data files.
+	mmapCount              int32          // The number of the memory mapped files.
+	unflushedNum           int            // The unflushed number.
+	flushChannel           chan struct{}  // The flushChannel channel would receive a signal when the unflushedNum reach the flush_ceiling_num.
+	insufficientMemChannel chan struct{}  // Notify when memory is insufficient
+	sufficientMemChannel   chan struct{}  // Notify when memory is sufficient
+
+	// control components
+	ctx        context.Context    // Parent ctx
+	cancel     context.CancelFunc // Parent ctx cancel function
+	showDownWg sync.WaitGroup     // The shutdown wait group.
+
+}
+
+func (q *Queue) Name() string {
+	return "mmap-queue"
+}
+
+func (q *Queue) Description() string {
+	return "this is a mmap queue"
+}
+
+func (q *Queue) DefaultConfig() string {
+	return `
+# The size of each segment. Default value is 128K. The unit is Byte.
+segment_size: 131072
+# The max num of segments in memory. Default value is 10.
+max_in_mem_segments: 10
+# The capacity of Queue = segment_size * queue_capacity_segments.
+queue_capacity_segments: 4000
+# The period flush time. The unit is ms. Default value is 1 second.
+flush_period: 1000
+# The max number in one flush time.  Default value is 10000.
+flush_ceiling_num: 10000
+# Contains all files in the queue.
+queue_dir: satellite-mmap-queue
+# The max size of the input event. Default value is 20k.
+max_event_size: 20480
+`
+}
+
+func (q *Queue) Initialize() error {
+	// the size of each segment file should be a multiple of the page size.
+	pageSize := os.Getpagesize()
+	if q.SegmentSize%pageSize != 0 {
+		q.SegmentSize -= q.SegmentSize % pageSize
+	}
+	if q.SegmentSize/pageSize == 0 {
+		q.SegmentSize = 131072

Review comment:
       magic num




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org