You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/07/29 03:28:23 UTC

[pulsar-client-go] branch master updated: Fix gofmt and golint issues

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 4512b67  Fix gofmt and golint issues
     new 7a9aa2b  Merge pull request #25 from wolfstudy/xiaolong/golint-code-format
4512b67 is described below

commit 4512b67b069e3fb13dd7e3f7a9cbdebcbc11bd26
Author: xiaolong.ran <ra...@gmail.com>
AuthorDate: Fri Jul 26 16:25:27 2019 +0800

    Fix gofmt and golint issues
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
---
 perf/perf-consumer.go                  |  3 +--
 perf/perf-producer.go                  |  3 +--
 perf/pulsar-perf-go.go                 |  2 --
 pkg/auth/disabled.go                   |  3 +--
 pkg/auth/provider.go                   | 11 +++++---
 pkg/auth/tls.go                        |  4 +--
 pkg/auth/token.go                      |  5 ++--
 pkg/compression/compression.go         | 10 +++++--
 pkg/compression/compression_test.go    |  2 --
 pkg/compression/lz4.go                 |  3 +--
 pkg/compression/noop.go                |  4 +--
 pkg/compression/zlib.go                |  3 +--
 pkg/compression/zstd.go                |  3 +--
 pkg/compression/zstd_native.go         |  2 --
 pulsar/client.go                       |  2 --
 pulsar/consumer.go                     | 33 ++++++++++-------------
 pulsar/error.go                        | 25 ++++++++++-------
 pulsar/impl_client.go                  |  2 --
 pulsar/impl_client_test.go             |  2 --
 pulsar/impl_message.go                 |  2 --
 pulsar/impl_message_test.go            |  2 --
 pulsar/impl_partition_producer.go      |  2 --
 pulsar/impl_producer.go                |  2 --
 pulsar/internal/backoff.go             |  4 +--
 pulsar/internal/batch_builder.go       | 16 ++++++++---
 pulsar/internal/buffer.go              | 12 ++++++---
 pulsar/internal/buffer_test.go         |  2 --
 pulsar/internal/checksum.go            |  5 ++--
 pulsar/internal/closable.go            |  2 --
 pulsar/internal/commands.go            |  5 ++--
 pulsar/internal/commands_test.go       |  2 --
 pulsar/internal/connection.go          |  5 ++--
 pulsar/internal/connection_pool.go     |  5 ++--
 pulsar/internal/connection_reader.go   |  2 --
 pulsar/internal/default_router.go      |  5 ++--
 pulsar/internal/default_router_test.go |  2 --
 pulsar/internal/hash.go                |  4 +--
 pulsar/internal/hash_test.go           |  2 --
 pulsar/internal/lookup_service.go      |  7 +++--
 pulsar/internal/lookup_service_test.go |  2 --
 pulsar/internal/rpc_client.go          |  2 --
 pulsar/internal/topic_name.go          |  4 +--
 pulsar/internal/topic_name_test.go     |  2 --
 pulsar/internal/utils.go               |  5 ++--
 pulsar/message.go                      | 32 +++++++++++-----------
 pulsar/producer.go                     | 49 +++++++++++++++++-----------------
 pulsar/producer_test.go                |  2 --
 pulsar/reader.go                       | 25 +++++++++--------
 pulsar/test_helper.go                  |  2 --
 util/blocking_queue.go                 | 19 ++++++-------
 util/blocking_queue_test.go            |  2 --
 util/semaphore.go                      | 17 ++++++++++--
 52 files changed, 184 insertions(+), 189 deletions(-)

diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
index e09e875..a5f98f1 100644
--- a/perf/perf-consumer.go
+++ b/perf/perf-consumer.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package main
 
@@ -31,6 +29,7 @@ import (
 	log "github.com/sirupsen/logrus"
 )
 
