You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:40 UTC
[pulsar-client-go] 03/38: Use string for result errors
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 0c2da14b1242290507109a1be7f5b1149aea26a5
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Mar 28 09:21:42 2019 -0700
Use string for result errors
---
pulsar/client.go | 2 +-
pulsar/error.go | 94 ++++++++++++++++++++++++---------------
pulsar/impl_client.go | 21 +++++----
pulsar/impl_client_test.go | 14 ++++++
pulsar/impl_partition_producer.go | 42 ++++++++---------
pulsar/impl_producer.go | 8 ++--
pulsar/message.go | 8 ++--
7 files changed, 117 insertions(+), 72 deletions(-)
diff --git a/pulsar/client.go b/pulsar/client.go
index 21db637..e68f09c 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -28,7 +28,7 @@ func NewClient(options ClientOptions) (Client, error) {
}
// Opaque interface that represents the authentication credentials
-type Authentication interface {}
+type Authentication interface{}
// Create new Authentication provider with specified auth token
func NewAuthenticationToken(token string) Authentication {
diff --git a/pulsar/error.go b/pulsar/error.go
index 929b250..24e1536 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -18,45 +18,48 @@
//
package pulsar
+
import "C"
import "fmt"
type Result int
const (
- UnknownError Result = 1 // Unknown error happened on broker
- InvalidConfiguration Result = 2 // Invalid configuration
- TimeoutError Result = 3 // Operation timed out
- LookupError Result = 4 // Broker lookup failed
- ConnectError Result = 5 // Failed to connect to broker
- ReadError Result = 6 // Failed to read from socket
- AuthenticationError Result = 7 // Authentication failed on broker
- AuthorizationError Result = 8 // Client is not authorized to create producer/consumer
- ErrorGettingAuthenticationData Result = 9 // Client cannot find authorization data
- BrokerMetadataError Result = 10 // Broker failed in updating metadata
- BrokerPersistenceError Result = 11 // Broker failed to persist entry
- ChecksumError Result = 12 // Corrupt message checksum failure
- ConsumerBusy Result = 13 // Exclusive consumer is already connected
- NotConnectedError Result = 14 // Producer/Consumer is not currently connected to broker
- AlreadyClosedError Result = 15 // Producer/Consumer is already closed and not accepting any operation
- InvalidMessage Result = 16 // Error in publishing an already used message
- ConsumerNotInitialized Result = 17 // Consumer is not initialized
- ProducerNotInitialized Result = 18 // Producer is not initialized
- TooManyLookupRequestException Result = 19 // Too Many concurrent LookupRequest
- InvalidTopicName Result = 20 // Invalid topic name
- InvalidUrl Result = 21 // Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)
- ServiceUnitNotReady Result = 22 // Service Unit unloaded between client did lookup and producer/consumer got created
- OperationNotSupported Result = 23
- ProducerBlockedQuotaExceededError Result = 24 // Producer is blocked
- ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception
- ProducerQueueIsFull Result = 26 // Producer queue is full
- MessageTooBig Result = 27 // Trying to send a messages exceeding the max size
- TopicNotFound Result = 28 // Topic not found
- SubscriptionNotFound Result = 29 // Subscription not found
- ConsumerNotFound Result = 30 // Consumer not found
- UnsupportedVersionError Result = 31 // Error when an older client/version doesn't support a required feature
- TopicTerminated Result = 32 // Topic was already terminated
- CryptoError Result = 33 // Error when crypto operation fails
+ ResultOk = iota // No errors
+ ResultUnknownError // Unknown error happened on broker
+ ResultInvalidConfiguration // Invalid configuration
+ ResultTimeoutError // Operation timed out
+ ResultLookupError // Broker lookup failed
+ ResultInvalidTopicName // Invalid topic name
+ ResultConnectError // Failed to connect to broker
+
+ //ReadError Result = 6 // Failed to read from socket
+ //AuthenticationError Result = 7 // Authentication failed on broker
+ //AuthorizationError Result = 8 // Client is not authorized to create producer/consumer
+ //ErrorGettingAuthenticationData Result = 9 // Client cannot find authorization data
+ //BrokerMetadataError Result = 10 // Broker failed in updating metadata
+ //BrokerPersistenceError Result = 11 // Broker failed to persist entry
+ //ChecksumError Result = 12 // Corrupt message checksum failure
+ //ConsumerBusy Result = 13 // Exclusive consumer is already connected
+ //NotConnectedError Result = 14 // Producer/Consumer is not currently connected to broker
+ //AlreadyClosedError Result = 15 // Producer/Consumer is already closed and not accepting any operation
+ //InvalidMessage Result = 16 // Error in publishing an already used message
+ //ConsumerNotInitialized Result = 17 // Consumer is not initialized
+ //ProducerNotInitialized Result = 18 // Producer is not initialized
+ //TooManyLookupRequestException Result = 19 // Too Many concurrent LookupRequest
+ //InvalidUrl Result = 21 // Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)
+ //ServiceUnitNotReady Result = 22 // Service Unit unloaded between client did lookup and producer/consumer got created
+ //OperationNotSupported Result = 23
+ //ProducerBlockedQuotaExceededError Result = 24 // Producer is blocked
+ //ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception
+ //ProducerQueueIsFull Result = 26 // Producer queue is full
+ //MessageTooBig Result = 27 // Trying to send a messages exceeding the max size
+ //TopicNotFound Result = 28 // Topic not found
+ //SubscriptionNotFound Result = 29 // Subscription not found
+ //ConsumerNotFound Result = 30 // Consumer not found
+ //UnsupportedVersionError Result = 31 // Error when an older client/version doesn't support a required feature
+ //TopicTerminated Result = 32 // Topic was already terminated
+ //CryptoError Result = 33 // Error when crypto operation fails
)
type Error struct {
@@ -74,7 +77,28 @@ func (e *Error) Error() string {
func newError(result Result, msg string) error {
return &Error{
- msg: fmt.Sprintf("%s: %d", msg, result),
+ msg: fmt.Sprintf("%s: %s", msg, getResultStr(result)),
result: result,
}
-}
\ No newline at end of file
+}
+
+func getResultStr(r Result) string {
+ switch r {
+ case ResultOk:
+ return "OK"
+ case ResultUnknownError:
+ return "Unknown error"
+ case ResultInvalidConfiguration:
+ return "InvalidConfiguration"
+ case ResultTimeoutError:
+ return "TimeoutError"
+ case ResultLookupError:
+ return "LookupError"
+ case ResultInvalidTopicName:
+ return "InvalidTopicName"
+ case ResultConnectError:
+ return "ConnectError"
+ default:
+ return fmt.Sprintf("Result(%d)", r)
+ }
+}
diff --git a/pulsar/impl_client.go b/pulsar/impl_client.go
index e143601..4a7047f 100644
--- a/pulsar/impl_client.go
+++ b/pulsar/impl_client.go
@@ -20,17 +20,17 @@ type client struct {
func newClient(options ClientOptions) (Client, error) {
if options.URL == "" {
- return nil, newError(InvalidConfiguration, "URL is required for client")
+ return nil, newError(ResultInvalidConfiguration, "URL is required for client")
}
url, err := url.Parse(options.URL)
if err != nil {
log.WithError(err).Error("Failed to parse service URL")
- return nil, newError(InvalidConfiguration, "Invalid service URL")
+ return nil, newError(ResultInvalidConfiguration, "Invalid service URL")
}
if url.Scheme != "pulsar" {
- return nil, newError(InvalidConfiguration, fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
+ return nil, newError(ResultInvalidConfiguration, fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
}
c := &client{
@@ -77,14 +77,19 @@ func (client *client) TopicPartitions(topic string) ([]string, error) {
r := res.Response.PartitionMetadataResponse
if r.Error != nil {
- return nil, newError(LookupError, r.GetError().String())
+ return nil, newError(ResultLookupError, r.GetError().String())
}
- partitions := make([]string, r.GetPartitions())
- for i := 0; i < int(r.GetPartitions()); i++ {
- partitions[i] = fmt.Sprintf("%s-partition-%d", topic, i)
+ if r.GetPartitions() > 0 {
+ partitions := make([]string, r.GetPartitions())
+ for i := 0; i < int(r.GetPartitions()); i++ {
+ partitions[i] = fmt.Sprintf("%s-partition-%d", topic, i)
+ }
+ return partitions, nil
+ } else {
+ // Non-partitioned topic
+ return []string{topicName.Name}, nil
}
- return partitions, nil
}
func (client *client) Close() error {
diff --git a/pulsar/impl_client_test.go b/pulsar/impl_client_test.go
new file mode 100644
index 0000000..99f0c78
--- /dev/null
+++ b/pulsar/impl_client_test.go
@@ -0,0 +1,14 @@
+package pulsar
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestClient(t *testing.T) {
+ client, err := NewClient(ClientOptions{})
+ assert.Nil(t, client)
+ assert.NotNil(t, err)
+ assert.Equal(t, Result(ResultInvalidConfiguration), err.(*Error).Result())
+}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index bb96a27..d035ad1 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -3,8 +3,8 @@ package pulsar
import (
"context"
log "github.com/sirupsen/logrus"
- "pulsar-client-go-native/pulsar/impl"
- pb "pulsar-client-go-native/pulsar/pulsar_proto"
+ //"pulsar-client-go-native/pulsar/impl"
+ //pb "pulsar-client-go-native/pulsar/pulsar_proto"
"sync"
)
@@ -33,25 +33,25 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
}
func (p *partitionProducer) grabCnx() error {
- lr, err := p.client.lookupService.Lookup(p.topic)
- if err != nil {
- p.log.WithError(err).Warn("Failed to lookup topic")
- return err
- }
-
- id := p.client.rpcClient.NewRequestId()
- p.client.rpcClient.Request(lr.LogicalAddr.Host, lr.PhysicalAddr.Host, id, pb.BaseCommand_PRODUCER, *pb.CommandProducer{
-
- })
-
- var cnx impl.Connection
- cnx, err = p.client.cnxPool.GetConnection(lr.LogicalAddr.Host, lr.PhysicalAddr.Host)
- if err != nil {
- p.log.WithError(err).Warn("Failed to get connection")
- return err
- }
-
- cnx.
+ //lr, err := p.client.lookupService.Lookup(p.topic)
+ //if err != nil {
+ // p.log.WithError(err).Warn("Failed to lookup topic")
+ // return err
+ //}
+
+ //id := p.client.rpcClient.NewRequestId()
+ //p.client.rpcClient.Request(lr.LogicalAddr.Host, lr.PhysicalAddr.Host, id, pb.BaseCommand_PRODUCER, *pb.CommandProducer{
+ //
+ //})
+ //
+ //var cnx impl.Connection
+ //cnx, err = p.client.cnxPool.GetConnection(lr.LogicalAddr.Host, lr.PhysicalAddr.Host)
+ //if err != nil {
+ // p.log.WithError(err).Warn("Failed to get connection")
+ // return err
+ //}
+
+ //cnx.
return nil
}
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index f2332ea..587b09d 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -2,6 +2,7 @@ package pulsar
import (
"context"
+ "fmt"
"pulsar-client-go-native/pulsar/impl"
"sync"
)
@@ -14,7 +15,7 @@ type producer struct {
func newProducer(client *client, options ProducerOptions) (*producer, error) {
if options.Topic == "" {
- return nil, newError(InvalidTopicName, "Topic name is required for producer")
+ return nil, newError(ResultInvalidTopicName, "Topic name is required for producer")
}
p := &producer{
@@ -47,7 +48,8 @@ func newProducer(client *client, options ProducerOptions) (*producer, error) {
for i := 0; i < numPartitions; i++ {
partition := i
go func() {
- prod, err := newPartitionProducer(client, &options)
+ partitionName := fmt.Sprintf("%s-partition-%d", options.Topic, partition)
+ prod, err := newPartitionProducer(client, partitionName, &options)
c <- ProducerError{partition, prod, err}
}()
}
@@ -62,7 +64,7 @@ func newProducer(client *client, options ProducerOptions) (*producer, error) {
// Since there were some failures, cleanup all the partitions that succeeded in creating the producers
for _, producer := range p.producers {
if producer != nil {
- _ := producer.Close()
+ _ = producer.Close()
}
}
return nil, err
diff --git a/pulsar/message.go b/pulsar/message.go
index b829b96..ad61704 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -82,9 +82,9 @@ func DeserializeMessageID(data []byte) MessageID {
}
var (
- // MessageID that points to the earliest message avaialable in a topic
- //EarliestMessage MessageID = earliestMessageID()
+// MessageID that points to the earliest message avaialable in a topic
+//EarliestMessage MessageID = earliestMessageID()
- // MessageID that points to the latest message
- //LatestMessage MessageID = latestMessageID()
+// MessageID that points to the latest message
+//LatestMessage MessageID = latestMessageID()
)