You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/12/18 08:32:50 UTC

[GitHub] dongeforever closed pull request #13: Implementing SendMessageOneway

dongeforever closed pull request #13: Implementing SendMessageOneway
URL: https://github.com/apache/rocketmq-client-go/pull/13
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/api.go b/core/api.go
index 8ad6af0..58c1465 100644
--- a/core/api.go
+++ b/core/api.go
@@ -32,6 +32,22 @@ type clientConfig struct {
 	LogC             *LogConfig
 }
 
+func (config *clientConfig) string() string {
+	// For security, don't print Credentials.
+	str := ""
+	str = strJoin(str, "GroupId", config.GroupID)
+	str = strJoin(str, "NameServer", config.NameServer)
+	str = strJoin(str, "NameServerDomain", config.NameServerDomain)
+	str = strJoin(str, "GroupName", config.GroupName)
+	str = strJoin(str, "InstanceName", config.InstanceName)
+
+	if config.LogC != nil {
+		str = strJoin(str, "LogConfig", config.LogC.String())
+	}
+
+	return str
+}
+
 // NewProducer create a new producer with config
 func NewProducer(config *ProducerConfig) (Producer, error) {
 	return newDefaultProducer(config)
@@ -46,11 +62,21 @@ type ProducerConfig struct {
 }
 
 func (config *ProducerConfig) String() string {
-	// For security, don't print Credentials default.
-	return fmt.Sprintf("[GroupID: %s, NameServer: %s, NameServerDomain: %s, InstanceName: %s, NameServer: %s, "+
-		"SendMsgTimeout: %d, CompressLevel: %d, MaxMessageSize: %d, ]", config.NameServer, config.GroupID,
-		config.NameServerDomain, config.GroupName, config.InstanceName, config.SendMsgTimeout, config.CompressLevel,
-		config.MaxMessageSize)
+	str := "ProducerConfig=[" + config.clientConfig.string()
+
+	if config.SendMsgTimeout > 0 {
+		str = strJoin(str, "SendMsgTimeout", config.SendMsgTimeout)
+	}
+
+	if config.CompressLevel > 0 {
+		str = strJoin(str, "CompressLevel", config.CompressLevel)
+	}
+
+	if config.MaxMessageSize > 0 {
+		str = strJoin(str, "MaxMessageSize", config.MaxMessageSize)
+	}
+
+	return str + "]"
 }
 
 type Producer interface {
@@ -61,8 +87,8 @@ type Producer interface {
 	// SendMessageOrderly send the message orderly
 	SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) SendResult
 
-	// SendMessageAsync send a message with async
-	SendMessageAsync(msg *Message)
+	// SendMessageOneway send a message with oneway
+	SendMessageOneway(msg *Message)
 }
 
 // NewPushConsumer create a new consumer with config.
@@ -70,6 +96,24 @@ func NewPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
 	return newPushConsumer(config)
 }
 
