You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by "hanahmily (via GitHub)" <gi...@apache.org> on 2023/04/03 14:26:01 UTC

[GitHub] [skywalking-banyandb] hanahmily commented on a diff in pull request #261: Implement Write-ahead Logging

hanahmily commented on code in PR #261:
URL: https://github.com/apache/skywalking-banyandb/pull/261#discussion_r1155916253


##########
pkg/wal/wal.go:
##########
@@ -43,18 +128,450 @@ type WAL interface {
 	// Write a logging entity.
 	// It will return immediately when the data is written in the buffer,
 	// The returned function will be called when the entity is flushed on the persistent storage.
-	Write(seriesID common.SeriesID, timestamp time.Time, data []byte) (func(), error)
+	Write(seriesID []byte, timestamp time.Time, data []byte) error
 	// Read specified segment by SegmentID.
-	Read(segmentID SegmentID) (*Segment, error)
+	Read(segmentID SegmentID) (Segment, error)
 	// ReadAllSegments reads all segments sorted by their creation time in ascending order.
-	ReadAllSegments() ([]*Segment, error)
+	ReadAllSegments() ([]Segment, error)
 	// Rotate closes the open segment and opens a new one, returning the closed segment details.
-	Rotate() (*Segment, error)
+	Rotate() (Segment, error)
 	// Delete the specified segment.
 	Delete(segmentID SegmentID) error
+	// Close all of segments and stop WAL work.
+	Close() error
 }
 
 // New creates a WAL instance in the specified path.
