You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:40 UTC

[pulsar-client-go] 03/38: Use string for result errors

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 0c2da14b1242290507109a1be7f5b1149aea26a5
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Mar 28 09:21:42 2019 -0700

    Use string for result errors
---
 pulsar/client.go                  |  2 +-
 pulsar/error.go                   | 94 ++++++++++++++++++++++++---------------
 pulsar/impl_client.go             | 21 +++++----
 pulsar/impl_client_test.go        | 14 ++++++
 pulsar/impl_partition_producer.go | 42 ++++++++---------
 pulsar/impl_producer.go           |  8 ++--
 pulsar/message.go                 |  8 ++--
 7 files changed, 117 insertions(+), 72 deletions(-)

diff --git a/pulsar/client.go b/pulsar/client.go
index 21db637..e68f09c 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -28,7 +28,7 @@ func NewClient(options ClientOptions) (Client, error) {
 }
 
 // Opaque interface that represents the authentication credentials
-type Authentication interface {}
+type Authentication interface{}
 
 // Create new Authentication provider with specified auth token
 func NewAuthenticationToken(token string) Authentication {
diff --git a/pulsar/error.go b/pulsar/error.go
index 929b250..24e1536 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -18,45 +18,48 @@
 //
 
 package pulsar
+
 import "C"
 import "fmt"
 
 type Result int
 
 const (
-	UnknownError                          Result = 1  // Unknown error happened on broker
-	InvalidConfiguration                  Result = 2  // Invalid configuration
-	TimeoutError                          Result = 3  // Operation timed out
-	LookupError                           Result = 4  // Broker lookup failed
-	ConnectError                          Result = 5  // Failed to connect to broker
-	ReadError                             Result = 6  // Failed to read from socket
-	AuthenticationError                   Result = 7  // Authentication failed on broker
-	AuthorizationError                    Result = 8  // Client is not authorized to create producer/consumer
-	ErrorGettingAuthenticationData        Result = 9  // Client cannot find authorization data
-	BrokerMetadataError                   Result = 10 // Broker failed in updating metadata
-	BrokerPersistenceError                Result = 11 // Broker failed to persist entry
-	ChecksumError                         Result = 12 // Corrupt message checksum failure
-	ConsumerBusy                          Result = 13 // Exclusive consumer is already connected
-	NotConnectedError                     Result = 14 // Producer/Consumer is not currently connected to broker
-	AlreadyClosedError                    Result = 15 // Producer/Consumer is already closed and not accepting any operation
-	InvalidMessage                        Result = 16 // Error in publishing an already used message
-	ConsumerNotInitialized                Result = 17 // Consumer is not initialized
-	ProducerNotInitialized                Result = 18 // Producer is not initialized
-	TooManyLookupRequestException         Result = 19 // Too Many concurrent LookupRequest
-	InvalidTopicName                      Result = 20 // Invalid topic name
-	InvalidUrl                            Result = 21 // Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)
-	ServiceUnitNotReady                   Result = 22 // Service Unit unloaded between client did lookup and producer/consumer got created
-	OperationNotSupported                 Result = 23
-	ProducerBlockedQuotaExceededError     Result = 24 // Producer is blocked
-	ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception
-	ProducerQueueIsFull                   Result = 26 // Producer queue is full
-	MessageTooBig                         Result = 27 // Trying to send a messages exceeding the max size
-	TopicNotFound                         Result = 28 // Topic not found
-	SubscriptionNotFound                  Result = 29 // Subscription not found
-	ConsumerNotFound                      Result = 30 // Consumer not found
-	UnsupportedVersionError               Result = 31 // Error when an older client/version doesn't support a required feature
-	TopicTerminated                       Result = 32 // Topic was already terminated
-	CryptoError                           Result = 33 // Error when crypto operation fails
+	ResultOk                   = iota // No errors
+	ResultUnknownError                // Unknown error happened on broker
+	ResultInvalidConfiguration        // Invalid configuration
+	ResultTimeoutError                // Operation timed out
+	ResultLookupError                 // Broker lookup failed
+	ResultInvalidTopicName            // Invalid topic name
+	ResultConnectError                // Failed to connect to broker
+
+	//ReadError                      Result = 6  // Failed to read from socket
+	//AuthenticationError            Result = 7  // Authentication failed on broker
+	//AuthorizationError             Result = 8  // Client is not authorized to create producer/consumer
+	//ErrorGettingAuthenticationData Result = 9  // Client cannot find authorization data
+	//BrokerMetadataError            Result = 10 // Broker failed in updating metadata
+	//BrokerPersistenceError         Result = 11 // Broker failed to persist entry
+	//ChecksumError                  Result = 12 // Corrupt message checksum failure
+	//ConsumerBusy                   Result = 13 // Exclusive consumer is already connected
+	//NotConnectedError              Result = 14 // Producer/Consumer is not currently connected to broker
+	//AlreadyClosedError             Result = 15 // Producer/Consumer is already closed and not accepting any operation
+	//InvalidMessage                 Result = 16 // Error in publishing an already used message
+	//ConsumerNotInitialized         Result = 17 // Consumer is not initialized
+	//ProducerNotInitialized         Result = 18 // Producer is not initialized
+	//TooManyLookupRequestException  Result = 19 // Too Many concurrent LookupRequest
+	//InvalidUrl                            Result = 21 // Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)
+	//ServiceUnitNotReady                   Result = 22 // Service Unit unloaded between client did lookup and producer/consumer got created
+	//OperationNotSupported                 Result = 23
+	//ProducerBlockedQuotaExceededError     Result = 24 // Producer is blocked
+	//ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception
+	//ProducerQueueIsFull                   Result = 26 // Producer queue is full
+	//MessageTooBig                         Result = 27 // Trying to send a messages exceeding the max size
+	//TopicNotFound                         Result = 28 // Topic not found
+	//SubscriptionNotFound                  Result = 29 // Subscription not found
+	//ConsumerNotFound                      Result = 30 // Consumer not found
+	//UnsupportedVersionError               Result = 31 // Error when an older client/version doesn't support a required feature
+	//TopicTerminated                       Result = 32 // Topic was already terminated
+	//CryptoError                           Result = 33 // Error when crypto operation fails
 )
 
 type Error struct {
@@ -74,7 +77,28 @@ func (e *Error) Error() string {
 
 func newError(result Result, msg string) error {
 	return &Error{
-		msg:    fmt.Sprintf("%s: %d", msg, result),
+		msg:    fmt.Sprintf("%s: %s", msg, getResultStr(result)),
 		result: result,
 	}
-}
\ No newline at end of file
+}
+
+func getResultStr(r Result) string {
+	switch r {
+	case ResultOk:
+		return "OK"
+	case ResultUnknownError:
+		return "Unknown error"
+	case ResultInvalidConfiguration:
+		return "InvalidConfiguration"
+	case ResultTimeoutError:
+		return "TimeoutError"
+	case ResultLookupError:
+		return "LookupError"
+	case ResultInvalidTopicName:
+		return "InvalidTopicName"
+	case ResultConnectError:
+		return "ConnectError"
+	default:
+		return fmt.Sprintf("Result(%d)", r)
+	}
+}
diff --git a/pulsar/impl_client.go b/pulsar/impl_client.go
index e143601..4a7047f 100644
--- a/pulsar/impl_client.go
+++ b/pulsar/impl_client.go
@@ -20,17 +20,17 @@ type client struct {
 
 func newClient(options ClientOptions) (Client, error) {
 	if options.URL == "" {
-		return nil, newError(InvalidConfiguration, "URL is required for client")
+		return nil, newError(ResultInvalidConfiguration, "URL is required for client")
 	}
 
 	url, err := url.Parse(options.URL)
 	if err != nil {
 		log.WithError(err).Error("Failed to parse service URL")
-		return nil, newError(InvalidConfiguration, "Invalid service URL")
+		return nil, newError(ResultInvalidConfiguration, "Invalid service URL")
 	}
 
 	if url.Scheme != "pulsar" {
-		return nil, newError(InvalidConfiguration, fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
+		return nil, newError(ResultInvalidConfiguration, fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
 	}
 
 	c := &client{
@@ -77,14 +77,19 @@ func (client *client) TopicPartitions(topic string) ([]string, error) {
 
 	r := res.Response.PartitionMetadataResponse
 	if r.Error != nil {
-		return nil, newError(LookupError, r.GetError().String())
+		return nil, newError(ResultLookupError, r.GetError().String())
 	}
 
-	partitions := make([]string, r.GetPartitions())
-	for i := 0; i < int(r.GetPartitions()); i++ {
-		partitions[i] = fmt.Sprintf("%s-partition-%d", topic, i)
+	if r.GetPartitions() > 0 {
+		partitions := make([]string, r.GetPartitions())
+		for i := 0; i < int(r.GetPartitions()); i++ {
+			partitions[i] = fmt.Sprintf("%s-partition-%d", topic, i)
+		}
+		return partitions, nil
+	} else {
+		// Non-partitioned topic
+		return []string{topicName.Name}, nil
 	}
-	return partitions, nil
 }
 
 func (client *client) Close() error {
diff --git a/pulsar/impl_client_test.go b/pulsar/impl_client_test.go
new file mode 100644
index 0000000..99f0c78
--- /dev/null
+++ b/pulsar/impl_client_test.go
@@ -0,0 +1,14 @@
+package pulsar
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestClient(t *testing.T) {
+	client, err := NewClient(ClientOptions{})
+	assert.Nil(t, client)
+	assert.NotNil(t, err)
+	assert.Equal(t, Result(ResultInvalidConfiguration), err.(*Error).Result())
+}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index bb96a27..d035ad1 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -3,8 +3,8 @@ package pulsar
 import (
 	"context"
 	log "github.com/sirupsen/logrus"
-	"pulsar-client-go-native/pulsar/impl"
-	pb "pulsar-client-go-native/pulsar/pulsar_proto"
+	//"pulsar-client-go-native/pulsar/impl"
+	//pb "pulsar-client-go-native/pulsar/pulsar_proto"
 	"sync"
 )
 
@@ -33,25 +33,25 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 }
 
 func (p *partitionProducer) grabCnx() error {
-	lr, err := p.client.lookupService.Lookup(p.topic)
-	if err != nil {
-		p.log.WithError(err).Warn("Failed to lookup topic")
-		return err
-	}
-
-	id := p.client.rpcClient.NewRequestId()
-	p.client.rpcClient.Request(lr.LogicalAddr.Host, lr.PhysicalAddr.Host, id, pb.BaseCommand_PRODUCER, *pb.CommandProducer{
-
-	})
-
-	var cnx impl.Connection
-	cnx, err = p.client.cnxPool.GetConnection(lr.LogicalAddr.Host, lr.PhysicalAddr.Host)
-	if err != nil {
-		p.log.WithError(err).Warn("Failed to get connection")
-		return err
-	}
-
-	cnx.
+	//lr, err := p.client.lookupService.Lookup(p.topic)
+	//if err != nil {
+	//	p.log.WithError(err).Warn("Failed to lookup topic")
+	//	return err
+	//}
+
+	//id := p.client.rpcClient.NewRequestId()
+	//p.client.rpcClient.Request(lr.LogicalAddr.Host, lr.PhysicalAddr.Host, id, pb.BaseCommand_PRODUCER, *pb.CommandProducer{
+	//
+	//})
+	//
+	//var cnx impl.Connection
+	//cnx, err = p.client.cnxPool.GetConnection(lr.LogicalAddr.Host, lr.PhysicalAddr.Host)
+	//if err != nil {
+	//	p.log.WithError(err).Warn("Failed to get connection")
+	//	return err
+	//}
+
+	//cnx.
 
 	return nil
 }
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index f2332ea..587b09d 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -2,6 +2,7 @@ package pulsar
 
 import (
 	"context"
+	"fmt"
 	"pulsar-client-go-native/pulsar/impl"
 	"sync"
 )
@@ -14,7 +15,7 @@ type producer struct {
 
 func newProducer(client *client, options ProducerOptions) (*producer, error) {
 	if options.Topic == "" {
-		return nil, newError(InvalidTopicName, "Topic name is required for producer")
+		return nil, newError(ResultInvalidTopicName, "Topic name is required for producer")
 	}
 
 	p := &producer{
@@ -47,7 +48,8 @@ func newProducer(client *client, options ProducerOptions) (*producer, error) {
 	for i := 0; i < numPartitions; i++ {
 		partition := i
 		go func() {
-			prod, err := newPartitionProducer(client, &options)
+			partitionName := fmt.Sprintf("%s-partition-%d", options.Topic, partition)
+			prod, err := newPartitionProducer(client, partitionName, &options)
 			c <- ProducerError{partition, prod, err}
 		}()
 	}
@@ -62,7 +64,7 @@ func newProducer(client *client, options ProducerOptions) (*producer, error) {
 		// Since there were some failures, cleanup all the partitions that succeeded in creating the producers
 		for _, producer := range p.producers {
 			if producer != nil {
-				_ := producer.Close()
+				_ = producer.Close()
 			}
 		}
 		return nil, err
diff --git a/pulsar/message.go b/pulsar/message.go
index b829b96..ad61704 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -82,9 +82,9 @@ func DeserializeMessageID(data []byte) MessageID {
 }
 
 var (
-	// MessageID that points to the earliest message avaialable in a topic
-	//EarliestMessage MessageID = earliestMessageID()
+// MessageID that points to the earliest message avaialable in a topic
+//EarliestMessage MessageID = earliestMessageID()
 
-	// MessageID that points to the latest message
-	//LatestMessage MessageID = latestMessageID()
+// MessageID that points to the latest message
+//LatestMessage MessageID = latestMessageID()
 )