+type MessageModel int
+
+const (
+	BroadCasting = MessageModel(1)
+	Clustering   = MessageModel(2)
+)
+
+func (mode MessageModel) String() string {
+	switch mode {
+	case BroadCasting:
+		return "BroadCasting"
+	case Clustering:
+		return "Clustering"
+	default:
+		return "Unknown"
+	}
+}
+
 // PushConsumerConfig define a new consumer.
 type PushConsumerConfig struct {
 	clientConfig
@@ -79,9 +123,22 @@ type PushConsumerConfig struct {
 }
 
 func (config *PushConsumerConfig) String() string {
-	return fmt.Sprintf("[GroupID: %s, NameServer: %s, NameServerDomain: %s, InstanceName: %s, "+
-		"ThreadCount: %d, MessageBatchMaxSize: %d, Model: %v ]", config.NameServer, config.GroupID,
-		config.NameServerDomain, config.InstanceName, config.ThreadCount, config.MessageBatchMaxSize, config.Model)
+	// For security, don't print Credentials.
+	str := "PushConsumerConfig=[" + config.clientConfig.string()
+
+	if config.ThreadCount > 0 {
+		str = strJoin(str, "ThreadCount", config.ThreadCount)
+	}
+
+	if config.MessageBatchMaxSize > 0 {
+		str = strJoin(str, "MessageBatchMaxSize", config.MessageBatchMaxSize)
+	}
+
+	if config.Model != 0 {
+		str = strJoin(str, "MessageModel", config.Model.String())
+	}
+
+	return str + "]"
 }
 
 type PushConsumer interface {
diff --git a/core/api_test.go b/core/api_test.go
index 05aa6cf..fc507f0 100644
--- a/core/api_test.go
+++ b/core/api_test.go
@@ -17,13 +17,61 @@
 package rocketmq
 
 import (
-	"fmt"
+	"github.com/stretchr/testify/assert"
 	"testing"
 )
 
-func TestVersion(test *testing.T) {
-	fmt.Println("-----TestGetVersion Start----")
-	version := Version()
-	fmt.Println(version)
-	fmt.Println("-----TestGetVersion Finish----")
+func TestProducerConfig_String(t *testing.T) {
+	pConfig := ProducerConfig{}
+	pConfig.GroupID = "testGroup"
+	pConfig.NameServer = "localhost:9876"
+	pConfig.NameServerDomain = "domain1"
+	pConfig.GroupName = "producerGroupName"
+	pConfig.InstanceName = "testProducer"
+	pConfig.LogC = &LogConfig{
+		Path:     "/rocketmq/log",
+		FileNum:  16,
+		FileSize: 1 << 20,
+		Level:    LogLevelDebug}
+	pConfig.SendMsgTimeout = 30
+	pConfig.CompressLevel = 4
+	pConfig.MaxMessageSize = 1024
+
+	expect := "ProducerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: NameServerDomain, " +
+		"GroupId: testGroup, InstanceName: testProducer, " +
+		"LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576 Level:Debug}, S" +
+		"endMsgTimeout: 30, CompressLevel: 4, MaxMessageSize: 1024, ]"
+	assert.Equal(t, expect, pConfig.String())
+}
+
+func TestPushConsumerConfig_String(t *testing.T) {
+	pcConfig := PushConsumerConfig{}
+	pcConfig.GroupID = "testGroup"
+	pcConfig.NameServer = "localhost:9876"
+	pcConfig.GroupName = "consumerGroupName"
+	pcConfig.InstanceName = "testPushConsumer"
+	pcConfig.LogC = &LogConfig{
+		Path:     "/rocketmq/log",
+		FileNum:  16,
+		FileSize: 1 << 20,
+		Level:    LogLevelDebug}
+	pcConfig.ThreadCount = 4
+	expect := "PushConsumerConfig=[GroupId: testGroup, NameServer: localhost:9876, " +
+		"GroupName: consumerGroupName, InstanceName: testPushConsumer, " +
+		"LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576 Level:Debug}, ThreadCount: 4, ]"
+	assert.Equal(t, expect, pcConfig.String())
+
+	pcConfig.NameServerDomain = "domain1"
+	expect = "PushConsumerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: domain1, " +
+		"GroupName: consumerGroupName, InstanceName: testPushConsumer, " +
+		"LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576 Level:Debug}, ThreadCount: 4, ]"
+	assert.Equal(t, expect, pcConfig.String())
+
+	pcConfig.MessageBatchMaxSize = 32
+	pcConfig.Model = Clustering
+	expect = "PushConsumerConfig=[GroupId: testGroup, NameServer: localhost:9876, NameServerDomain: domain1, " +
+		"GroupName: consumerGroupName, InstanceName: testPushConsumer, " +
+		"LogConfig: {Path:/rocketmq/log FileNum:16 FileSize:1048576 Level:Debug}, ThreadCount: 4," +
+		" MessageBatchMaxSize: 32, MessageModel: Clustering, ]"
+	assert.Equal(t, expect, pcConfig.String())
 }
diff --git a/core/log_test.go b/core/log_test.go
index 8c4a449..e2246f0 100644
--- a/core/log_test.go
+++ b/core/log_test.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
 package rocketmq
 
 import (
diff --git a/core/producer.go b/core/producer.go
index 77ed63b..f402589 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -90,8 +90,7 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
 		code = int(C.SetProducerNameServerAddress(cproduer, cs))
 		C.free(unsafe.Pointer(cs))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("Producer Set NameServerAddress error, code is: %d"+
-				"please check cpp logs for details", code))
+			return nil, fmt.Errorf("producer Set NameServerAddress error, code is: %d", code)
 		}
 	}
 
@@ -100,8 +99,7 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
 		code = int(C.SetProducerNameServerDomain(cproduer, cs))
 		C.free(unsafe.Pointer(cs))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("Producer Set NameServerDomain error, code is: %d"+
-				"please check cpp logs for details", code))
+			return nil, fmt.Errorf("producer Set NameServerDomain error, code is: %d", code)
 		}
 	}
 
