You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/03/12 12:50:47 UTC
[rocketmq-client-go] branch native updated: ring buffer (#40)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 1a860c8 ring buffer (#40)
1a860c8 is described below
commit 1a860c8f1e6f60da7f1444242d3e039bc432170d
Author: askuy <as...@foxmail.com>
AuthorDate: Tue Mar 12 20:50:43 2019 +0800
ring buffer (#40)
---
utils/ring_buffer.go | 187 +++++++++++++++++++++++++++++++++++++++-------
utils/ring_buffer_test.go | 126 +++++++++++++++++++++++++++++++
2 files changed, 288 insertions(+), 25 deletions(-)
diff --git a/utils/ring_buffer.go b/utils/ring_buffer.go
index 66a1842..9400f53 100644
--- a/utils/ring_buffer.go
+++ b/utils/ring_buffer.go
@@ -17,48 +17,185 @@
package utils
-import "sync"
-
-type RingBuffer struct {
- buf []byte
- writePos int
- readPos int
- cap int
- rwMutex sync.RWMutex
- exitCh chan interface{}
+import (
+ "runtime"
+ "time"
+ "sync/atomic"
+)
+
+// 1.需要能够动态扩容
+// 2.缩容看情况
+// 3.read的时候需要block
+// 4.线程安全
+type RingNodesBuffer struct {
+ writePos uint64
+ readPos uint64
+ mask uint64
+
+ nodes nodes
+}
+
+type node struct {
+ position uint64
+ buf []byte
}
-func NewRingBuffer(cap int) *RingBuffer {
- rb := &RingBuffer{buf: make([]byte, cap), cap: cap}
- go rb.resize()
+type nodes []*node
+
+// roundUp takes a uint64 greater than 0 and rounds it up to the next
+// power of 2.
+func roundUp(v uint64) uint64 {
+ v--
+ v |= v >> 1
+ v |= v >> 2
+ v |= v >> 4
+ v |= v >> 8
+ v |= v >> 16
+ v |= v >> 32
+ v++
+ return v
+}
+
+func (rb *RingNodesBuffer) init(size uint64) {
+ size = roundUp(size)
+ rb.nodes = make(nodes, size)
+ for i := uint64(0); i < size; i++ {
+ rb.nodes[i] = &node{position: i}
+ }
+ rb.mask = size - 1 // so we don't have to do this with every put/get operation
+}
+
+func NewRingNodesBuffer(cap uint64) *RingNodesBuffer {
+ rb := &RingNodesBuffer{}
+ rb.init(cap)
+ //go rb.resize()
return rb
}
-func (r *RingBuffer) Write(b []byte) error {
- // TODO
+func (r *RingNodesBuffer) Write(b []byte) error {
+ var n *node
+ var dif uint64
+ pos := atomic.LoadUint64(&r.writePos)
+ i := 0
+L:
+ for {
+ // pos 16 seq 1. 0001 0000 00001111 0001 1111
+ n = r.nodes[pos&r.mask]
+ seq := atomic.LoadUint64(&n.position)
+ switch dif = seq - pos; {
+ case dif == 0:
+ if atomic.CompareAndSwapUint64(&r.writePos, pos, pos+1) {
+ break L
+ }
+ default:
+ pos = atomic.LoadUint64(&r.writePos)
+ }
+ if i == 10000 {
+ runtime.Gosched() // free up the cpu before the next iteration
+ i = 0
+ } else {
+ i++
+ }
+ }
+
+ n.buf = b
+ atomic.StoreUint64(&n.position, pos+1)
return nil
}
-func (r *RingBuffer) Read(p []byte) (n int, err error) {
-
- if r.Size() >= len(p) {
- copy(p, r.buf[r.readPos:r.readPos+len(p)])
- r.readPos += len(p)
+// 直接返回数据
+func (r *RingNodesBuffer) Read(timeout time.Duration) (data []byte, err error) {
+ var (
+ node *node
+ pos = atomic.LoadUint64(&r.readPos)
+ start time.Time
+ dif uint64
+ )
+ if timeout > 0 {
+ start = time.Now()
+ }
+ i := 0
+L:
+ for {
+ node = r.nodes[pos&r.mask]
+ seq := atomic.LoadUint64(&node.position)
+ switch dif = seq - (pos + 1); {
+ case dif == 0:
+ if atomic.CompareAndSwapUint64(&r.readPos, pos, pos+1) {
+ break L
+ }
+ default:
+ pos = atomic.LoadUint64(&r.readPos)
+ }
+ if timeout > 0 && time.Since(start) >= timeout {
+ return
+ }
+ if i == 10000 {
+ runtime.Gosched() // free up the cpu before the next iteration
+ i = 0
+ } else {
+ i++
+ }
+ }
+ data = node.buf
+ atomic.StoreUint64(&node.position, pos+r.mask+1)
+ return
+}
+// 知道大小,传进去解析
+func (r *RingNodesBuffer) ReadBySize(data []byte,timeout time.Duration) (n int, err error) {
+ var (
+ node *node
+ pos = atomic.LoadUint64(&r.readPos)
+ start time.Time
+ dif uint64
+ )
+ i := 0
+ if timeout > 0 {
+ start = time.Now()
+ }
+L:
+ for {
+ node = r.nodes[pos&r.mask]
+ seq := atomic.LoadUint64(&node.position)
+ switch dif = seq - (pos + 1); {
+ case dif == 0:
+ if atomic.CompareAndSwapUint64(&r.readPos, pos, pos+1) {
+ break L
+ }
+ default:
+ pos = atomic.LoadUint64(&r.readPos)
+ }
+ if timeout > 0 && time.Since(start) >= timeout {
+ return
+ }
+ if i == 10000 {
+ runtime.Gosched() // free up the cpu before the next iteration
+ i = 0
+ } else {
+ i++
+ }
}
+ n = copy(data,node.buf)
+ atomic.StoreUint64(&node.position, pos+r.mask+1)
+ return
+}
+
+
+func (r *RingNodesBuffer) Size() uint64 {
+ return atomic.LoadUint64(&r.writePos) - atomic.LoadUint64(&r.readPos)
- // TODO waiting data...
- return 0, err
}
-func (r *RingBuffer) Size() int {
- return r.writePos - r.readPos
+// Cap returns the capacity of this ring buffer.
+func (rb *RingNodesBuffer) Cap() uint64 {
+ return uint64(len(rb.nodes))
}
-func (r *RingBuffer) Destroy() {
+func (r *RingNodesBuffer) Destroy() {
}
-func (r *RingBuffer) resize() {
+func (r *RingNodesBuffer) resize() {
// TODO
}
diff --git a/utils/ring_buffer_test.go b/utils/ring_buffer_test.go
new file mode 100644
index 0000000..a119f21
--- /dev/null
+++ b/utils/ring_buffer_test.go
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ "time"
+ "testing"
+ "github.com/stretchr/testify/assert"
+ "sync"
+ "strconv"
+ "fmt"
+)
+
+func TestRingRead(t *testing.T) {
+ rb := NewRingNodesBuffer(5)
+ assert.Equal(t, uint64(8), rb.Cap())
+
+ err := rb.Write([]byte("hello"))
+ if !assert.Nil(t, err) {
+ return
+ }
+ data, err := rb.Read(1*time.Second)
+ if !assert.Nil(t, err) {
+ return
+ }
+
+ assert.Equal(t, "hello", string(data))
+}
+
+
+func TestRingReadBySize(t *testing.T) {
+ rb := NewRingNodesBuffer(5)
+ assert.Equal(t, uint64(8), rb.Cap())
+
+ err := rb.Write([]byte("hello"))
+ if !assert.Nil(t, err) {
+ return
+ }
+ sink := make([]byte, 5)
+ n, err := rb.ReadBySize(sink,1*time.Second)
+ if !assert.Nil(t, err) {
+ return
+ }
+
+ assert.Equal(t, 5, n)
+ assert.Equal(t, "hello", string(sink))
+}
+
+func BenchmarkRingReadBufferMPMC(b *testing.B) {
+ q := NewRingNodesBuffer(uint64(b.N * 100))
+ var wg sync.WaitGroup
+ wg.Add(100)
+ b.ResetTimer()
+ b.ReportAllocs()
+
+
+ for i := 0; i < 100; i++ {
+ go func() {
+ for i := 0; i < b.N; i++ {
+ q.Write([]byte(strconv.Itoa(i)))
+ }
+ }()
+ }
+
+ for i := 0; i < 100; i++ {
+ go func() {
+ for i := 0; i < b.N; i++ {
+ _ = len(strconv.Itoa(i))
+ var p []byte
+ p,_ = q.Read(1*time.Second)
+ fmt.Sprintf("%v",p)
+
+ }
+ wg.Done()
+ }()
+ }
+
+ wg.Wait()
+}
+
+
+
+func BenchmarkRingBySizeBufferMPMC(b *testing.B) {
+ q := NewRingNodesBuffer(uint64(b.N * 100))
+ var wg sync.WaitGroup
+ wg.Add(100)
+ b.ResetTimer()
+ b.ReportAllocs()
+
+
+ for i := 0; i < 100; i++ {
+ go func() {
+ for i := 0; i < b.N; i++ {
+ q.Write([]byte(strconv.Itoa(i)))
+ }
+ }()
+ }
+
+ for i := 0; i < 100; i++ {
+ go func() {
+ for i := 0; i < b.N; i++ {
+ p := make([]byte,len(strconv.Itoa(i)))
+ q.ReadBySize(p,1*time.Second)
+ fmt.Sprintf("%v",p)
+ }
+ wg.Done()
+ }()
+ }
+
+ wg.Wait()
+}
\ No newline at end of file