You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/07/29 03:28:23 UTC
[pulsar-client-go] branch master updated: Fix gofmt and golint
issues
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 4512b67 Fix gofmt and golint issues
new 7a9aa2b Merge pull request #25 from wolfstudy/xiaolong/golint-code-format
4512b67 is described below
commit 4512b67b069e3fb13dd7e3f7a9cbdebcbc11bd26
Author: xiaolong.ran <ra...@gmail.com>
AuthorDate: Fri Jul 26 16:25:27 2019 +0800
Fix gofmt and golint issues
Signed-off-by: xiaolong.ran <ra...@gmail.com>
---
perf/perf-consumer.go | 3 +--
perf/perf-producer.go | 3 +--
perf/pulsar-perf-go.go | 2 --
pkg/auth/disabled.go | 3 +--
pkg/auth/provider.go | 11 +++++---
pkg/auth/tls.go | 4 +--
pkg/auth/token.go | 5 ++--
pkg/compression/compression.go | 10 +++++--
pkg/compression/compression_test.go | 2 --
pkg/compression/lz4.go | 3 +--
pkg/compression/noop.go | 4 +--
pkg/compression/zlib.go | 3 +--
pkg/compression/zstd.go | 3 +--
pkg/compression/zstd_native.go | 2 --
pulsar/client.go | 2 --
pulsar/consumer.go | 33 ++++++++++-------------
pulsar/error.go | 25 ++++++++++-------
pulsar/impl_client.go | 2 --
pulsar/impl_client_test.go | 2 --
pulsar/impl_message.go | 2 --
pulsar/impl_message_test.go | 2 --
pulsar/impl_partition_producer.go | 2 --
pulsar/impl_producer.go | 2 --
pulsar/internal/backoff.go | 4 +--
pulsar/internal/batch_builder.go | 16 ++++++++---
pulsar/internal/buffer.go | 12 ++++++---
pulsar/internal/buffer_test.go | 2 --
pulsar/internal/checksum.go | 5 ++--
pulsar/internal/closable.go | 2 --
pulsar/internal/commands.go | 5 ++--
pulsar/internal/commands_test.go | 2 --
pulsar/internal/connection.go | 5 ++--
pulsar/internal/connection_pool.go | 5 ++--
pulsar/internal/connection_reader.go | 2 --
pulsar/internal/default_router.go | 5 ++--
pulsar/internal/default_router_test.go | 2 --
pulsar/internal/hash.go | 4 +--
pulsar/internal/hash_test.go | 2 --
pulsar/internal/lookup_service.go | 7 +++--
pulsar/internal/lookup_service_test.go | 2 --
pulsar/internal/rpc_client.go | 2 --
pulsar/internal/topic_name.go | 4 +--
pulsar/internal/topic_name_test.go | 2 --
pulsar/internal/utils.go | 5 ++--
pulsar/message.go | 32 +++++++++++-----------
pulsar/producer.go | 49 +++++++++++++++++-----------------
pulsar/producer_test.go | 2 --
pulsar/reader.go | 25 +++++++++--------
pulsar/test_helper.go | 2 --
util/blocking_queue.go | 19 ++++++-------
util/blocking_queue_test.go | 2 --
util/semaphore.go | 17 ++++++++++--
52 files changed, 184 insertions(+), 189 deletions(-)
diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
index e09e875..a5f98f1 100644
--- a/perf/perf-consumer.go
+++ b/perf/perf-consumer.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package main
@@ -31,6 +29,7 @@ import (
log "github.com/sirupsen/logrus"
)
+// ConsumeArgs define the parameters required by consume
type ConsumeArgs struct {
Topic string
SubscriptionName string
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index f656638..2728d6e 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package main
@@ -32,6 +30,7 @@ import (
log "github.com/sirupsen/logrus"
)
+// ProduceArgs define the parameters required by produce
type ProduceArgs struct {
Topic string
Rate int
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
index 858d9f6..06ac059 100644
--- a/perf/pulsar-perf-go.go
+++ b/perf/pulsar-perf-go.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package main
diff --git a/pkg/auth/disabled.go b/pkg/auth/disabled.go
index 346dc78..e23bb35 100644
--- a/pkg/auth/disabled.go
+++ b/pkg/auth/disabled.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package auth
@@ -23,6 +21,7 @@ import "crypto/tls"
type disabled struct{}
+// NewAuthDisabled return a interface of Provider
func NewAuthDisabled() Provider {
return &disabled{}
}
diff --git a/pkg/auth/provider.go b/pkg/auth/provider.go
index fda78a3..8868df5 100644
--- a/pkg/auth/provider.go
+++ b/pkg/auth/provider.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package auth
@@ -27,20 +25,27 @@ import (
"github.com/pkg/errors"
)
+// Provider is a interface of authentication providers.
type Provider interface {
+ // Init the authentication provider.
Init() error
+ // Name func returns the identifier for this authentication method.
Name() string
// return a client certificate chain, or nil if the data are not available
GetTLSCertificate() (*tls.Certificate, error)
- //
+ // GetData returns the authentication data identifying this client that will be sent to the broker.
GetData() ([]byte, error)
io.Closer
}
+// NewProvider get/create an authentication data provider which provides the data
+// that this client will be sent to the broker.
+// Some authentication method need to auth between each client channel. So it need
+// the broker, who it will talk to.
func NewProvider(name string, params string) (Provider, error) {
m := parseParams(params)
diff --git a/pkg/auth/tls.go b/pkg/auth/tls.go
index f31ecc0..f04b755 100644
--- a/pkg/auth/tls.go
+++ b/pkg/auth/tls.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package auth
@@ -26,6 +24,7 @@ type tlsAuthProvider struct {
privateKeyPath string
}
+// NewAuthenticationTLSWithParams initialize the authentication provider with map param.
func NewAuthenticationTLSWithParams(params map[string]string) Provider {
return NewAuthenticationTLS(
params["tlsCertFile"],
@@ -33,6 +32,7 @@ func NewAuthenticationTLSWithParams(params map[string]string) Provider {
)
}
+// NewAuthenticationTLS initialize the authentication provider
func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Provider {
return &tlsAuthProvider{
certificatePath: certificatePath,
diff --git a/pkg/auth/token.go b/pkg/auth/token.go
index 4e2b8a4..c89eeaf 100644
--- a/pkg/auth/token.go
+++ b/pkg/auth/token.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package auth
@@ -31,6 +29,7 @@ type tokenAuthProvider struct {
tokenSupplier func() (string, error)
}
+// NewAuthenticationTokenWithParams return a interface of Provider with string map.
func NewAuthenticationTokenWithParams(params map[string]string) (Provider, error) {
if params["token"] != "" {
return NewAuthenticationToken(params["token"]), nil
@@ -41,6 +40,7 @@ func NewAuthenticationTokenWithParams(params map[string]string) (Provider, error
}
}
+// NewAuthenticationToken return a interface of Provider with a string token.
func NewAuthenticationToken(token string) Provider {
return &tokenAuthProvider{
tokenSupplier: func() (string, error) {
@@ -52,6 +52,7 @@ func NewAuthenticationToken(token string) Provider {
}
}
+// NewAuthenticationTokenFromFile return a interface of a Provider with a string token file path.
func NewAuthenticationTokenFromFile(tokenFilePath string) Provider {
return &tokenAuthProvider{
tokenSupplier: func() (string, error) {
diff --git a/pkg/compression/compression.go b/pkg/compression/compression.go
index e9eac6d..ac52092 100644
--- a/pkg/compression/compression.go
+++ b/pkg/compression/compression.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,15 +14,22 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package compression
+// Provider is a interface of compression providers
type Provider interface {
+ // CanCompress checks if the compression method is available under the current version.
CanCompress() bool
+ // Compress a []byte, the param is a []byte with the uncompressed content.
+ // The reader/writer indexes will not be modified. The return is a []byte
+ // with the compressed content.
Compress(data []byte) []byte
+ // Decompress a []byte. The buffer needs to have been compressed with the matching Encoder.
+ // The compressedData is compressed content, originalSize is the size of the original content.
+ // The return were the result will be passed, if err is nil, the buffer was decompressed, no nil otherwise.
Decompress(compressedData []byte, originalSize int) ([]byte, error)
}
diff --git a/pkg/compression/compression_test.go b/pkg/compression/compression_test.go
index 8261d0c..98d4509 100644
--- a/pkg/compression/compression_test.go
+++ b/pkg/compression/compression_test.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package compression
diff --git a/pkg/compression/lz4.go b/pkg/compression/lz4.go
index 5489f8c..449c205 100644
--- a/pkg/compression/lz4.go
+++ b/pkg/compression/lz4.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package compression
@@ -26,6 +24,7 @@ import (
type lz4Provider struct {
}
+// NewLz4Provider return a interface of Provider.
func NewLz4Provider() Provider {
return &lz4Provider{}
}
diff --git a/pkg/compression/noop.go b/pkg/compression/noop.go
index 8dec533..0d64123 100644
--- a/pkg/compression/noop.go
+++ b/pkg/compression/noop.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,16 +14,17 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package compression
type noopProvider struct{}
+// NewNoopProvider returns a Provider interface
func NewNoopProvider() Provider {
return &noopProvider{}
}
+// CanCompress always returns true, in the case of noopProvider, noopProvider means no compression.
func (noopProvider) CanCompress() bool {
return true
}
diff --git a/pkg/compression/zlib.go b/pkg/compression/zlib.go
index 7e0d123..f1c1b91 100644
--- a/pkg/compression/zlib.go
+++ b/pkg/compression/zlib.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package compression
@@ -26,6 +24,7 @@ import (
type zlibProvider struct{}
+// NewZLibProvider returns a Provider interface
func NewZLibProvider() Provider {
return &zlibProvider{}
}
diff --git a/pkg/compression/zstd.go b/pkg/compression/zstd.go
index 3a3365c..529020c 100644
--- a/pkg/compression/zstd.go
+++ b/pkg/compression/zstd.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
// +build cgo
@@ -28,6 +26,7 @@ import (
zstd "github.com/valyala/gozstd"
)
+// NewZStdProvider returns a Provider interface.
func NewZStdProvider() Provider {
return newCGoZStdProvider()
}
diff --git a/pkg/compression/zstd_native.go b/pkg/compression/zstd_native.go
index 9cf1124..fa3649e 100644
--- a/pkg/compression/zstd_native.go
+++ b/pkg/compression/zstd_native.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
// +build !cgo
diff --git a/pulsar/client.go b/pulsar/client.go
index 9147939..e69a9b2 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package pulsar
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index b32e5fc..d45253d 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package pulsar
@@ -30,18 +28,18 @@ type ConsumerMessage struct {
Message
}
-// Types of subscription supported by Pulsar
+// SubscriptionType of subscription supported by Pulsar
type SubscriptionType int
const (
- // There can be only 1 consumer on the same topic with the same subscription name
+ // Exclusive there can be only 1 consumer on the same topic with the same subscription name
Exclusive SubscriptionType = iota
- // Multiple consumer will be able to use the same subscription name and the messages will be dispatched according to
+ // Shared subscription mode, multiple consumer will be able to use the same subscription name and the messages will be dispatched according to
// a round-robin rotation between the connected consumers
Shared
- // Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.
+ // Failover subscription mode, multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.
// If that consumer disconnects, one of the other connected consumers will start receiving messages.
Failover
)
@@ -56,7 +54,7 @@ const (
Earliest
)
-// ConsumerBuilder is used to configure and create instances of Consumer
+// ConsumerOptions is used to configure and create instances of Consumer
type ConsumerOptions struct {
// Specify the topic this consumer will subscribe on.
// Either a topic, a list of topics or a topics pattern are required when subscribing
@@ -122,28 +120,28 @@ type ConsumerOptions struct {
ReadCompacted bool
}
-// An interface that abstracts behavior of Pulsar's consumer
+// Consumer is an interface that abstracts behavior of Pulsar's consumer
type Consumer interface {
- // Get the topic for the consumer
+ // Topic get the topic for the consumer
Topic() string
- // Get a subscription for the consumer
+ // Subscription get a subscription for the consumer
Subscription() string
// Unsubscribe the consumer
Unsubscribe() error
- // Receives a single message.
+ // Receive a single message.
// This calls blocks until a message is available.
Receive(context.Context) (Message, error)
// Ack the consumption of a single message
Ack(Message) error
- // Ack the consumption of a single message, identified by its MessageID
+ // AckID the consumption of a single message, identified by its MessageID
AckID(MessageID) error
- // Ack the reception of all the messages in the stream up to (and including) the provided message.
+ // AckCumulative the reception of all the messages in the stream up to (and including) the provided message.
// This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
// re-delivered to this consumer.
//
@@ -152,26 +150,23 @@ type Consumer interface {
// It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered.
AckCumulative(Message) error
- // Ack the reception of all the messages in the stream up to (and including) the provided message.
+ // AckCumulativeID the reception of all the messages in the stream up to (and including) the provided message.
// This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
// re-delivered to this consumer.
- //
// Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
- //
// It's equivalent to calling asyncAcknowledgeCumulative(MessageID) and waiting for the callback to be triggered.
AckCumulativeID(MessageID) error
// Close the consumer and stop the broker to push more messages
Close() error
- // Reset the subscription associated with this consumer to a specific message id.
+ // Seek reset the subscription associated with this consumer to a specific message id.
// The message id can either be a specific message or represent the first or last messages in the topic.
- //
// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
// seek() on the individual partitions.
Seek(msgID MessageID) error
- // Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
+ // RedeliverUnackedMessages redeliver all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
// active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
// the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
// breaks, the messages are redelivered after reconnect.
diff --git a/pulsar/error.go b/pulsar/error.go
index 38b9d64..8b73a95 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,22 +14,29 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package pulsar
import "fmt"
+// Result used to represent pulsar processing is an alias of type int.
type Result int
const (
- ResultOk = iota // No errors
- ResultUnknownError // Unknown error happened on broker
- ResultInvalidConfiguration // Invalid configuration
- ResultTimeoutError // Operation timed out
- ResultLookupError // Broker lookup failed
- ResultInvalidTopicName // Invalid topic name
- ResultConnectError // Failed to connect to broker
+ // ResultOk means no errors
+ ResultOk = iota
+ // ResultUnknownError means unknown error happened on broker
+ ResultUnknownError
+ // ResultInvalidConfiguration means invalid configuration
+ ResultInvalidConfiguration
+ // ResultTimeoutError means operation timed out
+ ResultTimeoutError
+ // ResultLookupError means broker lookup failed
+ ResultLookupError
+ // ResultInvalidTopicName means invalid topic name
+ ResultInvalidTopicName
+ // ResultConnectError means failed to connect to broker
+ ResultConnectError
//ReadError Result = 6 // Failed to read from socket
//AuthenticationError Result = 7 // Authentication failed on broker
@@ -61,6 +67,7 @@ const (
//CryptoError Result = 33 // Error when crypto operation fails
)
+// Error implement error interface, composed of two parts: msg and result.
type Error struct {
msg string
result Result
diff --git a/pulsar/impl_client.go b/pulsar/impl_client.go
index a03a41c..b73080b 100644
--- a/pulsar/impl_client.go
+++ b/pulsar/impl_client.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package pulsar
diff --git a/pulsar/impl_client_test.go b/pulsar/impl_client_test.go
index 63fa6ba..ec3110a 100644
--- a/pulsar/impl_client_test.go
+++ b/pulsar/impl_client_test.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package pulsar
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 710913e..a7b1207 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package pulsar
diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go
index bfbfde4..bd48495 100644
--- a/pulsar/impl_message_test.go
+++ b/pulsar/impl_message_test.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package pulsar
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index bd112da..ce8a2a9 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package pulsar
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index 3999ff1..497a18c 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package pulsar
diff --git a/pulsar/internal/backoff.go b/pulsar/internal/backoff.go
index 94a185e..c558f58 100644
--- a/pulsar/internal/backoff.go
+++ b/pulsar/internal/backoff.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
@@ -23,6 +21,7 @@ import (
"time"
)
+// Backoff
type Backoff struct {
backoff time.Duration
}
@@ -32,6 +31,7 @@ const (
maxBackoff = 60 * time.Second
)
+// Next
func (b *Backoff) Next() time.Duration {
// Double the delay each time
b.backoff += b.backoff
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index b44989f..53035ed 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
@@ -31,11 +29,17 @@ import (
)
const (
- MaxMessageSize = 5 * 1024 * 1024
- MaxBatchSize = 128 * 1024
+ // MaxMessageSize limit message size for transfer
+ MaxMessageSize = 5 * 1024 * 1024
+ // MaxBatchSize will be the largest size for a batch sent from this particular producer.
+ // This is used as a baseline to allocate a new buffer that can hold the entire batch
+ // without needing costly re-allocations.
+ MaxBatchSize = 128 * 1024
+ // DefaultMaxMessagesPerBatch init default num of entries in per batch.
DefaultMaxMessagesPerBatch = 1000
)
+// BatchBuilder wraps the objects needed to build a batch.
type BatchBuilder struct {
buffer Buffer
@@ -55,6 +59,7 @@ type BatchBuilder struct {
compressionProvider compression.Provider
}
+// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64,
compressionType pb.CompressionType) (*BatchBuilder, error) {
if maxMessages == 0 {
@@ -88,6 +93,7 @@ func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64,
return bb, nil
}
+// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
func (bb *BatchBuilder) IsFull() bool {
return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > MaxBatchSize
}
@@ -97,6 +103,7 @@ func (bb *BatchBuilder) hasSpace(payload []byte) bool {
return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > MaxBatchSize
}
+// Add will add single message to batch.
func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceID uint64, payload []byte,
callback interface{}, replicateTo []string) bool {
if replicateTo != nil && bb.numMessages != 0 {
@@ -135,6 +142,7 @@ func (bb *BatchBuilder) reset() {
bb.msgMetadata.ReplicateTo = nil
}
+// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks []interface{}) {
log.Debug("BatchBuilder flush: messages: ", bb.numMessages)
if bb.numMessages == 0 {
diff --git a/pulsar/internal/buffer.go b/pulsar/internal/buffer.go
index cadf041..98d7116 100644
--- a/pulsar/internal/buffer.go
+++ b/pulsar/internal/buffer.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
@@ -23,11 +21,15 @@ import (
"encoding/binary"
)
+// Buffer is a variable-sized buffer of bytes with Read and Write methods.
+// The zero value for Buffer is an empty buffer ready to use.
type Buffer interface {
ReadableBytes() uint32
WritableBytes() uint32
+ // Capacity returns the capacity of the buffer's underlying byte slice,
+ // that is, the total space allocated for the buffer's data.
Capacity() uint32
IsWritable() bool
@@ -40,10 +42,10 @@ type Buffer interface {
WritableSlice() []byte
- // Advance the writer index when data was written in a slice
+ // WrittenBytes advance the writer index when data was written in a slice
WrittenBytes(size uint32)
- // Copy the available portion of data at the beginning of the buffer
+ // MoveToFront copy the available portion of data at the beginning of the buffer
MoveToFront()
ReadUint16() uint16
@@ -62,6 +64,7 @@ type Buffer interface {
Resize(newSize uint32)
+ // Clear will clear the current buffer data.
Clear()
}
@@ -72,6 +75,7 @@ type buffer struct {
writerIdx uint32
}
+// NewBuffer creates and initializes a new Buffer using buf as its initial contents.
func NewBuffer(size int) Buffer {
return &buffer{
data: make([]byte, size),
diff --git a/pulsar/internal/buffer_test.go b/pulsar/internal/buffer_test.go
index 72a9ee8..8edc60c 100644
--- a/pulsar/internal/buffer_test.go
+++ b/pulsar/internal/buffer_test.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
diff --git a/pulsar/internal/checksum.go b/pulsar/internal/checksum.go
index 9716724..11d8d4f 100644
--- a/pulsar/internal/checksum.go
+++ b/pulsar/internal/checksum.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,14 +14,16 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
import "hash/crc32"
+// crc32cTable holds the precomputed crc32 hash table
+// used by Pulsar (crc32c)
var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
+// Crc32cCheckSum handles computing the checksum.
func Crc32cCheckSum(data []byte) uint32 {
return crc32.Checksum(data, crc32cTable)
}
diff --git a/pulsar/internal/closable.go b/pulsar/internal/closable.go
index 77801a4..7906c89 100644
--- a/pulsar/internal/closable.go
+++ b/pulsar/internal/closable.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index d528eb4..05ba409 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
@@ -26,6 +24,7 @@ import (
log "github.com/sirupsen/logrus"
)
+// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
const MaxFrameSize = 5 * 1024 * 1024
const magicCrc32c uint16 = 0x0e01
@@ -115,6 +114,7 @@ func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageM
wb.PutUint32(checksum, checksumIdx)
}
+// ConvertFromStringMap convert a string map to a KeyValue []byte
func ConvertFromStringMap(m map[string]string) []*pb.KeyValue {
list := make([]*pb.KeyValue, len(m))
@@ -131,6 +131,7 @@ func ConvertFromStringMap(m map[string]string) []*pb.KeyValue {
return list
}
+// ConvertToStringMap convert a KeyValue []byte to string map
func ConvertToStringMap(pbb []*pb.KeyValue) map[string]string {
m := make(map[string]string)
diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go
index 3ea96a7..3ae35c9 100644
--- a/pulsar/internal/commands_test.go
+++ b/pulsar/internal/commands_test.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 8baef2a..7d5ee74 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
@@ -47,11 +45,14 @@ type TLSOptions struct {
// a consumer) that can register itself to get notified
// when the connection is closed.
type ConnectionListener interface {
+ // ReceivedSendReceipt receive and process the return value of the send command.
ReceivedSendReceipt(response *pb.CommandSendReceipt)
+ // ConnectionClosed close the TCP connection.
ConnectionClosed()
}
+// Connection is a interface of client cnx.
type Connection interface {
SendRequest(requestID uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand))
WriteData(data []byte)
diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go
index b140ee7..4802e23 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
@@ -28,7 +26,9 @@ import (
log "github.com/sirupsen/logrus"
)
+// ConnectionPool is a interface of connection pool.
type ConnectionPool interface {
+ // GetConnection get a connection from ConnectionPool.
GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error)
// Close all the connections in the pool
@@ -41,6 +41,7 @@ type connectionPool struct {
auth auth.Provider
}
+// NewConnectionPool init connection pool.
func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider) ConnectionPool {
return &connectionPool{
tlsOptions: tlsOptions,
diff --git a/pulsar/internal/connection_reader.go b/pulsar/internal/connection_reader.go
index ad09c2c..746db5c 100644
--- a/pulsar/internal/connection_reader.go
+++ b/pulsar/internal/connection_reader.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
diff --git a/pulsar/internal/default_router.go b/pulsar/internal/default_router.go
index cb391b9..6626351 100644
--- a/pulsar/internal/default_router.go
+++ b/pulsar/internal/default_router.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
@@ -33,12 +31,15 @@ type defaultRouter struct {
type Clock func() uint64
+// NewSystemClock init system clock and return current time.
func NewSystemClock() Clock {
return func() uint64 {
return uint64(time.Now().UnixNano())
}
}
+// NewDefaultRouter set the message routing mode for the partitioned producer.
+// Default routing mode is round-robin routing.
func NewDefaultRouter(clock Clock, hashFunc func(string) uint32, maxBatchingDelay time.Duration) func(string, uint32) int {
state := &defaultRouter{
clock: clock,
diff --git a/pulsar/internal/default_router_test.go b/pulsar/internal/default_router_test.go
index 0c389f2..7dc3e0b 100644
--- a/pulsar/internal/default_router_test.go
+++ b/pulsar/internal/default_router_test.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
diff --git a/pulsar/internal/hash.go b/pulsar/internal/hash.go
index 5555888..0ac8590 100644
--- a/pulsar/internal/hash.go
+++ b/pulsar/internal/hash.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,12 +14,12 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
import "github.com/spaolacci/murmur3"
+// JavaStringHash and Java String.hashCode() equivalent
func JavaStringHash(s string) uint32 {
var h uint32
for i, size := 0, len(s); i < size; i++ {
@@ -30,6 +29,7 @@ func JavaStringHash(s string) uint32 {
return h
}
+// Murmur3_32Hash use Murmur3 hashing function
func Murmur3_32Hash(s string) uint32 {
h := murmur3.New32()
_, err := h.Write([]byte(s))
diff --git a/pulsar/internal/hash_test.go b/pulsar/internal/hash_test.go
index 7a601f6..2f9bc19 100644
--- a/pulsar/internal/hash_test.go
+++ b/pulsar/internal/hash_test.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go
index e257591..d5da829 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
@@ -30,12 +28,16 @@ import (
log "github.com/sirupsen/logrus"
)
+// LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr.
type LookupResult struct {
LogicalAddr *url.URL
PhysicalAddr *url.URL
}
+// LookupService is a interface of lookup service.
type LookupService interface {
+ // Lookup perform a lookup for the given topic, confirm the location of the broker
+ // where the topic is located, and return the LookupResult.
Lookup(topic string) (*LookupResult, error)
}
@@ -44,6 +46,7 @@ type lookupService struct {
serviceURL *url.URL
}
+// NewLookupService init a lookup service struct and return an object of LookupService.
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL) LookupService {
return &lookupService{
rpcClient: rpcClient,
diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go
index e1fae0c..8baddf3 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 6b9d338..8234dc8 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
diff --git a/pulsar/internal/topic_name.go b/pulsar/internal/topic_name.go
index 9b912d0..1cefeb7 100644
--- a/pulsar/internal/topic_name.go
+++ b/pulsar/internal/topic_name.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
@@ -26,6 +24,7 @@ import (
"strings"
)
+// TopicName abstract a struct contained in a Topic
type TopicName struct {
Name string
Namespace string
@@ -38,6 +37,7 @@ const (
partitionedTopicSuffix = "-partition-"
)
+// ParseTopicName parse the given topic name and return TopicName.
func ParseTopicName(topic string) (*TopicName, error) {
// The topic name can be in two different forms, one is fully qualified topic name,
// the other one is short topic name
diff --git a/pulsar/internal/topic_name_test.go b/pulsar/internal/topic_name_test.go
index 93e7fd9..bd57334 100644
--- a/pulsar/internal/topic_name_test.go
+++ b/pulsar/internal/topic_name_test.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
diff --git a/pulsar/internal/utils.go b/pulsar/internal/utils.go
index 30837d6..f0d5fb3 100644
--- a/pulsar/internal/utils.go
+++ b/pulsar/internal/utils.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package internal
@@ -24,11 +22,12 @@ import (
"time"
)
+// TimestampMillis return a time unix nano.
func TimestampMillis(t time.Time) uint64 {
return uint64(t.UnixNano()) / uint64(time.Millisecond)
}
-// Perform atomic read and update
+// GetAndAdd perform atomic read and update
func GetAndAdd(n *uint64, diff uint64) uint64 {
for {
v := *n
diff --git a/pulsar/message.go b/pulsar/message.go
index 2d03a31..1862c18 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,66 +14,67 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package pulsar
import "time"
+// ProducerMessage abstraction used in Pulsar producer
type ProducerMessage struct {
// Payload for the message
Payload []byte
- // Sets the key of the message for routing policy
+ // Key sets the key of the message for routing policy
Key string
- // Attach application defined properties on the message
+ // Properties attach application defined properties on the message
Properties map[string]string
- // Set the event time for a given message
+ // EventTime set the event time for a given message
EventTime *time.Time
- // Override the replication clusters for this message.
+ // ReplicationClusters override the replication clusters for this message.
ReplicationClusters []string
- // Set the sequence id to assign to the current message
+ // SequenceID set the sequence id to assign to the current message
SequenceID *int64
}
+// Message abstraction used in Pulsar
type Message interface {
- // Get the topic from which this message originated from
+ // Topic get the topic from which this message originated from
Topic() string
+ // Properties are application defined key/value pairs that will be attached to the message.
// Return the properties attached to the message.
- // Properties are application defined key/value pairs that will be attached to the message
Properties() map[string]string
- // Get the payload of the message
+ // Payload get the payload of the message
Payload() []byte
- // Get the unique message ID associated with this message.
+ // ID get the unique message ID associated with this message.
// The message id can be used to univocally refer to a message without having the keep the entire payload in memory.
ID() MessageID
- // Get the publish time of this message. The publish time is the timestamp that a client publish the message.
+ // PublishTime get the publish time of this message. The publish time is the timestamp that a client publish the message.
PublishTime() time.Time
- // Get the event time associated with this message. It is typically set by the applications via
+ // EventTime get the event time associated with this message. It is typically set by the applications via
// `ProducerMessage.EventTime`.
// If there isn't any event time associated with this event, it will be nil.
EventTime() *time.Time
- // Get the key of the message, if any
+ // Key get the key of the message, if any
Key() string
}
-// Identifier for a particular message
+// MessageID identifier for a particular message
type MessageID interface {
// Serialize the message id into a sequence of bytes that can be stored somewhere else
Serialize() []byte
}
-// Reconstruct a MessageID object from its serialized representation
+// DeserializeMessageID reconstruct a MessageID object from its serialized representation
func DeserializeMessageID(data []byte) (MessageID, error) {
return deserializeMessageID(data)
}
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 2a248cb..c77a047 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package pulsar
@@ -27,8 +25,10 @@ import (
type HashingScheme int
const (
- JavaStringHash HashingScheme = iota // Java String.hashCode() equivalent
- Murmur3_32Hash // Use Murmur3 hashing function
+ // JavaStringHash and Java String.hashCode() equivalent
+ JavaStringHash HashingScheme = iota
+ // Murmur3_32Hash use Murmur3 hashing function
+ Murmur3_32Hash
)
type CompressionType int
@@ -40,17 +40,18 @@ const (
ZSTD
)
+// TopicMetadata is a interface of topic metadata
type TopicMetadata interface {
- // Get the number of partitions for the specific topic
+ // NumPartitions get the number of partitions for the specific topic
NumPartitions() uint32
}
type ProducerOptions struct {
- // Specify the topic this producer will be publishing on.
+ // Topic specify the topic this producer will be publishing on.
// This argument is required when constructing the producer.
Topic string
- // Specify a name for the producer
+ // Name specify a name for the producer
// If not assigned, the system will generate a globally unique name which can be access with
// Producer.ProducerName().
// When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique
@@ -58,32 +59,32 @@ type ProducerOptions struct {
// a topic.
Name string
- // Attach a set of application defined properties to the producer
+ // Properties attach a set of application defined properties to the producer
// This properties will be visible in the topic stats
Properties map[string]string
- // Set the send timeout (default: 30 seconds)
+ // SendTimeout set the send timeout (default: 30 seconds)
// If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
// Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message
// deduplication feature.
SendTimeout time.Duration
- // Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
+ // MaxPendingMessages set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
// When the queue is full, by default, all calls to Producer.send() and Producer.sendAsync() will fail
// unless `BlockIfQueueFull` is set to true. Use BlockIfQueueFull(boolean) to change the blocking behavior.
MaxPendingMessages int
- // Set the number of max pending messages across all the partitions
+ // MaxPendingMessagesAcrossPartitions set the number of max pending messages across all the partitions
// This setting will be used to lower the max pending messages for each partition
// `MaxPendingMessages(int)`, if the total exceeds the configured value.
MaxPendingMessagesAcrossPartitions int
- // Set whether the `Producer.Send()` and `Producer.sendAsync()` operations should block when the outgoing
+ // BlockIfQueueFull set whether the `Producer.Send()` and `Producer.sendAsync()` operations should block when the outgoing
// message queue is full. Default is `false`. If set to `false`, send operations will immediately fail with
// `ProducerQueueIsFullError` when there is no space left in pending queue.
BlockIfQueueFull bool
- // Change the `HashingScheme` used to chose the partition on where to publish a particular message.
+ // HashingScheme change the `HashingScheme` used to chose the partition on where to publish a particular message.
// Standard hashing functions available are:
//
// - `JavaStringHash` : Java String.hashCode() equivalent
@@ -93,7 +94,7 @@ type ProducerOptions struct {
// Default is `JavaStringHash`.
HashingScheme
- // Set the compression type for the producer.
+ // CompressionType set the compression type for the producer.
// By default, message payloads are not compressed. Supported compression types are:
// - LZ4
// - ZLIB
@@ -103,38 +104,36 @@ type ProducerOptions struct {
// release in order to be able to receive messages compressed with ZSTD.
CompressionType
- // Set a custom message routing policy by passing an implementation of MessageRouter
+ // MessageRouter set a custom message routing policy by passing an implementation of MessageRouter
// The router is a function that given a particular message and the topic metadata, returns the
// partition index where the message should be routed to
MessageRouter func(Message, TopicMetadata) int
- // Control whether automatic batching of messages is enabled for the producer. By default batching
+ // DisableBatching control whether automatic batching of messages is enabled for the producer. By default batching
// is enabled.
- //
// When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
// broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
// messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or
// contents.
- //
// When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
// Setting `DisableBatching: true` will make the producer to send messages individually
DisableBatching bool
- // Set the time period within which the messages sent will be batched (default: 10ms) if batch messages are
+ // BatchingMaxPublishDelay set the time period within which the messages sent will be batched (default: 10ms) if batch messages are
// enabled. If set to a non zero value, messages will be queued until this time interval or until
BatchingMaxPublishDelay time.Duration
- // Set the maximum number of messages permitted in a batch. (default: 1000) If set to a value greater than 1,
+ // BatchingMaxMessages set the maximum number of messages permitted in a batch. (default: 1000) If set to a value greater than 1,
// messages will be queued until this threshold is reached or batch interval has elapsed
BatchingMaxMessages uint
}
-// The producer is used to publish messages on a topic
+// Producer is used to publish messages on a topic
type Producer interface {
- // return the topic to which producer is publishing to
+ // Topic return the topic to which producer is publishing to
Topic() string
- // return the producer name which could have been assigned by the system or specified by the client
+ // Name return the producer name which could have been assigned by the system or specified by the client
Name() string
// Send a message
@@ -143,12 +142,12 @@ type Producer interface {
// producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload })
Send(context.Context, *ProducerMessage) error
- // Send a message in asynchronous mode
+ // SendAsync a message in asynchronous mode
// The callback will report back the message being published and
// the eventual error in publishing
SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error))
- // Get the last sequence id that was published by this producer.
+ // LastSequenceID get the last sequence id that was published by this producer.
// This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that
// was published and acknowledged by the broker.
// After recreating a producer with the same producer name, this will return the last message that was
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 5ef6d2c..0b17b37 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package pulsar
diff --git a/pulsar/reader.go b/pulsar/reader.go
index 5592630..3e088b4 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,26 +14,27 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package pulsar
import "context"
+// ReaderMessage package Reader and Message as a struct to use
type ReaderMessage struct {
Reader
Message
}
+// ReaderOptions abstraction Reader options to use.
type ReaderOptions struct {
- // Specify the topic this consumer will subscribe on.
+ // Topic specify the topic this consumer will subscribe on.
// This argument is required when constructing the reader.
Topic string
- // Set the reader name.
+ // Name set the reader name.
Name string
- // The initial reader positioning is done by specifying a message id. The options are:
+ // StartMessageID initial reader positioning is done by specifying a message id. The options are:
// * `pulsar.EarliestMessage` : Start reading from the earliest message available in the topic
// * `pulsar.LatestMessage` : Start reading from the end topic, only getting messages published after the
// reader was created
@@ -43,19 +43,18 @@ type ReaderOptions struct {
// messageID
StartMessageID MessageID
- // Sets a `MessageChannel` for the consumer
+ // MessageChannel sets a `MessageChannel` for the consumer
// When a message is received, it will be pushed to the channel for consumption
MessageChannel chan ReaderMessage
- // Sets the size of the consumer receive queue.
+ // ReceiverQueueSize sets the size of the consumer receive queue.
// The consumer receive queue controls how many messages can be accumulated by the Reader before the
// application calls Reader.readNext(). Using a higher value could potentially increase the consumer
// throughput at the expense of bigger memory utilization.
- //
// Default value is {@code 1000} messages and should be good for most use cases.
ReceiverQueueSize int
- // Set the subscription role prefix. The default prefix is "reader".
+ // SubscriptionRolePrefix set the subscription role prefix. The default prefix is "reader".
SubscriptionRolePrefix string
// If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog
@@ -68,15 +67,15 @@ type ReaderOptions struct {
ReadCompacted bool
}
-// A Reader can be used to scan through all the messages currently available in a topic.
+// Reader can be used to scan through all the messages currently available in a topic.
type Reader interface {
- // The topic from which this reader is reading from
+ // Topic from which this reader is reading from
Topic() string
- // Read the next message in the topic, blocking until a message is available
+ // Next read the next message in the topic, blocking until a message is available
Next(context.Context) (Message, error)
- // Check if there is any message available to read from the current position
+ // HasNext check if there is any message available to read from the current position
HasNext() (bool, error)
// Close the reader and stop the broker to push more messages
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index a12a989..c58c239 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package pulsar
diff --git a/util/blocking_queue.go b/util/blocking_queue.go
index 90b1b54..224baac 100644
--- a/util/blocking_queue.go
+++ b/util/blocking_queue.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package util
@@ -25,29 +23,31 @@ import (
log "github.com/sirupsen/logrus"
)
+// BlockingQueue is a interface of block queue
type BlockingQueue interface {
- // Enqueue one item, block if the queue is full
+ // Put enqueue one item, block if the queue is full
Put(item interface{})
- // Dequeue one item, block until it's available
+ // Take dequeue one item, block until it's available
Take() interface{}
- // Dequeue one item, return nil if queue is empty
+ // Poll dequeue one item, return nil if queue is empty
Poll() interface{}
- // Return the first item without dequeing, return nil if queue is empty
+ // Peek return the first item without dequeing, return nil if queue is empty
Peek() interface{}
- // Return last item in queue without dequeing, return nil if queue is empty
+ // PeekLast return last item in queue without dequeing, return nil if queue is empty
PeekLast() interface{}
- // Return the current size of the queue
+ // Size return the current size of the queue
Size() int
- // Return an iterator for the queue
+ // Iterator return an iterator for the queue
Iterator() BlockingQueueIterator
}
+// BlockingQueueIterator abstract a interface of block queue iterator.
type BlockingQueueIterator interface {
HasNext() bool
Next() interface{}
@@ -71,6 +71,7 @@ type blockingQueueIterator struct {
toRead int
}
+// NewBlockingQueue init block queue and returns a BlockingQueue
func NewBlockingQueue(maxSize int) BlockingQueue {
bq := &blockingQueue{
items: make([]interface{}, maxSize),
diff --git a/util/blocking_queue_test.go b/util/blocking_queue_test.go
index f8cb3fa..f6cbaa0 100644
--- a/util/blocking_queue_test.go
+++ b/util/blocking_queue_test.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package util
diff --git a/util/semaphore.go b/util/semaphore.go
index 45fa7f4..74e4d16 100644
--- a/util/semaphore.go
+++ b/util/semaphore.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -15,16 +14,30 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
package util
+// Semaphore is a channel of bool, used to receive a bool type semaphore.
type Semaphore chan bool
+// Acquire a permit from this semaphore, blocking until one is available.
+
+// Acquire a permit, if one is available and returns immediately,
+// reducing the number of available permits by one.
func (s Semaphore) Acquire() {
s <- true
}
+// Release a permit, returning it to the semaphore.
+
+// Release a permit, increasing the number of available permits by
+// one. If any threads are trying to acquire a permit, then one is
+// selected and given the permit that was just released. That thread
+// is (re)enabled for thread scheduling purposes.
+// There is no requirement that a thread that releases a permit must
+// have acquired that permit by calling Acquire().
+// Correct usage of a semaphore is established by programming convention
+// in the application.
func (s Semaphore) Release() {
<-s
}