@@ -110,8 +108,7 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
 		code = int(C.SetProducerInstanceName(cproduer, cs))
 		C.free(unsafe.Pointer(cs))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("Producer Set InstanceName error, code is: %d"+
-				"please check cpp logs for details", code))
+			return nil, fmt.Errorf("producer Set InstanceName error, code is: %d", code)
 		}
 	}
 
@@ -125,7 +122,7 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
 		C.free(unsafe.Pointer(sk))
 		C.free(unsafe.Pointer(ch))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("Producer Set Credentials error, code is: %d", code))
+			return nil, fmt.Errorf("producer Set Credentials error, code is: %d", code)
 		}
 	}
 
@@ -134,38 +131,38 @@ func newDefaultProducer(config *ProducerConfig) (*defaultProducer, error) {
 		code = int(C.SetProducerLogPath(cproduer, cs))
 		C.free(unsafe.Pointer(cs))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("Producer Set LogPath error, code is: %d", code))
+			return nil, fmt.Errorf("producer Set LogPath error, code is: %d", code)
 		}
 
 		code = int(C.SetProducerLogFileNumAndSize(cproduer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("Producer Set FileNumAndSize error, code is: %d", code))
+			return nil, fmt.Errorf("producer Set FileNumAndSize error, code is: %d", code)
 		}
 
 		code = int(C.SetProducerLogLevel(cproduer, C.CLogLevel(config.LogC.Level)))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("Producer Set LogLevel error, code is: %d", code))
+			return nil, fmt.Errorf("producer Set LogLevel error, code is: %d", code)
 		}
 	}
 
 	if config.SendMsgTimeout > 0 {
 		code = int(C.SetProducerSendMsgTimeout(cproduer, C.int(config.SendMsgTimeout)))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("Producer Set SendMsgTimeout error, code is: %d", code))
+			return nil, fmt.Errorf("producer Set SendMsgTimeout error, code is: %d", code)
 		}
 	}
 
 	if config.CompressLevel > 0 {
 		code = int(C.SetProducerCompressLevel(cproduer, C.int(config.CompressLevel)))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("Producer Set CompressLevel error, code is: %d", code))
+			return nil, fmt.Errorf("producer Set CompressLevel error, code is: %d", code)
 		}
 	}
 
 	if config.MaxMessageSize > 0 {
 		code = int(C.SetProducerMaxMessageSize(cproduer, C.int(config.MaxMessageSize)))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("Producer Set MaxMessageSize error, code is: %d", code))
+			return nil, fmt.Errorf("producer Set MaxMessageSize error, code is: %d", code)
 		}
 	}
 
@@ -186,7 +183,7 @@ func (p *defaultProducer) String() string {
 func (p *defaultProducer) Start() error {
 	code := int(C.StartProducer(p.cproduer))
 	if code != 0 {
-		return errors.New(fmt.Sprintf("start producer error, error code is: %d", code))
+		return fmt.Errorf("start producer error, error code is: %d", code)
 	}
 	return nil
 }
@@ -247,6 +244,14 @@ func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueue
 	}
 }
 
