You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dc...@apache.org on 2020/06/08 13:06:30 UTC

[thrift] branch master updated: THRIFT-5214: Push read deadline in socketConn

This is an automated email from the ASF dual-hosted git repository.

dcelasun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git


The following commit(s) were added to refs/heads/master by this push:
     new e382275  THRIFT-5214: Push read deadline in socketConn
e382275 is described below

commit e382275bad2bd11fb5df33dd7db520fd7596f4ac
Author: Yuxuan 'fishy' Wang <yu...@reddit.com>
AuthorDate: Mon Jun 8 06:06:17 2020 -0700

    THRIFT-5214: Push read deadline in socketConn
    
    Client: go
    
    We added socketConn to go library for connectivity check in
    https://github.com/apache/thrift/pull/2153, but forgot to push read
    deadline on the socket when doing the connectivity checks. This caused
    the issue of large number of connectivity checks to fail with I/O
    timeout errors.
---
 lib/go/thrift/socket.go           |  8 +++++++-
 lib/go/thrift/socket_conn.go      |  5 ++++-
 lib/go/thrift/socket_unix_conn.go | 16 ++++++++++++----
 lib/go/thrift/ssl_socket.go       |  9 ++++++++-
 4 files changed, 31 insertions(+), 7 deletions(-)

diff --git a/lib/go/thrift/socket.go b/lib/go/thrift/socket.go
index 7c765f5..5080894 100644
--- a/lib/go/thrift/socket.go
+++ b/lib/go/thrift/socket.go
@@ -58,7 +58,9 @@ func NewTSocketFromAddrTimeout(addr net.Addr, connTimeout time.Duration, soTimeo
 
 // Creates a TSocket from an existing net.Conn
 func NewTSocketFromConnTimeout(conn net.Conn, connTimeout time.Duration) *TSocket {
-	return &TSocket{conn: wrapSocketConn(conn), addr: conn.RemoteAddr(), connectTimeout: connTimeout, socketTimeout: connTimeout}
+	sock := &TSocket{conn: wrapSocketConn(conn), addr: conn.RemoteAddr(), connectTimeout: connTimeout, socketTimeout: connTimeout}
+	sock.conn.socketTimeout = connTimeout
+	return sock
 }
 
 // Sets the connect timeout
@@ -70,6 +72,9 @@ func (p *TSocket) SetConnTimeout(timeout time.Duration) error {
 // Sets the socket timeout
 func (p *TSocket) SetSocketTimeout(timeout time.Duration) error {
 	p.socketTimeout = timeout
+	if p.conn != nil {
+		p.conn.socketTimeout = timeout
+	}
 	return nil
 }
 
@@ -109,6 +114,7 @@ func (p *TSocket) Open() error {
 	)); err != nil {
 		return NewTTransportException(NOT_OPEN, err.Error())
 	}
+	p.conn.socketTimeout = p.socketTimeout
 	return nil
 }
 
diff --git a/lib/go/thrift/socket_conn.go b/lib/go/thrift/socket_conn.go
index b0f7b3e..5ed598e 100644
--- a/lib/go/thrift/socket_conn.go
+++ b/lib/go/thrift/socket_conn.go
@@ -23,13 +23,16 @@ import (
 	"bytes"
 	"io"
 	"net"
+	"time"
 )
 
 // socketConn is a wrapped net.Conn that tries to do connectivity check.
 type socketConn struct {
 	net.Conn
 
-	buf bytes.Buffer
+	socketTimeout time.Duration
+	buf           bytes.Buffer
+	buffer        [1]byte
 }
 
 var _ net.Conn = (*socketConn)(nil)
diff --git a/lib/go/thrift/socket_unix_conn.go b/lib/go/thrift/socket_unix_conn.go
index f18e0e6..789b4fa 100644
--- a/lib/go/thrift/socket_unix_conn.go
+++ b/lib/go/thrift/socket_unix_conn.go
@@ -24,6 +24,7 @@ package thrift
 import (
 	"io"
 	"syscall"
+	"time"
 )
 
 func (sc *socketConn) read0() error {
@@ -36,16 +37,23 @@ func (sc *socketConn) checkConn() error {
 		// No way to check, return nil
 		return nil
 	}
+
+	// Push read deadline
+	var t time.Time
+	if sc.socketTimeout > 0 {
+		t = time.Now().Add(sc.socketTimeout)
+	}
+	sc.Conn.SetReadDeadline(t)
+
 	rc, err := syscallConn.SyscallConn()
 	if err != nil {
 		return err
 	}
 
 	var n int
-	var buf [1]byte
 
 	if readErr := rc.Read(func(fd uintptr) bool {
-		n, err = syscall.Read(int(fd), buf[:])
+		n, err = syscall.Read(int(fd), sc.buffer[:])
 		return true
 	}); readErr != nil {
 		return readErr
@@ -58,9 +66,9 @@ func (sc *socketConn) checkConn() error {
 	}
 
 	if n > 0 {
-		// We got 1 byte,
+		// We got something,
 		// put it to sc's buf for the next real read to use.
-		sc.buf.Write(buf[:])
+		sc.buf.Write(sc.buffer[:n])
 		return nil
 	}
 
diff --git a/lib/go/thrift/ssl_socket.go b/lib/go/thrift/ssl_socket.go
index 661111c..6e90438 100644
--- a/lib/go/thrift/ssl_socket.go
+++ b/lib/go/thrift/ssl_socket.go
@@ -62,12 +62,17 @@ func NewTSSLSocketFromAddrTimeout(addr net.Addr, cfg *tls.Config, timeout time.D
 
 // Creates a TSSLSocket from an existing net.Conn
 func NewTSSLSocketFromConnTimeout(conn net.Conn, cfg *tls.Config, timeout time.Duration) *TSSLSocket {
-	return &TSSLSocket{conn: wrapSocketConn(conn), addr: conn.RemoteAddr(), timeout: timeout, cfg: cfg}
+	sock := &TSSLSocket{conn: wrapSocketConn(conn), addr: conn.RemoteAddr(), timeout: timeout, cfg: cfg}
+	sock.conn.socketTimeout = timeout
+	return sock
 }
 
 // Sets the socket timeout
 func (p *TSSLSocket) SetTimeout(timeout time.Duration) error {
 	p.timeout = timeout
+	if p.conn != nil {
+		p.conn.socketTimeout = timeout
+	}
 	return nil
 }
 
@@ -101,6 +106,7 @@ func (p *TSSLSocket) Open() error {
 		)); err != nil {
 			return NewTTransportException(NOT_OPEN, err.Error())
 		}
+		p.conn.socketTimeout = p.timeout
 	} else {
 		if p.conn.isValid() {
 			return NewTTransportException(ALREADY_OPEN, "Socket already connected.")
@@ -124,6 +130,7 @@ func (p *TSSLSocket) Open() error {
 		)); err != nil {
 			return NewTTransportException(NOT_OPEN, err.Error())
 		}
+		p.conn.socketTimeout = p.timeout
 	}
 	return nil
 }