You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/11/02 02:53:30 UTC
[10/50] [abbrv] qpid-proton git commit: PROTON-1308: Go: Support Idle
Timeout setting in electron Transport
PROTON-1308: Go: Support Idle Timeout setting in electron Transport
electron.Heartbeat(time.Duration) returns a ConnectionOption that will set the
required heartbeat interval for the remote peer. See electron.Heartbeat
and electron.ConnectionSettings.Heartbeat
NOTE: The term "Heartbeat" was chosen to avoid the ambiguous term
"idle-timeout". Heartbeat *always* refers to the max allowed duration
between *sending* frames. The frame receiver waits for 2*remote-heartbeat before
closing a connection.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ce80f9d7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ce80f9d7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ce80f9d7
Branch: refs/heads/go1
Commit: ce80f9d7250400cbe47bb5bf0ced3937ed829d92
Parents: e7b70d8
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Sep 28 13:00:22 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Sep 28 16:39:05 2016 -0400
----------------------------------------------------------------------
proton-c/bindings/go/genwrap.go | 1 +
.../src/qpid.apache.org/electron/auth_test.go | 6 +-
.../src/qpid.apache.org/electron/connection.go | 49 ++++++++++++----
.../qpid.apache.org/electron/electron_test.go | 61 ++++++++++++++++++++
.../go/src/qpid.apache.org/electron/handler.go | 2 +
.../go/src/qpid.apache.org/proton/engine.go | 38 +++++++++---
.../go/src/qpid.apache.org/proton/wrappers.go | 36 ++++++++----
.../src/qpid.apache.org/proton/wrappers_gen.go | 2 +-
8 files changed, 160 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce80f9d7/proton-c/bindings/go/genwrap.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/genwrap.go b/proton-c/bindings/go/genwrap.go
index f295a32..8a9af03 100644
--- a/proton-c/bindings/go/genwrap.go
+++ b/proton-c/bindings/go/genwrap.go
@@ -317,6 +317,7 @@ func mapType(ctype string) (g genType) {
case "C.pn_millis_t":
g.Gotype = "time.Duration"
g.ToGo = func(v string) string { return fmt.Sprintf("(time.Duration(%s) * time.Millisecond)", v) }
+ g.ToC = func(v string) string { return fmt.Sprintf("C.pn_millis_t(%s/time.Millisecond)", v) }
case "C.pn_timestamp_t":
g.Gotype = "time.Time"
g.ToC = func(v string) string { return fmt.Sprintf("pnTime(%s)", v) }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce80f9d7/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
index a090b78..73a9299 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
@@ -53,7 +53,7 @@ func TestAuthAnonymous(t *testing.T) {
[]ConnectionOption{User("fred"), VirtualHost("vhost"), SASLAllowInsecure(true)},
[]ConnectionOption{SASLAllowedMechs("ANONYMOUS"), SASLAllowInsecure(true)})
fatalIf(t, err)
- errorIf(t, checkEqual(connectionSettings{"anonymous", "vhost"}, got))
+ errorIf(t, checkEqual(connectionSettings{user: "anonymous", virtualHost: "vhost"}, got))
}
func TestAuthPlain(t *testing.T) {
@@ -62,7 +62,7 @@ func TestAuthPlain(t *testing.T) {
[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("xxx"))},
[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
fatalIf(t, err)
- errorIf(t, checkEqual(connectionSettings{"fred@proton", ""}, got))
+ errorIf(t, checkEqual(connectionSettings{user: "fred@proton"}, got))
}
func TestAuthBadPass(t *testing.T) {
@@ -118,7 +118,7 @@ func configureSASL() error {
func TestMain(m *testing.M) {
status := m.Run()
if confDir != "" {
- os.RemoveAll(confDir)
+ _ = os.RemoveAll(confDir)
}
os.Exit(status)
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce80f9d7/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 00c08ad..7f3050f 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -45,6 +45,11 @@ type ConnectionSettings interface {
//
// Returns error if the connection fails to authenticate.
VirtualHost() string
+
+ // Heartbeat is the maximum delay between sending frames that the remote peer
+ // has requested of us. If the interval expires an empty "heartbeat" frame
+ // will be sent automatically to keep the connection open.
+ Heartbeat() time.Duration
}
// Connection is an AMQP connection, created by a Container.
@@ -88,10 +93,12 @@ type Connection interface {
type connectionSettings struct {
user, virtualHost string
+ heartbeat time.Duration
}
-func (c connectionSettings) User() string { return c.user }
-func (c connectionSettings) VirtualHost() string { return c.virtualHost }
+func (c connectionSettings) User() string { return c.user }
+func (c connectionSettings) VirtualHost() string { return c.virtualHost }
+func (c connectionSettings) Heartbeat() time.Duration { return c.heartbeat }
// ConnectionOption can be passed when creating a connection to configure various options
type ConnectionOption func(*connection)
@@ -165,7 +172,7 @@ type connection struct {
}
// NewConnection creates a connection with the given options.
-func NewConnection(conn net.Conn, setting ...ConnectionOption) (*connection, error) {
+func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error) {
c := &connection{
conn: conn,
}
@@ -176,7 +183,7 @@ func NewConnection(conn net.Conn, setting ...ConnectionOption) (*connection, err
return nil, err
}
c.pConnection = c.engine.Connection()
- for _, set := range setting {
+ for _, set := range opts {
set(c)
}
if c.container == nil {
@@ -211,7 +218,7 @@ func (c *connection) Disconnect(err error) {
c.engine.Disconnect(err)
}
-func (c *connection) Session(setting ...SessionOption) (Session, error) {
+func (c *connection) Session(opts ...SessionOption) (Session, error) {
var s Session
err := c.engine.InjectWait(func() error {
if c.Error() != nil {
@@ -221,7 +228,7 @@ func (c *connection) Session(setting ...SessionOption) (Session, error) {
if err == nil {
pSession.Open()
if err == nil {
- s = newSession(c, pSession, setting...)
+ s = newSession(c, pSession, opts...)
}
}
return err
@@ -241,17 +248,17 @@ func (c *connection) DefaultSession() (s Session, err error) {
return c.defaultSession, err
}
-func (c *connection) Sender(setting ...LinkOption) (Sender, error) {
+func (c *connection) Sender(opts ...LinkOption) (Sender, error) {
if s, err := c.DefaultSession(); err == nil {
- return s.Sender(setting...)
+ return s.Sender(opts...)
} else {
return nil, err
}
}
-func (c *connection) Receiver(setting ...LinkOption) (Receiver, error) {
+func (c *connection) Receiver(opts ...LinkOption) (Receiver, error) {
if s, err := c.DefaultSession(); err == nil {
- return s.Receiver(setting...)
+ return s.Receiver(opts...)
} else {
return nil, err
}
@@ -288,11 +295,20 @@ func newIncomingConnection(c *connection) *IncomingConnection {
c: c}
}
-func (in *IncomingConnection) Accept() Endpoint {
+// AcceptConnection is like Accept() but takes ConnectionOption s
+// For example you can set the Heartbeat() for the accepted connection.
+func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) Connection {
return in.accept(func() Endpoint {
+ for _, opt := range opts {
+ opt(in.c)
+ }
in.c.pConnection.Open()
return in.c
- })
+ }).(Connection)
+}
+
+func (in *IncomingConnection) Accept() Endpoint {
+ return in.AcceptConnection()
}
func sasl(c *connection) proton.SASL { return c.engine.Transport().SASL() }
@@ -325,6 +341,15 @@ func SASLAllowInsecure(b bool) ConnectionOption {
return func(c *connection) { sasl(c).SetAllowInsecureMechs(b) }
}
+// Heartbeat returns a ConnectionOption that requests the maximum delay
+// between sending frames for the remote peer. If we don't receive any frames
+// within 2*delay we will close the connection.
+//
+func Heartbeat(delay time.Duration) ConnectionOption {
+ // Proton-C divides the idle-timeout by 2 before sending, so compensate.
+ return func(c *connection) { c.engine.Transport().SetIdleTimeout(2 * delay) }
+}
+
// GlobalSASLConfigDir sets the SASL configuration directory for every
// Connection created in this process. If not called, the default is determined
// by your SASL installation.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce80f9d7/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
index aa37d57..294e952 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
@@ -483,3 +483,64 @@ func TestConnectionCloseInterrupt2(t *testing.T) {
}
}
}
+
+func heartbeat(c Connection) time.Duration {
+ return c.(*connection).engine.Transport().RemoteIdleTimeout()
+}
+
+func TestHeartbeat(t *testing.T) {
+ client, server := newClientServerOpts(t,
+ []ConnectionOption{Heartbeat(102 * time.Millisecond)},
+ nil)
+ defer closeClientServer(client, server)
+
+ var serverHeartbeat time.Duration
+
+ go func() {
+ for in := range server.Incoming() {
+ switch in := in.(type) {
+ case *IncomingConnection:
+ serverHeartbeat = in.Heartbeat()
+ in.AcceptConnection(Heartbeat(101 * time.Millisecond))
+ default:
+ in.Accept()
+ }
+ }
+ }()
+
+ // Freeze the server to stop it sending heartbeats.
+ unfreeze := make(chan bool)
+ defer close(unfreeze)
+ freeze := func() error { return server.(*connection).engine.Inject(func() { <-unfreeze }) }
+
+ fatalIf(t, client.Sync())
+ errorIf(t, checkEqual(101*time.Millisecond, heartbeat(client.Connection())))
+ errorIf(t, checkEqual(102*time.Millisecond, serverHeartbeat))
+ errorIf(t, client.Connection().Error())
+
+ // Freeze the server for less than a heartbeat
+ fatalIf(t, freeze())
+ time.Sleep(50 * time.Millisecond)
+ unfreeze <- true
+ // Make sure server is still responding.
+ s, err := client.Sender()
+ errorIf(t, err)
+ errorIf(t, s.Sync())
+
+ // Freeze the server till the client times out the connection
+ fatalIf(t, freeze())
+ select {
+ case <-client.Done():
+ if amqp.ResourceLimitExceeded != client.Error().(amqp.Error).Name {
+ t.Error("bad timeout error:", client.Error())
+ }
+ case <-time.After(400 * time.Millisecond):
+ t.Error("connection failed to time out")
+ }
+
+ unfreeze <- true // Unfreeze the server
+ <-server.Done()
+ if amqp.ResourceLimitExceeded != server.Error().(amqp.Error).Name {
+ t.Error("bad timeout error:", server.Error())
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce80f9d7/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index 2a426aa..588ba79 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -79,6 +79,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
}
case proton.MConnectionOpening:
+ h.connection.heartbeat = e.Transport().RemoteIdleTimeout()
if e.Connection().State().LocalUninit() { // Remotely opened
h.incoming(newIncomingConnection(h.connection))
}
@@ -137,6 +138,7 @@ func (h *handler) incoming(in Incoming) {
var err error
if h.connection.incoming != nil {
h.connection.incoming <- in
+ // Must block until accept/reject, subsequent events may use the incoming endpoint.
err = in.wait()
} else {
err = amqp.Errorf(amqp.NotAllowed, "rejected incoming %s %s",
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce80f9d7/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
index 6439fc1..7ba6827 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -107,6 +107,7 @@ type Engine struct {
handlers []EventHandler // Handlers for proton events.
running chan struct{} // This channel will be closed when the goroutines are done.
closeOnce sync.Once
+ timer *time.Timer
}
const bufferSize = 4096
@@ -123,6 +124,7 @@ func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) {
conn: conn,
handlers: handlers,
running: make(chan struct{}),
+ timer: time.NewTimer(0),
}
if pnErr := C.pn_connection_engine_init(&eng.engine); pnErr != 0 {
return nil, fmt.Errorf("cannot setup engine: %s", PnErrorCode(pnErr))
@@ -237,17 +239,34 @@ func (eng *Engine) Disconnect(err error) {
<-eng.running
}
+// Let proton run timed activity and set up the next tick
+func (eng *Engine) tick() {
+ now := time.Now()
+ next := eng.Transport().Tick(now)
+ if !next.IsZero() {
+ eng.timer.Reset(next.Sub(now))
+ }
+}
+
func (eng *Engine) dispatch() bool {
+ var needTick bool // Set if we need to tick the transport.
for {
if cevent := C.pn_connection_engine_dispatch(&eng.engine); cevent != nil {
event := makeEvent(cevent, eng)
for _, h := range eng.handlers {
+ switch event.Type() {
+ case ETransport:
+ needTick = true
+ }
h.HandleEvent(event)
}
} else {
break
}
}
+ if needTick {
+ eng.tick()
+ }
return !bool(C.pn_connection_engine_finished(&eng.engine))
}
@@ -285,10 +304,10 @@ func (eng *Engine) Run() error {
if n > 0 {
readsOut <- rbuf[:n]
} else if err != nil {
- eng.inject <- func() {
+ _ = eng.Inject(func() {
eng.Transport().Condition().SetError(err)
C.pn_connection_engine_read_close(&eng.engine)
- }
+ })
return
}
}
@@ -305,10 +324,10 @@ func (eng *Engine) Run() error {
if n > 0 {
writesOut <- wbuf[:n]
} else if err != nil {
- eng.inject <- func() {
+ _ = eng.Inject(func() {
eng.Transport().Condition().SetError(err)
C.pn_connection_engine_write_close(&eng.engine)
- }
+ })
return
}
}
@@ -353,6 +372,9 @@ func (eng *Engine) Run() error {
if ok {
f()
}
+
+ case <-eng.timer.C:
+ eng.tick()
}
}
@@ -360,18 +382,16 @@ func (eng *Engine) Run() error {
eng.err.Set(eng.Transport().Condition().Error())
close(readsIn)
close(writesIn)
- _ = eng.conn.Close() // Make sure connection is closed
+ close(eng.running) // Signal goroutines have exited and Error is set, disable Inject()
+ _ = eng.conn.Close() // Close conn, force read/write goroutines to exit (they will Inject)
wait.Wait() // Wait for goroutines
- close(eng.running) // Signal goroutines have exited and Error is set.
-
- C.pn_connection_engine_final(&eng.engine)
-
for _, h := range eng.handlers {
switch h := h.(type) {
case cHandler:
C.pn_handler_free(h.pn)
}
}
+ C.pn_connection_engine_final(&eng.engine)
return eng.err.Get()
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce80f9d7/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index fa3e850..1dee743 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -291,6 +291,10 @@ func (s Session) Receiver(name string) Link {
return Link{C.pn_receiver(s.pn, cname)}
}
+func (t Transport) String() string {
+ return fmt.Sprintf("(Transport)(%p)", t.CPtr())
+}
+
// Unique (per process) string identifier for a connection, useful for debugging.
func (c Connection) String() string {
// Use the transport address to match the default transport logs from PN_TRACE.
@@ -374,19 +378,31 @@ func (c Connection) Session() (Session, error) {
}
// pnTime converts Go time.Time to Proton millisecond Unix time.
-func pnTime(t time.Time) C.pn_timestamp_t {
- secs := t.Unix()
- // Note: sub-second accuracy is not guaraunteed if the Unix time in
- // nanoseconds cannot be represented by an int64 (sometime around year 2260)
- msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond)
- return C.pn_timestamp_t(secs*1000 + msecs)
+//
+// Note: t.isZero() is converted to C.pn_timestamp_t(0) and vice-versa. These
+// are used as "not set" sentinel values by the Go and Proton APIs, so it is
+// better to conserve the "zeroness" even though they don't represent the same
+// time instant.
+//
+func pnTime(t time.Time) (pnt C.pn_timestamp_t) {
+ if !t.IsZero() {
+ pnt = C.pn_timestamp_t(t.Unix()*1000 + int64(t.Nanosecond())/int64(time.Millisecond))
+ }
+ return
}
// goTime converts a pn_timestamp_t to a Go time.Time.
-func goTime(t C.pn_timestamp_t) time.Time {
- secs := int64(t) / 1000
- nsecs := (int64(t) % 1000) * int64(time.Millisecond)
- return time.Unix(secs, nsecs)
+//
+// Note: C.pn_timestamp_t(0) is converted to a zero time.Time and
+// vice-versa. These are used as "not set" sentinel values by the Go and Proton
+// APIs, so it is better to conserve the "zeroness" even though they don't
+// represent the same time instant.
+//
+func goTime(pnt C.pn_timestamp_t) (t time.Time) {
+ if pnt != 0 {
+ t = time.Unix(int64(pnt/1000), int64(pnt%1000)*int64(time.Millisecond))
+ }
+ return
}
// Special treatment for Transport.Head, return value is unsafe.Pointer not string
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce80f9d7/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
index 38c76cc..629caa6 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
@@ -812,7 +812,7 @@ func (t Transport) IdleTimeout() time.Duration {
return (time.Duration(C.pn_transport_get_idle_timeout(t.pn)) * time.Millisecond)
}
func (t Transport) SetIdleTimeout(timeout time.Duration) {
- C.pn_transport_set_idle_timeout(t.pn, C.pn_millis_t(timeout))
+ C.pn_transport_set_idle_timeout(t.pn, C.pn_millis_t(timeout/time.Millisecond))
}
func (t Transport) RemoteIdleTimeout() time.Duration {
return (time.Duration(C.pn_transport_get_remote_idle_timeout(t.pn)) * time.Millisecond)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org