-func (p *defaultProducer) SendMessageAsync(msg *Message) {
-	// TODO
+func (p *defaultProducer) SendMessageOneway(msg *Message) {
+	cmsg := goMsgToC(msg)
+	defer C.DestroyMessage(cmsg)
+
+	code := int(C.SendMessageOneway(p.cproduer, cmsg))
+	if code != 0 {
+		log.Warnf("send message with oneway error, error code is: %d", code)
+	} else {
+		log.Debugf("Send Message: %s with oneway success.", msg.String())
+	}
 }
diff --git a/core/push_consumer.go b/core/push_consumer.go
index 5f63b16..c39dc09 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -33,13 +33,11 @@ import "C"
 import (
 	"fmt"
 	"github.com/pkg/errors"
-	"github.com/prometheus/common/log"
+	log "github.com/sirupsen/logrus"
 	"sync"
 	"unsafe"
 )
 
-type MessageModel C.CMessageModel
-
 type ConsumeStatus int
 
 const (
@@ -100,7 +98,7 @@ func newPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
 		code = int(C.SetPushConsumerNameServerAddress(cconsumer, cs))
 		C.free(unsafe.Pointer(cs))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf(fmt.Sprintf("PushConsumer Set NameServerAddress error, code is: %d", code)))
+			return nil, fmt.Errorf("PushConsumer Set NameServerAddress error, code is: %d", code)
 		}
 	}
 
@@ -109,7 +107,7 @@ func newPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
 		code = int(C.SetPushConsumerNameServerDomain(cconsumer, cs))
 		C.free(unsafe.Pointer(cs))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("PushConsumer Set NameServerDomain error, code is: %d", code))
+			return nil, fmt.Errorf("PushConsumer Set NameServerDomain error, code is: %d", code)
 		}
 	}
 
@@ -118,8 +116,8 @@ func newPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
 		code = int(C.SetPushConsumerInstanceName(cconsumer, cs))
 		C.free(unsafe.Pointer(cs))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("PushConsumer Set InstanceName error, code is: %d, "+
-				"please check cpp logs for details", code))
+			return nil, fmt.Errorf("PushConsumer Set InstanceName error, code is: %d, "+
+				"please check cpp logs for details", code)
 		}
 	}
 
@@ -132,53 +130,63 @@ func newPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
 		C.free(unsafe.Pointer(sk))
 		C.free(unsafe.Pointer(ch))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("PushConsumer Set Credentials error, code is: %d", int(code)))
+			return nil, fmt.Errorf("PushConsumer Set Credentials error, code is: %d", int(code))
 		}
 	}
 
 	if config.LogC != nil {
 		cs = C.CString(config.LogC.Path)
-		code = int(C.SetProducerLogPath(cconsumer, cs))
+		code = int(C.SetPushConsumerLogPath(cconsumer, cs))
 		C.free(unsafe.Pointer(cs))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("Producer Set LogPath error, code is: %d", code))
+			return nil, fmt.Errorf("PushConsumer Set LogPath error, code is: %d", code)
 		}
 
-		code = int(C.SetProducerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
+		code = int(C.SetPushConsumerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("Producer Set FileNumAndSize error, code is: %d", code))
+			return nil, fmt.Errorf("PushConsumer Set FileNumAndSize error, code is: %d", code)
 		}
 
-		code = int(C.SetProducerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
+		code = int(C.SetPushConsumerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("Producer Set LogLevel error, code is: %d", code))
+			return nil, fmt.Errorf("PushConsumer Set LogLevel error, code is: %d", code)
 		}
 	}
 
 	if config.ThreadCount > 0 {
 		code = int(C.SetPushConsumerThreadCount(cconsumer, C.int(config.ThreadCount)))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("PushConsumer Set ThreadCount error, code is: %d", int(code)))
+			return nil, fmt.Errorf("PushConsumer Set ThreadCount error, code is: %d", int(code))
 		}
 	}
 
 	if config.MessageBatchMaxSize > 0 {
 		code = int(C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.MessageBatchMaxSize)))
 		if code != 0 {
-			return nil, errors.New(fmt.Sprintf("PushConsumer Set MessageBatchMaxSize error, code is: %d", int(code)))
+			return nil, fmt.Errorf("PushConsumer Set MessageBatchMaxSize error, code is: %d", int(code))
 		}
 	}
 
