You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/11/21 06:24:17 UTC

[GitHub] vongosling closed pull request #2: Make code be consistent with Golang Specfication

vongosling closed pull request #2: Make code be consistent with Golang Specfication
URL: https://github.com/apache/rocketmq-client-go/pull/2
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..485dee6
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+.idea
diff --git a/README.md b/README.md
index 9b7deb9..26ed01e 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,7 @@
 ## RocketMQ Client Go [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
 
-* RocketMQ Go client is developed on top of [rocketmq-client-go](https://github.com/apache/rocketmq-client-go), which has been proven robust and widely adopted within Alibaba Group by many business units for more than three years.
+* The client is using cgo to call [rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), which has been proven robust and widely adopted within Alibaba Group by many business units for more than three years.
+
 
 ----------
 ## Features
diff --git a/core/api.go b/core/api.go
new file mode 100644
index 0000000..92619f3
--- /dev/null
+++ b/core/api.go
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package rocketmq
+
+import "fmt"
+
+func Version() (version string) {
+	return GetVersion()
+}
+
+// NewProduer create a new producer with config
+func NewProduer(config *ProducerConfig) Producer {
+	return newDefaultProducer(config)
+}
+
+// ProducerConfig define a producer
+type ProducerConfig struct {
+	GroupID		string
+	NameServer  string
+	Credentials *SessionCredentials
+}
+
+func (config *ProducerConfig) String() string {
+	// For security, don't print Credentials default.
+	return fmt.Sprintf("[groupId: %s, nameServer: %s]", config.NameServer, config.GroupID)
+}
+
+type Producer interface {
+	baseAPI
+	// SendMessageSync send a message with sync
+	SendMessageSync(msg *Message) SendResult
+
+	// SendMessageAsync send a message with async
+	SendMessageAsync(msg *Message)
+}
+
+// NewPushConsumer create a new consumer with config.
+func NewPushConsumer(config *ConsumerConfig) (PushConsumer, error) {
+	return newPushConsumer(config)
+}
+
+// ConsumerConfig define a new conusmer.
+type ConsumerConfig struct {
+	GroupID             string
+	NameServer          string
+	ConsumerThreadCount int
+	MessageBatchMaxSize int
+	//ConsumerInstanceName int
+	Credentials *SessionCredentials
+}
+
+func (config *ConsumerConfig) String() string {
+	return fmt.Sprintf("[groupId: %s, nameServer: %s, consumerThreadCount: %d, messageBatchMaxSize: %d]",
+		config.GroupID, config.NameServer, config.ConsumerThreadCount, config.MessageBatchMaxSize)
+}
+
+type PushConsumer interface {
+	baseAPI
+	// Subscribe a new topic with specify filter expression and consume function.
+	Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error
+}
+
+type SessionCredentials struct {
+	AccessKey string
+	SecretKey string
+	Channel   string
+}
+
+func (session *SessionCredentials) String() string {
+	return fmt.Sprintf("[accessKey: %s, secretKey: %s, channel: %s]",
+		session.AccessKey, session.SecretKey, session.Channel)
+}
+
+type SendResult struct {
+	Status SendStatus
+	MsgId  string
+	Offset int64
+}
+
+func (result SendResult) String() string {
+	return fmt.Sprintf("[status: %s, messageId: %s, offset: %d]", result.Status, result.MsgId, result.Offset)
+
+}
+
+type baseAPI interface {
+	Start() error
+	Shutdown() error
+}
diff --git a/src/client/client.go b/core/api_test.go
similarity index 78%
rename from src/client/client.go
rename to core/api_test.go
index c4a9a59..05aa6cf 100644
--- a/src/client/client.go
+++ b/core/api_test.go
@@ -14,17 +14,16 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package client
+package rocketmq
 
-func Version() (version string){
-    return GetVersion()
-}
+import (
+	"fmt"
+	"testing"
+)
 
-type Message interface {
-}
-type MessageExt interface {
-}
-type Producer interface {
+func TestVersion(test *testing.T) {
+	fmt.Println("-----TestGetVersion Start----")
+	version := Version()
+	fmt.Println(version)
+	fmt.Println("-----TestGetVersion Finish----")
 }
-type PushConsumer interface {
-}
\ No newline at end of file
diff --git a/src/test/messageExt_test.go b/core/cfuns.go
similarity index 57%
rename from src/test/messageExt_test.go
rename to core/cfuns.go
index a241c28..7fe4ffe 100644
--- a/src/test/messageExt_test.go
+++ b/core/cfuns.go
@@ -14,17 +14,31 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package client_test
+package rocketmq
 
+/*
+#cgo LDFLAGS: -L/usr/local/lib -lrocketmq
+#include "rocketmq/CMessageExt.h"
+#include "rocketmq/CPushConsumer.h"
+*/
+import "C"
 import (
-    "fmt"
-    "testing"
-    "../client"
+	"sync"
 )
 
-func TestGetMessageTopic(test *testing.T){
-    fmt.Println("-----TestGetMessageTopic Start----")
-    msg := client.CreateMessage("testTopic")
-    client.DestroyMessage(msg)
-    fmt.Println("-----TestGetMessageTopic Finish----")
+var pushConsumerMap sync.Map
+
+//export consumeMessageCallback
+func consumeMessageCallback(cconsumer *C.CPushConsumer, msg *C.CMessageExt) C.int {
+	consumer, exist := pushConsumerMap.Load(cconsumer)
+	if !exist {
+		return C.int(ReConsumeLater)
+	}
+
+	msgExt := cmsgExtToGo(msg)
+	cfunc, exist := consumer.(*defaultPushConsumer).funcsMap.Load(msgExt.Topic)
+	if !exist {
+		return C.int(ReConsumeLater)
+	}
+	return C.int(cfunc.(func(msg *MessageExt) ConsumeStatus)(msgExt))
 }
diff --git a/core/message.go b/core/message.go
new file mode 100644
index 0000000..98dc6cb
--- /dev/null
+++ b/core/message.go
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package rocketmq
+
+//#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
+//#include "rocketmq/CMessage.h"
+//#include "rocketmq/CMessageExt.h"
+import "C"
+import "fmt"
+
+type Message struct {
+	Topic string
+	Keys  string
+	// improve: maybe []byte is better.
+	Body string
+}
+
+func (msg *Message) String() string {
+	return fmt.Sprintf("[topic: %s, keys: %s, body: %s]", msg.Topic, msg.Keys, msg.Body)
+}
+
+type MessageExt struct {
+	Message
+	MessageID string
+	Tags      string
+	// improve: is there is a method convert c++ map to go variable?
+	cmsgExt *C.struct_CMessageExt
+	//Properties  string
+}
+
+func (msgExt *MessageExt) String() string {
+	return fmt.Sprintf("[messageId: %s, %s, Tags: %s]", msgExt.MessageID, msgExt.Message, msgExt.Tags)
+}
+
+func (msgExt *MessageExt) GetProperty(key string) string {
+	return C.GoString(C.GetMessageProperty(msgExt.cmsgExt, C.CString(key)))
+}
+
+func cmsgToGo(cmsg *C.struct_CMessage) *Message {
+	defer C.DestroyMessage(cmsg)
+	gomsg := &Message{}
+
+	return gomsg
+}
+
+func goMsgToC(gomsg *Message) *C.struct_CMessage {
+	var cmsg = C.CreateMessage(C.CString(gomsg.Topic))
+
+	// int(C.SetMessageKeys(msg.(*C.struct_CMessage),C.CString(keys)))
+	C.SetMessageKeys(cmsg, C.CString(gomsg.Keys))
+
+	// int(C.SetMessageBody(msg.(*C.struct_CMessage),C.CString(body)))
+	C.SetMessageBody(cmsg, C.CString(gomsg.Body))
+	return cmsg
+}
+
+//
+func cmsgExtToGo(cmsg *C.struct_CMessageExt) *MessageExt {
+	//defer C.DestroyMessageExt(cmsg)
+	gomsg := &MessageExt{}
+
+	gomsg.Topic = C.GoString(C.GetMessageTopic(cmsg))
+	gomsg.Body = C.GoString(C.GetMessageBody(cmsg))
+	gomsg.Keys = C.GoString(C.GetMessageKeys(cmsg))
+	gomsg.Tags = C.GoString(C.GetMessageTags(cmsg))
+	gomsg.MessageID = C.GoString(C.GetMessageId(cmsg))
+
+	return gomsg
+}
+
+//
+//func goMsgExtToC(gomsg *MessageExt) *C.struct_CMessageExt {
+//	var cmsg = C.CreateMessage(C.CString(gomsg.Topic))
+//
+//	// int(C.SetMessageKeys(msg.(*C.struct_CMessage),C.CString(keys)))
+//	C.SetMessageKeys(cmsg, C.CString(gomsg.Keys))
+//
+//	// int(C.SetMessageBody(msg.(*C.struct_CMessage),C.CString(body)))
+//	C.SetMessageBody(cmsg, C.CString(gomsg.Body))
+//	return cmsg
+//}
diff --git a/src/test/client_test.go b/core/message_test.go
similarity index 76%
rename from src/test/client_test.go
rename to core/message_test.go
index 3b12421..1d1f2a8 100644
--- a/src/test/client_test.go
+++ b/core/message_test.go
@@ -14,17 +14,15 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package client_test
+package rocketmq
 
 import (
-    "fmt"
-    "testing"
-    "../client"
+	"testing"
 )
 
-func TestVersion(test *testing.T){
-    fmt.Println("-----TestGetVersion Start----")
-    version := client.Version();
-    fmt.Println(version)
-    fmt.Println("-----TestGetVersion Finish----")
+func TestGetMessageTopic(test *testing.T) {
+	//fmt.Println("-----TestGetMessageTopic Start----")
+	//msg := rocketmq.CreateMessage("testTopic")
+	//rocketmq.DestroyMessage(msg)
+	//fmt.Println("-----TestGetMessageTopic Finish----")
 }
diff --git a/core/producer.go b/core/producer.go
new file mode 100644
index 0000000..3d7e022
--- /dev/null
+++ b/core/producer.go
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package rocketmq
+
+//#cgo LDFLAGS: -L/usr/local/lib/ -lrocketmq
+//#include "rocketmq/CMessage.h"
+//#include "rocketmq/CProducer.h"
+//#include "rocketmq/CSendResult.h"
+import "C"
+import "fmt"
+
+type SendStatus int
+
+const (
+	SendOK                = SendStatus(C.E_SEND_OK)
+	SendFlushDiskTimeout  = SendStatus(C.E_SEND_FLUSH_DISK_TIMEOUT)
+	SendFlushSlaveTimeout = SendStatus(C.E_SEND_FLUSH_SLAVE_TIMEOUT)
+	SendSlaveNotAvailable = SendStatus(C.E_SEND_SLAVE_NOT_AVAILABLE)
+)
+
+func (status SendStatus) String() string {
+	switch status {
+	case SendOK:
+		return "SendOK"
+	case SendFlushDiskTimeout:
+		return "SendFlushDiskTimeout"
+	case SendFlushSlaveTimeout:
+		return "SendFlushSlaveTimeout"
+	case SendSlaveNotAvailable:
+		return "SendSlaveNotAvailable"
+	default:
+		return "Unknown"
+	}
+}
+
+func newDefaultProducer(config *ProducerConfig) *defaultProducer {
+	producer := &defaultProducer{config: config}
+	producer.cproduer = C.CreateProducer(C.CString(config.GroupID))
+	code := int(C.SetProducerNameServerAddress(producer.cproduer, C.CString(producer.config.NameServer)))
+	if config.Credentials != nil {
+		ret := C.SetProducerSessionCredentials(producer.cproduer,
+			C.CString(config.Credentials.AccessKey),
+			C.CString(config.Credentials.SecretKey),
+			C.CString(config.Credentials.Channel))
+		code = int(ret)
+	}
+	switch code {
+
+	}
+	return producer
+}
+
+type defaultProducer struct {
+	config   *ProducerConfig
+	cproduer *C.struct_CProducer
+}
+
+func (p *defaultProducer) String() string {
+	return p.config.String()
+}
+
+// Start the producer.
+func (p *defaultProducer) Start() error {
+	err := int(C.StartProducer(p.cproduer))
+	// TODO How to process err code.
+	fmt.Printf("result: %v \n", err)
+	return nil
+}
+
+// Shutdown the producer.
+func (p *defaultProducer) Shutdown() error {
+	defer C.DestroyProducer(p.cproduer)
+	err := C.ShutdownProducer(p.cproduer)
+
+	// TODO How to process err code.
+	fmt.Printf("result: %v \n", err)
+	return nil
+}
+
+func (p *defaultProducer) SendMessageSync(msg *Message) SendResult {
+	cmsg := goMsgToC(msg)
+	defer C.DestroyMessage(cmsg)
+
+	var sr C.struct__SendResult_
+	C.SendMessageSync(p.cproduer, cmsg, &sr)
+
+	result := SendResult{}
+	result.Status = SendStatus(sr.sendStatus)
+	result.MsgId = C.GoString(&sr.msgId[0])
+	result.Offset = int64(sr.offset)
+	return result
+}
+
+func (p *defaultProducer) SendMessageAsync(msg *Message) {
+	// TODO
+}
diff --git a/core/producer_test.go b/core/producer_test.go
new file mode 100644
index 0000000..04c007b
--- /dev/null
+++ b/core/producer_test.go
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package rocketmq
+
+//func TestCreateMessage(test *testing.T){
+//    fmt.Println("-----TestCreateMessage Start----")
+//    rocketmq.CreateMessage("testTopic")
+//    fmt.Println("-----TestCreateMessage Finish----")
+//}
+//
+//func TestDestroyMessage(test *testing.T){
+//    fmt.Println("-----TestCreateMessage Start----")
+//    msg := rocketmq.CreateMessage("testTopic")
+//    rocketmq.DestroyMessage(msg)
+//    fmt.Println("-----TestCreateMessage Finish----")
+//}
+//func TestSetMessageKeys(test *testing.T){
+//    fmt.Println("-----TestSetMessageKeys Start----")
+//    msg := rocketmq.CreateMessage("testTopic")
+//    len := rocketmq.SetMessageKeys(msg,"testKey")
+//    fmt.Println("Len:",len)
+//    rocketmq.DestroyMessage(msg)
+//    fmt.Println("-----TestCreateMessage Finish----")
+//}
+//func TestCreateProducer(test *testing.T){
+//    fmt.Println("-----TestCreateProducer Start----")
+//    rocketmq.CreateProducer("testGroupId")
+//    fmt.Println("-----TestCreateProducer Finish----")
+//}
diff --git a/core/push_consumer.go b/core/push_consumer.go
new file mode 100644
index 0000000..e431d95
--- /dev/null
+++ b/core/push_consumer.go
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package rocketmq
+
+/*
+#cgo LDFLAGS: -L/usr/local/lib -lrocketmq
+#include "rocketmq/CMessageExt.h"
+#include "rocketmq/CPushConsumer.h"
+#include "stdio.h"
+
+extern int consumeMessageCallback(CPushConsumer *consumer, CMessageExt *msg);
+
+int callback_cgo(CPushConsumer *consumer, CMessageExt *msg) {
+	return consumeMessageCallback(consumer, msg);
+}
+*/
+import "C"
+
+import (
+	"fmt"
+	"sync"
+	"unsafe"
+)
+
+type ConsumeStatus int
+
+const (
+	ConsumeSuccess = ConsumeStatus(C.E_CONSUME_SUCCESS)
+	ReConsumeLater = ConsumeStatus(C.E_RECONSUME_LATER)
+)
+
+func (status ConsumeStatus) String() string {
+	switch status {
+	case ConsumeSuccess:
+		return "ConsumeSuccess"
+	case ReConsumeLater:
+		return "ReConsumeLater"
+	default:
+		return "Unknown"
+	}
+}
+
+type defaultPushConsumer struct {
+	config    *ConsumerConfig
+	cconsumer *C.struct_CPushConsumer
+	funcsMap  sync.Map
+}
+
+func (c *defaultPushConsumer) String() string {
+	topics := ""
+	c.funcsMap.Range(func(key, value interface{}) bool {
+		topics += key.(string) + ", "
+		return true
+	})
+	return fmt.Sprintf("[%s, subcribed topics: [%s]]", c.config, topics)
+}
+
+func newPushConsumer(config *ConsumerConfig) (PushConsumer, error) {
+	consumer := &defaultPushConsumer{config: config}
+	cconsumer := C.CreatePushConsumer(C.CString(config.GroupID))
+	C.SetPushConsumerNameServerAddress(cconsumer, C.CString(config.NameServer))
+	C.SetPushConsumerThreadCount(cconsumer, C.int(config.ConsumerThreadCount))
+	C.SetPushConsumerMessageBatchMaxSize(cconsumer, C.int(config.ConsumerThreadCount))
+	C.RegisterMessageCallback(cconsumer, (C.MessageCallBack)(unsafe.Pointer(C.callback_cgo)))
+	if config.Credentials != nil {
+		C.SetPushConsumerSessionCredentials(cconsumer,
+			C.CString(config.Credentials.AccessKey),
+			C.CString(config.Credentials.SecretKey),
+			C.CString(config.Credentials.Channel))
+	}
+
+	consumer.cconsumer = cconsumer
+	pushConsumerMap.Store(cconsumer, consumer)
+	return consumer, nil
+}
+
+func (c *defaultPushConsumer) Start() error {
+	C.StartPushConsumer(c.cconsumer)
+	return nil
+}
+
+func (c *defaultPushConsumer) Shutdown() error {
+	C.ShutdownPushConsumer(c.cconsumer)
+	C.DestroyPushConsumer(c.cconsumer)
+	return nil
+}
+
+func (c *defaultPushConsumer) Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error {
+	_, exist := c.funcsMap.Load(topic)
+	if exist {
+		return nil
+	}
+	err := C.Subscribe(c.cconsumer, C.CString(topic), C.CString(expression))
+	fmt.Println("err:", err)
+	c.funcsMap.Store(topic, consumeFunc)
+	fmt.Printf("subscribe topic[%s] with expression[%s] successfully. \n", topic, expression)
+	return nil
+}
diff --git a/src/sample/sample_common.go b/core/push_consumer_test.go
similarity index 70%
rename from src/sample/sample_common.go
rename to core/push_consumer_test.go
index 461199b..22c1444 100644
--- a/src/sample/sample_common.go
+++ b/core/push_consumer_test.go
@@ -14,12 +14,15 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
+package rocketmq
 
-package main
-
-import "fmt"
-import "../client"
-
-func SampleVersion() {
-	fmt.Println("Version :", client.GetVersion())
-}
\ No newline at end of file
+//import "fmt"
+//import "testing"
+//import "../client"
+//
+//func TestCreatePushConsumer(test *testing.T){
+//    fmt.Println("-----TestCreateProducer Start----")
+//    consumer := rocketmq.CreatePushConsumer("testGroupId")
+//    rocketmq.DestroyPushConsumer(consumer)
+//    fmt.Println("-----TestCreateProducer Finish----")
+//}
diff --git a/src/client/version.go b/core/version.go
similarity index 84%
rename from src/client/version.go
rename to core/version.go
index 2137825..2466671 100644
--- a/src/client/version.go
+++ b/core/version.go
@@ -14,9 +14,10 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package client
+package rocketmq
 
-var GO_CLIENT_VERSION  = "Go Client V1.0.0, BuildTime:2018.10.30"
-func GetVersion() (version string){
-    return GO_CLIENT_VERSION
+var GO_CLIENT_VERSION = "Go Client V1.0.0, BuildTime:2018.10.30"
+
+func GetVersion() (version string) {
+	return GO_CLIENT_VERSION
 }
diff --git a/src/test/version_test.go b/core/version_test.go
similarity index 73%
rename from src/test/version_test.go
rename to core/version_test.go
index 59f35ba..85f43c1 100644
--- a/src/test/version_test.go
+++ b/core/version_test.go
@@ -14,17 +14,17 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package client_test
+package rocketmq
 
-import (
-    "fmt"
-    "testing"
-    "../client"
-)
-
-func TestGetVersion(test *testing.T){
-    fmt.Println("-----TestGetVersion Start----")
-    version := client.GetVersion();
-    fmt.Println(version)
-    fmt.Println("-----TestGetVersion Finish----")
-}
+//import (
+//    "fmt"
+//    "testing"
+//    "../client"
+//)
+//
+//func TestGetVersion(test *testing.T){
+//    fmt.Println("-----TestGetVersion Start----")
+//    version := rocketmq.GetVersion();
+//    fmt.Println(version)
+//    fmt.Println("-----TestGetVersion Finish----")
+//}
diff --git a/src/sample/main.go b/examples/producer.go
similarity index 58%
rename from src/sample/main.go
rename to examples/producer.go
index eb868b8..a6c801c 100644
--- a/src/sample/main.go
+++ b/examples/producer.go
@@ -16,13 +16,28 @@
  */
 
 package main
- 
-import "fmt"
+
+import (
+	"fmt"
+	"github.com/apache/rocketmq-client-go/core"
+	"time"
+)
 
 func main() {
-    fmt.Println("Start Sample Main Function......")
-    SampleVersion()
-    SampleSendMessage()
-    SamplePushConsumeMessage()
-    fmt.Scan()
+	SendMessage()
+}
+
+func SendMessage() {
+	producer := rocketmq.NewProduer(&rocketmq.ProducerConfig{GroupID: "testGroup", NameServer: "localhost:9876"})
+	producer.Start()
+	defer producer.Shutdown()
+
+	fmt.Printf("Producer: %s started... \n", producer)
+	for i := 0; i < 100; i++ {
+		msg := fmt.Sprintf("Hello RocketMQ-%d", i)
+		result := producer.SendMessageSync(&rocketmq.Message{Topic: "test", Body: msg})
+		fmt.Println(fmt.Sprintf("send message: %s result: %s", msg, result))
+	}
+	time.Sleep(10 * time.Second)
+	producer.Shutdown()
 }
diff --git a/examples/push_consumer.go b/examples/push_consumer.go
new file mode 100644
index 0000000..e65613b
--- /dev/null
+++ b/examples/push_consumer.go
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package main
+
+import (
+	"fmt"
+	"github.com/apache/rocketmq-client-go/core"
+	"time"
+)
+
+func main() {
+	PushConsumeMessage()
+}
+
+func PushConsumeMessage() {
+	fmt.Println("Start Receiving Messages...")
+	consumer, _ := rocketmq.NewPushConsumer(&rocketmq.ConsumerConfig{GroupID: "testGroupId", NameServer: "localhost:9876",
+		ConsumerThreadCount: 2, MessageBatchMaxSize: 16})
+
+	// MUST subscribe topic before consumer started.
+	consumer.Subscribe("test", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
+		fmt.Printf("A message received: \"%s\" \n", msg.Body)
+		return rocketmq.ConsumeSuccess
+	})
+
+	consumer.Start()
+	defer consumer.Shutdown()
+	fmt.Printf("consumer: %s started...\n", consumer)
+	time.Sleep(10 * time.Minute)
+}
diff --git a/src/client/consume.go b/src/client/consume.go
deleted file mode 100644
index 66aa2a6..0000000
--- a/src/client/consume.go
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package client
-
-//#cgo CFLAGS: -I/usr/local/include/rocketmq
-//#cgo LDFLAGS: -L/usr/local/lib -lrocketmq
-//#include "CMessageExt.h"
-//#include "CPushConsumer.h"
-import "C"
-
-//export ConsumeMessageCallback
-func ConsumeMessageCallback(consumer *C.struct_CPushConsumer,msg *C.struct_CMessageExt) (C.int) {
-	return C.int(ConsumeMessageInner(consumer,msg))
-}
diff --git a/src/client/consumestatus.go b/src/client/consumestatus.go
deleted file mode 100644
index 55fd015..0000000
--- a/src/client/consumestatus.go
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package client
-
-type ConsumeStatus int
-
-const (
-    ConsumeSuccess ConsumeStatus = iota // value --> 0
-    ReConsumeLater              // value --> 1
-)
-func (status ConsumeStatus) String() string {
-    switch status {
-    case ConsumeSuccess:
-        return "ConsumeSuccess"
-    case ReConsumeLater:
-        return "ReConsumeLater"
-    default:
-        return "Unknown"
-    }
-}
\ No newline at end of file
diff --git a/src/client/messageExt.go b/src/client/messageExt.go
deleted file mode 100644
index bccea21..0000000
--- a/src/client/messageExt.go
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package client
-
-//#cgo CFLAGS: -I/usr/local/include/rocketmq
-//#cgo LDFLAGS: -L/usr/local/lib -lrocketmq
-//#include "CMessageExt.h"
-import "C"
-
-//type MessageExt C.struct_CMessageExt
-
-func GetMessageTopic(msg MessageExt)(topic string){
-    topic = C.GoString(C.GetMessageTopic(msg.(*C.struct_CMessageExt)))
-    return
-}
-func GetMessageTags(msg MessageExt)(tags string){
-	tags = C.GoString(C.GetMessageTags(msg.(*C.struct_CMessageExt)))
-	return
-}
-func GetMessageKeys(msg MessageExt)(keys string){
-	keys = C.GoString(C.GetMessageKeys(msg.(*C.struct_CMessageExt)))
-	return
-}
-func GetMessageBody(msg MessageExt)(body string){
-	body = C.GoString(C.GetMessageBody(msg.(*C.struct_CMessageExt)))
-	return
-}
-func GetMessageProperty(msg MessageExt,key string)(value string){
-	value = C.GoString(C.GetMessageProperty(msg.(*C.struct_CMessageExt),C.CString(key)))
-	return
-}
-func GetMessageId(msg MessageExt)(msgId string){
-	msgId = C.GoString(C.GetMessageId(msg.(*C.struct_CMessageExt)))
-	return
-}
diff --git a/src/client/producer.go b/src/client/producer.go
deleted file mode 100644
index 95b2d66..0000000
--- a/src/client/producer.go
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package client
-
-//#cgo CFLAGS: -I/usr/local/include/rocketmq
-//#cgo LDFLAGS: -L/usr/local/lib -lrocketmq
-//#include "CMessage.h"
-//#include "CProducer.h"
-import "C"
-
-//type Producer C.struct_CProducer
-//type Message C.struct_CMessage
-//type SendResult C.struct_CSendResult
-//type PushConsumer C.struct_CPushConsumer
-//type MessageExt C.struct_CMessageExt
-
-func CreateMessage(topic string)(msg Message){
-    msg = C.CreateMessage(C.CString(topic))
-    return msg;
-}
-func DestroyMessage(msg Message){
-    C.DestroyMessage(msg.(*C.struct_CMessage))
-}
-func SetMessageKeys(msg Message,keys string)(int){
-    return int(C.SetMessageKeys(msg.(*C.struct_CMessage),C.CString(keys)))
-}
-func SetMessageBody(msg Message,body string)(int){
-	return int(C.SetMessageBody(msg.(*C.struct_CMessage),C.CString(body)))
-}
-func CreateProducer(groupId string)(producer Producer){
-    producer = C.CreateProducer(C.CString(groupId))
-    return producer;
-}
-func DestroyProducer(producer Producer){
-	C.DestroyProducer(producer.(*C.struct_CProducer))
-}
-func StartProducer(producer Producer)(int){
-	return int(C.StartProducer(producer.(*C.struct_CProducer)))
-}
-func ShutdownProducer(producer Producer)(int){
-	return int(C.ShutdownProducer(producer.(*C.struct_CProducer)))
-}
-func SetProducerNameServerAddress(producer Producer, nameServer string)(int){
-	return int(C.SetProducerNameServerAddress(producer.(*C.struct_CProducer),C.CString(nameServer)))
-}
-func SetProducerSessionCredentials(producer Producer, accessKey string, secretKey string, channel string) (int) {
-	ret := C.SetProducerSessionCredentials(producer.(*C.struct_CProducer),
-		C.CString(accessKey),
-		C.CString(secretKey),
-		C.CString(channel))
-	return int(ret)
-}
-func SendMessageSync(producer Producer, msg Message)(sendResult SendResult){
-	var sr C.struct__SendResult_
-	C.SendMessageSync(producer.(*C.struct_CProducer),msg.(*C.struct_CMessage),&sr)
-	sendResult.Status = SendStatus(sr.sendStatus)
-	sendResult.MsgId = C.GoString(&sr.msgId[0])
-	sendResult.Offset = int64(sr.offset)
-	return sendResult
-}
diff --git a/src/client/pushconsumer.go b/src/client/pushconsumer.go
deleted file mode 100644
index 9192862..0000000
--- a/src/client/pushconsumer.go
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package client
-
-//#cgo CFLAGS: -I/usr/local/include/rocketmq
-//#cgo LDFLAGS: -L/usr/local/lib -lrocketmq
-//#include "CMessageExt.h"
-//#include "CPushConsumer.h"
-//extern int ConsumeMessageCallback(CPushConsumer *consumer,CMessageExt *msg);
-//int ConsumerMessageCallBackInner(CPushConsumer *consumer, CMessageExt *msg) {
-//return ConsumeMessageCallback(consumer,msg);
-//}
-//int SetConsumerMessageCallBackInner(CPushConsumer *consumer) {
-//return RegisterMessageCallback(consumer,ConsumerMessageCallBackInner);
-//}
-import "C"
-import "fmt"
-
-//type PushConsumer C.struct_CPushConsumer
-//type MessageExt C.struct_CMessageExt
-type Callback func(msg MessageExt) ConsumeStatus
-
-func CreatePushConsumer(groupId string) (consumer PushConsumer) {
-	consumer = C.CreatePushConsumer(C.CString(groupId))
-	return consumer;
-}
-func DestroyPushConsumer(consumer PushConsumer) {
-	consumer = C.DestroyPushConsumer(consumer.(*C.struct_CPushConsumer))
-	return
-}
-func StartPushConsumer(consumer PushConsumer) int {
-	return int(C.StartPushConsumer(consumer.(*C.struct_CPushConsumer)))
-}
-func ShutdownPushConsumer(consumer PushConsumer) int {
-	return int(C.ShutdownPushConsumer(consumer.(*C.struct_CPushConsumer)))
-}
-func SetPushConsumerGroupID(consumer PushConsumer, groupId string) (int) {
-	return int(C.SetPushConsumerGroupID(consumer.(*C.struct_CPushConsumer), C.CString(groupId)))
-}
-func SetPushConsumerNameServerAddress(consumer PushConsumer, name string) (int) {
-	return int(C.SetPushConsumerNameServerAddress(consumer.(*C.struct_CPushConsumer), C.CString(name)))
-}
-func SetPushConsumerThreadCount(consumer PushConsumer, count int) (int) {
-	return int(C.SetPushConsumerThreadCount(consumer.(*C.struct_CPushConsumer), C.int(count)))
-}
-func SetPushConsumerMessageBatchMaxSize(consumer PushConsumer, size int) (int) {
-	return int(C.SetPushConsumerMessageBatchMaxSize(consumer.(*C.struct_CPushConsumer), C.int(size)))
-}
-func SetPushConsumerInstanceName(consumer PushConsumer, name string) (int) {
-	return int(C.SetPushConsumerInstanceName(consumer.(*C.struct_CPushConsumer), C.CString(name)))
-}
-func SetPushConsumerSessionCredentials(consumer PushConsumer, accessKey string, secretKey string, channel string) (int) {
-	ret := C.SetPushConsumerSessionCredentials(consumer.(*C.struct_CPushConsumer),
-		C.CString(accessKey),
-		C.CString(secretKey),
-		C.CString(channel))
-	return int(ret)
-}
-
-func Subscribe(consumer PushConsumer, topic string, expression string) (int) {
-	return int(C.Subscribe(consumer.(*C.struct_CPushConsumer), C.CString(topic), C.CString(expression)))
-}
-
-var CallBackMap map[PushConsumer]Callback = map[PushConsumer]Callback{}
-
-func RegisterMessageCallback(consumer PushConsumer, callback Callback) (int) {
-	CallBackMap[consumer] = callback
-	ret := C.SetConsumerMessageCallBackInner(consumer.(*C.struct_CPushConsumer))
-	return int(ret)
-}
-func ConsumeMessageInner(consumer PushConsumer, msg MessageExt) (ConsumeStatus) {
-	fmt.Println("ConsumeMessageInner")
-	callback,ok := CallBackMap[consumer]
-	if ok {
-		return callback(msg)
-	}
-	return ReConsumeLater
-}
diff --git a/src/client/sendresult.go b/src/client/sendresult.go
deleted file mode 100644
index 2dc0c73..0000000
--- a/src/client/sendresult.go
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package client
-
-type SendStatus int
-
-const (
-    SendOK SendStatus = iota // value --> 0
-    SendFlushDiskTimeout              // value --> 1
-    SendFlushSlaveTimeout            // value --> 2
-    SendSlaveNotAvailable          // value --> 3
-)
-func (status SendStatus) String() string {
-    switch status {
-    case SendOK:
-        return "SendOK"
-    case SendFlushDiskTimeout:
-        return "SendFlushDiskTimeout"
-    case SendFlushSlaveTimeout:
-        return "SendFlushSlaveTimeout"
-    case SendSlaveNotAvailable:
-        return "SendSlaveNotAvailable"
-    default:
-        return "Unknown"
-    }
-}
-type SendResult struct {
-    Status SendStatus
-    MsgId string
-    Offset int64
-}
\ No newline at end of file
diff --git a/src/sample/sample_producer.go b/src/sample/sample_producer.go
deleted file mode 100644
index 336812c..0000000
--- a/src/sample/sample_producer.go
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package main
-
-import "fmt"
-import "../client"
-
-func SampleSendMessage() {
-	fmt.Println("Start Send Message..")
-	namesvr := "172.17.0.2:9876"
-	topic := "T_TestTopic"
-	keys := "testKeys"
-	body := "testBody"
-	//Create Producer
-	producer := client.CreateProducer("testGroupId")
-	fmt.Println("Create Producer")
-	client.SetProducerNameServerAddress(producer, namesvr)
-	fmt.Println("Set Producer Nameserver:", namesvr)
-	client.StartProducer(producer)
-	fmt.Println("Start Producer")
-
-	for i := 1; i <= 10; i++ {
-		//Create Message
-		msg := client.CreateMessage(topic)
-		fmt.Println("Create Message, Topic:", topic)
-		client.SetMessageKeys(msg, keys)
-		fmt.Println("Set Message Keys:", keys)
-		client.SetMessageBody(msg, body)
-		fmt.Println("Set Message body:", body)
-
-		sendresult := client.SendMessageSync(producer, msg)
-		fmt.Println("Send Message OK")
-		fmt.Println("SendStatus:", sendresult.Status)
-		fmt.Println("SendResult:", sendresult)
-		client.DestroyMessage(msg)
-	}
-	client.ShutdownProducer(producer)
-	client.DestroyProducer(producer)
-}
diff --git a/src/sample/sample_pushconsumer.go b/src/sample/sample_pushconsumer.go
deleted file mode 100644
index e70dd75..0000000
--- a/src/sample/sample_pushconsumer.go
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package main
-
-import "fmt"
-import "../client"
-
-func SampleConsumeMessage(msg client.MessageExt) (client.ConsumeStatus) {
-	fmt.Println("ConsumeMessageInSample")
-	fmt.Println("Message topic",client.GetMessageTopic(msg))
-	fmt.Println("MessageId",client.GetMessageId(msg))
-	return client.ConsumeSuccess
-}
-
-func SamplePushConsumeMessage() {
-	fmt.Println("Start Send Message..")
-	namesvr := "172.17.0.2:9876"
-	topic := "T_TestTopic"
-	expression := "*"
-	//Create Producer
-	consumer := client.CreatePushConsumer("testGroupId")
-	fmt.Println("Create Push Consumer")
-	client.SetPushConsumerNameServerAddress(consumer, namesvr)
-	fmt.Println("Set Push Consumer Nameserver:", namesvr)
-
-	client.Subscribe(consumer, topic, expression)
-	fmt.Println("Set Push Consumer Subscribe,Topic:", topic," Exp:", expression)
-
-	client.RegisterMessageCallback(consumer,SampleConsumeMessage)
-	client.StartPushConsumer(consumer)
-	fmt.Println("Start Push Consumer")
-	fmt.Scan()
-	select{}
-	client.ShutdownPushConsumer(consumer)
-	client.DestroyPushConsumer(consumer)
-}
diff --git a/src/test/producer_test.go b/src/test/producer_test.go
deleted file mode 100644
index 66e2caf..0000000
--- a/src/test/producer_test.go
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package client_test
-
-import "fmt"
-import "testing"
-import "../client"
-
-func TestCreateMessage(test *testing.T){
-    fmt.Println("-----TestCreateMessage Start----")
-    client.CreateMessage("testTopic")
-    fmt.Println("-----TestCreateMessage Finish----")
-}
-func TestDestroyMessage(test *testing.T){
-    fmt.Println("-----TestCreateMessage Start----")
-    msg := client.CreateMessage("testTopic")
-    client.DestroyMessage(msg)
-    fmt.Println("-----TestCreateMessage Finish----")
-}
-func TestSetMessageKeys(test *testing.T){
-    fmt.Println("-----TestSetMessageKeys Start----")
-    msg := client.CreateMessage("testTopic")
-    len := client.SetMessageKeys(msg,"testKey")
-    fmt.Println("Len:",len)
-    client.DestroyMessage(msg)
-    fmt.Println("-----TestCreateMessage Finish----")
-}
-func TestCreateProducer(test *testing.T){
-    fmt.Println("-----TestCreateProducer Start----")
-    client.CreateProducer("testGroupId")
-    fmt.Println("-----TestCreateProducer Finish----")
-}
diff --git a/src/test/pushconsumer_test.go b/src/test/pushconsumer_test.go
deleted file mode 100644
index 7083713..0000000
--- a/src/test/pushconsumer_test.go
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package client_test
-
-import "fmt"
-import "testing"
-import "../client"
-
-func TestCreatePushConsumer(test *testing.T){
-    fmt.Println("-----TestCreateProducer Start----")
-    consumer := client.CreatePushConsumer("testGroupId")
-    client.DestroyPushConsumer(consumer)
-    fmt.Println("-----TestCreateProducer Finish----")
-}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services