You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/03/12 12:50:47 UTC

[rocketmq-client-go] branch native updated: ring buffer (#40)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 1a860c8  ring buffer (#40)
1a860c8 is described below

commit 1a860c8f1e6f60da7f1444242d3e039bc432170d
Author: askuy <as...@foxmail.com>
AuthorDate: Tue Mar 12 20:50:43 2019 +0800

    ring buffer (#40)
---
 utils/ring_buffer.go      | 187 +++++++++++++++++++++++++++++++++++++++-------
 utils/ring_buffer_test.go | 126 +++++++++++++++++++++++++++++++
 2 files changed, 288 insertions(+), 25 deletions(-)

diff --git a/utils/ring_buffer.go b/utils/ring_buffer.go
index 66a1842..9400f53 100644
--- a/utils/ring_buffer.go
+++ b/utils/ring_buffer.go
@@ -17,48 +17,185 @@
 
 package utils
 
-import "sync"
-
-type RingBuffer struct {
-	buf      []byte
-	writePos int
-	readPos  int
-	cap      int
-	rwMutex  sync.RWMutex
-	exitCh   chan interface{}
+import (
+	"runtime"
+	"time"
+	"sync/atomic"
+)
+
+// 1.需要能够动态扩容
+// 2.缩容看情况
+// 3.read的时候需要block
+// 4.线程安全
+type RingNodesBuffer struct {
+	writePos uint64
+	readPos  uint64
+	mask  uint64
+
+	nodes nodes
+}
+
+type node struct {
+	position uint64
+	buf     []byte
 }
 
-func NewRingBuffer(cap int) *RingBuffer {
-	rb := &RingBuffer{buf: make([]byte, cap), cap: cap}
-	go rb.resize()
+type nodes []*node
+
+// roundUp takes a uint64 greater than 0 and rounds it up to the next
+// power of 2.
+func roundUp(v uint64) uint64 {
+	v--
+	v |= v >> 1
+	v |= v >> 2
+	v |= v >> 4
+	v |= v >> 8
+	v |= v >> 16
+	v |= v >> 32
+	v++
+	return v
+}
+
+func (rb *RingNodesBuffer) init(size uint64) {
+	size = roundUp(size)
+	rb.nodes = make(nodes, size)
+	for i := uint64(0); i < size; i++ {
+		rb.nodes[i] = &node{position: i}
+	}
+	rb.mask = size - 1 // so we don't have to do this with every put/get operation
+}
+
+func NewRingNodesBuffer(cap uint64) *RingNodesBuffer {
+	rb := &RingNodesBuffer{}
+	rb.init(cap)
+	//go rb.resize()
 	return rb
 }
 
-func (r *RingBuffer) Write(b []byte) error {
-	// TODO
+func (r *RingNodesBuffer) Write(b []byte) error {
+	var n *node
+	var dif uint64
+	pos := atomic.LoadUint64(&r.writePos)
+	i := 0
+L:
+	for {
+		// pos 16 seq 1.     0001 0000    00001111   0001 1111
+		n = r.nodes[pos&r.mask]
+		seq := atomic.LoadUint64(&n.position)
+		switch dif = seq - pos; {
+		case dif == 0:
+			if atomic.CompareAndSwapUint64(&r.writePos, pos, pos+1) {
+				break L
+			}
+		default:
+			pos = atomic.LoadUint64(&r.writePos)
+		}
+		if i == 10000 {
+			runtime.Gosched() // free up the cpu before the next iteration
+			i = 0
+		} else {
+			i++
+		}
+	}
+
+	n.buf = b
+	atomic.StoreUint64(&n.position, pos+1)
 	return nil
 }
 
-func (r *RingBuffer) Read(p []byte) (n int, err error) {
-
-	if r.Size() >= len(p) {
-		copy(p, r.buf[r.readPos:r.readPos+len(p)])
-		r.readPos += len(p)
+// 直接返回数据
+func (r *RingNodesBuffer) Read(timeout time.Duration) (data []byte, err error) {
+	var (
+		node     *node
+		pos   = atomic.LoadUint64(&r.readPos)
+		start time.Time
+		dif uint64
+	)
+	if timeout > 0 {
+		start = time.Now()
+	}
+	i := 0
+L:
+	for {
+		node = r.nodes[pos&r.mask]
+		seq := atomic.LoadUint64(&node.position)
+		switch dif = seq - (pos + 1); {
+		case dif == 0:
+			if atomic.CompareAndSwapUint64(&r.readPos, pos, pos+1) {
+				break L
+			}
+		default:
+			pos = atomic.LoadUint64(&r.readPos)
+		}
+		if timeout > 0 && time.Since(start) >= timeout {
+			return
+		}
+		if i == 10000 {
+			runtime.Gosched() // free up the cpu before the next iteration
+			i = 0
+		} else {
+			i++
+		}
+	}
+	data = node.buf
+	atomic.StoreUint64(&node.position, pos+r.mask+1)
+	return
+}
 
+// 知道大小,传进去解析
+func (r *RingNodesBuffer) ReadBySize(data []byte,timeout time.Duration) (n int, err error) {
+	var (
+		node     *node
+		pos   = atomic.LoadUint64(&r.readPos)
+		start time.Time
+		dif uint64
+	)
+	i := 0
+	if timeout > 0 {
+		start = time.Now()
+	}
+L:
+	for {
+		node = r.nodes[pos&r.mask]
+		seq := atomic.LoadUint64(&node.position)
+		switch dif = seq - (pos + 1); {
+		case dif == 0:
+			if atomic.CompareAndSwapUint64(&r.readPos, pos, pos+1) {
+				break L
+			}
+		default:
+			pos = atomic.LoadUint64(&r.readPos)
+		}
+		if timeout > 0 && time.Since(start) >= timeout {
+			return
+		}
+		if i == 10000 {
+			runtime.Gosched() // free up the cpu before the next iteration
+			i = 0
+		} else {
+			i++
+		}
 	}
+	n = copy(data,node.buf)
+	atomic.StoreUint64(&node.position, pos+r.mask+1)
+	return
+}
+
+
+func (r *RingNodesBuffer) Size() uint64 {
+	return atomic.LoadUint64(&r.writePos) - atomic.LoadUint64(&r.readPos)
 
-	// TODO waiting data...
-	return 0, err
 }
 
-func (r *RingBuffer) Size() int {
-	return r.writePos - r.readPos
+// Cap returns the capacity of this ring buffer.
+func (rb *RingNodesBuffer) Cap() uint64 {
+	return uint64(len(rb.nodes))
 }
 
-func (r *RingBuffer) Destroy() {
+func (r *RingNodesBuffer) Destroy() {
 
 }
 
-func (r *RingBuffer) resize() {
+func (r *RingNodesBuffer) resize() {
 	// TODO
 }
diff --git a/utils/ring_buffer_test.go b/utils/ring_buffer_test.go
new file mode 100644
index 0000000..a119f21
--- /dev/null
+++ b/utils/ring_buffer_test.go
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package utils
+
+import (
+	"time"
+	"testing"
+	"github.com/stretchr/testify/assert"
+	"sync"
+	"strconv"
+	"fmt"
+)
+
+func TestRingRead(t *testing.T)  {
+	rb := NewRingNodesBuffer(5)
+	assert.Equal(t, uint64(8), rb.Cap())
+
+	err := rb.Write([]byte("hello"))
+	if !assert.Nil(t, err) {
+		return
+	}
+	data, err := rb.Read(1*time.Second)
+	if !assert.Nil(t, err) {
+		return
+	}
+
+	assert.Equal(t, "hello", string(data))
+}
+
+
+func TestRingReadBySize(t *testing.T)  {
+	rb := NewRingNodesBuffer(5)
+	assert.Equal(t, uint64(8), rb.Cap())
+
+	err := rb.Write([]byte("hello"))
+	if !assert.Nil(t, err) {
+		return
+	}
+	sink := make([]byte, 5)
+	n, err := rb.ReadBySize(sink,1*time.Second)
+	if !assert.Nil(t, err) {
+		return
+	}
+
+	assert.Equal(t, 5, n)
+	assert.Equal(t, "hello", string(sink))
+}
+
+func BenchmarkRingReadBufferMPMC(b *testing.B) {
+	q := NewRingNodesBuffer(uint64(b.N * 100))
+	var wg sync.WaitGroup
+	wg.Add(100)
+	b.ResetTimer()
+	b.ReportAllocs()
+
+
+	for i := 0; i < 100; i++ {
+		go func() {
+			for i := 0; i < b.N; i++ {
+				q.Write([]byte(strconv.Itoa(i)))
+			}
+		}()
+	}
+
+	for i := 0; i < 100; i++ {
+		go func() {
+			for i := 0; i < b.N; i++ {
+				_ = len(strconv.Itoa(i))
+				var p []byte
+				p,_ = q.Read(1*time.Second)
+				fmt.Sprintf("%v",p)
+
+			}
+			wg.Done()
+		}()
+	}
+
+	wg.Wait()
+}
+
+
+
+func BenchmarkRingBySizeBufferMPMC(b *testing.B) {
+	q := NewRingNodesBuffer(uint64(b.N * 100))
+	var wg sync.WaitGroup
+	wg.Add(100)
+	b.ResetTimer()
+	b.ReportAllocs()
+
+
+	for i := 0; i < 100; i++ {
+		go func() {
+			for i := 0; i < b.N; i++ {
+				q.Write([]byte(strconv.Itoa(i)))
+			}
+		}()
+	}
+
+	for i := 0; i < 100; i++ {
+		go func() {
+			for i := 0; i < b.N; i++ {
+				p := make([]byte,len(strconv.Itoa(i)))
+				q.ReadBySize(p,1*time.Second)
+				fmt.Sprintf("%v",p)
+			}
+			wg.Done()
+		}()
+	}
+
+	wg.Wait()
+}
\ No newline at end of file