You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2021/03/16 12:18:28 UTC

[rocketmq-client-go] branch master updated: fit(produce): support message compression (#608)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f5c1f61  fit(produce): support message compression (#608)
f5c1f61 is described below

commit f5c1f6161d9e7678576aa16fcda6db48e6465c6c
Author: shenhui0509 <sh...@outlook.com>
AuthorDate: Tue Mar 16 20:18:21 2021 +0800

    fit(produce): support message compression (#608)
---
 internal/utils/{helper.go => compression.go} | 50 +++++++++++++-
 internal/utils/compression_test.go           | 97 ++++++++++++++++++++++++++++
 internal/utils/helper_test.go                | 37 -----------
 internal/utils/net.go                        |  4 ++
 primitive/message.go                         |  5 ++
 producer/option.go                           | 34 ++++++++--
 producer/producer.go                         | 22 +++++++
 7 files changed, 202 insertions(+), 47 deletions(-)

diff --git a/internal/utils/helper.go b/internal/utils/compression.go
similarity index 50%
rename from internal/utils/helper.go
rename to internal/utils/compression.go
index ccadd3f..4ad2ced 100644
--- a/internal/utils/helper.go
+++ b/internal/utils/compression.go
@@ -20,12 +20,56 @@ package utils
 import (
 	"bytes"
 	"compress/zlib"
+	"errors"
 	"io/ioutil"
-	"net"
+	"sync"
 )
 
-func GetAddressByBytes(data []byte) string {
-	return net.IPv4(data[0], data[1], data[2], data[3]).String()
+var zlibWriterPools []sync.Pool
+
+var bufPool = sync.Pool{
+	New: func() interface{} {
+		return &bytes.Buffer{}
+	},
+}
+
+func init() {
+	zlibWriterPools = make([]sync.Pool, zlib.BestCompression)
+	for i := 0; i < zlib.BestCompression; i++ {
+		compressLevel := i
+		zlibWriterPools[i] = sync.Pool{
+			New: func() interface{} {
+				z, _ := zlib.NewWriterLevel(nil, compressLevel+1)
+				return z
+			},
+		}
+	}
+}
+
+func Compress(raw []byte, compressLevel int) ([]byte, error) {
+	if compressLevel < zlib.BestSpeed || compressLevel > zlib.BestCompression {
+		return nil, errors.New("unsupported compress level")
+	}
+
+	buf := bufPool.Get().(*bytes.Buffer)
+	defer bufPool.Put(buf)
+	writerPool := zlibWriterPools[compressLevel-1]
+	writer := writerPool.Get().(*zlib.Writer)
+	defer writerPool.Put(writer)
+	buf.Reset()
+	writer.Reset(buf)
+	_, e := writer.Write(raw)
+	if e != nil {
+		return nil, e
+	}
+
+	e = writer.Close()
+	if e != nil {
+		return nil, e
+	}
+	result := make([]byte, buf.Len())
+	buf.Read(result)
+	return result, nil
 }
 
 func UnCompress(data []byte) []byte {
diff --git a/internal/utils/compression_test.go b/internal/utils/compression_test.go
new file mode 100644
index 0000000..4d10893
--- /dev/null
+++ b/internal/utils/compression_test.go
@@ -0,0 +1,97 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+	"bytes"
+	"compress/zlib"
+	"encoding/json"
+	"fmt"
+	"math/rand"
+	"testing"
+)
+
+func TestUnCompress(t *testing.T) {
+	var b bytes.Buffer
+	var oriStr string = "hello, go"
+	zr := zlib.NewWriter(&b)
+	zr.Write([]byte(oriStr))
+	zr.Close()
+
+	retBytes := UnCompress(b.Bytes())
+	if string(retBytes) != oriStr {
+		t.Errorf("UnCompress was incorrect, got %s, want: %s .", retBytes, []byte(oriStr))
+	}
+}
+
+func TestCompress(t *testing.T) {
+	raw := []byte("The quick brown fox jumps over the lazy dog")
+	for i := zlib.BestSpeed; i <= zlib.BestCompression; i++ {
+		compressed, e := Compress(raw, i)
+		if e != nil {
+			t.Errorf("Compress data:%s returns error: %v", string(raw), e)
+			return
+		}
+		decompressed := UnCompress(compressed)
+		if string(decompressed) != string(raw) {
+			t.Errorf("data is corrupt, got: %s, want: %s", string(decompressed), string(raw))
+		}
+	}
+}
+
+func testCase(data []byte, level int, t *testing.T) {
+	compressed, e := Compress(data, level)
+	if e != nil {
+		t.Errorf("Compress data:%v returns error: %v", data, e)
+	}
+	decompressed := UnCompress(compressed)
+	if string(data) != string(decompressed) {
+		t.Errorf("data is corrupt, got: %s, want: %s", string(decompressed), string(data))
+	}
+}
+
+func generateRandTestData(n int) []byte {
+	data := make([]byte, n)
+	rand.Read(data)
+	return data
+}
+
+func generateJsonString(n int) []byte {
+	x := make(map[string]string)
+	for i := 0; i < n; i++ {
+		k := fmt.Sprintf("compression_key_%d", i)
+		v := fmt.Sprintf("compression_value_%d", i)
+		x[k] = v
+	}
+	data, _ := json.Marshal(x)
+	return data
+}
+
+func TestCompressThreadSafe(t *testing.T) {
+	for i := 0; i < 100; i++ {
+		data := generateRandTestData(i * 100)
+		level := i%zlib.BestCompression + 1
+		go testCase(data, level, t)
+	}
+
+	for i := 0; i < 100; i++ {
+		data := generateJsonString(i * 100)
+		level := i%zlib.BestCompression + 1
+		go testCase(data, level, t)
+	}
+}
diff --git a/internal/utils/helper_test.go b/internal/utils/helper_test.go
deleted file mode 100644
index 837c5ab..0000000
--- a/internal/utils/helper_test.go
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package utils
-
-import (
-	"bytes"
-	"compress/zlib"
-	"testing"
-)
-
-func TestUnCompress(t *testing.T) {
-	var b bytes.Buffer
-	var oriStr string = "hello, go"
-	zr := zlib.NewWriter(&b)
-	zr.Write([]byte(oriStr))
-	zr.Close()
-
-	retBytes := UnCompress(b.Bytes())
-	if string(retBytes) != oriStr {
-		t.Errorf("UnCompress was incorrect, got %s, want: %s .", retBytes, []byte(oriStr))
-	}
-}
diff --git a/internal/utils/net.go b/internal/utils/net.go
index c9444a9..0dfcff8 100644
--- a/internal/utils/net.go
+++ b/internal/utils/net.go
@@ -59,3 +59,7 @@ func FakeIP() []byte {
 	buf.WriteString(strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10))
 	return buf.Bytes()[4:8]
 }
+
+func GetAddressByBytes(data []byte) string {
+	return net.IPv4(data[0], data[1], data[2], data[3]).String()
+}
diff --git a/primitive/message.go b/primitive/message.go
index 6a84477..fd7e9c6 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -67,6 +67,7 @@ type Message struct {
 	Flag          int32
 	TransactionId string
 	Batch         bool
+	Compress      bool
 	// Queue is the queue that messages will be sent to. the value must be set if want to custom the queue of message,
 	// just ignore if not.
 	Queue *MessageQueue
@@ -498,6 +499,10 @@ func ClearCompressedFlag(flag int) int {
 	return flag & (^CompressedFlag)
 }
 
+func SetCompressedFlag(flag int) int {
+	return flag | CompressedFlag
+}
+
 var (
 	counter        int16 = 0
 	startTimestamp int64 = 0
diff --git a/producer/option.go b/producer/option.go
index bacef7b..5839402 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -26,12 +26,14 @@ import (
 
 func defaultProducerOptions() producerOptions {
 	opts := producerOptions{
-		ClientOptions:         internal.DefaultClientOptions(),
-		Selector:              NewRoundRobinQueueSelector(),
-		SendMsgTimeout:        3 * time.Second,
-		DefaultTopicQueueNums: 4,
-		CreateTopicKey:        "TBW102",
-		Resolver:              primitive.NewHttpResolver("DEFAULT"),
+		ClientOptions:              internal.DefaultClientOptions(),
+		Selector:                   NewRoundRobinQueueSelector(),
+		SendMsgTimeout:             3 * time.Second,
+		DefaultTopicQueueNums:      4,
+		CreateTopicKey:             "TBW102",
+		Resolver:                   primitive.NewHttpResolver("DEFAULT"),
+		CompressMsgBodyOverHowmuch: 4096,
+		CompressLevel:              5,
 	}
 	opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
 	return opts
@@ -44,7 +46,9 @@ type producerOptions struct {
 	DefaultTopicQueueNums int
 	CreateTopicKey        string // "TBW102" Will be created at broker when isAutoCreateTopicEnable. when topic is not created,
 	// and broker open isAutoCreateTopicEnable, topic will use "TBW102" config to create topic
-	Resolver primitive.NsResolver
+	Resolver                   primitive.NsResolver
+	CompressMsgBodyOverHowmuch int
+	CompressLevel              int
 }
 
 type Option func(*producerOptions)
@@ -142,3 +146,19 @@ func WithNameServerDomain(nameServerUrl string) Option {
 		opts.Resolver = primitive.NewHttpResolver("DEFAULT", nameServerUrl)
 	}
 }
+
+// WithCompressMsgBodyOverHowmuch set compression threshold
+func WithCompressMsgBodyOverHowmuch(threshold int) Option {
+	return func(opts *producerOptions) {
+		opts.CompressMsgBodyOverHowmuch = threshold
+	}
+}
+
+// WithCompressLevel set compress level (0~9)
+// 0 stands for best speed
+// 9 stands for best compression ratio
+func WithCompressLevel(level int) Option {
+	return func(opts *producerOptions) {
+		opts.CompressLevel = level
+	}
+}
diff --git a/producer/producer.go b/producer/producer.go
index 7b33960..65e39c2 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -302,12 +302,34 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message
 	return err
 }
 
+func (p *defaultProducer) tryCompressMsg(msg *primitive.Message) bool {
+	if msg.Compress {
+		return true
+	}
+	if msg.Batch {
+		return false
+	}
+	if len(msg.Body) < p.options.CompressMsgBodyOverHowmuch {
+		return false
+	}
+	compressedBody, e := utils.Compress(msg.Body, p.options.CompressLevel)
+	if e != nil {
+		return false
+	}
+	msg.Body = compressedBody
+	msg.Compress = true
+	return true
+}
+
 func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
 	msg *primitive.Message) *remote.RemotingCommand {
 	if !msg.Batch && msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex) == "" {
 		msg.WithProperty(primitive.PropertyUniqueClientMessageIdKeyIndex, primitive.CreateUniqID())
 	}
 	sysFlag := 0
+	if p.tryCompressMsg(msg) {
+		sysFlag = primitive.SetCompressedFlag(sysFlag)
+	}
 	v := msg.GetProperty(primitive.PropertyTransactionPrepared)
 	if v != "" {
 		tranMsg, err := strconv.ParseBool(v)