You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/10/08 04:05:13 UTC
[pulsar-client-go] branch master updated: (refactor)clean code (#70)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 702405f (refactor)clean code (#70)
702405f is described below
commit 702405fa17a05882bdb08a64bc20319e1d516a78
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Tue Oct 8 12:05:09 2019 +0800
(refactor)clean code (#70)
Change-Id: I480f5b8bb580887543cde66824b2f5704f8fa6e8
- use goimports import lib
- use switch-case instrad of if-else for newClient in pulsar/impl_client.go
---
pulsar/consumer_test.go | 1 +
pulsar/impl_client.go | 7 ++++---
pulsar/internal/connection.go | 12 +++++++-----
3 files changed, 12 insertions(+), 8 deletions(-)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index cda9506..e6f4d52 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -850,6 +850,7 @@ func TestConsumer_Flow(t *testing.T) {
SubscriptionName: "sub-1",
ReceiverQueueSize: 4,
})
+ assert.Nil(t, err)
for msgNum := 0; msgNum < 100; msgNum++ {
if err := producer.Send(ctx, &ProducerMessage{
diff --git a/pulsar/impl_client.go b/pulsar/impl_client.go
index aeda1c0..7077155 100644
--- a/pulsar/impl_client.go
+++ b/pulsar/impl_client.go
@@ -54,15 +54,16 @@ func newClient(options ClientOptions) (Client, error) {
}
var tlsConfig *internal.TLSOptions
- if url.Scheme == "pulsar" {
+ switch url.Scheme {
+ case "pulsar":
tlsConfig = nil
- } else if url.Scheme == "pulsar+ssl" {
+ case "pulsar+ssl":
tlsConfig = &internal.TLSOptions{
AllowInsecureConnection: options.TLSAllowInsecureConnection,
TrustCertsFilePath: options.TLSTrustCertsFilePath,
ValidateHostname: options.TLSValidateHostname,
}
- } else {
+ default:
return nil, newError(ResultInvalidConfiguration, fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 8084017..fe43f79 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -28,11 +28,12 @@ import (
"sync/atomic"
"time"
+ "github.com/golang/protobuf/proto"
+ log "github.com/sirupsen/logrus"
+
"github.com/apache/pulsar-client-go/pkg/auth"
"github.com/apache/pulsar-client-go/pkg/pb"
"github.com/apache/pulsar-client-go/util"
- "github.com/golang/protobuf/proto"
- log "github.com/sirupsen/logrus"
)
type TLSOptions struct {
@@ -356,6 +357,9 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
case pb.BaseCommand_MESSAGE:
err = c.handleMessage(cmd.GetMessage(), headersAndPayload)
+ if err != nil {
+ c.Close()
+ }
case pb.BaseCommand_PING:
c.handlePing()
case pb.BaseCommand_PONG:
@@ -364,9 +368,7 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE:
default:
- if err != nil {
- c.log.Errorf("Received invalid command type: %s", cmd.Type)
- }
+ c.log.Errorf("Received invalid command type: %s", cmd.Type)
c.Close()
}
}