+// ConsumeArgs define the parameters required by consume
 type ConsumeArgs struct {
 	Topic             string
 	SubscriptionName  string
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index f656638..2728d6e 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package main
 
@@ -32,6 +30,7 @@ import (
 	log "github.com/sirupsen/logrus"
 )
 
+// ProduceArgs define the parameters required by produce
 type ProduceArgs struct {
 	Topic              string
 	Rate               int
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
index 858d9f6..06ac059 100644
--- a/perf/pulsar-perf-go.go
+++ b/perf/pulsar-perf-go.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package main
 
diff --git a/pkg/auth/disabled.go b/pkg/auth/disabled.go
index 346dc78..e23bb35 100644
--- a/pkg/auth/disabled.go
+++ b/pkg/auth/disabled.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package auth
 
@@ -23,6 +21,7 @@ import "crypto/tls"
 
 type disabled struct{}
 
+// NewAuthDisabled return a interface of Provider
 func NewAuthDisabled() Provider {
 	return &disabled{}
 }
diff --git a/pkg/auth/provider.go b/pkg/auth/provider.go
index fda78a3..8868df5 100644
--- a/pkg/auth/provider.go
+++ b/pkg/auth/provider.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package auth
 
@@ -27,20 +25,27 @@ import (
 	"github.com/pkg/errors"
 )
 
+// Provider is a interface of authentication providers.
 type Provider interface {
+	// Init the authentication provider.
 	Init() error
 
+	// Name func returns the identifier for this authentication method.
 	Name() string
 
 	// return a client certificate chain, or nil if the data are not available
 	GetTLSCertificate() (*tls.Certificate, error)
 
-	//
+	// GetData returns the authentication data identifying this client that will be sent to the broker.
 	GetData() ([]byte, error)
 
 	io.Closer
 }
 
+// NewProvider get/create an authentication data provider which provides the data
+// that this client will be sent to the broker.
+// Some authentication method need to auth between each client channel. So it need
+// the broker, who it will talk to.
 func NewProvider(name string, params string) (Provider, error) {
 	m := parseParams(params)
 
diff --git a/pkg/auth/tls.go b/pkg/auth/tls.go
index f31ecc0..f04b755 100644
--- a/pkg/auth/tls.go
+++ b/pkg/auth/tls.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package auth
 
@@ -26,6 +24,7 @@ type tlsAuthProvider struct {
 	privateKeyPath  string
 }
 
+// NewAuthenticationTLSWithParams initialize the authentication provider with map param.
 func NewAuthenticationTLSWithParams(params map[string]string) Provider {
 	return NewAuthenticationTLS(
 		params["tlsCertFile"],
@@ -33,6 +32,7 @@ func NewAuthenticationTLSWithParams(params map[string]string) Provider {
 	)
 }
 
+// NewAuthenticationTLS initialize the authentication provider
 func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Provider {
 	return &tlsAuthProvider{
 		certificatePath: certificatePath,
diff --git a/pkg/auth/token.go b/pkg/auth/token.go
index 4e2b8a4..c89eeaf 100644
--- a/pkg/auth/token.go
+++ b/pkg/auth/token.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package auth
 
@@ -31,6 +29,7 @@ type tokenAuthProvider struct {
 	tokenSupplier func() (string, error)
 }
 
+// NewAuthenticationTokenWithParams return a interface of Provider with string map.
 func NewAuthenticationTokenWithParams(params map[string]string) (Provider, error) {
 	if params["token"] != "" {
 		return NewAuthenticationToken(params["token"]), nil
@@ -41,6 +40,7 @@ func NewAuthenticationTokenWithParams(params map[string]string) (Provider, error
 	}
 }
 
+// NewAuthenticationToken return a interface of Provider with a string token.
 func NewAuthenticationToken(token string) Provider {
 	return &tokenAuthProvider{
 		tokenSupplier: func() (string, error) {
@@ -52,6 +52,7 @@ func NewAuthenticationToken(token string) Provider {
 	}
 }
 
+// NewAuthenticationTokenFromFile return a interface of a Provider with a string token file path.
 func NewAuthenticationTokenFromFile(tokenFilePath string) Provider {
 	return &tokenAuthProvider{
 		tokenSupplier: func() (string, error) {
diff --git a/pkg/compression/compression.go b/pkg/compression/compression.go
index e9eac6d..ac52092 100644
--- a/pkg/compression/compression.go
+++ b/pkg/compression/compression.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,15 +14,22 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package compression
 
+// Provider is a interface of compression providers
 type Provider interface {
+	// CanCompress checks if the compression method is available under the current version.
 	CanCompress() bool
 
+	// Compress a []byte, the param is a []byte with the uncompressed content.
+	// The reader/writer indexes will not be modified. The return is a []byte
+	// with the compressed content.
 	Compress(data []byte) []byte
 
+	// Decompress a []byte. The buffer needs to have been compressed with the matching Encoder.
+	// The compressedData is compressed content, originalSize is the size of the original content.
+	// The return were the result will be passed, if err is nil, the buffer was decompressed, no nil otherwise.
 	Decompress(compressedData []byte, originalSize int) ([]byte, error)
 }
 
diff --git a/pkg/compression/compression_test.go b/pkg/compression/compression_test.go
index 8261d0c..98d4509 100644
--- a/pkg/compression/compression_test.go
+++ b/pkg/compression/compression_test.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package compression
 
diff --git a/pkg/compression/lz4.go b/pkg/compression/lz4.go
index 5489f8c..449c205 100644
--- a/pkg/compression/lz4.go
+++ b/pkg/compression/lz4.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package compression
 
@@ -26,6 +24,7 @@ import (
 type lz4Provider struct {
 }
 
+// NewLz4Provider return a interface of Provider.
 func NewLz4Provider() Provider {
 	return &lz4Provider{}
 }
diff --git a/pkg/compression/noop.go b/pkg/compression/noop.go
index 8dec533..0d64123 100644
--- a/pkg/compression/noop.go
+++ b/pkg/compression/noop.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,16 +14,17 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package compression
 
 type noopProvider struct{}
 
+// NewNoopProvider returns a Provider interface
 func NewNoopProvider() Provider {
 	return &noopProvider{}
 }
 
+// CanCompress always returns true, in the case of noopProvider, noopProvider means no compression.
 func (noopProvider) CanCompress() bool {
 	return true
 }
diff --git a/pkg/compression/zlib.go b/pkg/compression/zlib.go
index 7e0d123..f1c1b91 100644
--- a/pkg/compression/zlib.go
+++ b/pkg/compression/zlib.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package compression
 
@@ -26,6 +24,7 @@ import (
 
 type zlibProvider struct{}
 
+// NewZLibProvider returns a Provider interface
 func NewZLibProvider() Provider {
 	return &zlibProvider{}
 }
diff --git a/pkg/compression/zstd.go b/pkg/compression/zstd.go
index 3a3365c..529020c 100644
--- a/pkg/compression/zstd.go
+++ b/pkg/compression/zstd.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 // +build cgo
 
@@ -28,6 +26,7 @@ import (
 	zstd "github.com/valyala/gozstd"
 )
 
+// NewZStdProvider returns a Provider interface.
 func NewZStdProvider() Provider {
 	return newCGoZStdProvider()
 }
diff --git a/pkg/compression/zstd_native.go b/pkg/compression/zstd_native.go
index 9cf1124..fa3649e 100644
--- a/pkg/compression/zstd_native.go
+++ b/pkg/compression/zstd_native.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 // +build !cgo
 
diff --git a/pulsar/client.go b/pulsar/client.go
index 9147939..e69a9b2 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index b32e5fc..d45253d 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
@@ -30,18 +28,18 @@ type ConsumerMessage struct {
 	Message
 }
 
-// Types of subscription supported by Pulsar
+// SubscriptionType of subscription supported by Pulsar
 type SubscriptionType int
 
 const (
-	// There can be only 1 consumer on the same topic with the same subscription name
+	// Exclusive there can be only 1 consumer on the same topic with the same subscription name
 	Exclusive SubscriptionType = iota
 
-	// Multiple consumer will be able to use the same subscription name and the messages will be dispatched according to
+	// Shared subscription mode, multiple consumer will be able to use the same subscription name and the messages will be dispatched according to
 	// a round-robin rotation between the connected consumers
 	Shared
 
-	// Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.
+	// Failover subscription mode, multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.
 	// If that consumer disconnects, one of the other connected consumers will start receiving messages.
 	Failover
 )
@@ -56,7 +54,7 @@ const (
 	Earliest
 )
 
-// ConsumerBuilder is used to configure and create instances of Consumer
+// ConsumerOptions is used to configure and create instances of Consumer
 type ConsumerOptions struct {
 	// Specify the topic this consumer will subscribe on.
 	// Either a topic, a list of topics or a topics pattern are required when subscribing
@@ -122,28 +120,28 @@ type ConsumerOptions struct {
 	ReadCompacted bool
 }
 
-// An interface that abstracts behavior of Pulsar's consumer
+// Consumer is an interface that abstracts behavior of Pulsar's consumer
 type Consumer interface {
-	// Get the topic for the consumer
+	// Topic get the topic for the consumer
 	Topic() string
 
-	// Get a subscription for the consumer
+	// Subscription get a subscription for the consumer
 	Subscription() string
 
 	// Unsubscribe the consumer
 	Unsubscribe() error
 
-	// Receives a single message.
+	// Receive a single message.
 	// This calls blocks until a message is available.
 	Receive(context.Context) (Message, error)
 
 	// Ack the consumption of a single message
 	Ack(Message) error
 
-	// Ack the consumption of a single message, identified by its MessageID
+	// AckID the consumption of a single message, identified by its MessageID
 	AckID(MessageID) error
 
-	// Ack the reception of all the messages in the stream up to (and including) the provided message.
+	// AckCumulative the reception of all the messages in the stream up to (and including) the provided message.
 	// This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
 	// re-delivered to this consumer.
 	//
@@ -152,26 +150,23 @@ type Consumer interface {
 	// It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered.
 	AckCumulative(Message) error
 
-	// Ack the reception of all the messages in the stream up to (and including) the provided message.
+	// AckCumulativeID the reception of all the messages in the stream up to (and including) the provided message.
 	// This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
 	// re-delivered to this consumer.
-	//
 	// Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
-	//
 	// It's equivalent to calling asyncAcknowledgeCumulative(MessageID) and waiting for the callback to be triggered.
 	AckCumulativeID(MessageID) error
 
 	// Close the consumer and stop the broker to push more messages
 	Close() error
 
-	// Reset the subscription associated with this consumer to a specific message id.
+	// Seek reset the subscription associated with this consumer to a specific message id.
 	// The message id can either be a specific message or represent the first or last messages in the topic.
-	//
 	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
 	//       seek() on the individual partitions.
 	Seek(msgID MessageID) error
 
-	// Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
+	// RedeliverUnackedMessages redeliver all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
 	// active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
 	// the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
 	// breaks, the messages are redelivered after reconnect.
diff --git a/pulsar/error.go b/pulsar/error.go
index 38b9d64..8b73a95 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,22 +14,29 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
 import "fmt"
 
+// Result used to represent pulsar processing is an alias of type int.
 type Result int
 
 const (
-	ResultOk                   = iota // No errors
-	ResultUnknownError                // Unknown error happened on broker
-	ResultInvalidConfiguration        // Invalid configuration
-	ResultTimeoutError                // Operation timed out
-	ResultLookupError                 // Broker lookup failed
-	ResultInvalidTopicName            // Invalid topic name
-	ResultConnectError                // Failed to connect to broker
+	// ResultOk means no errors
+	ResultOk = iota
+	// ResultUnknownError means unknown error happened on broker
+	ResultUnknownError
+	// ResultInvalidConfiguration means invalid configuration
+	ResultInvalidConfiguration
+	// ResultTimeoutError means operation timed out
+	ResultTimeoutError
+	// ResultLookupError means broker lookup failed
+	ResultLookupError
+	// ResultInvalidTopicName means invalid topic name
+	ResultInvalidTopicName
+	// ResultConnectError means failed to connect to broker
+	ResultConnectError
 
 	//ReadError                      Result = 6  // Failed to read from socket
 	//AuthenticationError            Result = 7  // Authentication failed on broker
@@ -61,6 +67,7 @@ const (
 	//CryptoError                           Result = 33 // Error when crypto operation fails
 )
 
+// Error implement error interface, composed of two parts: msg and result.
 type Error struct {
 	msg    string
 	result Result
diff --git a/pulsar/impl_client.go b/pulsar/impl_client.go
index a03a41c..b73080b 100644
--- a/pulsar/impl_client.go
+++ b/pulsar/impl_client.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
diff --git a/pulsar/impl_client_test.go b/pulsar/impl_client_test.go
index 63fa6ba..ec3110a 100644
--- a/pulsar/impl_client_test.go
+++ b/pulsar/impl_client_test.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 710913e..a7b1207 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go
index bfbfde4..bd48495 100644
--- a/pulsar/impl_message_test.go
+++ b/pulsar/impl_message_test.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index bd112da..ce8a2a9 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index 3999ff1..497a18c 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
diff --git a/pulsar/internal/backoff.go b/pulsar/internal/backoff.go
index 94a185e..c558f58 100644
--- a/pulsar/internal/backoff.go
+++ b/pulsar/internal/backoff.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
@@ -23,6 +21,7 @@ import (
 	"time"
 )
 
+// Backoff
 type Backoff struct {
 	backoff time.Duration
 }
@@ -32,6 +31,7 @@ const (
 	maxBackoff = 60 * time.Second
 )
 
+// Next
 func (b *Backoff) Next() time.Duration {
 	// Double the delay each time
 	b.backoff += b.backoff
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index b44989f..53035ed 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
@@ -31,11 +29,17 @@ import (
 )
 
 const (
-	MaxMessageSize             = 5 * 1024 * 1024
-	MaxBatchSize               = 128 * 1024
+	// MaxMessageSize limit message size for transfer
+	MaxMessageSize = 5 * 1024 * 1024
+	// MaxBatchSize will be the largest size for a batch sent from this particular producer.
+	// This is used as a baseline to allocate a new buffer that can hold the entire batch
+	// without needing costly re-allocations.
+	MaxBatchSize = 128 * 1024
+	// DefaultMaxMessagesPerBatch init default num of entries in per batch.
 	DefaultMaxMessagesPerBatch = 1000
 )
 
+// BatchBuilder wraps the objects needed to build a batch.
 type BatchBuilder struct {
 	buffer Buffer
 
@@ -55,6 +59,7 @@ type BatchBuilder struct {
 	compressionProvider compression.Provider
 }
 
+// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
 func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64,
 	compressionType pb.CompressionType) (*BatchBuilder, error) {
 	if maxMessages == 0 {
@@ -88,6 +93,7 @@ func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64,
 	return bb, nil
 }
 
+// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
 func (bb *BatchBuilder) IsFull() bool {
 	return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > MaxBatchSize
 }
@@ -97,6 +103,7 @@ func (bb *BatchBuilder) hasSpace(payload []byte) bool {
 	return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > MaxBatchSize
 }
 
+// Add will add single message to batch.
 func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceID uint64, payload []byte,
 	callback interface{}, replicateTo []string) bool {
 	if replicateTo != nil && bb.numMessages != 0 {
@@ -135,6 +142,7 @@ func (bb *BatchBuilder) reset() {
 	bb.msgMetadata.ReplicateTo = nil
 }
 
+// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
 func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks []interface{}) {
 	log.Debug("BatchBuilder flush: messages: ", bb.numMessages)
 	if bb.numMessages == 0 {
diff --git a/pulsar/internal/buffer.go b/pulsar/internal/buffer.go
index cadf041..98d7116 100644
--- a/pulsar/internal/buffer.go
+++ b/pulsar/internal/buffer.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
@@ -23,11 +21,15 @@ import (
 	"encoding/binary"
 )
 
+// Buffer is a variable-sized buffer of bytes with Read and Write methods.
+// The zero value for Buffer is an empty buffer ready to use.
 type Buffer interface {
 	ReadableBytes() uint32
 
 	WritableBytes() uint32
 
+	// Capacity returns the capacity of the buffer's underlying byte slice,
+	// that is, the total space allocated for the buffer's data.
 	Capacity() uint32
 
 	IsWritable() bool
@@ -40,10 +42,10 @@ type Buffer interface {
 
 	WritableSlice() []byte
 
-	// Advance the writer index when data was written in a slice
+	// WrittenBytes advance the writer index when data was written in a slice
 	WrittenBytes(size uint32)
 
-	// Copy the available portion of data at the beginning of the buffer
+	// MoveToFront copy the available portion of data at the beginning of the buffer
 	MoveToFront()
 
 	ReadUint16() uint16
@@ -62,6 +64,7 @@ type Buffer interface {
 
 	Resize(newSize uint32)
 
+	// Clear will clear the current buffer data.
 	Clear()
 }
 
@@ -72,6 +75,7 @@ type buffer struct {
 	writerIdx uint32
 }
 
+// NewBuffer creates and initializes a new Buffer using buf as its initial contents.
 func NewBuffer(size int) Buffer {
 	return &buffer{
 		data:      make([]byte, size),
diff --git a/pulsar/internal/buffer_test.go b/pulsar/internal/buffer_test.go
index 72a9ee8..8edc60c 100644
--- a/pulsar/internal/buffer_test.go
+++ b/pulsar/internal/buffer_test.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
diff --git a/pulsar/internal/checksum.go b/pulsar/internal/checksum.go
index 9716724..11d8d4f 100644
--- a/pulsar/internal/checksum.go
+++ b/pulsar/internal/checksum.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,14 +14,16 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
 import "hash/crc32"
 
+// crc32cTable holds the precomputed crc32 hash table
+// used by Pulsar (crc32c)
 var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
 
+// Crc32cCheckSum handles computing the checksum.
 func Crc32cCheckSum(data []byte) uint32 {
 	return crc32.Checksum(data, crc32cTable)
 }
diff --git a/pulsar/internal/closable.go b/pulsar/internal/closable.go
index 77801a4..7906c89 100644
--- a/pulsar/internal/closable.go
+++ b/pulsar/internal/closable.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index d528eb4..05ba409 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
@@ -26,6 +24,7 @@ import (
 	log "github.com/sirupsen/logrus"
 )
 
+// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
 const MaxFrameSize = 5 * 1024 * 1024
 
 const magicCrc32c uint16 = 0x0e01
@@ -115,6 +114,7 @@ func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageM
 	wb.PutUint32(checksum, checksumIdx)
 }
 
+// ConvertFromStringMap convert a string map to a KeyValue []byte
 func ConvertFromStringMap(m map[string]string) []*pb.KeyValue {
 	list := make([]*pb.KeyValue, len(m))
 
@@ -131,6 +131,7 @@ func ConvertFromStringMap(m map[string]string) []*pb.KeyValue {
 	return list
 }
 
+// ConvertToStringMap convert a KeyValue []byte to string map
 func ConvertToStringMap(pbb []*pb.KeyValue) map[string]string {
 	m := make(map[string]string)
 
diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go
index 3ea96a7..3ae35c9 100644
--- a/pulsar/internal/commands_test.go
+++ b/pulsar/internal/commands_test.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 8baef2a..7d5ee74 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
@@ -47,11 +45,14 @@ type TLSOptions struct {
 // a consumer) that can register itself to get notified
 // when the connection is closed.
 type ConnectionListener interface {
+	// ReceivedSendReceipt receive and process the return value of the send command.
 	ReceivedSendReceipt(response *pb.CommandSendReceipt)
 
+	// ConnectionClosed close the TCP connection.
 	ConnectionClosed()
 }
 
+// Connection is a interface of client cnx.
 type Connection interface {
 	SendRequest(requestID uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand))
 	WriteData(data []byte)
diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go
index b140ee7..4802e23 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
@@ -28,7 +26,9 @@ import (
 	log "github.com/sirupsen/logrus"
 )
 
+// ConnectionPool is a interface of connection pool.
 type ConnectionPool interface {
+	// GetConnection get a connection from ConnectionPool.
 	GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error)
 
 	// Close all the connections in the pool
@@ -41,6 +41,7 @@ type connectionPool struct {
 	auth       auth.Provider
 }
 
+// NewConnectionPool init connection pool.
 func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider) ConnectionPool {
 	return &connectionPool{
 		tlsOptions: tlsOptions,
diff --git a/pulsar/internal/connection_reader.go b/pulsar/internal/connection_reader.go
index ad09c2c..746db5c 100644
--- a/pulsar/internal/connection_reader.go
+++ b/pulsar/internal/connection_reader.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
diff --git a/pulsar/internal/default_router.go b/pulsar/internal/default_router.go
index cb391b9..6626351 100644
--- a/pulsar/internal/default_router.go
+++ b/pulsar/internal/default_router.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
@@ -33,12 +31,15 @@ type defaultRouter struct {
 
 type Clock func() uint64
 
+// NewSystemClock init system clock and return current time.
 func NewSystemClock() Clock {
 	return func() uint64 {
 		return uint64(time.Now().UnixNano())
 	}
 }
 
+// NewDefaultRouter set the message routing mode for the partitioned producer.
+// Default routing mode is round-robin routing.
 func NewDefaultRouter(clock Clock, hashFunc func(string) uint32, maxBatchingDelay time.Duration) func(string, uint32) int {
 	state := &defaultRouter{
 		clock:            clock,
diff --git a/pulsar/internal/default_router_test.go b/pulsar/internal/default_router_test.go
index 0c389f2..7dc3e0b 100644
--- a/pulsar/internal/default_router_test.go
+++ b/pulsar/internal/default_router_test.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
diff --git a/pulsar/internal/hash.go b/pulsar/internal/hash.go
index 5555888..0ac8590 100644
--- a/pulsar/internal/hash.go
+++ b/pulsar/internal/hash.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,12 +14,12 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
 import "github.com/spaolacci/murmur3"
 
+// JavaStringHash and Java String.hashCode() equivalent
 func JavaStringHash(s string) uint32 {
 	var h uint32
 	for i, size := 0, len(s); i < size; i++ {
@@ -30,6 +29,7 @@ func JavaStringHash(s string) uint32 {
 	return h
 }
 
+// Murmur3_32Hash use Murmur3 hashing function
 func Murmur3_32Hash(s string) uint32 {
 	h := murmur3.New32()
 	_, err := h.Write([]byte(s))
diff --git a/pulsar/internal/hash_test.go b/pulsar/internal/hash_test.go
index 7a601f6..2f9bc19 100644
--- a/pulsar/internal/hash_test.go
+++ b/pulsar/internal/hash_test.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go
index e257591..d5da829 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
@@ -30,12 +28,16 @@ import (
 	log "github.com/sirupsen/logrus"
 )
 
+// LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr.
 type LookupResult struct {
 	LogicalAddr  *url.URL
 	PhysicalAddr *url.URL
 }
 
+// LookupService is a interface of lookup service.
 type LookupService interface {
+	// Lookup perform a lookup for the given topic, confirm the location of the broker
+	// where the topic is located, and return the LookupResult.
 	Lookup(topic string) (*LookupResult, error)
 }
 
@@ -44,6 +46,7 @@ type lookupService struct {
 	serviceURL *url.URL
 }
 
+// NewLookupService init a lookup service struct and return an object of LookupService.
 func NewLookupService(rpcClient RPCClient, serviceURL *url.URL) LookupService {
 	return &lookupService{
 		rpcClient:  rpcClient,
diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go
index e1fae0c..8baddf3 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 6b9d338..8234dc8 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
diff --git a/pulsar/internal/topic_name.go b/pulsar/internal/topic_name.go
index 9b912d0..1cefeb7 100644
--- a/pulsar/internal/topic_name.go
+++ b/pulsar/internal/topic_name.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
@@ -26,6 +24,7 @@ import (
 	"strings"
 )
 
+// TopicName abstract a struct contained in a Topic
 type TopicName struct {
 	Name      string
 	Namespace string
@@ -38,6 +37,7 @@ const (
 	partitionedTopicSuffix = "-partition-"
 )
 
+// ParseTopicName parse the given topic name and return TopicName.
 func ParseTopicName(topic string) (*TopicName, error) {
 	// The topic name can be in two different forms, one is fully qualified topic name,
 	// the other one is short topic name
diff --git a/pulsar/internal/topic_name_test.go b/pulsar/internal/topic_name_test.go
index 93e7fd9..bd57334 100644
--- a/pulsar/internal/topic_name_test.go
+++ b/pulsar/internal/topic_name_test.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
diff --git a/pulsar/internal/utils.go b/pulsar/internal/utils.go
index 30837d6..f0d5fb3 100644
--- a/pulsar/internal/utils.go
+++ b/pulsar/internal/utils.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package internal
 
@@ -24,11 +22,12 @@ import (
 	"time"
 )
 
+// TimestampMillis return a time unix nano.
 func TimestampMillis(t time.Time) uint64 {
 	return uint64(t.UnixNano()) / uint64(time.Millisecond)
 }
 
-// Perform atomic read and update
+// GetAndAdd perform atomic read and update
 func GetAndAdd(n *uint64, diff uint64) uint64 {
 	for {
 		v := *n
diff --git a/pulsar/message.go b/pulsar/message.go
index 2d03a31..1862c18 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,66 +14,67 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
 import "time"
 
+// ProducerMessage abstraction used in Pulsar producer
 type ProducerMessage struct {
 	// Payload for the message
 	Payload []byte
 
-	// Sets the key of the message for routing policy
+	// Key sets the key of the message for routing policy
 	Key string
 
-	// Attach application defined properties on the message
+	// Properties attach application defined properties on the message
 	Properties map[string]string
 
-	// Set the event time for a given message
+	// EventTime set the event time for a given message
 	EventTime *time.Time
 
-	// Override the replication clusters for this message.
+	// ReplicationClusters override the replication clusters for this message.
 	ReplicationClusters []string
 
-	// Set the sequence id to assign to the current message
+	// SequenceID set the sequence id to assign to the current message
 	SequenceID *int64
 }
 
+// Message abstraction used in Pulsar
 type Message interface {
-	// Get the topic from which this message originated from
+	// Topic get the topic from which this message originated from
 	Topic() string
 
+	// Properties are application defined key/value pairs that will be attached to the message.
 	// Return the properties attached to the message.
-	// Properties are application defined key/value pairs that will be attached to the message
 	Properties() map[string]string
 
-	// Get the payload of the message
+	// Payload get the payload of the message
 	Payload() []byte
 
-	// Get the unique message ID associated with this message.
+	// ID get the unique message ID associated with this message.
 	// The message id can be used to univocally refer to a message without having the keep the entire payload in memory.
 	ID() MessageID
 
-	// Get the publish time of this message. The publish time is the timestamp that a client publish the message.
+	// PublishTime get the publish time of this message. The publish time is the timestamp that a client publish the message.
 	PublishTime() time.Time
 
-	// Get the event time associated with this message. It is typically set by the applications via
+	// EventTime get the event time associated with this message. It is typically set by the applications via
 	// `ProducerMessage.EventTime`.
 	// If there isn't any event time associated with this event, it will be nil.
 	EventTime() *time.Time
 
-	// Get the key of the message, if any
+	// Key get the key of the message, if any
 	Key() string
 }
 
-// Identifier for a particular message
+// MessageID identifier for a particular message
 type MessageID interface {
 	// Serialize the message id into a sequence of bytes that can be stored somewhere else
 	Serialize() []byte
 }
 
-// Reconstruct a MessageID object from its serialized representation
+// DeserializeMessageID reconstruct a MessageID object from its serialized representation
 func DeserializeMessageID(data []byte) (MessageID, error) {
 	return deserializeMessageID(data)
 }
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 2a248cb..c77a047 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
@@ -27,8 +25,10 @@ import (
 type HashingScheme int
 
 const (
-	JavaStringHash HashingScheme = iota // Java String.hashCode() equivalent
-	Murmur3_32Hash                      // Use Murmur3 hashing function
+	// JavaStringHash and Java String.hashCode() equivalent
+	JavaStringHash HashingScheme = iota
+	// Murmur3_32Hash use Murmur3 hashing function
+	Murmur3_32Hash
 )
 
 type CompressionType int
@@ -40,17 +40,18 @@ const (
 	ZSTD
 )
 
+// TopicMetadata is a interface of topic metadata
 type TopicMetadata interface {
-	// Get the number of partitions for the specific topic
+	// NumPartitions get the number of partitions for the specific topic
 	NumPartitions() uint32
 }
 
 type ProducerOptions struct {
-	// Specify the topic this producer will be publishing on.
+	// Topic specify the topic this producer will be publishing on.
 	// This argument is required when constructing the producer.
 	Topic string
 
-	// Specify a name for the producer
+	// Name specify a name for the producer
 	// If not assigned, the system will generate a globally unique name which can be access with
 	// Producer.ProducerName().
 	// When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique
@@ -58,32 +59,32 @@ type ProducerOptions struct {
 	// a topic.
 	Name string
 
-	// Attach a set of application defined properties to the producer
+	// Properties attach a set of application defined properties to the producer
 	// This properties will be visible in the topic stats
 	Properties map[string]string
 
-	// Set the send timeout (default: 30 seconds)
+	// SendTimeout set the send timeout (default: 30 seconds)
 	// If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
 	// Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message
 	// deduplication feature.
 	SendTimeout time.Duration
 
-	// Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
+	// MaxPendingMessages set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
 	// When the queue is full, by default, all calls to Producer.send() and Producer.sendAsync() will fail
 	// unless `BlockIfQueueFull` is set to true. Use BlockIfQueueFull(boolean) to change the blocking behavior.
 	MaxPendingMessages int
 
-	// Set the number of max pending messages across all the partitions
+	// MaxPendingMessagesAcrossPartitions set the number of max pending messages across all the partitions
 	// This setting will be used to lower the max pending messages for each partition
 	// `MaxPendingMessages(int)`, if the total exceeds the configured value.
 	MaxPendingMessagesAcrossPartitions int
 
-	// Set whether the `Producer.Send()` and `Producer.sendAsync()` operations should block when the outgoing
+	// BlockIfQueueFull set whether the `Producer.Send()` and `Producer.sendAsync()` operations should block when the outgoing
 	// message queue is full. Default is `false`. If set to `false`, send operations will immediately fail with
 	// `ProducerQueueIsFullError` when there is no space left in pending queue.
 	BlockIfQueueFull bool
 
-	// Change the `HashingScheme` used to chose the partition on where to publish a particular message.
+	// HashingScheme change the `HashingScheme` used to chose the partition on where to publish a particular message.
 	// Standard hashing functions available are:
 	//
 	//  - `JavaStringHash` : Java String.hashCode() equivalent
@@ -93,7 +94,7 @@ type ProducerOptions struct {
 	// Default is `JavaStringHash`.
 	HashingScheme
 
-	// Set the compression type for the producer.
+	// CompressionType set the compression type for the producer.
 	// By default, message payloads are not compressed. Supported compression types are:
 	//  - LZ4
 	//  - ZLIB
@@ -103,38 +104,36 @@ type ProducerOptions struct {
 	// release in order to be able to receive messages compressed with ZSTD.
 	CompressionType
 
-	// Set a custom message routing policy by passing an implementation of MessageRouter
+	// MessageRouter set a custom message routing policy by passing an implementation of MessageRouter
 	// The router is a function that given a particular message and the topic metadata, returns the
 	// partition index where the message should be routed to
 	MessageRouter func(Message, TopicMetadata) int
 
-	// Control whether automatic batching of messages is enabled for the producer. By default batching
+	// DisableBatching control whether automatic batching of messages is enabled for the producer. By default batching
 	// is enabled.
-	//
 	// When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
 	// broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
 	// messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or
 	// contents.
-	//
 	// When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
 	// Setting `DisableBatching: true` will make the producer to send messages individually
 	DisableBatching bool
 
-	// Set the time period within which the messages sent will be batched (default: 10ms) if batch messages are
+	// BatchingMaxPublishDelay set the time period within which the messages sent will be batched (default: 10ms) if batch messages are
 	// enabled. If set to a non zero value, messages will be queued until this time interval or until
 	BatchingMaxPublishDelay time.Duration
 
-	// Set the maximum number of messages permitted in a batch. (default: 1000) If set to a value greater than 1,
+	// BatchingMaxMessages set the maximum number of messages permitted in a batch. (default: 1000) If set to a value greater than 1,
 	// messages will be queued until this threshold is reached or batch interval has elapsed
 	BatchingMaxMessages uint
 }
 
-// The producer is used to publish messages on a topic
+// Producer is used to publish messages on a topic
 type Producer interface {
-	// return the topic to which producer is publishing to
+	// Topic return the topic to which producer is publishing to
 	Topic() string
 
-	// return the producer name which could have been assigned by the system or specified by the client
+	// Name return the producer name which could have been assigned by the system or specified by the client
 	Name() string
 
 	// Send a message
@@ -143,12 +142,12 @@ type Producer interface {
 	// producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload })
 	Send(context.Context, *ProducerMessage) error
 
-	// Send a message in asynchronous mode
+	// SendAsync a message in asynchronous mode
 	// The callback will report back the message being published and
 	// the eventual error in publishing
 	SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error))
 
-	// Get the last sequence id that was published by this producer.
+	// LastSequenceID get the last sequence id that was published by this producer.
 	// This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that
 	// was published and acknowledged by the broker.
 	// After recreating a producer with the same producer name, this will return the last message that was
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 5ef6d2c..0b17b37 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
diff --git a/pulsar/reader.go b/pulsar/reader.go
index 5592630..3e088b4 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,26 +14,27 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
 import "context"
 
+// ReaderMessage package Reader and Message as a struct to use
 type ReaderMessage struct {
 	Reader
 	Message
 }
 
+// ReaderOptions abstraction Reader options to use.
 type ReaderOptions struct {
-	// Specify the topic this consumer will subscribe on.
+	// Topic specify the topic this consumer will subscribe on.
 	// This argument is required when constructing the reader.
 	Topic string
 
-	// Set the reader name.
+	// Name set the reader name.
 	Name string
 
-	// The initial reader positioning is done by specifying a message id. The options are:
+	// StartMessageID initial reader positioning is done by specifying a message id. The options are:
 	//  * `pulsar.EarliestMessage` : Start reading from the earliest message available in the topic
 	//  * `pulsar.LatestMessage` : Start reading from the end topic, only getting messages published after the
 	//                           reader was created
@@ -43,19 +43,18 @@ type ReaderOptions struct {
 	//                  messageID
 	StartMessageID MessageID
 
-	// Sets a `MessageChannel` for the consumer
+	// MessageChannel sets a `MessageChannel` for the consumer
 	// When a message is received, it will be pushed to the channel for consumption
 	MessageChannel chan ReaderMessage
 
-	// Sets the size of the consumer receive queue.
+	// ReceiverQueueSize sets the size of the consumer receive queue.
 	// The consumer receive queue controls how many messages can be accumulated by the Reader before the
 	// application calls Reader.readNext(). Using a higher value could potentially increase the consumer
 	// throughput at the expense of bigger memory utilization.
-	//
 	// Default value is {@code 1000} messages and should be good for most use cases.
 	ReceiverQueueSize int
 
-	// Set the subscription role prefix. The default prefix is "reader".
+	// SubscriptionRolePrefix set the subscription role prefix. The default prefix is "reader".
 	SubscriptionRolePrefix string
 
 	// If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog
@@ -68,15 +67,15 @@ type ReaderOptions struct {
 	ReadCompacted bool
 }
 
-// A Reader can be used to scan through all the messages currently available in a topic.
+// Reader can be used to scan through all the messages currently available in a topic.
 type Reader interface {
-	// The topic from which this reader is reading from
+	// Topic from which this reader is reading from
 	Topic() string
 
-	// Read the next message in the topic, blocking until a message is available
+	// Next read the next message in the topic, blocking until a message is available
 	Next(context.Context) (Message, error)
 
-	// Check if there is any message available to read from the current position
+	// HasNext check if there is any message available to read from the current position
 	HasNext() (bool, error)
 
 	// Close the reader and stop the broker to push more messages
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index a12a989..c58c239 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
diff --git a/util/blocking_queue.go b/util/blocking_queue.go
index 90b1b54..224baac 100644
--- a/util/blocking_queue.go
+++ b/util/blocking_queue.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package util
 
@@ -25,29 +23,31 @@ import (
 	log "github.com/sirupsen/logrus"
 )
 
+// BlockingQueue is a interface of block queue
 type BlockingQueue interface {
-	// Enqueue one item, block if the queue is full
+	// Put enqueue one item, block if the queue is full
 	Put(item interface{})
 
-	// Dequeue one item, block until it's available
+	// Take dequeue one item, block until it's available
 	Take() interface{}
 
-	// Dequeue one item, return nil if queue is empty
+	// Poll dequeue one item, return nil if queue is empty
 	Poll() interface{}
 
-	// Return the first item without dequeing, return nil if queue is empty
+	// Peek return the first item without dequeing, return nil if queue is empty
 	Peek() interface{}
 
-	// Return last item in queue without dequeing, return nil if queue is empty
+	// PeekLast return last item in queue without dequeing, return nil if queue is empty
 	PeekLast() interface{}
 
-	// Return the current size of the queue
+	// Size return the current size of the queue
 	Size() int
 
-	// Return an iterator for the queue
+	// Iterator return an iterator for the queue
 	Iterator() BlockingQueueIterator
 }
 
+// BlockingQueueIterator abstract a interface of block queue iterator.
 type BlockingQueueIterator interface {
 	HasNext() bool
 	Next() interface{}
@@ -71,6 +71,7 @@ type blockingQueueIterator struct {
 	toRead  int
 }
 
+// NewBlockingQueue init block queue and returns a BlockingQueue
 func NewBlockingQueue(maxSize int) BlockingQueue {
 	bq := &blockingQueue{
 		items:   make([]interface{}, maxSize),
diff --git a/util/blocking_queue_test.go b/util/blocking_queue_test.go
index f8cb3fa..f6cbaa0 100644
--- a/util/blocking_queue_test.go
+++ b/util/blocking_queue_test.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package util
 
diff --git a/util/semaphore.go b/util/semaphore.go
index 45fa7f4..74e4d16 100644
--- a/util/semaphore.go
+++ b/util/semaphore.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,16 +14,30 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package util
 
+// Semaphore is a channel of bool, used to receive a bool type semaphore.
 type Semaphore chan bool
 
+// Acquire a permit from this semaphore, blocking until one is available.
+
+// Acquire a permit, if one is available and returns immediately,
+// reducing the number of available permits by one.
 func (s Semaphore) Acquire() {
 	s <- true
 }
 
+// Release a permit, returning it to the semaphore.
+
+// Release a permit, increasing the number of available permits by
+// one.  If any threads are trying to acquire a permit, then one is
+// selected and given the permit that was just released.  That thread
+// is (re)enabled for thread scheduling purposes.
+// There is no requirement that a thread that releases a permit must
+// have acquired that permit by calling Acquire().
+// Correct usage of a semaphore is established by programming convention
+// in the application.
 func (s Semaphore) Release() {
 	<-s
 }