You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/07/17 05:53:00 UTC
[rocketmq-client-go] branch native updated: [ISSUE #104] Support
ACL (#117)
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new a133666 [ISSUE #104] Support ACL (#117)
a133666 is described below
commit a133666071467b72ec6c6021372d8377486dd3f0
Author: wenfeng <sx...@gmail.com>
AuthorDate: Wed Jul 17 13:52:56 2019 +0800
[ISSUE #104] Support ACL (#117)
* move utils to internal package
* add ACL
* fix options
* fix PR
---
consumer/consumer.go | 4 +-
consumer/offset_store.go | 2 +-
consumer/option.go | 14 ++--
consumer/pull_consumer.go | 2 +-
consumer/statistics.go | 1 -
consumer/strategy.go | 2 +-
utils/errors.go => errors.go | 4 +-
examples/consumer/acl/main.go | 55 ++++++++++++++
examples/consumer/pull/main.go | 6 +-
examples/producer/acl/main.go | 63 ++++++++++++++++
internal/client.go | 17 +++--
internal/remote/codec.go | 11 +--
internal/remote/future.go | 2 +-
internal/remote/interceptor.go | 83 ++++++++++++++++++++++
.../math.go => internal/remote/interceptor_test.go | 21 +++---
internal/remote/remote_client.go | 43 +++++++++++
internal/remote/remote_client_test.go | 19 +++--
internal/route.go | 2 +-
internal/route_test.go | 2 +-
{utils => internal/utils}/errors.go | 2 +-
{utils => internal/utils}/files.go | 0
{utils => internal/utils}/fun.go | 0
{utils => internal/utils}/helper.go | 0
{utils => internal/utils}/helper_test.go | 0
{utils => internal/utils}/math.go | 0
{utils => internal/utils}/messagesysflag.go | 0
{utils => internal/utils}/net.go | 0
{utils => internal/utils}/net_test.go | 0
{utils => internal/utils}/ring_buffer.go | 0
{utils => internal/utils}/ring_buffer_test.go | 0
{utils => internal/utils}/string.go | 0
{utils => internal/utils}/string_test.go | 0
utils/math.go => primitive/auth.go | 18 ++---
primitive/message.go | 2 +-
primitive/result.go | 3 +-
producer/option.go | 14 ++--
producer/producer.go | 7 +-
37 files changed, 318 insertions(+), 81 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 393e094..7f80802 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -29,9 +29,9 @@ import (
"github.com/apache/rocketmq-client-go/internal"
"github.com/apache/rocketmq-client-go/internal/remote"
+ "github.com/apache/rocketmq-client-go/internal/utils"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
- "github.com/apache/rocketmq-client-go/utils"
"github.com/pkg/errors"
"github.com/tidwall/gjson"
)
@@ -879,7 +879,7 @@ func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue) (int64, er
return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
}
-func (dc *defaultConsumer) queryOffset(mq *primitive.MessageQueue) (int64) {
+func (dc *defaultConsumer) queryOffset(mq *primitive.MessageQueue) int64 {
return dc.storage.read(mq, _ReadMemoryThenStore)
}
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 45d80f2..8bc5b5a 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -28,9 +28,9 @@ import (
"github.com/apache/rocketmq-client-go/internal"
"github.com/apache/rocketmq-client-go/internal/remote"
+ "github.com/apache/rocketmq-client-go/internal/utils"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
- "github.com/apache/rocketmq-client-go/utils"
)
type readType int
diff --git a/consumer/option.go b/consumer/option.go
index 0973797..f967e1e 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -175,20 +175,12 @@ func WithNameServer(nameServers []string) Option {
}
}
-// WithACL on/off ACL
func WithVIPChannel(enable bool) Option {
return func(opts *consumerOptions) {
opts.VIPChannelEnabled = enable
}
}
-// WithACL on/off ACL
-func WithACL(enable bool) Option {
- return func(opts *consumerOptions) {
- opts.ACLEnabled = enable
- }
-}
-
// WithRetry return a Option that specifies the retry times when send failed.
// TODO: use retry middleware instead
func WithRetry(retries int) Option {
@@ -196,3 +188,9 @@ func WithRetry(retries int) Option {
opts.RetryTimes = retries
}
}
+
+func WithCredentials(c primitive.Credentials) Option {
+ return func(options *consumerOptions) {
+ options.ClientOptions.Credentials = c
+ }
+}
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 19e2369..742cf16 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -24,9 +24,9 @@ import (
"sync/atomic"
"github.com/apache/rocketmq-client-go/internal"
+ "github.com/apache/rocketmq-client-go/internal/utils"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
- "github.com/apache/rocketmq-client-go/utils"
"github.com/pkg/errors"
)
diff --git a/consumer/statistics.go b/consumer/statistics.go
index adc7b38..f01364a 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -188,7 +188,6 @@ func (sis *statsItemSet) init() {
}()
}
-
func (sis *statsItemSet) samplingInSeconds() {
sis.statsItemTable.Range(func(key, value interface{}) bool {
si := value.(*statsItem)
diff --git a/consumer/strategy.go b/consumer/strategy.go
index 2990d90..c5b6a25 100644
--- a/consumer/strategy.go
+++ b/consumer/strategy.go
@@ -18,9 +18,9 @@ limitations under the License.
package consumer
import (
+ "github.com/apache/rocketmq-client-go/internal/utils"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
- "github.com/apache/rocketmq-client-go/utils"
)
// Strategy Algorithm for message allocating between consumers
diff --git a/utils/errors.go b/errors.go
similarity index 98%
copy from utils/errors.go
copy to errors.go
index 507d7bb..6b98475 100644
--- a/utils/errors.go
+++ b/errors.go
@@ -15,14 +15,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package utils
+package rocketmq
import (
"github.com/apache/rocketmq-client-go/rlog"
"github.com/pkg/errors"
)
-var(
+var (
// ErrRequestTimeout for request timeout error
ErrRequestTimeout = errors.New("request timeout")
diff --git a/examples/consumer/acl/main.go b/examples/consumer/acl/main.go
new file mode 100644
index 0000000..4582cde
--- /dev/null
+++ b/examples/consumer/acl/main.go
@@ -0,0 +1,55 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/apache/rocketmq-client-go"
+ "github.com/apache/rocketmq-client-go/consumer"
+ "github.com/apache/rocketmq-client-go/primitive"
+)
+
+func main() {
+ c, _ := rocketmq.NewPushConsumer(
+ consumer.WithGroupName("testGroup"),
+ consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ consumer.WithCredentials(primitive.Credentials{
+ AccessKey: "RocketMQ",
+ SecretKey: "12345678",
+ }),
+ )
+ err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
+ msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+ fmt.Printf("subscribe callback: %v \n", msgs)
+ return consumer.ConsumeSuccess, nil
+ })
+ if err != nil {
+ fmt.Println(err.Error())
+ }
+ // Note: start after subscribe
+ err = c.Start()
+ if err != nil {
+ fmt.Println(err.Error())
+ os.Exit(-1)
+ }
+ time.Sleep(time.Hour)
+}
diff --git a/examples/consumer/pull/main.go b/examples/consumer/pull/main.go
index 4076450..c2915f9 100644
--- a/examples/consumer/pull/main.go
+++ b/examples/consumer/pull/main.go
@@ -23,14 +23,14 @@ import (
"time"
"github.com/apache/rocketmq-client-go/consumer"
+ "github.com/apache/rocketmq-client-go/internal/utils"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
- "github.com/apache/rocketmq-client-go/utils"
)
func main() {
c, err := consumer.NewPullConsumer(consumer.WithGroupName("testGroup"), consumer.WithNameServer([]string{"127.0.0.1:9876"}))
- if err != nil{
+ if err != nil {
rlog.Fatal("fail to new pullConsumer: ", err)
}
c.Start()
@@ -48,7 +48,7 @@ func main() {
if err != nil {
if err == utils.ErrRequestTimeout {
fmt.Printf("timeout \n")
- time.Sleep(1 *time.Second)
+ time.Sleep(1 * time.Second)
continue
}
fmt.Printf("unexpectable err: %v \n", err)
diff --git a/examples/producer/acl/main.go b/examples/producer/acl/main.go
new file mode 100644
index 0000000..cee23db
--- /dev/null
+++ b/examples/producer/acl/main.go
@@ -0,0 +1,63 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package main implements a producer with user custom interceptor.
+package main
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "strconv"
+
+ "github.com/apache/rocketmq-client-go"
+ "github.com/apache/rocketmq-client-go/primitive"
+ "github.com/apache/rocketmq-client-go/producer"
+)
+
+func main() {
+ p, _ := rocketmq.NewProducer(
+ producer.WithNameServer([]string{"127.0.0.1:9876"}),
+ producer.WithRetry(2),
+ producer.WithCredentials(primitive.Credentials{
+ AccessKey: "RocketMQ",
+ SecretKey: "12345678",
+ }),
+ )
+ err := p.Start()
+ if err != nil {
+ fmt.Printf("start producer error: %s", err.Error())
+ os.Exit(1)
+ }
+ for i := 0; i < 100000; i++ {
+ res, err := p.SendSync(context.Background(), &primitive.Message{
+ Topic: "test",
+ Body: []byte("Hello RocketMQ Go Client!"),
+ Properties: map[string]string{"order": strconv.Itoa(i)},
+ })
+
+ if err != nil {
+ fmt.Printf("send message error: %s\n", err)
+ } else {
+ fmt.Printf("send message success: result=%s\n", res.String())
+ }
+ }
+ err = p.Shutdown()
+ if err != nil {
+ fmt.Printf("shundown producer error: %s", err.Error())
+ }
+}
diff --git a/internal/client.go b/internal/client.go
index c826154..34870b3 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -29,9 +29,9 @@ import (
"time"
"github.com/apache/rocketmq-client-go/internal/remote"
+ "github.com/apache/rocketmq-client-go/internal/utils"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
- "github.com/apache/rocketmq-client-go/utils"
)
const (
@@ -104,6 +104,7 @@ type ClientOptions struct {
ACLEnabled bool
RetryTimes int
Interceptors []primitive.Interceptor
+ Credentials primitive.Credentials
}
func (opt *ClientOptions) ChangeInstanceNameToPID() {
@@ -140,7 +141,7 @@ type RMQClient interface {
SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
msgs []*primitive.Message) (*primitive.SendResult, error)
- ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message)
+ ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) error
RegisterConsumer(group string, consumer InnerConsumer) error
UnregisterConsumer(group string)
@@ -190,6 +191,9 @@ func (c *rmqClient) Start() {
c.close = false
c.once.Do(func() {
// TODO fetchNameServerAddr
+ if !c.option.Credentials.IsEmpty() {
+ c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials))
+ }
go func() {}()
// schedule update route info
@@ -385,7 +389,7 @@ func (c *rmqClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, r
return nil, err
}
-func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) {
+func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) error {
var status primitive.SendStatus
switch cmd.Code {
case ResFlushDiskTimeout:
@@ -397,7 +401,8 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
case ResSuccess:
status = primitive.SendOK
default:
- // TODO process unknown code
+ status = primitive.SendUnknownError
+ return errors.New(cmd.Remark)
}
msgIDs := make([]string, 0)
@@ -427,7 +432,7 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
//TransactionID: sendResponse.TransactionId,
resp.RegionID = regionId
resp.TraceOn = trace != "" && trace != _TranceOff
-
+ return nil
}
// PullMessage with sync
@@ -600,7 +605,7 @@ func encodeMessages(message []*primitive.Message) []byte {
index := 0
for index < len(message) {
buffer.Write(message[index].Body)
- index ++
+ index++
}
return buffer.Bytes()
}
diff --git a/internal/remote/codec.go b/internal/remote/codec.go
index 434678e..3093514 100644
--- a/internal/remote/codec.go
+++ b/internal/remote/codec.go
@@ -58,11 +58,12 @@ type CustomHeader interface {
func NewRemotingCommand(code int16, header CustomHeader, body []byte) *RemotingCommand {
cmd := &RemotingCommand{
- Code: code,
- Version: _Version,
- Opaque: atomic.AddInt32(&opaque, 1),
- Body: body,
- Language: _LanguageCode,
+ Code: code,
+ Version: _Version,
+ Opaque: atomic.AddInt32(&opaque, 1),
+ Body: body,
+ Language: _LanguageCode,
+ ExtFields: make(map[string]string),
}
if header != nil {
diff --git a/internal/remote/future.go b/internal/remote/future.go
index 5a1c724..8690644 100644
--- a/internal/remote/future.go
+++ b/internal/remote/future.go
@@ -21,7 +21,7 @@ import (
"sync"
"time"
- "github.com/apache/rocketmq-client-go/utils"
+ "github.com/apache/rocketmq-client-go/internal/utils"
)
// ResponseFuture
diff --git a/internal/remote/interceptor.go b/internal/remote/interceptor.go
new file mode 100644
index 0000000..0197f27
--- /dev/null
+++ b/internal/remote/interceptor.go
@@ -0,0 +1,83 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package remote
+
+import (
+ "context"
+ "crypto/hmac"
+ "crypto/sha1"
+ "encoding/base64"
+ "hash"
+ "sort"
+ "strings"
+
+ "github.com/apache/rocketmq-client-go/primitive"
+)
+
+const (
+ signature = "Signature"
+ accessKey = "AccessKey"
+ securityToken = "SecurityToken"
+ keyFile = "KEY_FILE"
+ // System.getProperty("rocketmq.client.keyFile", System.getProperty("user.home") + File.separator + "key");
+)
+
+func ACLInterceptor(credentials primitive.Credentials) primitive.Interceptor {
+ return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
+ cmd := req.(*RemotingCommand)
+ m := make(map[string]string)
+ order := make([]string, 1)
+ m[accessKey] = credentials.AccessKey
+ order[0] = accessKey
+ if credentials.SecurityToken != "" {
+ m[securityToken] = credentials.SecurityToken
+ }
+ for k, v := range cmd.ExtFields {
+ m[k] = v
+ order = append(order, k)
+ }
+ sort.Slice(order, func(i, j int) bool {
+ return strings.Compare(order[i], order[j]) < 0
+ })
+ content := ""
+ for idx := range order {
+ content += m[order[idx]]
+ }
+ buf := make([]byte, len(content)+len(cmd.Body))
+ copy(buf, []byte(content))
+ copy(buf[len(content):], cmd.Body)
+
+ cmd.ExtFields[signature] = calculateSignature(buf, []byte(credentials.SecretKey))
+ cmd.ExtFields[accessKey] = credentials.AccessKey
+
+ // The SecurityToken value is unnecessary, user can choose this one.
+ if credentials.SecurityToken != "" {
+ cmd.ExtFields[securityToken] = credentials.SecurityToken
+ }
+ err := next(ctx, req, reply)
+ return err
+ }
+}
+
+func calculateSignature(data, sk []byte) string {
+ mac := hmac.New(func() hash.Hash {
+ return sha1.New()
+ }, sk)
+ mac.Write(data)
+ return base64.StdEncoding.EncodeToString(mac.Sum(nil))
+}
diff --git a/utils/math.go b/internal/remote/interceptor_test.go
similarity index 74%
copy from utils/math.go
copy to internal/remote/interceptor_test.go
index 816631e..c2cc6ca 100644
--- a/utils/math.go
+++ b/internal/remote/interceptor_test.go
@@ -15,18 +15,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package utils
+package remote
-func AbsInt(i int) int {
- if i >= 0 {
- return i
- }
- return -i
-}
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
-func MinInt(a, b int) int {
- if a < b {
- return a
- }
- return b
+func Test_CalculateSignature(t *testing.T) {
+ assert.Equal(t, "tAb/54Rwwcq+pbH8Loi7FWX4QSQ=",
+ calculateSignature([]byte("Hello RocketMQ Client ACL Feature"), []byte("adiaushdiaushd")))
}
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 8e32216..297aaa3 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -19,12 +19,15 @@ package remote
import (
"bufio"
"bytes"
+ "context"
"encoding/binary"
"io"
"net"
"sync"
"time"
+ "github.com/apache/rocketmq-client-go/primitive"
+
"github.com/apache/rocketmq-client-go/rlog"
)
@@ -40,6 +43,7 @@ type RemotingClient struct {
option TcpOption
processors map[int16]ClientRequestFunc
connectionLocker sync.Mutex
+ interceptor primitive.Interceptor
}
func NewRemotingClient() *RemotingClient {
@@ -192,6 +196,18 @@ func (c *RemotingClient) createScanner(r io.Reader) *bufio.Scanner {
}
func (c *RemotingClient) sendRequest(conn net.Conn, request *RemotingCommand) error {
+ var err error
+ if c.interceptor != nil {
+ err = c.interceptor(context.Background(), request, nil, func(ctx context.Context, req, reply interface{}) error {
+ return c.doRequest(conn, request)
+ })
+ } else {
+ err = c.doRequest(conn, request)
+ }
+ return err
+}
+
+func (c *RemotingClient) doRequest(conn net.Conn, request *RemotingCommand) error {
content, err := encode(request)
if err != nil {
return err
@@ -226,3 +242,30 @@ func (c *RemotingClient) ShutDown() {
return true
})
}
+
+func (c *RemotingClient) RegisterInterceptor(interceptors ...primitive.Interceptor) {
+ if len(interceptors) == 0 {
+ return
+ }
+ idx := 0
+ if c.interceptor == nil {
+ c.interceptor = interceptors[0]
+ idx = 1
+ }
+ for ; idx < len(interceptors); idx++ {
+ c.interceptor = func(ctx context.Context, req, reply interface{}, invoker primitive.Invoker) error {
+ return interceptors[0](ctx, req, reply, getChainedInterceptor(interceptors, idx, invoker))
+ }
+ }
+}
+
+// TODO
+// getChainedInterceptor recursively generate the chained invoker.
+func getChainedInterceptor(interceptors []primitive.Interceptor, cur int, finalInvoker primitive.Invoker) primitive.Invoker {
+ if cur == len(interceptors)-1 {
+ return finalInvoker
+ }
+ return func(ctx context.Context, req, reply interface{}) error {
+ return interceptors[cur+1](ctx, req, reply, getChainedInterceptor(interceptors, cur+1, finalInvoker))
+ }
+}
diff --git a/internal/remote/remote_client_test.go b/internal/remote/remote_client_test.go
index 4efb990..06dcedc 100644
--- a/internal/remote/remote_client_test.go
+++ b/internal/remote/remote_client_test.go
@@ -19,6 +19,7 @@ package remote
import (
"bytes"
"errors"
+ "github.com/apache/rocketmq-client-go/internal/utils"
"math/rand"
"net"
"reflect"
@@ -81,7 +82,7 @@ func TestResponseFutureTimeout(t *testing.T) {
}
func TestResponseFutureIsTimeout(t *testing.T) {
- future := NewResponseFuture(10, 500 * time.Millisecond, nil)
+ future := NewResponseFuture(10, 500*time.Millisecond, nil)
if future.isTimeout() != false {
t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t", false, future.isTimeout())
}
@@ -93,12 +94,12 @@ func TestResponseFutureIsTimeout(t *testing.T) {
}
func TestResponseFutureWaitResponse(t *testing.T) {
- future := NewResponseFuture(10, 500 * time.Millisecond, nil)
- if _, err := future.waitResponse(); err != ErrRequestTimeout {
+ future := NewResponseFuture(10, 500*time.Millisecond, nil)
+ if _, err := future.waitResponse(); err != utils.ErrRequestTimeout {
t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v",
- ErrRequestTimeout, err)
+ utils.ErrRequestTimeout, err)
}
- future = NewResponseFuture(10, 500 * time.Millisecond, nil)
+ future = NewResponseFuture(10, 500*time.Millisecond, nil)
responseError := errors.New("response error")
go func() {
time.Sleep(100 * time.Millisecond)
@@ -109,7 +110,7 @@ func TestResponseFutureWaitResponse(t *testing.T) {
t.Errorf("wrong ResponseFuture waitResponse. want=%v. got=%v",
responseError, err)
}
- future = NewResponseFuture(10, 500 * time.Millisecond, nil)
+ future = NewResponseFuture(10, 500*time.Millisecond, nil)
responseRemotingCommand := NewRemotingCommand(202, nil, nil)
go func() {
time.Sleep(100 * time.Millisecond)
@@ -219,7 +220,7 @@ func TestInvokeAsync(t *testing.T) {
cnt := 50
wg.Add(cnt)
client := NewRemotingClient()
- for i:=0; i < cnt; i++ {
+ for i := 0; i < cnt; i++ {
go func(index int) {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
t.Logf("[Send: %d] asychronous message", index)
@@ -291,7 +292,7 @@ func TestInvokeAsyncTimeout(t *testing.T) {
err := client.InvokeAsync(":3000", clientSendRemtingCommand,
time.Duration(1000), func(r *ResponseFuture) {
assert.NotNil(t, r.Err)
- assert.Equal(t, ErrRequestTimeout, r.Err)
+ assert.Equal(t, utils.ErrRequestTimeout, r.Err)
wg.Done()
})
assert.Nil(t, err, "failed to invokeSync.")
@@ -361,5 +362,3 @@ func TestInvokeOneWay(t *testing.T) {
}
wg.Done()
}
-
-
diff --git a/internal/route.go b/internal/route.go
index dbb39c1..94fd5c0 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -29,9 +29,9 @@ import (
"time"
"github.com/apache/rocketmq-client-go/internal/remote"
+ "github.com/apache/rocketmq-client-go/internal/utils"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
- "github.com/apache/rocketmq-client-go/utils"
"github.com/tidwall/gjson"
)
diff --git a/internal/route_test.go b/internal/route_test.go
index ee95755..9c3241e 100644
--- a/internal/route_test.go
+++ b/internal/route_test.go
@@ -132,4 +132,4 @@ func TestFindBrokerAddressInSubscribe(t *testing.T) {
}
})
})
-}
\ No newline at end of file
+}
diff --git a/utils/errors.go b/internal/utils/errors.go
similarity index 99%
rename from utils/errors.go
rename to internal/utils/errors.go
index 507d7bb..1d83d58 100644
--- a/utils/errors.go
+++ b/internal/utils/errors.go
@@ -22,7 +22,7 @@ import (
"github.com/pkg/errors"
)
-var(
+var (
// ErrRequestTimeout for request timeout error
ErrRequestTimeout = errors.New("request timeout")
diff --git a/utils/files.go b/internal/utils/files.go
similarity index 100%
rename from utils/files.go
rename to internal/utils/files.go
diff --git a/utils/fun.go b/internal/utils/fun.go
similarity index 100%
rename from utils/fun.go
rename to internal/utils/fun.go
diff --git a/utils/helper.go b/internal/utils/helper.go
similarity index 100%
rename from utils/helper.go
rename to internal/utils/helper.go
diff --git a/utils/helper_test.go b/internal/utils/helper_test.go
similarity index 100%
rename from utils/helper_test.go
rename to internal/utils/helper_test.go
diff --git a/utils/math.go b/internal/utils/math.go
similarity index 100%
copy from utils/math.go
copy to internal/utils/math.go
diff --git a/utils/messagesysflag.go b/internal/utils/messagesysflag.go
similarity index 100%
rename from utils/messagesysflag.go
rename to internal/utils/messagesysflag.go
diff --git a/utils/net.go b/internal/utils/net.go
similarity index 100%
rename from utils/net.go
rename to internal/utils/net.go
diff --git a/utils/net_test.go b/internal/utils/net_test.go
similarity index 100%
rename from utils/net_test.go
rename to internal/utils/net_test.go
diff --git a/utils/ring_buffer.go b/internal/utils/ring_buffer.go
similarity index 100%
rename from utils/ring_buffer.go
rename to internal/utils/ring_buffer.go
diff --git a/utils/ring_buffer_test.go b/internal/utils/ring_buffer_test.go
similarity index 100%
rename from utils/ring_buffer_test.go
rename to internal/utils/ring_buffer_test.go
diff --git a/utils/string.go b/internal/utils/string.go
similarity index 100%
rename from utils/string.go
rename to internal/utils/string.go
diff --git a/utils/string_test.go b/internal/utils/string_test.go
similarity index 100%
rename from utils/string_test.go
rename to internal/utils/string_test.go
diff --git a/utils/math.go b/primitive/auth.go
similarity index 79%
rename from utils/math.go
rename to primitive/auth.go
index 816631e..772bc4d 100644
--- a/utils/math.go
+++ b/primitive/auth.go
@@ -15,18 +15,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package utils
+package primitive
-func AbsInt(i int) int {
- if i >= 0 {
- return i
- }
- return -i
+type Credentials struct {
+ AccessKey string
+ SecretKey string
+ SecurityToken string
}
-func MinInt(a, b int) int {
- if a < b {
- return a
- }
- return b
+func (c Credentials) IsEmpty() bool {
+ return c.AccessKey == "" || c.SecretKey == ""
}
diff --git a/primitive/message.go b/primitive/message.go
index 52ab2bf..f191c92 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -20,7 +20,7 @@ package primitive
import (
"fmt"
- "github.com/apache/rocketmq-client-go/utils"
+ "github.com/apache/rocketmq-client-go/internal/utils"
)
const (
diff --git a/primitive/result.go b/primitive/result.go
index f734043..f21cc2e 100644
--- a/primitive/result.go
+++ b/primitive/result.go
@@ -22,7 +22,7 @@ import (
"encoding/binary"
"fmt"
- "github.com/apache/rocketmq-client-go/utils"
+ "github.com/apache/rocketmq-client-go/internal/utils"
)
// SendStatus of message
@@ -33,6 +33,7 @@ const (
SendFlushDiskTimeout
SendFlushSlaveTimeout
SendSlaveNotAvailable
+ SendUnknownError
FlagCompressed = 0x1
MsgIdLength = 8 + 8
diff --git a/producer/option.go b/producer/option.go
index 7b6c18b..ad6c98e 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -57,20 +57,12 @@ func WithNameServer(nameServers []string) Option {
}
}
-// WithACL on/off ACL
func WithVIPChannel(enable bool) Option {
return func(opts *producerOptions) {
opts.VIPChannelEnabled = enable
}
}
-// WithACL on/off ACL
-func WithACL(enable bool) Option {
- return func(opts *producerOptions) {
- opts.ACLEnabled = enable
- }
-}
-
// WithRetry return a Option that specifies the retry times when send failed.
// TODO: use retry middleware instead
func WithRetry(retries int) Option {
@@ -90,3 +82,9 @@ func WithQueueSelector(s QueueSelector) Option {
options.Selector = s
}
}
+
+func WithCredentials(c primitive.Credentials) Option {
+ return func(options *producerOptions) {
+ options.ClientOptions.Credentials = c
+ }
+}
diff --git a/producer/producer.go b/producer/producer.go
index ab501de..12a72b6 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -139,8 +139,8 @@ func (p *defaultProducer) SendSync(ctx context.Context, msg *primitive.Message)
return resp, err
}
- p.sendSync(ctx, msg, resp)
- return resp, nil
+ err := p.sendSync(ctx, msg, resp)
+ return resp, err
}
func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, resp *primitive.SendResult) error {
@@ -168,8 +168,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
err = _err
continue
}
- p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg)
- return nil
+ return p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg)
}
return err
}