-func New(_ string, _ Options) (WAL, error) {
-	return nil, nil
+func New(path string, options *Options) (WAL, error) {
+	//  Check configuration options.
+	if options == nil {
+		options = DefaultOptions
+	}
+	if options.FileSize <= 0 {
+		options.FileSize = DefaultOptions.FileSize
+	}
+	if options.BufferSize <= 0 {
+		options.BufferSize = DefaultOptions.BufferSize
+	}
+
+	// Initial WAL path.
+	path, err := filepath.Abs(path)
+	if err != nil {
+		return nil, errors.Wrap(err, "Can not get absolute path")
+	}
+	log := &Log{path: path, options: *options}
+	if err := os.MkdirAll(path, os.ModePerm); err != nil {

Review Comment:
   `FileMode` should be `0700` 



##########
pkg/wal/wal.go:
##########
@@ -19,20 +19,105 @@
 package wal
 
 import (
+	"bytes"
+	"encoding/binary"
+	"fmt"
+	"os"
+	"path/filepath"
+	"strconv"
 	"time"
 
+	"github.com/golang/snappy"
+	"github.com/pkg/errors"
+
 	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+	segmentNamePrefix   = "seg"
+	segmentNameSuffix   = ".wal"
+	batchWriteLength    = 8
+	entryLength         = 8
+	seriesIDLength      = 8
+	countLength         = 4
+	timestampLength     = 8
+	binaryLength        = 2
+	flushSuccessFlag    = "success"
+	flushFailFlag       = "fail"
+	bufferBatchInterval = 500
+	parseTimeStr        = "2006-01-02 15:04:05"
 )
 
 // SegmentID identities a segment in a WAL.
 type SegmentID uint64
 
+// Log implements the WAL interface.
+type Log struct {

Review Comment:
   `Log` should be unexported. 



##########
pkg/wal/wal.go:
##########
@@ -43,18 +128,450 @@ type WAL interface {
 	// Write a logging entity.
 	// It will return immediately when the data is written in the buffer,
 	// The returned function will be called when the entity is flushed on the persistent storage.
-	Write(seriesID common.SeriesID, timestamp time.Time, data []byte) (func(), error)
+	Write(seriesID []byte, timestamp time.Time, data []byte) error
 	// Read specified segment by SegmentID.
-	Read(segmentID SegmentID) (*Segment, error)
+	Read(segmentID SegmentID) (Segment, error)
 	// ReadAllSegments reads all segments sorted by their creation time in ascending order.
-	ReadAllSegments() ([]*Segment, error)
+	ReadAllSegments() ([]Segment, error)
 	// Rotate closes the open segment and opens a new one, returning the closed segment details.
-	Rotate() (*Segment, error)
+	Rotate() (Segment, error)
 	// Delete the specified segment.
 	Delete(segmentID SegmentID) error
+	// Close all of segments and stop WAL work.
+	Close() error
 }
 
 // New creates a WAL instance in the specified path.
-func New(_ string, _ Options) (WAL, error) {
-	return nil, nil
+func New(path string, options *Options) (WAL, error) {
+	//  Check configuration options.
+	if options == nil {
+		options = DefaultOptions
+	}
+	if options.FileSize <= 0 {
+		options.FileSize = DefaultOptions.FileSize
+	}
+	if options.BufferSize <= 0 {
+		options.BufferSize = DefaultOptions.BufferSize
+	}
+
+	// Initial WAL path.
+	path, err := filepath.Abs(path)
+	if err != nil {
+		return nil, errors.Wrap(err, "Can not get absolute path")
+	}
+	log := &Log{path: path, options: *options}
+	if err := os.MkdirAll(path, os.ModePerm); err != nil {
+		return nil, err
+	}
+
+	if err := log.load(); err != nil {
+		return nil, err
+	}
+	log.runFlushTask()
+	return log, nil
+}
+
+func (log *Log) runFlushTask() {
+	go func() {
+		bufferSize := 0
+		for {
+			timer := time.NewTimer(bufferBatchInterval * time.Millisecond)
+			select {
+			case request := <-log.writeChannel:
+				bufferSize += seriesIDLength + timestampLength + len(request.data)

Review Comment:
   The length of `seriesID` is not fixed. 



##########
pkg/wal/wal.go:
##########
@@ -43,18 +128,450 @@ type WAL interface {
 	// Write a logging entity.
 	// It will return immediately when the data is written in the buffer,
 	// The returned function will be called when the entity is flushed on the persistent storage.
-	Write(seriesID common.SeriesID, timestamp time.Time, data []byte) (func(), error)
+	Write(seriesID []byte, timestamp time.Time, data []byte) error
 	// Read specified segment by SegmentID.
-	Read(segmentID SegmentID) (*Segment, error)
+	Read(segmentID SegmentID) (Segment, error)
 	// ReadAllSegments reads all segments sorted by their creation time in ascending order.
-	ReadAllSegments() ([]*Segment, error)
+	ReadAllSegments() ([]Segment, error)
 	// Rotate closes the open segment and opens a new one, returning the closed segment details.
-	Rotate() (*Segment, error)
+	Rotate() (Segment, error)
 	// Delete the specified segment.
 	Delete(segmentID SegmentID) error
+	// Close all of segments and stop WAL work.
+	Close() error
 }
 
 // New creates a WAL instance in the specified path.
-func New(_ string, _ Options) (WAL, error) {
-	return nil, nil
+func New(path string, options *Options) (WAL, error) {
+	//  Check configuration options.
+	if options == nil {
+		options = DefaultOptions
+	}
+	if options.FileSize <= 0 {
+		options.FileSize = DefaultOptions.FileSize
+	}
+	if options.BufferSize <= 0 {
+		options.BufferSize = DefaultOptions.BufferSize
+	}
+
+	// Initial WAL path.
+	path, err := filepath.Abs(path)
+	if err != nil {
+		return nil, errors.Wrap(err, "Can not get absolute path")
+	}
+	log := &Log{path: path, options: *options}
+	if err := os.MkdirAll(path, os.ModePerm); err != nil {
+		return nil, err
+	}
+
+	if err := log.load(); err != nil {
+		return nil, err
+	}
+	log.runFlushTask()
+	return log, nil
+}
+
+func (log *Log) runFlushTask() {
+	go func() {
+		bufferSize := 0
+		for {
+			timer := time.NewTimer(bufferBatchInterval * time.Millisecond)
+			select {
+			case request := <-log.writeChannel:
+				bufferSize += seriesIDLength + timestampLength + len(request.data)
+				err := log.buffer.write(request)
+				if err != nil {
+					log.l.Error().Err(err).Msg("Fail to write to buffer")
+				}
+				if bufferSize > log.options.BufferSize {
+					// Clone buffer,avoiding to block write.
+					buf := log.buffer
+					// Clear buffer to receive Log request.
+					log.newBuffer()
+					if log.closeFlag {

Review Comment:
   You can't archive synchronization through a local varable. To safely share data between goroutines is by using channel or context.



##########
pkg/wal/wal.go:
##########
@@ -43,18 +128,450 @@ type WAL interface {
 	// Write a logging entity.
 	// It will return immediately when the data is written in the buffer,
 	// The returned function will be called when the entity is flushed on the persistent storage.
-	Write(seriesID common.SeriesID, timestamp time.Time, data []byte) (func(), error)
+	Write(seriesID []byte, timestamp time.Time, data []byte) error

Review Comment:
   Why do you drop off the returned function to get flushed event?



##########
pkg/wal/wal.go:
##########
@@ -43,18 +128,450 @@ type WAL interface {
 	// Write a logging entity.
 	// It will return immediately when the data is written in the buffer,
 	// The returned function will be called when the entity is flushed on the persistent storage.
-	Write(seriesID common.SeriesID, timestamp time.Time, data []byte) (func(), error)
+	Write(seriesID []byte, timestamp time.Time, data []byte) error
 	// Read specified segment by SegmentID.
-	Read(segmentID SegmentID) (*Segment, error)
+	Read(segmentID SegmentID) (Segment, error)
 	// ReadAllSegments reads all segments sorted by their creation time in ascending order.
-	ReadAllSegments() ([]*Segment, error)
+	ReadAllSegments() ([]Segment, error)
 	// Rotate closes the open segment and opens a new one, returning the closed segment details.
-	Rotate() (*Segment, error)
+	Rotate() (Segment, error)
 	// Delete the specified segment.
 	Delete(segmentID SegmentID) error
+	// Close all of segments and stop WAL work.
+	Close() error
 }
 
 // New creates a WAL instance in the specified path.
-func New(_ string, _ Options) (WAL, error) {
-	return nil, nil
+func New(path string, options *Options) (WAL, error) {
+	//  Check configuration options.
+	if options == nil {
+		options = DefaultOptions
+	}
+	if options.FileSize <= 0 {
+		options.FileSize = DefaultOptions.FileSize
+	}
+	if options.BufferSize <= 0 {
+		options.BufferSize = DefaultOptions.BufferSize
+	}
+
+	// Initial WAL path.
+	path, err := filepath.Abs(path)
+	if err != nil {
+		return nil, errors.Wrap(err, "Can not get absolute path")
+	}
+	log := &Log{path: path, options: *options}
+	if err := os.MkdirAll(path, os.ModePerm); err != nil {
+		return nil, err
+	}
+
+	if err := log.load(); err != nil {
+		return nil, err
+	}
+	log.runFlushTask()
+	return log, nil
+}
+
+func (log *Log) runFlushTask() {
+	go func() {
+		bufferSize := 0
+		for {
+			timer := time.NewTimer(bufferBatchInterval * time.Millisecond)
+			select {
+			case request := <-log.writeChannel:
+				bufferSize += seriesIDLength + timestampLength + len(request.data)
+				err := log.buffer.write(request)
+				if err != nil {
+					log.l.Error().Err(err).Msg("Fail to write to buffer")
+				}
+				if bufferSize > log.options.BufferSize {
+					// Clone buffer,avoiding to block write.
+					buf := log.buffer
+					// Clear buffer to receive Log request.
+					log.newBuffer()
+					if log.closeFlag {
+						return
+					}
+					log.asyncBatchflush(buf, log.flushChannel)
+				}
+			case <-timer.C:
+				buf := log.buffer
+				log.newBuffer()
+				if log.closeFlag {
+					return
+				}
+				log.asyncBatchflush(buf, log.flushChannel)
+			}
+		}
+	}()
+}
+
+func (log *Log) asyncBatchflush(buffer buffer, flushCh chan string) {
+	go func() {
+		// Convert to byte.
+		var err error
+		bytesBuffer := bytes.NewBuffer([]byte{})
+		err = binary.Write(bytesBuffer, binary.LittleEndian, int64(log.buffer.size))
+		if err != nil {
+			log.l.Error().Err(err).Msg("EntryLength fail to convert to byte")
+		}
+		for seriesID, timestamp := range buffer.timestampMap {
+			count := 0
+			var timeBytes []byte
+			for _, timestamp := range timestamp {
+				time := []byte(timestamp.String())
+				count += len(time)
+				timeBytes = append(timeBytes, time...)
+			}
+			count /= timestampLength
+			entryLength := seriesIDLength + count + len(timeBytes) + binaryLength + len(buffer.valueMap[seriesID])
+			err = binary.Write(bytesBuffer, binary.LittleEndian, int64(entryLength))
+			if err != nil {
+				log.l.Error().Err(err).Msg("EntryLength fail to convert to byte")
+			}
+			err = binary.Write(bytesBuffer, binary.LittleEndian, uint64(seriesID))
+			if err != nil {
+				log.l.Error().Err(err).Msg("SeriesID fail to convert to byte")
+			}
+			err = binary.Write(bytesBuffer, binary.LittleEndian, int32(count))
+			if err != nil {
+				log.l.Error().Err(err).Msg("Count fail to convert to byte")
+			}
+			err = binary.Write(bytesBuffer, binary.LittleEndian, timeBytes)
+			if err != nil {
+				log.l.Error().Err(err).Msg("Timestamp fail to convert to byte")
+			}
+			err = binary.Write(bytesBuffer, binary.LittleEndian, int16(len(buffer.valueMap[seriesID])))
+			if err != nil {
+				log.l.Error().Err(err).Msg("Binary Length fail to convert to byte")
+			}
+			err = binary.Write(bytesBuffer, binary.LittleEndian, buffer.valueMap[seriesID])
+			if err != nil {
+				log.l.Error().Err(err).Msg("Value fail to convert to byte")
+			}
+		}
+
+		// Compression and flush.
+		compressionData := snappy.Encode(nil, bytesBuffer.Bytes())

Review Comment:
   Instead of compressing the entire segment, it is recommended to separately encode/compress timestamps and values.
   
   The `pkg/encoding/xor` package offers methods for encoding uint64. These can be used to encode/decode the UNIX epoch (nanosecond) of a timestamp.
   
   For quick compression of values, snappy can be utilized.
   
   By following this approach, there is no need for `bytesBuffer` during flushing. Instead, data can be written directly to `workSegment.file`.



##########
pkg/wal/wal_test.go:
##########
@@ -0,0 +1,191 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package version can be used to implement embedding versioning details from
+// git branches and tags into the binary importing this package.
+package wal_test
+
+import (
+	"bytes"
+	"encoding/binary"
+	"fmt"
+	"os"
+	"path/filepath"
+	"sync"
+	"time"
+
+	"github.com/onsi/ginkgo/v2"
+	"github.com/onsi/gomega"
+	"github.com/onsi/gomega/gleak"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
+	"github.com/apache/skywalking-banyandb/pkg/wal"
+)
+
+var _ = ginkgo.Describe("WAL", func() {
+	var (
+		log     wal.WAL
+		options *wal.Options
+		goods   []gleak.Goroutine
+	)
+	ginkgo.BeforeEach(func() {
+		options = &wal.Options{
+			Compression: true,
+			FileSize:    67108864, // 20MB
+			BufferSize:  16,       // 16B
+		}
+		goods = gleak.Goroutines()
+	})
+	ginkgo.AfterEach(func() {
+		gomega.Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+	})
+
+	ginkgo.Context("Write and Read", func() {
+		ginkgo.BeforeEach(func() {
+			var err error
+			log, err = wal.New("test", options)
+			gomega.Expect(err).ToNot(gomega.HaveOccurred())
+		})
+
+		ginkgo.AfterEach(func() {
+			path, err := filepath.Abs("test")
+			os.RemoveAll(path)
+			gomega.Expect(err).ToNot(gomega.HaveOccurred())
+			log.Close()
+		})
+
+		ginkgo.It("should write and read data correctly", func() {
+			var err error
+			var wg sync.WaitGroup
+			wg.Add(100)
+			for i := 0; i < 100; i++ {
+				go func(idx int) {
+					defer wg.Done()
+					id := int64(idx)
+					bytesBuffer := bytes.NewBuffer([]byte{})
+					binary.Write(bytesBuffer, binary.LittleEndian, &id)
+					seriesID := bytesBuffer.Bytes()
+					timestamp := time.Now()
+					value := []byte(fmt.Sprintf("value-%d", idx))
+					err = log.Write(seriesID, timestamp, value)
+					gomega.Expect(err).ToNot(gomega.HaveOccurred())
+				}(i)
+			}
+			wg.Wait()
+
+			err = log.Close()
+			gomega.Expect(err).ToNot(gomega.HaveOccurred())
+			log, err = wal.New("test", options)
+			gomega.Expect(err).ToNot(gomega.HaveOccurred())
+
+			segments, err := log.ReadAllSegments()
+			gomega.Expect(err).ToNot(gomega.HaveOccurred())
+			for _, segment := range segments {
+				entries := segment.GetLogEntries()
+				for index, entity := range entries {
+					seriesID := entity.GetSeriesID()
+					gomega.Expect(seriesID == common.SeriesID(index)).To(gomega.BeTrue())
+					value := entity.GetBinary()
+					gomega.Expect(bytes.Equal(value, []byte(fmt.Sprintf("value-%d", index)))).To(gomega.BeTrue())
+				}
+			}
+		})
+	})
+
+	ginkgo.Context("Rotate", func() {
+		ginkgo.BeforeEach(func() {
+			var err error
+			log, err = wal.New("test", options)
+			gomega.Expect(err).ToNot(gomega.HaveOccurred())
+		})
+
+		ginkgo.AfterEach(func() {
+			path, err := filepath.Abs("test")
+			os.RemoveAll(path)
+			gomega.Expect(err).ToNot(gomega.HaveOccurred())
+			log.Close()
+		})
+
+		ginkgo.It("should rotate correctly", func() {
+			var err error
+			var wg sync.WaitGroup
+			wg.Add(100)
+			for i := 0; i < 100; i++ {
+				go func(idx int) {
+					defer wg.Done()
+					id := int64(idx)
+					bytesBuffer := bytes.NewBuffer([]byte{})
+					binary.Write(bytesBuffer, binary.LittleEndian, &id)
+					seriesID := bytesBuffer.Bytes()
+					timestamp := time.Now()
+					value := []byte(fmt.Sprintf("value-%d", idx))
+					err = log.Write(seriesID, timestamp, value)
+					gomega.Expect(err).ToNot(gomega.HaveOccurred())
+				}(i)
+			}
+			wg.Wait()
+
+			segment, err := log.Rotate()

Review Comment:
   Could you verify entries in the segment?



##########
pkg/wal/wal.go:
##########
@@ -43,18 +128,450 @@ type WAL interface {
 	// Write a logging entity.
 	// It will return immediately when the data is written in the buffer,
 	// The returned function will be called when the entity is flushed on the persistent storage.
-	Write(seriesID common.SeriesID, timestamp time.Time, data []byte) (func(), error)
+	Write(seriesID []byte, timestamp time.Time, data []byte) error
 	// Read specified segment by SegmentID.
-	Read(segmentID SegmentID) (*Segment, error)
+	Read(segmentID SegmentID) (Segment, error)
 	// ReadAllSegments reads all segments sorted by their creation time in ascending order.
-	ReadAllSegments() ([]*Segment, error)
+	ReadAllSegments() ([]Segment, error)
 	// Rotate closes the open segment and opens a new one, returning the closed segment details.
-	Rotate() (*Segment, error)
+	Rotate() (Segment, error)
 	// Delete the specified segment.
 	Delete(segmentID SegmentID) error
+	// Close all of segments and stop WAL work.
+	Close() error
 }
 
 // New creates a WAL instance in the specified path.
-func New(_ string, _ Options) (WAL, error) {
-	return nil, nil
+func New(path string, options *Options) (WAL, error) {
+	//  Check configuration options.
+	if options == nil {
+		options = DefaultOptions
+	}
+	if options.FileSize <= 0 {
+		options.FileSize = DefaultOptions.FileSize
+	}
+	if options.BufferSize <= 0 {
+		options.BufferSize = DefaultOptions.BufferSize
+	}
+
+	// Initial WAL path.
+	path, err := filepath.Abs(path)
+	if err != nil {
+		return nil, errors.Wrap(err, "Can not get absolute path")
+	}
+	log := &Log{path: path, options: *options}
+	if err := os.MkdirAll(path, os.ModePerm); err != nil {
+		return nil, err
+	}
+
+	if err := log.load(); err != nil {
+		return nil, err
+	}
+	log.runFlushTask()
+	return log, nil
+}
+
+func (log *Log) runFlushTask() {
+	go func() {
+		bufferSize := 0
+		for {
+			timer := time.NewTimer(bufferBatchInterval * time.Millisecond)
+			select {
+			case request := <-log.writeChannel:
+				bufferSize += seriesIDLength + timestampLength + len(request.data)
+				err := log.buffer.write(request)
+				if err != nil {
+					log.l.Error().Err(err).Msg("Fail to write to buffer")
+				}
+				if bufferSize > log.options.BufferSize {
+					// Clone buffer,avoiding to block write.
+					buf := log.buffer
+					// Clear buffer to receive Log request.
+					log.newBuffer()
+					if log.closeFlag {
+						return
+					}
+					log.asyncBatchflush(buf, log.flushChannel)
+				}
+			case <-timer.C:
+				buf := log.buffer
+				log.newBuffer()
+				if log.closeFlag {
+					return
+				}
+				log.asyncBatchflush(buf, log.flushChannel)
+			}
+		}
+	}()
+}
+
+func (log *Log) asyncBatchflush(buffer buffer, flushCh chan string) {

Review Comment:
   
   ```suggestion
   func (log *Log) asyncBatchFlush(buffer buffer, flushCh chan string) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org