You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2021/03/16 12:18:28 UTC
[rocketmq-client-go] branch master updated: fit(produce): support
message compression (#608)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f5c1f61 fit(produce): support message compression (#608)
f5c1f61 is described below
commit f5c1f6161d9e7678576aa16fcda6db48e6465c6c
Author: shenhui0509 <sh...@outlook.com>
AuthorDate: Tue Mar 16 20:18:21 2021 +0800
fit(produce): support message compression (#608)
---
internal/utils/{helper.go => compression.go} | 50 +++++++++++++-
internal/utils/compression_test.go | 97 ++++++++++++++++++++++++++++
internal/utils/helper_test.go | 37 -----------
internal/utils/net.go | 4 ++
primitive/message.go | 5 ++
producer/option.go | 34 ++++++++--
producer/producer.go | 22 +++++++
7 files changed, 202 insertions(+), 47 deletions(-)
diff --git a/internal/utils/helper.go b/internal/utils/compression.go
similarity index 50%
rename from internal/utils/helper.go
rename to internal/utils/compression.go
index ccadd3f..4ad2ced 100644
--- a/internal/utils/helper.go
+++ b/internal/utils/compression.go
@@ -20,12 +20,56 @@ package utils
import (
"bytes"
"compress/zlib"
+ "errors"
"io/ioutil"
- "net"
+ "sync"
)
-func GetAddressByBytes(data []byte) string {
- return net.IPv4(data[0], data[1], data[2], data[3]).String()
+var zlibWriterPools []sync.Pool
+
+var bufPool = sync.Pool{
+ New: func() interface{} {
+ return &bytes.Buffer{}
+ },
+}
+
+func init() {
+ zlibWriterPools = make([]sync.Pool, zlib.BestCompression)
+ for i := 0; i < zlib.BestCompression; i++ {
+ compressLevel := i
+ zlibWriterPools[i] = sync.Pool{
+ New: func() interface{} {
+ z, _ := zlib.NewWriterLevel(nil, compressLevel+1)
+ return z
+ },
+ }
+ }
+}
+
+func Compress(raw []byte, compressLevel int) ([]byte, error) {
+ if compressLevel < zlib.BestSpeed || compressLevel > zlib.BestCompression {
+ return nil, errors.New("unsupported compress level")
+ }
+
+ buf := bufPool.Get().(*bytes.Buffer)
+ defer bufPool.Put(buf)
+ writerPool := zlibWriterPools[compressLevel-1]
+ writer := writerPool.Get().(*zlib.Writer)
+ defer writerPool.Put(writer)
+ buf.Reset()
+ writer.Reset(buf)
+ _, e := writer.Write(raw)
+ if e != nil {
+ return nil, e
+ }
+
+ e = writer.Close()
+ if e != nil {
+ return nil, e
+ }
+ result := make([]byte, buf.Len())
+ buf.Read(result)
+ return result, nil
}
func UnCompress(data []byte) []byte {
diff --git a/internal/utils/compression_test.go b/internal/utils/compression_test.go
new file mode 100644
index 0000000..4d10893
--- /dev/null
+++ b/internal/utils/compression_test.go
@@ -0,0 +1,97 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+ "bytes"
+ "compress/zlib"
+ "encoding/json"
+ "fmt"
+ "math/rand"
+ "testing"
+)
+
+func TestUnCompress(t *testing.T) {
+ var b bytes.Buffer
+ var oriStr string = "hello, go"
+ zr := zlib.NewWriter(&b)
+ zr.Write([]byte(oriStr))
+ zr.Close()
+
+ retBytes := UnCompress(b.Bytes())
+ if string(retBytes) != oriStr {
+ t.Errorf("UnCompress was incorrect, got %s, want: %s .", retBytes, []byte(oriStr))
+ }
+}
+
+func TestCompress(t *testing.T) {
+ raw := []byte("The quick brown fox jumps over the lazy dog")
+ for i := zlib.BestSpeed; i <= zlib.BestCompression; i++ {
+ compressed, e := Compress(raw, i)
+ if e != nil {
+ t.Errorf("Compress data:%s returns error: %v", string(raw), e)
+ return
+ }
+ decompressed := UnCompress(compressed)
+ if string(decompressed) != string(raw) {
+ t.Errorf("data is corrupt, got: %s, want: %s", string(decompressed), string(raw))
+ }
+ }
+}
+
+func testCase(data []byte, level int, t *testing.T) {
+ compressed, e := Compress(data, level)
+ if e != nil {
+ t.Errorf("Compress data:%v returns error: %v", data, e)
+ }
+ decompressed := UnCompress(compressed)
+ if string(data) != string(decompressed) {
+ t.Errorf("data is corrupt, got: %s, want: %s", string(decompressed), string(data))
+ }
+}
+
+func generateRandTestData(n int) []byte {
+ data := make([]byte, n)
+ rand.Read(data)
+ return data
+}
+
+func generateJsonString(n int) []byte {
+ x := make(map[string]string)
+ for i := 0; i < n; i++ {
+ k := fmt.Sprintf("compression_key_%d", i)
+ v := fmt.Sprintf("compression_value_%d", i)
+ x[k] = v
+ }
+ data, _ := json.Marshal(x)
+ return data
+}
+
+func TestCompressThreadSafe(t *testing.T) {
+ for i := 0; i < 100; i++ {
+ data := generateRandTestData(i * 100)
+ level := i%zlib.BestCompression + 1
+ go testCase(data, level, t)
+ }
+
+ for i := 0; i < 100; i++ {
+ data := generateJsonString(i * 100)
+ level := i%zlib.BestCompression + 1
+ go testCase(data, level, t)
+ }
+}
diff --git a/internal/utils/helper_test.go b/internal/utils/helper_test.go
deleted file mode 100644
index 837c5ab..0000000
--- a/internal/utils/helper_test.go
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package utils
-
-import (
- "bytes"
- "compress/zlib"
- "testing"
-)
-
-func TestUnCompress(t *testing.T) {
- var b bytes.Buffer
- var oriStr string = "hello, go"
- zr := zlib.NewWriter(&b)
- zr.Write([]byte(oriStr))
- zr.Close()
-
- retBytes := UnCompress(b.Bytes())
- if string(retBytes) != oriStr {
- t.Errorf("UnCompress was incorrect, got %s, want: %s .", retBytes, []byte(oriStr))
- }
-}
diff --git a/internal/utils/net.go b/internal/utils/net.go
index c9444a9..0dfcff8 100644
--- a/internal/utils/net.go
+++ b/internal/utils/net.go
@@ -59,3 +59,7 @@ func FakeIP() []byte {
buf.WriteString(strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10))
return buf.Bytes()[4:8]
}
+
+func GetAddressByBytes(data []byte) string {
+ return net.IPv4(data[0], data[1], data[2], data[3]).String()
+}
diff --git a/primitive/message.go b/primitive/message.go
index 6a84477..fd7e9c6 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -67,6 +67,7 @@ type Message struct {
Flag int32
TransactionId string
Batch bool
+ Compress bool
// Queue is the queue that messages will be sent to. the value must be set if want to custom the queue of message,
// just ignore if not.
Queue *MessageQueue
@@ -498,6 +499,10 @@ func ClearCompressedFlag(flag int) int {
return flag & (^CompressedFlag)
}
+func SetCompressedFlag(flag int) int {
+ return flag | CompressedFlag
+}
+
var (
counter int16 = 0
startTimestamp int64 = 0
diff --git a/producer/option.go b/producer/option.go
index bacef7b..5839402 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -26,12 +26,14 @@ import (
func defaultProducerOptions() producerOptions {
opts := producerOptions{
- ClientOptions: internal.DefaultClientOptions(),
- Selector: NewRoundRobinQueueSelector(),
- SendMsgTimeout: 3 * time.Second,
- DefaultTopicQueueNums: 4,
- CreateTopicKey: "TBW102",
- Resolver: primitive.NewHttpResolver("DEFAULT"),
+ ClientOptions: internal.DefaultClientOptions(),
+ Selector: NewRoundRobinQueueSelector(),
+ SendMsgTimeout: 3 * time.Second,
+ DefaultTopicQueueNums: 4,
+ CreateTopicKey: "TBW102",
+ Resolver: primitive.NewHttpResolver("DEFAULT"),
+ CompressMsgBodyOverHowmuch: 4096,
+ CompressLevel: 5,
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
@@ -44,7 +46,9 @@ type producerOptions struct {
DefaultTopicQueueNums int
CreateTopicKey string // "TBW102" Will be created at broker when isAutoCreateTopicEnable. when topic is not created,
// and broker open isAutoCreateTopicEnable, topic will use "TBW102" config to create topic
- Resolver primitive.NsResolver
+ Resolver primitive.NsResolver
+ CompressMsgBodyOverHowmuch int
+ CompressLevel int
}
type Option func(*producerOptions)
@@ -142,3 +146,19 @@ func WithNameServerDomain(nameServerUrl string) Option {
opts.Resolver = primitive.NewHttpResolver("DEFAULT", nameServerUrl)
}
}
+
+// WithCompressMsgBodyOverHowmuch set compression threshold
+func WithCompressMsgBodyOverHowmuch(threshold int) Option {
+ return func(opts *producerOptions) {
+ opts.CompressMsgBodyOverHowmuch = threshold
+ }
+}
+
+// WithCompressLevel set compress level (0~9)
+// 0 stands for best speed
+// 9 stands for best compression ratio
+func WithCompressLevel(level int) Option {
+ return func(opts *producerOptions) {
+ opts.CompressLevel = level
+ }
+}
diff --git a/producer/producer.go b/producer/producer.go
index 7b33960..65e39c2 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -302,12 +302,34 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message
return err
}
+func (p *defaultProducer) tryCompressMsg(msg *primitive.Message) bool {
+ if msg.Compress {
+ return true
+ }
+ if msg.Batch {
+ return false
+ }
+ if len(msg.Body) < p.options.CompressMsgBodyOverHowmuch {
+ return false
+ }
+ compressedBody, e := utils.Compress(msg.Body, p.options.CompressLevel)
+ if e != nil {
+ return false
+ }
+ msg.Body = compressedBody
+ msg.Compress = true
+ return true
+}
+
func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
msg *primitive.Message) *remote.RemotingCommand {
if !msg.Batch && msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex) == "" {
msg.WithProperty(primitive.PropertyUniqueClientMessageIdKeyIndex, primitive.CreateUniqID())
}
sysFlag := 0
+ if p.tryCompressMsg(msg) {
+ sysFlag = primitive.SetCompressedFlag(sysFlag)
+ }
v := msg.GetProperty(primitive.PropertyTransactionPrepared)
if v != "" {
tranMsg, err := strconv.ParseBool(v)