-	code = int(C.SetPushConsumerMessageModel(cconsumer, (C.CMessageModel)(config.Model)))
+	if config.Model != 0 {
+		var mode C.CMessageModel
+		switch config.Model {
+		case BroadCasting:
+			mode = C.BROADCASTING
+		case Clustering:
+			mode = C.CLUSTERING
+		}
+		code = int(C.SetPushConsumerMessageModel(cconsumer, mode))
+
+		if code != 0 {
+			return nil, fmt.Errorf("PushConsumer Set ConsumerMessageModel error, code is: %d", int(code))
+		}
 
-	if code != 0 {
-		return nil, errors.New(fmt.Sprintf("PushConsumer Set ConsumerMessageModel error, code is: %d", int(code)))
 	}
 
 	code = int(C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo))))
 
 	if code != 0 {
-		return nil, errors.New(fmt.Sprintf("PushConsumer RegisterMessageCallback error, code is: %d", int(code)))
+		return nil, fmt.Errorf("PushConsumer RegisterMessageCallback error, code is: %d", int(code))
 	}
 
 	consumer.cconsumer = cconsumer
@@ -189,7 +197,7 @@ func newPushConsumer(config *PushConsumerConfig) (PushConsumer, error) {
 func (c *defaultPushConsumer) Start() error {
 	code := C.StartPushConsumer(c.cconsumer)
 	if code != 0 {
-		return errors.New(fmt.Sprintf("start PushConsumer error, code is: %d", int(code)))
+		return fmt.Errorf("start PushConsumer error, code is: %d", int(code))
 	}
 	return nil
 }
@@ -215,7 +223,7 @@ func (c *defaultPushConsumer) Subscribe(topic, expression string, consumeFunc fu
 	}
 	code := C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression))
 	if code != 0 {
-		return errors.New(fmt.Sprintf("subscribe topic: %s failed, error code is: %d", topic, int(code)))
+		return fmt.Errorf("subscribe topic: %s failed, error code is: %d", topic, int(code))
 	}
 	c.funcsMap.Store(topic, consumeFunc)
 	log.Infof("subscribe topic[%s] with expression[%s] successfully.", topic, expression)
diff --git a/core/queue_selector.go b/core/queue_selector.go
index 311c378..7bf1927 100644
--- a/core/queue_selector.go
+++ b/core/queue_selector.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
 package rocketmq
 
 import "C"
diff --git a/core/queue_selector_test.go b/core/queue_selector_test.go
index e9a68ee..74fff80 100644
--- a/core/queue_selector_test.go
+++ b/core/queue_selector_test.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
 package rocketmq
 
 import (
diff --git a/core/utils.go b/core/utils.go
new file mode 100644
index 0000000..e9f83f1
--- /dev/null
+++ b/core/utils.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package rocketmq
+
+import "fmt"
+
+func strJoin(str, key string, value interface{}) string {
+	if key == "" || value == "" {
+		return str
+	}
+
+	return str + key + ": " + fmt.Sprint(value) + ", "
+}
diff --git a/examples/orderproducer/producer.go b/examples/orderproducer/producer.go
index 5559d87..f3d70c7 100644
--- a/examples/orderproducer/producer.go
+++ b/examples/orderproducer/producer.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
 package main
 
 import (
diff --git a/examples/producer.go b/examples/producer.go
index 56ecee2..33eab07 100644
--- a/examples/producer.go
+++ b/examples/producer.go
@@ -20,13 +20,12 @@ package main
 import (
 	"fmt"
 	"github.com/apache/rocketmq-client-go/core"
-	"time"
 )
 
 func main() {
 	cfg := &rocketmq.ProducerConfig{}
 	cfg.GroupID = "testGroup"
-	cfg.NameServer = "47.101.55.250:9876"
+	cfg.NameServer = "localhost:9876"
 	producer, err := rocketmq.NewProducer(cfg)
 	if err != nil {
 		fmt.Println("create Producer failed, error:", err)
@@ -39,8 +38,8 @@ func main() {
 	fmt.Printf("Producer: %s started... \n", producer)
 	for i := 0; i < 100; i++ {
 		msg := fmt.Sprintf("Hello RocketMQ-%d", i)
-		result := producer.SendMessageSync(&rocketmq.Message{Topic: "wwf1", Body: msg})
+		result := producer.SendMessageSync(&rocketmq.Message{Topic: "test", Body: msg})
 		fmt.Println(fmt.Sprintf("send message: %s result: %s", msg, result))
 	}
-	time.Sleep(10 * time.Second)
+	fmt.Println("shutdown producer.")
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services