You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2020/12/16 08:00:37 UTC

[GitHub] [skywalking-satellite] EvanLjp opened a new pull request #10: mmap-queue-plugin

EvanLjp opened a new pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10


   the mmap plugin on design doc. The design doc and benchmark test report is on skywalking-satellite/docs/plugins/queue/mmap. Please let me know if you have any suggestions on the swap segments algorithm.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546277885



##########
File path: plugins/queue/mmap/queue_opreation.go
##########
@@ -0,0 +1,237 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor

Review comment:
       It has been added. Please check it again




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#issuecomment-748493107


   https://github.com/apache/skywalking-satellite/blob/3b944f1ecffaf3cc863e80839dce4f24af35efb7/LICENSE#L204
   
   Note, this is not accurate, this should be `Apache SkyWalking Satellite Subcomponents:`, the same issue in the next several lines.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546684267



##########
File path: .github/workflows/build-and-test.yaml
##########
@@ -52,11 +52,11 @@ jobs:
 
       - name: Test
         run: make test
+

Review comment:
       We need this in the CI, to make sure submodule matching the generated source codes, https://github.com/apache/skywalking-cli/blob/b3b919d5c89d43aebfc6f25f147bf41db11950bf/.github/workflows/go.yml#L41-L48




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546252360



##########
File path: plugins/queue/mmap/queue_opreation.go
##########
@@ -0,0 +1,237 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor

Review comment:
       2 things, 
   1. `opreation` -> `operation`? Seems a typo
   2. License issue, I crosscheck this with the following two files, they have similar codes, which I think we should remove the ASF header, because we don't own the original copyright.  Le't keep its [MIT license](https://github.com/grandecola/bigqueue/blob/master/LICENSE) in the header part and add comments about the original sources and why we need to change codes.
   - https://github.com/grandecola/bigqueue/blob/master/write.go
   - https://github.com/grandecola/bigqueue/blob/master/read.go




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] kezhenxu94 commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r547179888



##########
File path: Makefile
##########
@@ -36,7 +37,7 @@ GO_BUILD_FLAGS = -v
 GO_BUILD_LDFLAGS = -X main.version=$(VERSION)
 GQL_GEN = $(GO_PATH)/bin/gqlgen
 
-PLATFORMS := windows linux darwin
+PLATFORMS := linux darwin

Review comment:
       > @EvanLjp I think this has not been resolved. Could you update the status?
   
   What's the resolution?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546253587



##########
File path: plugins/queue/mmap/meta/meta.go
##########
@@ -0,0 +1,152 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package meta
+
+import (
+	"fmt"
+	"path/filepath"
+	"syscall"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+const (
+	metaSize    = 80
+	metaName    = "meta.dat"
+	metaVersion = 1
+)
+
+// Metadata only needs 80B to store the Metadata for the pipe. But for memory alignment,

Review comment:
       Same license issue here, https://github.com/apache/skywalking-satellite/blob/mmap-queue/plugins/queue/mmap/meta/meta.go
   
   Even comments are the same. We must keep the original license.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r548941198



##########
File path: plugins/queue/mmap/segment_operation.go
##########
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"path"
+	"strconv"
+	"sync/atomic"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+// GetSegment returns a memory mapped file at the segmentID position.
+func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
+	index := q.GetIndex(segmentID)
+	if q.mmapCount >= q.MaxInMemSegments {
+		q.insufficientMemChannel <- struct{}{}
+		<-q.sufficientMemChannel
+	}
+	if err := q.mapSegment(segmentID); err != nil {

Review comment:
       segment stores the data 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r545725086



##########
File path: docs/plugins/queue/mmap/README.md
##########
@@ -0,0 +1,66 @@
+# Design
+The mmap-queue is a big, fast and persistent queue based on the memory mapped files. One mmap-queue has a directory to store the whole data. The Queue directory is made up with many segments and 1 meta file. 
+
+- Segment: Segment is the real data store center, that provides large-space storage and does not reduce read and write performance as much as possible by using mmap. And we will avoid deleting files by reusing them.
+- Meta: The purpose of meta is to find the data that the consumer needs.
+
+## Meta
+Metadata only needs 80B to store the Metadata for the pipe. But for memory alignment, it takes at least one memory page size, which is generally 4K.
+```
+[    8Bit   ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit  ]
+[metaVersion][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][capacity]
+[metaVersion][writing   offset][watermark offset][committed offset][reading   offset][capacity]
+
+```
+### Transforming
+
+![](https://skywalking.apache.org/blog/2020-11-25-skywalking-satellite-0.1.0-design/offset-convert.jpg)
+
+## Configuration
+[Configuration Params](../../../configuration/queue.md)
+
+## Segment
+Segments are a series of files of the same size. Each input data would cost `8Bit+Len(data)Bit` to store the raw bytes. The first 8Bit is equal to `Len(data)` for finding the ending position. 
+### Swapper
+For the performance and resources thinking, we define a page replacement policy.
+
+- Keep the reading and writing segments on the memory.
+- When the mmapcount is greater or equals to the max_in_mem_segments, we first scan the read scope and then scan the written scope to swap the segments to promise the reading or writing segments are always in memory.
+    - Read scope: [reading_offset - max_in_mem_segments,reading_offset - 1]
+    - Written scope: [writing_offset - max_in_mem_segments,writing_offset - 1]
+    - Each displacement operation guarantees at least `max_in_mem_segments/2-1` capacity available. Subtract operation to subtract the amount of memory that must always exist.
+
+## BenchmarkTest

Review comment:
       Yes, you can see in the benchmark test runs over 30000 times, so must trigger the memory swap.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] surlymo commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
surlymo commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r548937718



##########
File path: plugins/queue/mmap/queue.go
##########
@@ -0,0 +1,266 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"strconv"
+	"strings"
+	"sync"
+	"syscall"
+	"time"
+
+	"github.com/grandecola/mmap"
+
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
+	"github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+)
+
+// Queue is a memory mapped queue to store the input data.
+type Queue struct {
+	sync.Mutex
+	// config
+	SegmentSize           int    `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
+	MaxInMemSegments      int32  `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
+	QueueCapacitySegments int    `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
+	FlushPeriod           int    `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
+	FlushCeilingNum       int    `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
+	MaxEventSize          int    `mapstructure:"max_event_size"`          // The max size of the input event.
+	QueueDir              string `mapstructure:"queue_dir"`               // Contains all files in the queue.
+
+	// running components
+	meta                   *meta.Metadata // The metadata file.
+	segments               []*mmap.File   // The data files.
+	mmapCount              int32          // The number of the memory mapped files.
+	unflushedNum           int            // The unflushed number.
+	flushChannel           chan struct{}  // The flushChannel channel would receive a signal when the unflushedNum reach the flush_ceiling_num.
+	insufficientMemChannel chan struct{}  // Notify when memory is insufficient
+	sufficientMemChannel   chan struct{}  // Notify when memory is sufficient
+
+	// control components
+	ctx        context.Context    // Parent ctx
+	cancel     context.CancelFunc // Parent ctx cancel function
+	showDownWg sync.WaitGroup     // The shutdown wait group.
+
+}
+
+func (q *Queue) Name() string {
+	return "mmap-queue"
+}
+
+func (q *Queue) Description() string {
+	return "this is a mmap queue"
+}
+
+func (q *Queue) DefaultConfig() string {
+	return `
+# The size of each segment. Default value is 128K. The unit is Byte.
+segment_size: 131072
+# The max num of segments in memory. Default value is 10.
+max_in_mem_segments: 10
+# The capacity of Queue = segment_size * queue_capacity_segments.
+queue_capacity_segments: 4000
+# The period flush time. The unit is ms. Default value is 1 second.
+flush_period: 1000
+# The max number in one flush time.  Default value is 10000.
+flush_ceiling_num: 10000
+# Contains all files in the queue.
+queue_dir: satellite-mmap-queue
+# The max size of the input event. Default value is 20k.
+max_event_size: 20480
+`
+}
+
+func (q *Queue) Initialize() error {
+	// the size of each segment file should be a multiple of the page size.
+	pageSize := os.Getpagesize()
+	if q.SegmentSize%pageSize != 0 {
+		q.SegmentSize -= q.SegmentSize % pageSize
+	}
+	if q.SegmentSize/pageSize == 0 {
+		q.SegmentSize = 131072
+	}
+	// the minimum MaxInMemSegments value should be 4.
+	if q.MaxInMemSegments < 4 {
+		q.MaxInMemSegments = 4
+	}
+	// load metadata and override the reading or writing offset by the committed or watermark offset.
+	md, err := meta.NewMetaData(q.QueueDir, q.QueueCapacitySegments)
+	if err != nil {
+		return fmt.Errorf("error in creating the metadata: %v", err)
+	}
+	q.meta = md
+	cmID, cmOffset := md.GetCommittedOffset()
+	wmID, wmOffset := md.GetWatermarkOffset()
+	md.PutWritingOffset(wmID, wmOffset)
+	md.PutReadingOffset(cmID, cmOffset)
+	// keep the reading or writing segments in the memory.
+	q.segments = make([]*mmap.File, q.QueueCapacitySegments)
+	if _, err := q.GetSegment(cmID); err != nil {
+		return err
+	}
+	if _, err := q.GetSegment(wmID); err != nil {
+		return err
+	}
+	// init components
+	q.insufficientMemChannel = make(chan struct{})
+	q.sufficientMemChannel = make(chan struct{})
+	q.flushChannel = make(chan struct{})
+	q.ctx, q.cancel = context.WithCancel(context.Background())
+	// async supported processes.
+	q.showDownWg.Add(2)
+	go q.segmentSwapper()
+	go q.flush()
+	return nil
+}
+
+func (q *Queue) Push(e *protocol.Event) error {
+	data, err := proto.Marshal(e)

Review comment:
       this can be lower the complexity and make system quicker




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546681511



##########
File path: README.md
##########
@@ -7,7 +7,8 @@ Apache SkyWalking Satellite
 
 [![GitHub stars](https://img.shields.io/github/stars/apache/skywalking.svg?style=for-the-badge&label=Stars&logo=github)](https://github.com/apache/skywalking)
 [![Twitter Follow](https://img.shields.io/twitter/follow/asfskywalking.svg?style=for-the-badge&label=Follow&logo=twitter)](https://twitter.com/AsfSkyWalking)
-
+# Compile
+[How to compile the Satellite.](./docs/compile/compile.md)

Review comment:
       Compiling doc should be after download. People would not learn a project from reading codes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r547220703



##########
File path: dist/licenses/LICENSE-github-protobuf.txt
##########
@@ -0,0 +1,27 @@
+Copyright 2010 The Go Authors.  All rights reserved.

Review comment:
       I found there are 2 protobuf libs , and the licenses seem not equal, so add 2 licenses. I am not quite familiar with Proto and I'd like your advice




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546306012



##########
File path: dist/LICENSE
##########
@@ -235,3 +235,4 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
 	spf13 (viper) v1.7.1: https://github.com/spf13/viper MIT
 	urfave (cli) v2.3.0: https://github.com/urfave/cli MIT
 	grandecola (mmap) v0.6.0: https://github.com/grandecola/mmap MIT
+	grandecola (bigqueue) v0.4.0: https://github.com/grandecola/bigqueue MIT

Review comment:
       Not just this, this is the binary license, you need to update the source code LICENSE.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r547180595



##########
File path: Makefile
##########
@@ -36,7 +37,7 @@ GO_BUILD_FLAGS = -v
 GO_BUILD_LDFLAGS = -X main.version=$(VERSION)
 GQL_GEN = $(GO_PATH)/bin/gqlgen
 
-PLATFORMS := windows linux darwin
+PLATFORMS := linux darwin

Review comment:
       https://github.com/apache/skywalking/issues/6046 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#issuecomment-751313703


    If no more feedback today, I am going to merge this. We are looking froward the first release, the codes will be polished from time to time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] kezhenxu94 commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r547182272



##########
File path: dist/licenses/LICENSE-github-protobuf.txt
##########
@@ -0,0 +1,27 @@
+Copyright 2010 The Go Authors.  All rights reserved.

Review comment:
       This file should be removed? As filename seems to be wrong `dist/licenses/LICENSE-github-protobuf.txt`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546595103



##########
File path: Makefile
##########
@@ -36,7 +37,7 @@ GO_BUILD_FLAGS = -v
 GO_BUILD_LDFLAGS = -X main.version=$(VERSION)
 GQL_GEN = $(GO_PATH)/bin/gqlgen
 
-PLATFORMS := windows linux darwin
+PLATFORMS := linux darwin

Review comment:
       @EvanLjp I think this has not been resolved. Could you update the status?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#issuecomment-748055641


   Could you make CI passed first?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r545700216



##########
File path: plugins/queue/mmap/queue_opreation.go
##########
@@ -0,0 +1,236 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"strings"
+	"syscall"
+	"time"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+)
+
+const uInt64Size = 8
+
+// flush control the flush operation by timer or counter.
+func (q *Queue) flush() {
+	defer q.showDownWg.Done()
+	ctx, cancel := context.WithCancel(q.ctx)
+	defer cancel()
+	for {
+		timeTicker := time.NewTicker(time.Duration(q.FlushPeriod) * time.Millisecond)
+		select {
+		case <-q.flushChannel:
+			q.doFlush()
+		case <-timeTicker.C:
+			q.doFlush()
+		case <-ctx.Done():
+			q.doFlush()
+			return
+		}
+	}
+}
+
+// doFlush flush the segment and meta files to the disk.
+func (q *Queue) doFlush() {
+	q.Lock()
+	defer q.Unlock()
+	for _, segment := range q.segments {
+		if segment == nil {
+			continue
+		}
+		if err := segment.Flush(syscall.MS_SYNC); err != nil {
+			log.Logger.Errorf("cannot flush segment file: %v", err)
+		}
+	}
+	wid, woffset := q.meta.GetWritingOffset()
+	q.meta.PutWatermarkOffset(wid, woffset)
+	if err := q.meta.Flush(); err != nil {
+		log.Logger.Errorf("cannot flush meta file: %v", err)
+	}
+}
+
+// push writes the data into the file system. It first writes the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) push(bytes []byte) error {

Review comment:
       ![image](https://user-images.githubusercontent.com/31562192/102597291-648e3900-4155-11eb-9949-1ce3c58218f7.png)
   
   Already add it before, please have a look again




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#issuecomment-748867548


   > thx for recheck it again.
   
   @EvanLjp Still can't find the source code LICENSE change, do I miss anything?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] hanahmily commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
hanahmily commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546300310



##########
File path: go.mod
##########
@@ -3,8 +3,9 @@ module github.com/apache/skywalking-satellite
 go 1.14
 
 require (
+	github.com/google/go-cmp v0.5.4

Review comment:
       After running `make dep`, can't reproduce it anymore, wired. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546277965



##########
File path: Makefile
##########
@@ -36,7 +37,7 @@ GO_BUILD_FLAGS = -v
 GO_BUILD_LDFLAGS = -X main.version=$(VERSION)
 GQL_GEN = $(GO_PATH)/bin/gqlgen
 
-PLATFORMS := windows linux darwin
+PLATFORMS := linux darwin

Review comment:
       The MMap Go is largely unsupported due to the underlying system command




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546682764



##########
File path: plugins/queue/mmap/branchmark_test.go
##########
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"fmt"
+	"os"
+	"testing"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+type benchmarkParam struct {
+	segmentSize      int
+	message          int // unit KB
+	maxInMemSegments int
+}
+
+var params = []benchmarkParam{
+	{segmentSize: 1024 * 128, message: 8, maxInMemSegments: 10},
+	// compare the influence of the segmentSize.
+	{segmentSize: 1024 * 256, message: 8, maxInMemSegments: 10},
+	// compare the influence of the maxInMemSegments.
+	{segmentSize: 1024 * 128, message: 8, maxInMemSegments: 20},
+	// compare the influence of the message size.
+	{segmentSize: 1024 * 128, message: 16, maxInMemSegments: 10},
+}
+
+func cleanBenchmarkQueue(b *testing.B, q api.Queue) {
+	if err := os.RemoveAll(q.(*Queue).QueueDir); err != nil {
+		b.Errorf("cannot remove test queue dir, %v", err)
+	}
+}
+
+func BenchmarkPush(b *testing.B) {

Review comment:
       I can't see this has been fixed, but the comments have been resolved. Could you update?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] hanahmily commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
hanahmily commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r544746659



##########
File path: dist/licenses/LICENSE-go-cmp
##########
@@ -0,0 +1,27 @@
+Copyright (c) 2017 The Go Authors. All rights reserved.

Review comment:
       The file and others located at `dist/licenses` should be appended with the `.txt` suffix as indicated in `dist/LICENSE`.

##########
File path: docs/configuration/queue.md
##########
@@ -0,0 +1,11 @@
+# queue configuration
+
+|  Type   | Param  | DefaultValue| Meaning| 
+|  ----  | ----  |----  | ----  |
+| mmap-queue  | segment_size | 131072 | The size of each segment. The unit is Byte.

Review comment:
       Should mention that it has to be more than system memory page size

##########
File path: go.mod
##########
@@ -3,8 +3,9 @@ module github.com/apache/skywalking-satellite
 go 1.14
 
 require (
+	github.com/google/go-cmp v0.5.4

Review comment:
       After issuing `go mod tidy`, I found this file's content is updated. My preference to run the tidy command before committing, in order to keep go.mod's content consistent. A better practice to add a task to CI to check it as https://github.com/apache/skywalking-swck/blob/master/.github/workflows/go.yml#L42 did

##########
File path: plugins/queue/mmap/queue.go
##########
@@ -0,0 +1,192 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"os"
+	"sync"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
+)
+
+// Queue is a memory mapped queue to store the input data.
+type Queue struct {
+	sync.Mutex
+	// config
+	SegmentSize           int    `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
+	MaxInMemSegments      int    `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
+	QueueCapacitySegments int    `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
+	FlushPeriod           int    `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
+	FlushCeilingNum       int    `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
+	MaxEventSize          int    `mapstructure:"max_event_size"`          // The max size of the input event.
+	QueueDir              string `mapstructure:"queue_dir"`               // Contains all files in the queue.
+
+	// running components
+	meta                   *meta.Metadata // The metadata file.
+	segments               []*mmap.File   // The data files.

Review comment:
       In flush goroutine, you opt to the mutex to make it visible to flush operation. Could you use the `atomic` package to improve the performance?

##########
File path: docs/configuration/queue.md
##########
@@ -0,0 +1,11 @@
+# queue configuration
+
+|  Type   | Param  | DefaultValue| Meaning| 
+|  ----  | ----  |----  | ----  |
+| mmap-queue  | segment_size | 131072 | The size of each segment. The unit is Byte.
+| mmap-queue  | max_in_mem_segments | 10 | The max num of segments in memory.

Review comment:
       Could you mention it could be more than 3?

##########
File path: docs/plugins/queue/mmap/README.md
##########
@@ -0,0 +1,66 @@
+# Design
+The mmap-queue is a big, fast and persistent queue based on the memory mapped files. One mmap-queue has a directory to store the whole data. The Queue directory is made up with many segments and 1 meta file. 
+
+- Segment: Segment is the real data store center, that provides large-space storage and does not reduce read and write performance as much as possible by using mmap. And we will avoid deleting files by reusing them.
+- Meta: The purpose of meta is to find the data that the consumer needs.
+
+## Meta
+Metadata only needs 80B to store the Metadata for the pipe. But for memory alignment, it takes at least one memory page size, which is generally 4K.
+```
+[    8Bit   ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit  ]
+[metaVersion][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][capacity]
+[metaVersion][writing   offset][watermark offset][committed offset][reading   offset][capacity]
+
+```
+### Transforming
+
+![](https://skywalking.apache.org/blog/2020-11-25-skywalking-satellite-0.1.0-design/offset-convert.jpg)
+
+## Configuration
+[Configuration Params](../../../configuration/queue.md)
+
+## Segment
+Segments are a series of files of the same size. Each input data would cost `8Bit+Len(data)Bit` to store the raw bytes. The first 8Bit is equal to `Len(data)` for finding the ending position. 
+### Swapper
+For the performance and resources thinking, we define a page replacement policy.
+
+- Keep the reading and writing segments on the memory.
+- When the mmapcount is greater or equals to the max_in_mem_segments, we first scan the read scope and then scan the written scope to swap the segments to promise the reading or writing segments are always in memory.
+    - Read scope: [reading_offset - max_in_mem_segments,reading_offset - 1]
+    - Written scope: [writing_offset - max_in_mem_segments,writing_offset - 1]
+    - Each displacement operation guarantees at least `max_in_mem_segments/2-1` capacity available. Subtract operation to subtract the amount of memory that must always exist.
+
+## BenchmarkTest

Review comment:
       The result looks great. Does it contain some scenarios under memory pressure, that means swap is triggered.

##########
File path: plugins/queue/mmap/queue.go
##########
@@ -0,0 +1,192 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"os"
+	"sync"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
+)
+
+// Queue is a memory mapped queue to store the input data.
+type Queue struct {
+	sync.Mutex
+	// config
+	SegmentSize           int    `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
+	MaxInMemSegments      int    `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
+	QueueCapacitySegments int    `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
+	FlushPeriod           int    `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
+	FlushCeilingNum       int    `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
+	MaxEventSize          int    `mapstructure:"max_event_size"`          // The max size of the input event.
+	QueueDir              string `mapstructure:"queue_dir"`               // Contains all files in the queue.
+
+	// running components
+	meta                   *meta.Metadata // The metadata file.
+	segments               []*mmap.File   // The data files.
+	mmapCount              int            // The number of the memory mapped files.

Review comment:
       it's shared in different goroutines, swapper contains reading and misusing operations but push take cares of adding. How to sync this shared variable?

##########
File path: plugins/queue/mmap/meta/meta.go
##########
@@ -0,0 +1,153 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package meta
+
+import (
+	"fmt"
+	"syscall"
+
+	"path/filepath"

Review comment:
       Imports are not sorted




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r548941072



##########
File path: plugins/queue/mmap/segment_operation.go
##########
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"path"
+	"strconv"
+	"sync/atomic"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+// GetSegment returns a memory mapped file at the segmentID position.
+func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {

Review comment:
       don't support concurrency




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp merged pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp merged pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#issuecomment-748519822


   @kezhenxu94 I don't know why the ci processes are not work, could u help me?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546244391



##########
File path: plugins/queue/mmap/branchmark_test.go
##########
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"fmt"
+	"os"
+	"testing"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+type benchmarkParam struct {
+	segmentSize      int
+	message          int // unit KB
+	maxInMemSegments int
+}
+
+var params = []benchmarkParam{
+	{segmentSize: 1024 * 128, message: 8, maxInMemSegments: 10},
+	// compare the influence of the segmentSize.
+	{segmentSize: 1024 * 256, message: 8, maxInMemSegments: 10},
+	// compare the influence of the maxInMemSegments.
+	{segmentSize: 1024 * 128, message: 8, maxInMemSegments: 20},
+	// compare the influence of the message size.
+	{segmentSize: 1024 * 128, message: 16, maxInMemSegments: 10},
+}
+
+func cleanBenchmarkQueue(b *testing.B, q api.Queue) {
+	if err := os.RemoveAll(q.(*Queue).QueueDir); err != nil {
+		b.Errorf("cannot remove test queue dir, %v", err)
+	}
+}
+
+func BenchmarkPush(b *testing.B) {

Review comment:
       As the benchmark, SkyWalking usually posts the results with specifications of the Mac.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r545745760



##########
File path: go.mod
##########
@@ -3,8 +3,9 @@ module github.com/apache/skywalking-satellite
 go 1.14
 
 require (
+	github.com/google/go-cmp v0.5.4

Review comment:
       ![image](https://user-images.githubusercontent.com/31562192/102605242-f7cc6c00-415f-11eb-8536-09648a90a4db.png)
   There seems to be some risk on my machine that is not the same as yours




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546219629



##########
File path: .github/workflows/build-and-test.yaml
##########
@@ -50,8 +50,8 @@ jobs:
       - name: Lint
         run: make lint
 
-      - name: Check
-        run: make check
+      - name: Test
+          run: make test

Review comment:
       Do you break the CI control file?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546307863



##########
File path: docs/plugins/queue/mmap/README.md
##########
@@ -0,0 +1,66 @@
+# Design
+The mmap-queue is a big, fast and persistent queue based on the memory mapped files. One mmap-queue has a directory to store the whole data. The Queue directory is made up with many segments and 1 meta file, which references the design of the [bigqueue](https://github.com/grandecola/bigqueue). 

Review comment:
       Please note, `reference` usually means taking the design idea, but here, we copied some of it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546352636



##########
File path: Makefile
##########
@@ -67,7 +67,9 @@ test: clean lint
 
 .PHONY: license
 license: clean tools
-	$(GO_LICENSER) -d -exclude=protocol/gen-codes -licensor='Apache Software Foundation (ASF)' .
+	$(GO_LICENSER) -d  -exclude=plugins/queue/mmap \

Review comment:
       The exclude feature of the licensor is only working in a dir.  When refactoring the file to a single dir, a serious problem would be occurring, that is circle reference.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r548941148



##########
File path: plugins/queue/mmap/segment_operation.go
##########
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"path"
+	"strconv"
+	"sync/atomic"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+// GetSegment returns a memory mapped file at the segmentID position.
+func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
+	index := q.GetIndex(segmentID)
+	if q.mmapCount >= q.MaxInMemSegments {

Review comment:
       the method is for pushing operation




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546359397



##########
File path: Makefile
##########
@@ -67,7 +67,9 @@ test: clean lint
 
 .PHONY: license
 license: clean tools
-	$(GO_LICENSER) -d -exclude=protocol/gen-codes -licensor='Apache Software Foundation (ASF)' .
+	$(GO_LICENSER) -d  -exclude=plugins/queue/mmap \

Review comment:
       This is the typle case of decoupling the codes. I think there are enough design patterns to fix this.
   Anyway, if tool supports file level exclusion, should be better.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r545723958



##########
File path: plugins/queue/mmap/queue.go
##########
@@ -0,0 +1,192 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"os"
+	"sync"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
+)
+
+// Queue is a memory mapped queue to store the input data.
+type Queue struct {
+	sync.Mutex
+	// config
+	SegmentSize           int    `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
+	MaxInMemSegments      int    `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
+	QueueCapacitySegments int    `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
+	FlushPeriod           int    `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
+	FlushCeilingNum       int    `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
+	MaxEventSize          int    `mapstructure:"max_event_size"`          // The max size of the input event.
+	QueueDir              string `mapstructure:"queue_dir"`               // Contains all files in the queue.
+
+	// running components
+	meta                   *meta.Metadata // The metadata file.
+	segments               []*mmap.File   // The data files.

Review comment:
       the flush in only in one goroutine, so I remove the lock




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r548944865



##########
File path: internal/satellite/module/sender/sender.go
##########
@@ -130,12 +131,12 @@ func (s *Sender) Shutdown() {
 func (s *Sender) consume(batch *buffer.BatchBuffer) {
 	log.Logger.Infof("sender module of %s namespace is flushing a new batch buffer."+
 		" the start offset is %s, and the size is %d", s.config.NamespaceName, batch.Last(), batch.Len())
-	var events = make(map[event.Type]event.BatchEvents)
+	var events = make(map[protocol.EventType]event.BatchEvents)
 	for i := 0; i < batch.Len(); i++ {
 		eventContext := batch.Buf()[i]
 		for _, e := range eventContext.Context {
-			if e.IsRemote() {
-				events[e.Type()] = append(events[e.Type()], e)
+			if e.Remote {

Review comment:
       use proto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] hanahmily commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
hanahmily commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r544781893



##########
File path: plugins/queue/mmap/queue.go
##########
@@ -0,0 +1,192 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"os"
+	"sync"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
+)
+
+// Queue is a memory mapped queue to store the input data.
+type Queue struct {
+	sync.Mutex
+	// config
+	SegmentSize           int    `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
+	MaxInMemSegments      int    `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
+	QueueCapacitySegments int    `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
+	FlushPeriod           int    `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
+	FlushCeilingNum       int    `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
+	MaxEventSize          int    `mapstructure:"max_event_size"`          // The max size of the input event.
+	QueueDir              string `mapstructure:"queue_dir"`               // Contains all files in the queue.
+
+	// running components
+	meta                   *meta.Metadata // The metadata file.
+	segments               []*mmap.File   // The data files.
+	mmapCount              int            // The number of the memory mapped files.

Review comment:
       it's shared in different goroutines, swapper contains read and subtraction operations but push take cares of addition. How to sync this shared variable?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] surlymo commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
surlymo commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r548939173



##########
File path: plugins/queue/mmap/queue_opreation.go
##########
@@ -0,0 +1,167 @@
+// MIT License
+//
+// Copyright (c) 2018 Aman Mangal
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package mmap
+
+import (
+	"fmt"
+)
+
+// Because the design of the mmap-queue in Satellite references the design of the
+// bigqueue(https://github.com/grandecola/bigqueue), the queue operation file retains
+// the original author license.
+//
+// The reason why we references the source codes of bigqueue rather than using the lib
+// is the file queue in Satellite is like following.
+// 1. Only one consumer and publisher in the Satellite queue.
+// 2. Reusing files strategy is required to reduce the creation times in the Satellite queue.
+// 3. More complex OFFSET design is needed to ensure the final stability of data.
+
+const uInt64Size = 8
+
+// push writes the data into the file system. It first writes the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) push(bytes []byte) error {
+	if q.isFull() {
+		return fmt.Errorf("cannot push data when the queue is full")
+	}
+	id, offset := q.meta.GetWritingOffset()
+	id, offset, err := q.writeLength(len(bytes), id, offset)
+	if err != nil {
+		return err
+	}
+	id, offset, err = q.writeBytes(bytes, id, offset)
+	if err != nil {
+		return err
+	}
+	q.meta.PutWritingOffset(id, offset)
+	q.unflushedNum++
+	if q.unflushedNum == q.FlushCeilingNum {
+		q.flushChannel <- struct{}{}
+		q.unflushedNum = 0
+	}
+	return nil
+}
+
+// pop reads the data from the file system. It first reads the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) pop() (data []byte, rid, roffset int64, err error) {
+	if q.isEmpty() {
+		return nil, 0, 0, fmt.Errorf("cannot read data when the queue is empty")
+	}
+	id, offset := q.meta.GetReadingOffset()
+	id, offset, length, err := q.readLength(id, offset)
+	if err != nil {
+		return nil, 0, 0, err
+	}
+	bytes, id, offset, err := q.readBytes(id, offset, length)
+	if err != nil {
+		return nil, 0, 0, err
+	}
+	q.meta.PutReadingOffset(id, offset)
+	return bytes, id, offset, nil
+}
+
+// readBytes reads bytes into the memory mapped file.
+func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, newOffset int64, err error) {
+	counter := 0
+	res := make([]byte, length)
+	for {
+		segment, err := q.GetSegment(id)
+		if err != nil {
+			return nil, 0, 0, err
+		}
+		readBytes, err := segment.ReadAt(res[counter:], offset)
+		if err != nil {
+			return nil, 0, 0, err
+		}
+		counter += readBytes
+		offset += int64(readBytes)
+		if offset == int64(q.SegmentSize) {
+			id, offset = id+1, 0
+		}
+		if counter == length {
+			break
+		}
+	}
+	return res, id, offset, nil
+}
+
+// readLength reads the data length with 8 Bits spaces.
+func (q *Queue) readLength(id, offset int64) (newID, newOffset int64, length int, err error) {
+	if offset+uInt64Size > int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	segment, err := q.GetSegment(id)
+	if err != nil {
+		return 0, 0, 0, err
+	}
+	num := segment.ReadUint64At(offset)
+	offset += uInt64Size
+	if offset == int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	return id, offset, int(num), nil
+}
+
+// writeLength write the data length with 8 Bits spaces.
+func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset int64, err error) {
+	if offset+uInt64Size > int64(q.SegmentSize) {

Review comment:
       remainSegmentSize?

##########
File path: plugins/queue/mmap/segment_operation.go
##########
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"path"
+	"strconv"
+	"sync/atomic"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+// GetSegment returns a memory mapped file at the segmentID position.
+func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
+	index := q.GetIndex(segmentID)
+	if q.mmapCount >= q.MaxInMemSegments {
+		q.insufficientMemChannel <- struct{}{}
+		<-q.sufficientMemChannel
+	}
+	if err := q.mapSegment(segmentID); err != nil {
+		return nil, err
+	}
+	if q.segments[index] != nil {
+		return q.segments[index], nil
+	}
+	return nil, fmt.Errorf("cannot get a memory mapped file at %d segment", segmentID)
+}
+
+// mapSegment load the segment file reference to the segments.
+func (q *Queue) mapSegment(segmentID int64) error {
+	index := q.GetIndex(segmentID)
+	if q.segments[index] != nil {
+		return nil
+	}
+	filePath := path.Join(q.QueueDir, strconv.Itoa(index)+segment.FileSuffix)

Review comment:
       file capacity threadhold management mechanism?

##########
File path: plugins/queue/mmap/queue_opreation.go
##########
@@ -0,0 +1,167 @@
+// MIT License
+//
+// Copyright (c) 2018 Aman Mangal
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package mmap
+
+import (
+	"fmt"
+)
+
+// Because the design of the mmap-queue in Satellite references the design of the
+// bigqueue(https://github.com/grandecola/bigqueue), the queue operation file retains
+// the original author license.
+//
+// The reason why we references the source codes of bigqueue rather than using the lib
+// is the file queue in Satellite is like following.
+// 1. Only one consumer and publisher in the Satellite queue.
+// 2. Reusing files strategy is required to reduce the creation times in the Satellite queue.
+// 3. More complex OFFSET design is needed to ensure the final stability of data.
+
+const uInt64Size = 8
+
+// push writes the data into the file system. It first writes the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) push(bytes []byte) error {
+	if q.isFull() {
+		return fmt.Errorf("cannot push data when the queue is full")
+	}
+	id, offset := q.meta.GetWritingOffset()
+	id, offset, err := q.writeLength(len(bytes), id, offset)
+	if err != nil {
+		return err
+	}
+	id, offset, err = q.writeBytes(bytes, id, offset)
+	if err != nil {
+		return err
+	}
+	q.meta.PutWritingOffset(id, offset)
+	q.unflushedNum++
+	if q.unflushedNum == q.FlushCeilingNum {
+		q.flushChannel <- struct{}{}
+		q.unflushedNum = 0
+	}
+	return nil
+}
+
+// pop reads the data from the file system. It first reads the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) pop() (data []byte, rid, roffset int64, err error) {
+	if q.isEmpty() {
+		return nil, 0, 0, fmt.Errorf("cannot read data when the queue is empty")
+	}
+	id, offset := q.meta.GetReadingOffset()
+	id, offset, length, err := q.readLength(id, offset)
+	if err != nil {
+		return nil, 0, 0, err
+	}
+	bytes, id, offset, err := q.readBytes(id, offset, length)
+	if err != nil {
+		return nil, 0, 0, err
+	}
+	q.meta.PutReadingOffset(id, offset)
+	return bytes, id, offset, nil
+}
+
+// readBytes reads bytes into the memory mapped file.
+func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, newOffset int64, err error) {
+	counter := 0
+	res := make([]byte, length)
+	for {
+		segment, err := q.GetSegment(id)
+		if err != nil {
+			return nil, 0, 0, err
+		}
+		readBytes, err := segment.ReadAt(res[counter:], offset)
+		if err != nil {
+			return nil, 0, 0, err
+		}
+		counter += readBytes
+		offset += int64(readBytes)
+		if offset == int64(q.SegmentSize) {
+			id, offset = id+1, 0
+		}
+		if counter == length {
+			break
+		}
+	}
+	return res, id, offset, nil
+}
+
+// readLength reads the data length with 8 Bits spaces.
+func (q *Queue) readLength(id, offset int64) (newID, newOffset int64, length int, err error) {
+	if offset+uInt64Size > int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	segment, err := q.GetSegment(id)
+	if err != nil {
+		return 0, 0, 0, err
+	}
+	num := segment.ReadUint64At(offset)
+	offset += uInt64Size
+	if offset == int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	return id, offset, int(num), nil
+}
+
+// writeLength write the data length with 8 Bits spaces.
+func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset int64, err error) {

Review comment:
       how to deal with while the single request bytes is greater than a segment size or remain segment ? show the code

##########
File path: plugins/queue/mmap/segment_operation.go
##########
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"path"
+	"strconv"
+	"sync/atomic"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+// GetSegment returns a memory mapped file at the segmentID position.
+func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
+	index := q.GetIndex(segmentID)
+	if q.mmapCount >= q.MaxInMemSegments {
+		q.insufficientMemChannel <- struct{}{}
+		<-q.sufficientMemChannel
+	}
+	if err := q.mapSegment(segmentID); err != nil {

Review comment:
       why mapsegment?

##########
File path: plugins/queue/mmap/segment_operation.go
##########
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"path"
+	"strconv"
+	"sync/atomic"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+// GetSegment returns a memory mapped file at the segmentID position.
+func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
+	index := q.GetIndex(segmentID)
+	if q.mmapCount >= q.MaxInMemSegments {

Review comment:
       why execute it here? why not just using timer to trigger

##########
File path: plugins/queue/mmap/queue_opreation.go
##########
@@ -0,0 +1,167 @@
+// MIT License
+//
+// Copyright (c) 2018 Aman Mangal
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package mmap
+
+import (
+	"fmt"
+)
+
+// Because the design of the mmap-queue in Satellite references the design of the
+// bigqueue(https://github.com/grandecola/bigqueue), the queue operation file retains
+// the original author license.
+//
+// The reason why we references the source codes of bigqueue rather than using the lib
+// is the file queue in Satellite is like following.
+// 1. Only one consumer and publisher in the Satellite queue.
+// 2. Reusing files strategy is required to reduce the creation times in the Satellite queue.
+// 3. More complex OFFSET design is needed to ensure the final stability of data.
+
+const uInt64Size = 8
+
+// push writes the data into the file system. It first writes the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) push(bytes []byte) error {
+	if q.isFull() {
+		return fmt.Errorf("cannot push data when the queue is full")
+	}
+	id, offset := q.meta.GetWritingOffset()
+	id, offset, err := q.writeLength(len(bytes), id, offset)
+	if err != nil {
+		return err
+	}
+	id, offset, err = q.writeBytes(bytes, id, offset)
+	if err != nil {
+		return err
+	}
+	q.meta.PutWritingOffset(id, offset)
+	q.unflushedNum++
+	if q.unflushedNum == q.FlushCeilingNum {
+		q.flushChannel <- struct{}{}
+		q.unflushedNum = 0
+	}
+	return nil
+}
+
+// pop reads the data from the file system. It first reads the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) pop() (data []byte, rid, roffset int64, err error) {
+	if q.isEmpty() {
+		return nil, 0, 0, fmt.Errorf("cannot read data when the queue is empty")
+	}
+	id, offset := q.meta.GetReadingOffset()
+	id, offset, length, err := q.readLength(id, offset)
+	if err != nil {
+		return nil, 0, 0, err
+	}
+	bytes, id, offset, err := q.readBytes(id, offset, length)
+	if err != nil {
+		return nil, 0, 0, err
+	}
+	q.meta.PutReadingOffset(id, offset)
+	return bytes, id, offset, nil
+}
+
+// readBytes reads bytes into the memory mapped file.
+func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, newOffset int64, err error) {
+	counter := 0
+	res := make([]byte, length)
+	for {
+		segment, err := q.GetSegment(id)
+		if err != nil {
+			return nil, 0, 0, err
+		}
+		readBytes, err := segment.ReadAt(res[counter:], offset)
+		if err != nil {
+			return nil, 0, 0, err
+		}
+		counter += readBytes
+		offset += int64(readBytes)
+		if offset == int64(q.SegmentSize) {
+			id, offset = id+1, 0
+		}
+		if counter == length {
+			break
+		}
+	}
+	return res, id, offset, nil
+}
+
+// readLength reads the data length with 8 Bits spaces.
+func (q *Queue) readLength(id, offset int64) (newID, newOffset int64, length int, err error) {
+	if offset+uInt64Size > int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	segment, err := q.GetSegment(id)
+	if err != nil {
+		return 0, 0, 0, err
+	}
+	num := segment.ReadUint64At(offset)
+	offset += uInt64Size
+	if offset == int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	return id, offset, int(num), nil
+}
+
+// writeLength write the data length with 8 Bits spaces.
+func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset int64, err error) {
+	if offset+uInt64Size > int64(q.SegmentSize) {

Review comment:
       change the name if yes

##########
File path: plugins/queue/mmap/segment_operation.go
##########
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"path"
+	"strconv"
+	"sync/atomic"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+// GetSegment returns a memory mapped file at the segmentID position.
+func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {

Review comment:
       is the queue support concurrency situation? any problem will occur in this situation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546300667



##########
File path: Makefile
##########
@@ -36,7 +37,7 @@ GO_BUILD_FLAGS = -v
 GO_BUILD_LDFLAGS = -X main.version=$(VERSION)
 GQL_GEN = $(GO_PATH)/bin/gqlgen
 
-PLATFORMS := windows linux darwin
+PLATFORMS := linux darwin

Review comment:
       Does that block compiling and running if we don't activate mmap? AFAIK, mmap is only a queue option, we could add document about it, says, only support memory-queue or other implementation in windows. Don't like the idea to exclude window directly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546359667



##########
File path: Makefile
##########
@@ -67,7 +67,9 @@ test: clean lint
 
 .PHONY: license
 license: clean tools
-	$(GO_LICENSER) -d -exclude=protocol/gen-codes -licensor='Apache Software Foundation (ASF)' .
+	$(GO_LICENSER) -d  -exclude=plugins/queue/mmap \

Review comment:
       I will try my best to solve this problem




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#issuecomment-749345962


   > CI fails, generating codes check can't pass.
   
   go.mod &go.sum may be different in different machines because of the indirect libs, using grep eliminates this effect.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r545682816



##########
File path: go.mod
##########
@@ -3,8 +3,9 @@ module github.com/apache/skywalking-satellite
 go 1.14
 
 require (
+	github.com/google/go-cmp v0.5.4

Review comment:
       ![image](https://user-images.githubusercontent.com/31562192/102595676-f34d8680-4152-11eb-9289-697681f0027e.png)
   I don't find the file is updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#issuecomment-748549058


   Root LICENSE, aka source code license has not been updated.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] nic-chen commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
nic-chen commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r544913104



##########
File path: plugins/queue/mmap/queue.go
##########
@@ -0,0 +1,192 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"os"
+	"sync"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
+)
+
+// Queue is a memory mapped queue to store the input data.
+type Queue struct {
+	sync.Mutex
+	// config
+	SegmentSize           int    `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
+	MaxInMemSegments      int    `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
+	QueueCapacitySegments int    `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
+	FlushPeriod           int    `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
+	FlushCeilingNum       int    `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
+	MaxEventSize          int    `mapstructure:"max_event_size"`          // The max size of the input event.
+	QueueDir              string `mapstructure:"queue_dir"`               // Contains all files in the queue.
+
+	// running components
+	meta                   *meta.Metadata // The metadata file.
+	segments               []*mmap.File   // The data files.
+	mmapCount              int            // The number of the memory mapped files.
+	unflushedNum           int            // The unflushed number.
+	flushChannel           chan struct{}  // The flushChannel channel would receive a signal when the unflushedNum reach the flush_ceiling_num.
+	insufficientMemChannel chan struct{}  // Notify when memory is insufficient
+	sufficientMemChannel   chan struct{}  // Notify when memory is sufficient
+
+	// control components
+	ctx        context.Context    // Parent ctx
+	cancel     context.CancelFunc // Parent ctx cancel function
+	showDownWg sync.WaitGroup     // The shutdown wait group.
+
+	bufPool *sync.Pool
+
+	encoder *Encoder
+	decoder *Decoder
+}
+
+func (q *Queue) Name() string {
+	return "mmap-queue"
+}
+
+func (q *Queue) Description() string {
+	return "this is a mmap queue"
+}
+
+func (q *Queue) DefaultConfig() string {
+	return `
+# The size of each segment. Default value is 128K. The unit is Byte.
+segment_size: 131072
+# The max num of segments in memory. Default value is 10.
+max_in_mem_segments: 10
+# The capacity of Queue = segment_size * queue_capacity_segments.
+queue_capacity_segments: 4000
+# The period flush time. The unit is ms. Default value is 1 second.
+flush_period: 1000
+# The max number in one flush time.  Default value is 10000.
+flush_ceiling_num: 10000
+# Contains all files in the queue.
+queue_dir: satellite-mmap-queue
+# The max size of the input event. Default value is 20k.
+max_event_size: 20480
+`
+}
+
+func (q *Queue) Initialize() error {
+	q.encoder = NewEncoder()
+	q.decoder = NewDecoder()
+
+	q.bufPool = &sync.Pool{New: func() interface{} {
+		return new(bytes.Buffer)
+	}}
+	// the size of each segment file should be a multiple of the page size.
+	pageSize := os.Getpagesize()
+	if q.SegmentSize%pageSize != 0 {
+		q.SegmentSize -= q.SegmentSize % pageSize
+	}
+	if q.SegmentSize/pageSize == 0 {
+		q.SegmentSize = 131072
+	}
+	// the minimum MaxInMemSegments value should be 4.
+	if q.MaxInMemSegments < 4 {
+		q.MaxInMemSegments = 4
+	}
+	// load metadata and override the reading or writing offset by the committed or watermark offset.
+	md, err := meta.NewMetaData(q.QueueDir, q.QueueCapacitySegments)
+	if err != nil {
+		return fmt.Errorf("error in creating the metadata: %v", err)
+	}
+	q.meta = md
+	cmID, cmOffset := md.GetCommittedOffset()
+	wmID, wmOffset := md.GetWatermarkOffset()
+	md.PutWritingOffset(wmID, wmOffset)
+	md.PutReadingOffset(cmID, cmOffset)
+	// keep the reading or writing segments in the memory.
+	q.segments = make([]*mmap.File, q.QueueCapacitySegments)
+	if _, err := q.GetSegment(cmID); err != nil {
+		return err
+	}
+	if _, err := q.GetSegment(wmID); err != nil {
+		return err
+	}
+	// init components
+	q.insufficientMemChannel = make(chan struct{})
+	q.sufficientMemChannel = make(chan struct{})
+	q.flushChannel = make(chan struct{})
+	q.ctx, q.cancel = context.WithCancel(context.Background())
+	// async supported processes.
+	q.showDownWg.Add(2)
+	go q.segmentSwapper()
+	go q.flush()
+	return nil
+}
+
+func (q *Queue) Push(e *event.Event) error {

Review comment:
       Missing test cases

##########
File path: plugins/queue/mmap/queue_opreation.go
##########
@@ -0,0 +1,236 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"strings"
+	"syscall"
+	"time"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+)
+
+const uInt64Size = 8
+
+// flush control the flush operation by timer or counter.
+func (q *Queue) flush() {
+	defer q.showDownWg.Done()
+	ctx, cancel := context.WithCancel(q.ctx)
+	defer cancel()
+	for {
+		timeTicker := time.NewTicker(time.Duration(q.FlushPeriod) * time.Millisecond)
+		select {
+		case <-q.flushChannel:
+			q.doFlush()
+		case <-timeTicker.C:
+			q.doFlush()
+		case <-ctx.Done():
+			q.doFlush()
+			return
+		}
+	}
+}
+
+// doFlush flush the segment and meta files to the disk.
+func (q *Queue) doFlush() {
+	q.Lock()
+	defer q.Unlock()
+	for _, segment := range q.segments {
+		if segment == nil {
+			continue
+		}
+		if err := segment.Flush(syscall.MS_SYNC); err != nil {
+			log.Logger.Errorf("cannot flush segment file: %v", err)
+		}
+	}
+	wid, woffset := q.meta.GetWritingOffset()
+	q.meta.PutWatermarkOffset(wid, woffset)
+	if err := q.meta.Flush(); err != nil {
+		log.Logger.Errorf("cannot flush meta file: %v", err)
+	}
+}
+
+// push writes the data into the file system. It first writes the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) push(bytes []byte) error {

Review comment:
       Missing test cases

##########
File path: plugins/queue/mmap/serializer.go
##########
@@ -0,0 +1,74 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"bytes"
+
+	"encoding/gob"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+)
+
+// Decoder used in pop operation for reusing gob.Decoder and buf.
+type Decoder struct {
+	buf     *bytes.Buffer
+	decoder *gob.Decoder
+}
+
+// Encoder used in push operation for reusing gob.Decoder and buf.
+type Encoder struct {
+	buf     *bytes.Buffer
+	encoder *gob.Encoder
+}
+
+func NewDecoder() *Decoder {
+	buf := new(bytes.Buffer)
+	return &Decoder{
+		buf:     buf,
+		decoder: gob.NewDecoder(buf),
+	}
+}
+
+func NewEncoder() *Encoder {
+	buf := new(bytes.Buffer)
+	return &Encoder{
+		buf:     buf,
+		encoder: gob.NewEncoder(buf),
+	}
+}
+
+func (d *Decoder) deserialize(b []byte) (*event.Event, error) {
+	defer d.buf.Reset()
+	d.buf.Write(b)
+	e := &event.Event{}
+	err := d.decoder.Decode(e)
+	if err != nil {
+		return nil, err
+	}
+	return e, nil
+}
+
+func (e *Encoder) serialize(data *event.Event) ([]byte, error) {

Review comment:
       Missing test cases for some go files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546254109



##########
File path: docs/plugins/queue/mmap/README.md
##########
@@ -0,0 +1,66 @@
+# Design
+The mmap-queue is a big, fast and persistent queue based on the memory mapped files. One mmap-queue has a directory to store the whole data. The Queue directory is made up with many segments and 1 meta file. 
+
+- Segment: Segment is the real data store center, that provides large-space storage and does not reduce read and write performance as much as possible by using mmap. And we will avoid deleting files by reusing them.
+- Meta: The purpose of meta is to find the data that the consumer needs.
+
+## Meta
+Metadata only needs 80B to store the Metadata for the pipe. But for memory alignment, it takes at least one memory page size, which is generally 4K.

Review comment:
       Please highlight, this is not our design, declare the original place very clearly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546302880



##########
File path: .github/workflows/build-and-test.yaml
##########
@@ -51,7 +51,8 @@ jobs:
         run: make lint
 
       - name: Test
-        run: make test
+          run: make test

Review comment:
       thx for your help




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546305964



##########
File path: Makefile
##########
@@ -67,7 +67,9 @@ test: clean lint
 
 .PHONY: license
 license: clean tools
-	$(GO_LICENSER) -d -exclude=protocol/gen-codes -licensor='Apache Software Foundation (ASF)' .
+	$(GO_LICENSER) -d  -exclude=plugins/queue/mmap \

Review comment:
       You should not exclude the whole mmap folder. I believe not all of them are from a 3rd party. Please use the explicit file name list.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r545701435



##########
File path: plugins/queue/mmap/queue.go
##########
@@ -0,0 +1,192 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"os"
+	"sync"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
+)
+
+// Queue is a memory mapped queue to store the input data.
+type Queue struct {
+	sync.Mutex
+	// config
+	SegmentSize           int    `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
+	MaxInMemSegments      int    `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
+	QueueCapacitySegments int    `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
+	FlushPeriod           int    `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
+	FlushCeilingNum       int    `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
+	MaxEventSize          int    `mapstructure:"max_event_size"`          // The max size of the input event.
+	QueueDir              string `mapstructure:"queue_dir"`               // Contains all files in the queue.
+
+	// running components
+	meta                   *meta.Metadata // The metadata file.
+	segments               []*mmap.File   // The data files.
+	mmapCount              int            // The number of the memory mapped files.
+	unflushedNum           int            // The unflushed number.
+	flushChannel           chan struct{}  // The flushChannel channel would receive a signal when the unflushedNum reach the flush_ceiling_num.
+	insufficientMemChannel chan struct{}  // Notify when memory is insufficient
+	sufficientMemChannel   chan struct{}  // Notify when memory is sufficient
+
+	// control components
+	ctx        context.Context    // Parent ctx
+	cancel     context.CancelFunc // Parent ctx cancel function
+	showDownWg sync.WaitGroup     // The shutdown wait group.
+
+	bufPool *sync.Pool
+
+	encoder *Encoder
+	decoder *Decoder
+}
+
+func (q *Queue) Name() string {
+	return "mmap-queue"
+}
+
+func (q *Queue) Description() string {
+	return "this is a mmap queue"
+}
+
+func (q *Queue) DefaultConfig() string {
+	return `
+# The size of each segment. Default value is 128K. The unit is Byte.
+segment_size: 131072
+# The max num of segments in memory. Default value is 10.
+max_in_mem_segments: 10
+# The capacity of Queue = segment_size * queue_capacity_segments.
+queue_capacity_segments: 4000
+# The period flush time. The unit is ms. Default value is 1 second.
+flush_period: 1000
+# The max number in one flush time.  Default value is 10000.
+flush_ceiling_num: 10000
+# Contains all files in the queue.
+queue_dir: satellite-mmap-queue
+# The max size of the input event. Default value is 20k.
+max_event_size: 20480
+`
+}
+
+func (q *Queue) Initialize() error {
+	q.encoder = NewEncoder()
+	q.decoder = NewDecoder()
+
+	q.bufPool = &sync.Pool{New: func() interface{} {
+		return new(bytes.Buffer)
+	}}
+	// the size of each segment file should be a multiple of the page size.
+	pageSize := os.Getpagesize()
+	if q.SegmentSize%pageSize != 0 {
+		q.SegmentSize -= q.SegmentSize % pageSize
+	}
+	if q.SegmentSize/pageSize == 0 {
+		q.SegmentSize = 131072
+	}
+	// the minimum MaxInMemSegments value should be 4.
+	if q.MaxInMemSegments < 4 {
+		q.MaxInMemSegments = 4
+	}
+	// load metadata and override the reading or writing offset by the committed or watermark offset.
+	md, err := meta.NewMetaData(q.QueueDir, q.QueueCapacitySegments)
+	if err != nil {
+		return fmt.Errorf("error in creating the metadata: %v", err)
+	}
+	q.meta = md
+	cmID, cmOffset := md.GetCommittedOffset()
+	wmID, wmOffset := md.GetWatermarkOffset()
+	md.PutWritingOffset(wmID, wmOffset)
+	md.PutReadingOffset(cmID, cmOffset)
+	// keep the reading or writing segments in the memory.
+	q.segments = make([]*mmap.File, q.QueueCapacitySegments)
+	if _, err := q.GetSegment(cmID); err != nil {
+		return err
+	}
+	if _, err := q.GetSegment(wmID); err != nil {
+		return err
+	}
+	// init components
+	q.insufficientMemChannel = make(chan struct{})
+	q.sufficientMemChannel = make(chan struct{})
+	q.flushChannel = make(chan struct{})
+	q.ctx, q.cancel = context.WithCancel(context.Background())
+	// async supported processes.
+	q.showDownWg.Add(2)
+	go q.segmentSwapper()
+	go q.flush()
+	return nil
+}
+
+func (q *Queue) Push(e *event.Event) error {

Review comment:
       I understand that the tests have been fully covered




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] kezhenxu94 commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546301894



##########
File path: .github/workflows/build-and-test.yaml
##########
@@ -51,7 +51,8 @@ jobs:
         run: make lint
 
       - name: Test
-        run: make test
+          run: make test

Review comment:
       Here is the reason. The yaml indentation is wrong 
   ```suggestion
           run: make test
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r545795857



##########
File path: .github/workflows/build-and-test.yaml
##########
@@ -53,8 +53,6 @@ jobs:
       - name: Check
         run: make check
 
-      - name: Test
-        run: make test

Review comment:
       Why delete this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r545725679



##########
File path: plugins/queue/mmap/serializer.go
##########
@@ -0,0 +1,74 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"bytes"
+
+	"encoding/gob"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+)
+
+// Decoder used in pop operation for reusing gob.Decoder and buf.
+type Decoder struct {
+	buf     *bytes.Buffer
+	decoder *gob.Decoder
+}
+
+// Encoder used in push operation for reusing gob.Decoder and buf.
+type Encoder struct {
+	buf     *bytes.Buffer
+	encoder *gob.Encoder
+}
+
+func NewDecoder() *Decoder {
+	buf := new(bytes.Buffer)
+	return &Decoder{
+		buf:     buf,
+		decoder: gob.NewDecoder(buf),
+	}
+}
+
+func NewEncoder() *Encoder {
+	buf := new(bytes.Buffer)
+	return &Encoder{
+		buf:     buf,
+		encoder: gob.NewEncoder(buf),
+	}
+}
+
+func (d *Decoder) deserialize(b []byte) (*event.Event, error) {
+	defer d.buf.Reset()
+	d.buf.Write(b)
+	e := &event.Event{}
+	err := d.decoder.Decode(e)
+	if err != nil {
+		return nil, err
+	}
+	return e, nil
+}
+
+func (e *Encoder) serialize(data *event.Event) ([]byte, error) {

Review comment:
       Maybe I would change the structure of the event. Let me think again, please




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r548941486



##########
File path: plugins/queue/mmap/queue.go
##########
@@ -0,0 +1,266 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"strconv"
+	"strings"
+	"sync"
+	"syscall"
+	"time"
+
+	"github.com/grandecola/mmap"
+
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
+	"github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+)
+
+// Queue is a memory mapped queue to store the input data.
+type Queue struct {
+	sync.Mutex
+	// config
+	SegmentSize           int    `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
+	MaxInMemSegments      int32  `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
+	QueueCapacitySegments int    `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
+	FlushPeriod           int    `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
+	FlushCeilingNum       int    `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
+	MaxEventSize          int    `mapstructure:"max_event_size"`          // The max size of the input event.
+	QueueDir              string `mapstructure:"queue_dir"`               // Contains all files in the queue.
+
+	// running components
+	meta                   *meta.Metadata // The metadata file.
+	segments               []*mmap.File   // The data files.
+	mmapCount              int32          // The number of the memory mapped files.
+	unflushedNum           int            // The unflushed number.
+	flushChannel           chan struct{}  // The flushChannel channel would receive a signal when the unflushedNum reach the flush_ceiling_num.
+	insufficientMemChannel chan struct{}  // Notify when memory is insufficient
+	sufficientMemChannel   chan struct{}  // Notify when memory is sufficient
+
+	// control components
+	ctx        context.Context    // Parent ctx
+	cancel     context.CancelFunc // Parent ctx cancel function
+	showDownWg sync.WaitGroup     // The shutdown wait group.
+
+}
+
+func (q *Queue) Name() string {
+	return "mmap-queue"
+}
+
+func (q *Queue) Description() string {
+	return "this is a mmap queue"
+}
+
+func (q *Queue) DefaultConfig() string {
+	return `
+# The size of each segment. Default value is 128K. The unit is Byte.
+segment_size: 131072
+# The max num of segments in memory. Default value is 10.
+max_in_mem_segments: 10
+# The capacity of Queue = segment_size * queue_capacity_segments.
+queue_capacity_segments: 4000
+# The period flush time. The unit is ms. Default value is 1 second.
+flush_period: 1000
+# The max number in one flush time.  Default value is 10000.
+flush_ceiling_num: 10000
+# Contains all files in the queue.
+queue_dir: satellite-mmap-queue
+# The max size of the input event. Default value is 20k.
+max_event_size: 20480
+`
+}
+
+func (q *Queue) Initialize() error {
+	// the size of each segment file should be a multiple of the page size.
+	pageSize := os.Getpagesize()
+	if q.SegmentSize%pageSize != 0 {
+		q.SegmentSize -= q.SegmentSize % pageSize
+	}
+	if q.SegmentSize/pageSize == 0 {
+		q.SegmentSize = 131072
+	}
+	// the minimum MaxInMemSegments value should be 4.
+	if q.MaxInMemSegments < 4 {
+		q.MaxInMemSegments = 4
+	}
+	// load metadata and override the reading or writing offset by the committed or watermark offset.
+	md, err := meta.NewMetaData(q.QueueDir, q.QueueCapacitySegments)
+	if err != nil {
+		return fmt.Errorf("error in creating the metadata: %v", err)
+	}
+	q.meta = md
+	cmID, cmOffset := md.GetCommittedOffset()
+	wmID, wmOffset := md.GetWatermarkOffset()
+	md.PutWritingOffset(wmID, wmOffset)
+	md.PutReadingOffset(cmID, cmOffset)
+	// keep the reading or writing segments in the memory.
+	q.segments = make([]*mmap.File, q.QueueCapacitySegments)
+	if _, err := q.GetSegment(cmID); err != nil {
+		return err
+	}
+	if _, err := q.GetSegment(wmID); err != nil {
+		return err
+	}
+	// init components
+	q.insufficientMemChannel = make(chan struct{})
+	q.sufficientMemChannel = make(chan struct{})
+	q.flushChannel = make(chan struct{})
+	q.ctx, q.cancel = context.WithCancel(context.Background())
+	// async supported processes.
+	q.showDownWg.Add(2)
+	go q.segmentSwapper()
+	go q.flush()
+	return nil
+}
+
+func (q *Queue) Push(e *protocol.Event) error {
+	data, err := proto.Marshal(e)

Review comment:
       Preserve sequentiality




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546353190



##########
File path: Makefile
##########
@@ -67,7 +67,9 @@ test: clean lint
 
 .PHONY: license
 license: clean tools
-	$(GO_LICENSER) -d -exclude=protocol/gen-codes -licensor='Apache Software Foundation (ASF)' .
+	$(GO_LICENSER) -d  -exclude=plugins/queue/mmap \

Review comment:
       Why folder matters for source codes? Please be clear. You are risking the project missing the license header in the source release.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546253770



##########
File path: Makefile
##########
@@ -36,7 +37,7 @@ GO_BUILD_FLAGS = -v
 GO_BUILD_LDFLAGS = -X main.version=$(VERSION)
 GQL_GEN = $(GO_PATH)/bin/gqlgen
 
-PLATFORMS := windows linux darwin
+PLATFORMS := linux darwin

Review comment:
       Why no windows? Is the windows not supported due to queue implementation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#issuecomment-749282042


   CI fails, generating codes check can't pass.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r548937399



##########
File path: dist/licenses/LICENSE-github-protobuf.txt
##########
@@ -0,0 +1,27 @@
+Copyright 2010 The Go Authors.  All rights reserved.

Review comment:
       Who are using them? Do we have a chance to remove one later?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546274258



##########
File path: Makefile
##########
@@ -36,7 +37,7 @@ GO_BUILD_FLAGS = -v
 GO_BUILD_LDFLAGS = -X main.version=$(VERSION)
 GQL_GEN = $(GO_PATH)/bin/gqlgen
 
-PLATFORMS := windows linux darwin
+PLATFORMS := linux darwin

Review comment:
       yes, mmap is not supported in windows , I found that there were many mmap go lib problems




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r547180853



##########
File path: Makefile
##########
@@ -36,7 +37,7 @@ GO_BUILD_FLAGS = -v
 GO_BUILD_LDFLAGS = -X main.version=$(VERSION)
 GQL_GEN = $(GO_PATH)/bin/gqlgen
 
-PLATFORMS := windows linux darwin
+PLATFORMS := linux darwin

Review comment:
       support memory queue  to windows




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] surlymo commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
surlymo commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r548934392



##########
File path: plugins/queue/mmap/meta/meta.go
##########
@@ -0,0 +1,152 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package meta
+
+import (
+	"fmt"
+	"path/filepath"
+	"syscall"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+const (
+	metaSize    = 80
+	metaName    = "meta.dat"
+	metaVersion = 1
+)
+
+// Metadata only needs 80B to store the Metadata for the pipe. But for memory alignment,
+// it takes at least one memory page size, which is generally 4K.
+//
+// [    8Bit   ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit  ]
+// [metaVersion][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][capacity]
+// [metaVersion][writing   offset][watermark offset][committed offset][reading   offset][capacity]
+type Metadata struct {
+	metaFile *mmap.File
+	name     string
+	size     int
+	capacity int
+}
+
+// NewMetaData read or create a Metadata with supported metaVersion
+func NewMetaData(metaDir string, capacity int) (*Metadata, error) {
+	path := filepath.Join(metaDir, metaName)
+	metaFile, err := segment.NewSegment(path, metaSize)
+	if err != nil {
+		return nil, fmt.Errorf("error in crating the Metadata memory mapped file: %v", err)
+	}
+
+	m := &Metadata{
+		metaFile: metaFile,
+		name:     metaName,
+		size:     metaSize,
+		capacity: capacity,
+	}
+
+	v := m.GetVersion()
+	if v != 0 && v != metaVersion {
+		return nil, fmt.Errorf("metadata metaVersion is not matching, the Metadata metaVersion is %d", v)
+	}
+	c := m.GetCapacity()
+	if c != 0 && c != capacity {
+		return nil, fmt.Errorf("metadata catapacity is not equal to the old capacity, the old capacity is %d", c)
+	}
+	m.PutVersion(metaVersion)
+	m.PutCapacity(int64(capacity))
+	return m, nil
+}
+
+// GetVersion returns the meta version.
+func (m *Metadata) GetVersion() int {
+	return int(m.metaFile.ReadUint64At(0))
+}
+
+// PutVersion put the version into the memory mapped file.
+func (m *Metadata) PutVersion(version int64) {
+	m.metaFile.WriteUint64At(uint64(version), 0)
+}
+
+// GetWritingOffset returns the writing offset, which contains the segment ID and the offset of the segment.
+func (m *Metadata) GetWritingOffset() (segmentID, offset int64) {
+	return int64(m.metaFile.ReadUint64At(8)), int64(m.metaFile.ReadUint64At(16))
+}
+
+// PutWritingOffset put the segment ID and the offset of the segment into the writing offset.
+func (m *Metadata) PutWritingOffset(segmentID, offset int64) {
+	m.metaFile.WriteUint64At(uint64(segmentID), 8)

Review comment:
       magic number

##########
File path: internal/satellite/module/sender/sender.go
##########
@@ -130,12 +131,12 @@ func (s *Sender) Shutdown() {
 func (s *Sender) consume(batch *buffer.BatchBuffer) {
 	log.Logger.Infof("sender module of %s namespace is flushing a new batch buffer."+
 		" the start offset is %s, and the size is %d", s.config.NamespaceName, batch.Last(), batch.Len())
-	var events = make(map[event.Type]event.BatchEvents)
+	var events = make(map[protocol.EventType]event.BatchEvents)
 	for i := 0; i < batch.Len(); i++ {
 		eventContext := batch.Buf()[i]
 		for _, e := range eventContext.Context {
-			if e.IsRemote() {
-				events[e.Type()] = append(events[e.Type()], e)
+			if e.Remote {

Review comment:
       why replace the name?

##########
File path: plugins/queue/mmap/queue.go
##########
@@ -0,0 +1,266 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"strconv"
+	"strings"
+	"sync"
+	"syscall"
+	"time"
+
+	"github.com/grandecola/mmap"
+
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
+	"github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+)
+
+// Queue is a memory mapped queue to store the input data.
+type Queue struct {
+	sync.Mutex
+	// config
+	SegmentSize           int    `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
+	MaxInMemSegments      int32  `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
+	QueueCapacitySegments int    `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
+	FlushPeriod           int    `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
+	FlushCeilingNum       int    `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
+	MaxEventSize          int    `mapstructure:"max_event_size"`          // The max size of the input event.
+	QueueDir              string `mapstructure:"queue_dir"`               // Contains all files in the queue.
+
+	// running components
+	meta                   *meta.Metadata // The metadata file.
+	segments               []*mmap.File   // The data files.
+	mmapCount              int32          // The number of the memory mapped files.
+	unflushedNum           int            // The unflushed number.
+	flushChannel           chan struct{}  // The flushChannel channel would receive a signal when the unflushedNum reach the flush_ceiling_num.
+	insufficientMemChannel chan struct{}  // Notify when memory is insufficient
+	sufficientMemChannel   chan struct{}  // Notify when memory is sufficient
+
+	// control components
+	ctx        context.Context    // Parent ctx
+	cancel     context.CancelFunc // Parent ctx cancel function
+	showDownWg sync.WaitGroup     // The shutdown wait group.
+
+}
+
+func (q *Queue) Name() string {
+	return "mmap-queue"
+}
+
+func (q *Queue) Description() string {
+	return "this is a mmap queue"
+}
+
+func (q *Queue) DefaultConfig() string {
+	return `
+# The size of each segment. Default value is 128K. The unit is Byte.
+segment_size: 131072
+# The max num of segments in memory. Default value is 10.
+max_in_mem_segments: 10
+# The capacity of Queue = segment_size * queue_capacity_segments.
+queue_capacity_segments: 4000
+# The period flush time. The unit is ms. Default value is 1 second.
+flush_period: 1000
+# The max number in one flush time.  Default value is 10000.
+flush_ceiling_num: 10000
+# Contains all files in the queue.
+queue_dir: satellite-mmap-queue
+# The max size of the input event. Default value is 20k.
+max_event_size: 20480
+`
+}
+
+func (q *Queue) Initialize() error {
+	// the size of each segment file should be a multiple of the page size.
+	pageSize := os.Getpagesize()
+	if q.SegmentSize%pageSize != 0 {
+		q.SegmentSize -= q.SegmentSize % pageSize
+	}
+	if q.SegmentSize/pageSize == 0 {
+		q.SegmentSize = 131072
+	}
+	// the minimum MaxInMemSegments value should be 4.
+	if q.MaxInMemSegments < 4 {
+		q.MaxInMemSegments = 4
+	}
+	// load metadata and override the reading or writing offset by the committed or watermark offset.
+	md, err := meta.NewMetaData(q.QueueDir, q.QueueCapacitySegments)
+	if err != nil {
+		return fmt.Errorf("error in creating the metadata: %v", err)
+	}
+	q.meta = md
+	cmID, cmOffset := md.GetCommittedOffset()
+	wmID, wmOffset := md.GetWatermarkOffset()
+	md.PutWritingOffset(wmID, wmOffset)
+	md.PutReadingOffset(cmID, cmOffset)
+	// keep the reading or writing segments in the memory.
+	q.segments = make([]*mmap.File, q.QueueCapacitySegments)
+	if _, err := q.GetSegment(cmID); err != nil {
+		return err
+	}
+	if _, err := q.GetSegment(wmID); err != nil {
+		return err
+	}
+	// init components
+	q.insufficientMemChannel = make(chan struct{})
+	q.sufficientMemChannel = make(chan struct{})
+	q.flushChannel = make(chan struct{})
+	q.ctx, q.cancel = context.WithCancel(context.Background())
+	// async supported processes.
+	q.showDownWg.Add(2)
+	go q.segmentSwapper()
+	go q.flush()
+	return nil
+}
+
+func (q *Queue) Push(e *protocol.Event) error {
+	data, err := proto.Marshal(e)

Review comment:
       mmap only be used in disaster recovery situation. runtime queue can simply implemented in a normal way.

##########
File path: plugins/queue/mmap/queue.go
##########
@@ -0,0 +1,266 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"strconv"
+	"strings"
+	"sync"
+	"syscall"
+	"time"
+
+	"github.com/grandecola/mmap"
+
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
+	"github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+)
+
+// Queue is a memory mapped queue to store the input data.
+type Queue struct {
+	sync.Mutex
+	// config
+	SegmentSize           int    `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
+	MaxInMemSegments      int32  `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
+	QueueCapacitySegments int    `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
+	FlushPeriod           int    `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
+	FlushCeilingNum       int    `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
+	MaxEventSize          int    `mapstructure:"max_event_size"`          // The max size of the input event.
+	QueueDir              string `mapstructure:"queue_dir"`               // Contains all files in the queue.
+
+	// running components
+	meta                   *meta.Metadata // The metadata file.
+	segments               []*mmap.File   // The data files.
+	mmapCount              int32          // The number of the memory mapped files.
+	unflushedNum           int            // The unflushed number.
+	flushChannel           chan struct{}  // The flushChannel channel would receive a signal when the unflushedNum reach the flush_ceiling_num.
+	insufficientMemChannel chan struct{}  // Notify when memory is insufficient
+	sufficientMemChannel   chan struct{}  // Notify when memory is sufficient
+
+	// control components
+	ctx        context.Context    // Parent ctx
+	cancel     context.CancelFunc // Parent ctx cancel function
+	showDownWg sync.WaitGroup     // The shutdown wait group.
+
+}
+
+func (q *Queue) Name() string {
+	return "mmap-queue"
+}
+
+func (q *Queue) Description() string {
+	return "this is a mmap queue"
+}
+
+func (q *Queue) DefaultConfig() string {
+	return `
+# The size of each segment. Default value is 128K. The unit is Byte.
+segment_size: 131072
+# The max num of segments in memory. Default value is 10.
+max_in_mem_segments: 10
+# The capacity of Queue = segment_size * queue_capacity_segments.
+queue_capacity_segments: 4000
+# The period flush time. The unit is ms. Default value is 1 second.
+flush_period: 1000
+# The max number in one flush time.  Default value is 10000.
+flush_ceiling_num: 10000
+# Contains all files in the queue.
+queue_dir: satellite-mmap-queue
+# The max size of the input event. Default value is 20k.
+max_event_size: 20480
+`
+}
+
+func (q *Queue) Initialize() error {
+	// the size of each segment file should be a multiple of the page size.
+	pageSize := os.Getpagesize()
+	if q.SegmentSize%pageSize != 0 {
+		q.SegmentSize -= q.SegmentSize % pageSize
+	}
+	if q.SegmentSize/pageSize == 0 {
+		q.SegmentSize = 131072

Review comment:
       magic num




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r548941349



##########
File path: plugins/queue/mmap/queue_opreation.go
##########
@@ -0,0 +1,167 @@
+// MIT License
+//
+// Copyright (c) 2018 Aman Mangal
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package mmap
+
+import (
+	"fmt"
+)
+
+// Because the design of the mmap-queue in Satellite references the design of the
+// bigqueue(https://github.com/grandecola/bigqueue), the queue operation file retains
+// the original author license.
+//
+// The reason why we references the source codes of bigqueue rather than using the lib
+// is the file queue in Satellite is like following.
+// 1. Only one consumer and publisher in the Satellite queue.
+// 2. Reusing files strategy is required to reduce the creation times in the Satellite queue.
+// 3. More complex OFFSET design is needed to ensure the final stability of data.
+
+const uInt64Size = 8
+
+// push writes the data into the file system. It first writes the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) push(bytes []byte) error {
+	if q.isFull() {
+		return fmt.Errorf("cannot push data when the queue is full")
+	}
+	id, offset := q.meta.GetWritingOffset()
+	id, offset, err := q.writeLength(len(bytes), id, offset)
+	if err != nil {
+		return err
+	}
+	id, offset, err = q.writeBytes(bytes, id, offset)
+	if err != nil {
+		return err
+	}
+	q.meta.PutWritingOffset(id, offset)
+	q.unflushedNum++
+	if q.unflushedNum == q.FlushCeilingNum {
+		q.flushChannel <- struct{}{}
+		q.unflushedNum = 0
+	}
+	return nil
+}
+
+// pop reads the data from the file system. It first reads the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) pop() (data []byte, rid, roffset int64, err error) {
+	if q.isEmpty() {
+		return nil, 0, 0, fmt.Errorf("cannot read data when the queue is empty")
+	}
+	id, offset := q.meta.GetReadingOffset()
+	id, offset, length, err := q.readLength(id, offset)
+	if err != nil {
+		return nil, 0, 0, err
+	}
+	bytes, id, offset, err := q.readBytes(id, offset, length)
+	if err != nil {
+		return nil, 0, 0, err
+	}
+	q.meta.PutReadingOffset(id, offset)
+	return bytes, id, offset, nil
+}
+
+// readBytes reads bytes into the memory mapped file.
+func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, newOffset int64, err error) {
+	counter := 0
+	res := make([]byte, length)
+	for {
+		segment, err := q.GetSegment(id)
+		if err != nil {
+			return nil, 0, 0, err
+		}
+		readBytes, err := segment.ReadAt(res[counter:], offset)
+		if err != nil {
+			return nil, 0, 0, err
+		}
+		counter += readBytes
+		offset += int64(readBytes)
+		if offset == int64(q.SegmentSize) {
+			id, offset = id+1, 0
+		}
+		if counter == length {
+			break
+		}
+	}
+	return res, id, offset, nil
+}
+
+// readLength reads the data length with 8 Bits spaces.
+func (q *Queue) readLength(id, offset int64) (newID, newOffset int64, length int, err error) {
+	if offset+uInt64Size > int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	segment, err := q.GetSegment(id)
+	if err != nil {
+		return 0, 0, 0, err
+	}
+	num := segment.ReadUint64At(offset)
+	offset += uInt64Size
+	if offset == int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	return id, offset, int(num), nil
+}
+
+// writeLength write the data length with 8 Bits spaces.
+func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset int64, err error) {
+	if offset+uInt64Size > int64(q.SegmentSize) {

Review comment:
       the method if for storing data size rather than Segment size




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r545682816



##########
File path: go.mod
##########
@@ -3,8 +3,9 @@ module github.com/apache/skywalking-satellite
 go 1.14
 
 require (
+	github.com/google/go-cmp v0.5.4

Review comment:
       ![image](https://user-images.githubusercontent.com/31562192/102595676-f34d8680-4152-11eb-9289-697681f0027e.png)
   I don't find the file is updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#issuecomment-748857431


   @wu-sheng thx for recheck it again.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] wu-sheng commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546307373



##########
File path: docs/plugins/queue/mmap/README.md
##########
@@ -0,0 +1,66 @@
+# Design
+The mmap-queue is a big, fast and persistent queue based on the memory mapped files. One mmap-queue has a directory to store the whole data. The Queue directory is made up with many segments and 1 meta file, which references the design of the [bigqueue](https://github.com/grandecola/bigqueue). 

Review comment:
       ```suggestion
   The mmap-queue is a big, fast, and persistent queue based on the memory-mapped files. One mmap-queue has a directory to store the whole data. The queue directory is made up of many segments and 1 metafile. This is originally implemented by [bigqueue](https://github.com/grandecola/bigqueue) project, we changed it a little for fitting the Satellite project requirements.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r546356841



##########
File path: Makefile
##########
@@ -67,7 +67,9 @@ test: clean lint
 
 .PHONY: license
 license: clean tools
-	$(GO_LICENSER) -d -exclude=protocol/gen-codes -licensor='Apache Software Foundation (ASF)' .
+	$(GO_LICENSER) -d  -exclude=plugins/queue/mmap \

Review comment:
       for example, there are 2 influenced files, which are queue.go and queue_opreation.go. If refactoring the queue_opreation.go to bigqueue/queue_opreation.go, circle reference would be occurringing.  The ` queue.go` in mmap package imports bigqueue package, some operations in bigqueue package are also dependent on the map package. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r548941168



##########
File path: plugins/queue/mmap/queue_opreation.go
##########
@@ -0,0 +1,167 @@
+// MIT License
+//
+// Copyright (c) 2018 Aman Mangal
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package mmap
+
+import (
+	"fmt"
+)
+
+// Because the design of the mmap-queue in Satellite references the design of the
+// bigqueue(https://github.com/grandecola/bigqueue), the queue operation file retains
+// the original author license.
+//
+// The reason why we references the source codes of bigqueue rather than using the lib
+// is the file queue in Satellite is like following.
+// 1. Only one consumer and publisher in the Satellite queue.
+// 2. Reusing files strategy is required to reduce the creation times in the Satellite queue.
+// 3. More complex OFFSET design is needed to ensure the final stability of data.
+
+const uInt64Size = 8
+
+// push writes the data into the file system. It first writes the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) push(bytes []byte) error {
+	if q.isFull() {
+		return fmt.Errorf("cannot push data when the queue is full")
+	}
+	id, offset := q.meta.GetWritingOffset()
+	id, offset, err := q.writeLength(len(bytes), id, offset)
+	if err != nil {
+		return err
+	}
+	id, offset, err = q.writeBytes(bytes, id, offset)
+	if err != nil {
+		return err
+	}
+	q.meta.PutWritingOffset(id, offset)
+	q.unflushedNum++
+	if q.unflushedNum == q.FlushCeilingNum {
+		q.flushChannel <- struct{}{}
+		q.unflushedNum = 0
+	}
+	return nil
+}
+
+// pop reads the data from the file system. It first reads the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) pop() (data []byte, rid, roffset int64, err error) {
+	if q.isEmpty() {
+		return nil, 0, 0, fmt.Errorf("cannot read data when the queue is empty")
+	}
+	id, offset := q.meta.GetReadingOffset()
+	id, offset, length, err := q.readLength(id, offset)
+	if err != nil {
+		return nil, 0, 0, err
+	}
+	bytes, id, offset, err := q.readBytes(id, offset, length)
+	if err != nil {
+		return nil, 0, 0, err
+	}
+	q.meta.PutReadingOffset(id, offset)
+	return bytes, id, offset, nil
+}
+
+// readBytes reads bytes into the memory mapped file.
+func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, newOffset int64, err error) {
+	counter := 0
+	res := make([]byte, length)
+	for {
+		segment, err := q.GetSegment(id)
+		if err != nil {
+			return nil, 0, 0, err
+		}
+		readBytes, err := segment.ReadAt(res[counter:], offset)
+		if err != nil {
+			return nil, 0, 0, err
+		}
+		counter += readBytes
+		offset += int64(readBytes)
+		if offset == int64(q.SegmentSize) {
+			id, offset = id+1, 0
+		}
+		if counter == length {
+			break
+		}
+	}
+	return res, id, offset, nil
+}
+
+// readLength reads the data length with 8 Bits spaces.
+func (q *Queue) readLength(id, offset int64) (newID, newOffset int64, length int, err error) {
+	if offset+uInt64Size > int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	segment, err := q.GetSegment(id)
+	if err != nil {
+		return 0, 0, 0, err
+	}
+	num := segment.ReadUint64At(offset)
+	offset += uInt64Size
+	if offset == int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	return id, offset, int(num), nil
+}
+
+// writeLength write the data length with 8 Bits spaces.
+func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset int64, err error) {

Review comment:
       please see write bytes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-satellite] EvanLjp commented on a change in pull request #10: mmap-queue-plugin

Posted by GitBox <gi...@apache.org>.
EvanLjp commented on a change in pull request #10:
URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r548941256



##########
File path: plugins/queue/mmap/segment_operation.go
##########
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"path"
+	"strconv"
+	"sync/atomic"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+// GetSegment returns a memory mapped file at the segmentID position.
+func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
+	index := q.GetIndex(segmentID)
+	if q.mmapCount >= q.MaxInMemSegments {
+		q.insufficientMemChannel <- struct{}{}
+		<-q.sufficientMemChannel
+	}
+	if err := q.mapSegment(segmentID); err != nil {
+		return nil, err
+	}
+	if q.segments[index] != nil {
+		return q.segments[index], nil
+	}
+	return nil, fmt.Errorf("cannot get a memory mapped file at %d segment", segmentID)
+}
+
+// mapSegment load the segment file reference to the segments.
+func (q *Queue) mapSegment(segmentID int64) error {
+	index := q.GetIndex(segmentID)
+	if q.segments[index] != nil {
+		return nil
+	}
+	filePath := path.Join(q.QueueDir, strconv.Itoa(index)+segment.FileSuffix)

Review comment:
       meta has a field called capacity




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org