You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by cm...@apache.org on 2015/11/20 01:55:59 UTC
incubator-htrace git commit: HTRACE-302. htraced: Add admissions
control to HRPC to limit the number of incoming messages (cmccabe)
Repository: incubator-htrace
Updated Branches:
refs/heads/master fc0d8f38f -> c715e12eb
HTRACE-302. htraced: Add admissions control to HRPC to limit the number of incoming messages (cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/c715e12e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/c715e12e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/c715e12e
Branch: refs/heads/master
Commit: c715e12eb085cf551e90567f80c78886a3cc07f6
Parents: fc0d8f3
Author: Colin P. Mccabe <cm...@apache.org>
Authored: Thu Nov 19 16:45:42 2015 -0800
Committer: Colin P. Mccabe <cm...@apache.org>
Committed: Thu Nov 19 16:52:56 2015 -0800
----------------------------------------------------------------------
.../go/src/org/apache/htrace/client/client.go | 28 ++-
.../go/src/org/apache/htrace/client/hclient.go | 11 +-
.../go/src/org/apache/htrace/common/log.go | 4 +
.../src/org/apache/htrace/conf/config_keys.go | 12 ++
.../org/apache/htrace/htraced/client_test.go | 133 ++++++++++++-
.../src/org/apache/htrace/htraced/datastore.go | 1 +
.../org/apache/htrace/htraced/datastore_test.go | 4 +-
.../go/src/org/apache/htrace/htraced/hrpc.go | 193 ++++++++++++++-----
.../go/src/org/apache/htrace/htraced/htraced.go | 2 +-
.../org/apache/htrace/htraced/metrics_test.go | 7 +-
.../org/apache/htrace/htraced/mini_htraced.go | 6 +-
.../go/src/org/apache/htrace/htracedTool/cmd.go | 2 +-
12 files changed, 333 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/client/client.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/client.go b/htrace-htraced/go/src/org/apache/htrace/client/client.go
index 1140209..0028545 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/client.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/client.go
@@ -33,25 +33,35 @@ import (
// A golang client for htraced.
// TODO: fancier APIs for streaming spans in the background, optimize TCP stuff
-
-func NewClient(cnf *conf.Config) (*Client, error) {
- hcl := Client{}
+func NewClient(cnf *conf.Config, testHooks *TestHooks) (*Client, error) {
+ hcl := Client{testHooks: testHooks}
hcl.restAddr = cnf.Get(conf.HTRACE_WEB_ADDRESS)
- hcl.hrpcAddr = cnf.Get(conf.HTRACE_HRPC_ADDRESS)
+ if testHooks != nil && testHooks.HrpcDisabled {
+ hcl.hrpcAddr = ""
+ } else {
+ hcl.hrpcAddr = cnf.Get(conf.HTRACE_HRPC_ADDRESS)
+ }
return &hcl, nil
}
+type TestHooks struct {
+ // If true, HRPC is disabled.
+ HrpcDisabled bool
+
+ // A function which gets called after we connect to the server and send the
+ // message frame, but before we write the message body.
+ HandleWriteRequestBody func()
+}
+
type Client struct {
// REST address of the htraced server.
restAddr string
// HRPC address of the htraced server.
hrpcAddr string
-}
-// Disable HRPC
-func (hcl *Client) DisableHrpc() {
- hcl.hrpcAddr = ""
+ // The test hooks to use, or nil if test hooks are not enabled.
+ testHooks *TestHooks
}
// Get the htraced server version information.
@@ -136,7 +146,7 @@ func (hcl *Client) WriteSpans(req *common.WriteSpansReq) error {
if hcl.hrpcAddr == "" {
return hcl.writeSpansHttp(req)
}
- hcr, err := newHClient(hcl.hrpcAddr)
+ hcr, err := newHClient(hcl.hrpcAddr, hcl.testHooks)
if err != nil {
return err
}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/hclient.go b/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
index 608dd59..ef79deb 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
@@ -38,6 +38,7 @@ type hClient struct {
type HrpcClientCodec struct {
rwc io.ReadWriteCloser
length uint32
+ testHooks *TestHooks
}
func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) error {
@@ -72,6 +73,9 @@ func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) erro
return errors.New(fmt.Sprintf("Error writing header bytes: %s",
err.Error()))
}
+ if cdc.testHooks != nil && cdc.testHooks.HandleWriteRequestBody != nil {
+ cdc.testHooks.HandleWriteRequestBody()
+ }
_, err = cdc.rwc.Write(buf)
if err != nil {
return errors.New(fmt.Sprintf("Error writing body bytes: %s",
@@ -136,14 +140,17 @@ func (cdc *HrpcClientCodec) Close() error {
return cdc.rwc.Close()
}
-func newHClient(hrpcAddr string) (*hClient, error) {
+func newHClient(hrpcAddr string, testHooks *TestHooks) (*hClient, error) {
hcr := hClient{}
conn, err := net.Dial("tcp", hrpcAddr)
if err != nil {
return nil, errors.New(fmt.Sprintf("Error contacting the HRPC server "+
"at %s: %s", hrpcAddr, err.Error()))
}
- hcr.rpcClient = rpc.NewClientWithCodec(&HrpcClientCodec{rwc: conn})
+ hcr.rpcClient = rpc.NewClientWithCodec(&HrpcClientCodec{
+ rwc: conn,
+ testHooks: testHooks,
+ })
return &hcr, nil
}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/common/log.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/log.go b/htrace-htraced/go/src/org/apache/htrace/common/log.go
index 4066094..8a26507 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/log.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/log.go
@@ -291,6 +291,10 @@ func (lg *Logger) ErrorEnabled() bool {
return lg.Level <= ERROR
}
+func (lg *Logger) LevelEnabled(level Level) bool {
+ return lg.Level <= level
+}
+
func (lg *Logger) Close() {
lg.sink.Unref()
lg.sink = nil
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
index d10f3af..511833c 100644
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
@@ -86,6 +86,16 @@ const HTRACE_REAPER_HEARTBEAT_PERIOD_MS = "reaper.heartbeat.period.ms"
// started.
const HTRACE_STARTUP_NOTIFICATION_ADDRESS = "startup.notification.address"
+// The maximum number of HRPC handler goroutines we will create at once. If
+// this is too small, we won't get enough concurrency; if it's too big, we will
+// buffer too much data in memory while waiting for the datastore to process
+// requests.
+const HTRACE_NUM_HRPC_HANDLERS = "num.hrpc.handlers"
+
+// The I/O timeout HRPC will use, in milliseconds. If it takes longer than
+// this to read or write a message, we will abort the connection.
+const HTRACE_HRPC_IO_TIMEOUT_MS = "hrpc.io.timeout.ms"
+
// Default values for HTrace configuration keys.
var DEFAULTS = map[string]string{
HTRACE_WEB_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT),
@@ -100,6 +110,8 @@ var DEFAULTS = map[string]string{
HTRACE_METRICS_MAX_ADDR_ENTRIES: "100000",
HTRACE_SPAN_EXPIRY_MS: "0",
HTRACE_REAPER_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 90*1000),
+ HTRACE_NUM_HRPC_HANDLERS: "20",
+ HTRACE_HRPC_IO_TIMEOUT_MS: "60000",
}
// Values to be used when creating test configurations
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
index 9a51cd4..e4f2151 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
@@ -22,12 +22,15 @@ package main
import (
"fmt"
"math/rand"
- htrace "org/apache/htrace/client"
"org/apache/htrace/common"
+ "org/apache/htrace/conf"
"org/apache/htrace/test"
"sort"
"testing"
"time"
+ "sync"
+ "sync/atomic"
+ htrace "org/apache/htrace/client"
)
func TestClientGetServerVersion(t *testing.T) {
@@ -39,7 +42,7 @@ func TestClientGetServerVersion(t *testing.T) {
}
defer ht.Close()
var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf())
+ hcl, err = htrace.NewClient(ht.ClientConf(), nil)
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
@@ -58,7 +61,7 @@ func TestClientGetServerDebugInfo(t *testing.T) {
}
defer ht.Close()
var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf())
+ hcl, err = htrace.NewClient(ht.ClientConf(), nil)
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
@@ -95,7 +98,7 @@ func TestClientOperations(t *testing.T) {
}
defer ht.Close()
var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf())
+ hcl, err = htrace.NewClient(ht.ClientConf(), nil)
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
@@ -185,7 +188,7 @@ func TestDumpAll(t *testing.T) {
}
defer ht.Close()
var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf())
+ hcl, err = htrace.NewClient(ht.ClientConf(), nil)
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
@@ -246,7 +249,7 @@ func TestClientGetServerConf(t *testing.T) {
}
defer ht.Close()
var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf())
+ hcl, err = htrace.NewClient(ht.ClientConf(), nil)
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
@@ -259,3 +262,121 @@ func TestClientGetServerConf(t *testing.T) {
EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE)
}
}
+
+const TEST_NUM_HRPC_HANDLERS = 2
+
+const TEST_NUM_WRITESPANS = 4
+
+// Tests that HRPC limits the number of simultaneous connections being processed.
+func TestHrpcAdmissionsControl(t *testing.T) {
+ var wg sync.WaitGroup
+ wg.Add(TEST_NUM_WRITESPANS)
+ var numConcurrentHrpcCalls int32
+ testHooks := &hrpcTestHooks {
+ HandleAdmission: func() {
+ defer wg.Done()
+ n := atomic.AddInt32(&numConcurrentHrpcCalls, 1)
+ if n > TEST_NUM_HRPC_HANDLERS {
+ t.Fatalf("The number of concurrent HRPC calls went above " +
+ "%d: it's at %d\n", TEST_NUM_HRPC_HANDLERS, n)
+ }
+ time.Sleep(1 * time.Millisecond)
+ n = atomic.AddInt32(&numConcurrentHrpcCalls, -1)
+ if n >= TEST_NUM_HRPC_HANDLERS {
+ t.Fatalf("The number of concurrent HRPC calls went above " +
+ "%d: it was at %d\n", TEST_NUM_HRPC_HANDLERS, n + 1)
+ }
+ },
+ }
+ htraceBld := &MiniHTracedBuilder{Name: "TestHrpcAdmissionsControl",
+ DataDirs: make([]string, 2),
+ Cnf: map[string]string{
+ conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS),
+ },
+ WrittenSpans: make(chan *common.Span, TEST_NUM_WRITESPANS),
+ HrpcTestHooks: testHooks,
+ }
+ ht, err := htraceBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create datastore: %s", err.Error())
+ }
+ defer ht.Close()
+ var hcl *htrace.Client
+ hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+ if err != nil {
+ t.Fatalf("failed to create client: %s", err.Error())
+ }
+ // Create some random trace spans.
+ allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
+ for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
+ go func(i int) {
+ err = hcl.WriteSpans(&common.WriteSpansReq{
+ Spans: allSpans[i:i+1],
+ })
+ if err != nil {
+ t.Fatalf("WriteSpans failed: %s\n", err.Error())
+ }
+ }(iter)
+ }
+ wg.Wait()
+ for i := 0; i < TEST_NUM_WRITESPANS; i++ {
+ <-ht.Store.WrittenSpans
+ }
+}
+
+// Tests that HRPC I/O timeouts work.
+func TestHrpcIoTimeout(t *testing.T) {
+ htraceBld := &MiniHTracedBuilder{Name: "TestHrpcIoTimeout",
+ DataDirs: make([]string, 2),
+ Cnf: map[string]string{
+ conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS),
+ conf.HTRACE_HRPC_IO_TIMEOUT_MS: "1",
+ },
+ }
+ ht, err := htraceBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create datastore: %s", err.Error())
+ }
+ defer ht.Close()
+ var hcl *htrace.Client
+ finishClient := make(chan interface{})
+ defer func() {
+ // Close the finishClient channel, if it hasn't already been closed.
+ defer func() {recover()}()
+ close(finishClient)
+ }()
+ testHooks := &htrace.TestHooks {
+ HandleWriteRequestBody: func() {
+ <-finishClient
+ },
+ }
+ hcl, err = htrace.NewClient(ht.ClientConf(), testHooks)
+ if err != nil {
+ t.Fatalf("failed to create client: %s", err.Error())
+ }
+ // Create some random trace spans.
+ allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
+ var wg sync.WaitGroup
+ wg.Add(TEST_NUM_WRITESPANS)
+ for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
+ go func(i int) {
+ defer wg.Done()
+ // Ignore the error return because there are internal retries in
+ // the client which will make this succeed eventually, usually.
+ // Keep in mind that we only block until we have seen
+ // TEST_NUM_WRITESPANS I/O errors in the HRPC server-- after that,
+ // we let requests through so that the test can exit cleanly.
+ hcl.WriteSpans(&common.WriteSpansReq{
+ Spans: allSpans[i:i+1],
+ })
+ }(iter)
+ }
+ for {
+ if ht.Hsv.GetNumIoErrors() >= TEST_NUM_WRITESPANS {
+ break
+ }
+ time.Sleep(1000 * time.Nanosecond)
+ }
+ close(finishClient)
+ wg.Wait()
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index ab2747b..a4bb320 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -289,6 +289,7 @@ func (shd *shard) writeSpan(ispan *IncomingSpan) error {
}
shd.mtxMap.IncrementWritten(ispan.Addr, shd.maxMtx, shd.store.lg)
if shd.store.WrittenSpans != nil {
+ shd.store.lg.Errorf("WATERMELON: Sending span to shd.store.WrittenSpans\n")
shd.store.WrittenSpans <- span
}
return nil
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index 576ee0b..0443834 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -410,7 +410,7 @@ func TestReloadDataStore(t *testing.T) {
}
}()
var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf())
+ hcl, err = htrace.NewClient(ht.ClientConf(), nil)
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
@@ -444,7 +444,7 @@ func TestReloadDataStore(t *testing.T) {
if err != nil {
t.Fatalf("failed to re-create datastore: %s", err.Error())
}
- hcl, err = htrace.NewClient(ht.ClientConf())
+ hcl, err = htrace.NewClient(ht.ClientConf(), nil)
if err != nil {
t.Fatalf("failed to re-create client: %s", err.Error())
}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
index 0d72602..a0f2e81 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
@@ -33,9 +33,13 @@ import (
"org/apache/htrace/common"
"org/apache/htrace/conf"
"reflect"
+ "sync"
+ "sync/atomic"
"time"
)
+const MAX_HRPC_HANDLERS = 32765
+
// Handles HRPC calls
type HrpcHandler struct {
lg *common.Logger
@@ -46,14 +50,57 @@ type HrpcHandler struct {
type HrpcServer struct {
*rpc.Server
hand *HrpcHandler
+
+ // The listener we are using to accept new connections.
listener net.Listener
+
+ // A WaitGroup used to block until the HRPC server has exited.
+ exited sync.WaitGroup
+
+ // A channel containing server codecs to use. This channel is fully
+ // buffered. The number of entries it initially contains determines how
+ // many concurrent codecs we will have running at once.
+ cdcs chan *HrpcServerCodec
+
+ // Used to shut down
+ shutdown chan interface{}
+
+ // The I/O timeout to use when reading requests or sending responses. This
+ // timeout does not apply to the time we spend processing the message.
+ ioTimeo time.Duration
+
+ // A count of all I/O errors that we have encountered since the server
+ // started. This counts errors like improperly formatted message frames,
+ // but not errors like properly formatted but invalid messages.
+ // This count is updated from multiple goroutines via sync/atomic.
+ ioErrorCount uint64
+
+ // The test hooks to use, or nil during normal operation.
+ testHooks *hrpcTestHooks
+}
+
+type hrpcTestHooks struct {
+ // A callback we make right after calling Accept() but before reading from
+ // the new connection.
+ HandleAdmission func()
}
-// Codec which encodes HRPC data via JSON
+// A codec which encodes HRPC data via JSON. This structure holds the context
+// for a particular client connection.
type HrpcServerCodec struct {
lg *common.Logger
+
+ // The current connection.
conn net.Conn
+
+ // The HrpcServer which this connection is part of.
+ hsv *HrpcServer
+
+ // The message length we read from the header.
length uint32
+
+ // The number of messages this connection has handled.
+ numHandled int
}
func asJson(val interface{}) string {
@@ -64,45 +111,51 @@ func asJson(val interface{}) string {
return string(js)
}
-func createErrAndWarn(lg *common.Logger, val string) error {
- return createErrAndLog(lg, val, common.WARN)
+func newIoErrorWarn(cdc *HrpcServerCodec, val string) error {
+ return newIoError(cdc, val, common.WARN)
}
-func createErrAndLog(lg *common.Logger, val string, level common.Level) error {
- lg.Write(level, val+"\n")
+func newIoError(cdc *HrpcServerCodec, val string, level common.Level) error {
+ if cdc.lg.LevelEnabled(level) {
+ cdc.lg.Write(level, cdc.conn.RemoteAddr().String() + ": " + val + "\n")
+ }
+ if level >= common.INFO {
+ atomic.AddUint64(&cdc.hsv.ioErrorCount, 1)
+ }
return errors.New(val)
}
func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
hdr := common.HrpcRequestHeader{}
if cdc.lg.TraceEnabled() {
- cdc.lg.Tracef("Reading HRPC request header from %s\n", cdc.conn.RemoteAddr())
+ cdc.lg.Tracef("%s: Reading HRPC request header.\n", cdc.conn.RemoteAddr())
}
+ cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
err := binary.Read(cdc.conn, binary.LittleEndian, &hdr)
if err != nil {
- level := common.WARN
- if err == io.EOF {
- level = common.DEBUG
+ if err == io.EOF && cdc.numHandled > 0 {
+ return newIoError(cdc, fmt.Sprintf("Remote closed connection " +
+ "after writing %d message(s)", cdc.numHandled), common.DEBUG)
}
- return createErrAndLog(cdc.lg, fmt.Sprintf("Error reading header bytes: %s",
- err.Error()), level)
+ return newIoError(cdc,
+ fmt.Sprintf("Error reading request header: %s", err.Error()), common.WARN)
}
if cdc.lg.TraceEnabled() {
- cdc.lg.Tracef("Read HRPC request header %s from %s\n",
- asJson(&hdr), cdc.conn.RemoteAddr())
+ cdc.lg.Tracef("%s: Read HRPC request header %s\n",
+ cdc.conn.RemoteAddr(), asJson(&hdr))
}
if hdr.Magic != common.HRPC_MAGIC {
- return createErrAndWarn(cdc.lg, fmt.Sprintf("Invalid request header: expected "+
+ return newIoErrorWarn(cdc, fmt.Sprintf("Invalid request header: expected "+
"magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic))
}
if hdr.Length > common.MAX_HRPC_BODY_LENGTH {
- return createErrAndWarn(cdc.lg, fmt.Sprintf("Length prefix was too long. "+
+ return newIoErrorWarn(cdc, fmt.Sprintf("Length prefix was too long. "+
"Maximum length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH,
hdr.Length))
}
req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
if req.ServiceMethod == "" {
- return createErrAndWarn(cdc.lg, fmt.Sprintf("Unknown MethodID code 0x%04x",
+ return newIoErrorWarn(cdc, fmt.Sprintf("Unknown MethodID code 0x%04x",
hdr.MethodId))
}
req.Seq = hdr.Seq
@@ -111,34 +164,36 @@ func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
}
func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
- remoteAddr := cdc.conn.RemoteAddr()
if cdc.lg.TraceEnabled() {
- cdc.lg.Tracef("Reading HRPC %d-byte request body from %s\n",
- cdc.length, remoteAddr)
+ cdc.lg.Tracef("%s: Reading HRPC %d-byte request body.\n",
+ cdc.conn.RemoteAddr(), cdc.length)
}
mh := new(codec.MsgpackHandle)
mh.WriteExt = true
dec := codec.NewDecoder(io.LimitReader(cdc.conn, int64(cdc.length)), mh)
err := dec.Decode(body)
if err != nil {
- return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to read request "+
- "body from %s: %s", remoteAddr, err.Error()))
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to read %d-byte " +
+ "request body: %s", cdc.length, err.Error()))
}
if cdc.lg.TraceEnabled() {
- cdc.lg.Tracef("Read body from %s: %s\n",
- remoteAddr, asJson(&body))
+ cdc.lg.Tracef("%s: read %d-byte request body %s\n",
+ cdc.conn.RemoteAddr(), cdc.length, asJson(&body))
}
val := reflect.ValueOf(body)
addr := val.Elem().FieldByName("Addr")
if addr.IsValid() {
- addr.SetString(remoteAddr.String())
+ addr.SetString(cdc.conn.RemoteAddr().String())
}
+ var zeroTime time.Time
+ cdc.conn.SetDeadline(zeroTime)
return nil
}
var EMPTY []byte = make([]byte, 0)
func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) error {
+ cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
var err error
buf := EMPTY
if msg != nil {
@@ -148,7 +203,7 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) e
enc := codec.NewEncoder(w, mh)
err := enc.Encode(msg)
if err != nil {
- return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to marshal "+
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to marshal "+
"response message: %s", err.Error()))
}
buf = w.Bytes()
@@ -161,13 +216,13 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) e
writer := bufio.NewWriterSize(cdc.conn, 256)
err = binary.Write(writer, binary.LittleEndian, &hdr)
if err != nil {
- return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write response "+
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+
"header: %s", err.Error()))
}
if hdr.ErrLength > 0 {
_, err = io.WriteString(writer, resp.Error)
if err != nil {
- return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write error "+
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write error "+
"string: %s", err.Error()))
}
}
@@ -175,24 +230,30 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) e
var length int
length, err = writer.Write(buf)
if err != nil {
- return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write response "+
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+
"message: %s", err.Error()))
}
if uint32(length) != hdr.Length {
- return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write all of "+
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write all of "+
"response message: %s", err.Error()))
}
}
err = writer.Flush()
if err != nil {
- return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write the response "+
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write the response "+
"bytes: %s", err.Error()))
}
+ cdc.numHandled++
return nil
}
func (cdc *HrpcServerCodec) Close() error {
- return cdc.conn.Close()
+ err := cdc.conn.Close()
+ cdc.conn = nil
+ cdc.length = 0
+ cdc.numHandled = 0
+ cdc.hsv.cdcs <- cdc
+ return err
}
func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
@@ -228,14 +289,36 @@ func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
return nil
}
-func CreateHrpcServer(cnf *conf.Config, store *dataStore) (*HrpcServer, error) {
+func CreateHrpcServer(cnf *conf.Config, store *dataStore,
+ testHooks *hrpcTestHooks) (*HrpcServer, error) {
lg := common.NewLogger("hrpc", cnf)
+ numHandlers := cnf.GetInt(conf.HTRACE_NUM_HRPC_HANDLERS)
+ if numHandlers < 1 {
+ lg.Warnf("%s must be positive: using 1 handler.\n", conf.HTRACE_NUM_HRPC_HANDLERS)
+ numHandlers = 1
+ }
+ if numHandlers > MAX_HRPC_HANDLERS {
+ lg.Warnf("%s cannot be more than %d: using %d handlers\n",
+ conf.HTRACE_NUM_HRPC_HANDLERS, MAX_HRPC_HANDLERS, MAX_HRPC_HANDLERS)
+ numHandlers = MAX_HRPC_HANDLERS
+ }
hsv := &HrpcServer{
Server: rpc.NewServer(),
hand: &HrpcHandler{
lg: lg,
store: store,
},
+ cdcs: make(chan *HrpcServerCodec, numHandlers),
+ shutdown: make(chan interface{}),
+ ioTimeo: time.Millisecond *
+ time.Duration(cnf.GetInt64(conf.HTRACE_HRPC_IO_TIMEOUT_MS)),
+ testHooks: testHooks,
+ }
+ for i := 0; i < numHandlers; i++ {
+ hsv.cdcs <- &HrpcServerCodec{
+ lg: lg,
+ hsv: hsv,
+ }
}
var err error
hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS))
@@ -243,26 +326,42 @@ func CreateHrpcServer(cnf *conf.Config, store *dataStore) (*HrpcServer, error) {
return nil, err
}
hsv.Server.Register(hsv.hand)
+ hsv.exited.Add(1)
go hsv.run()
- lg.Infof("Started HRPC server on %s...\n", hsv.listener.Addr().String())
+ lg.Infof("Started HRPC server on %s with %d handler routines. " +
+ "ioTimeo=%s.\n", hsv.listener.Addr().String(), numHandlers,
+ hsv.ioTimeo.String())
return hsv, nil
}
func (hsv *HrpcServer) run() {
lg := hsv.hand.lg
+ srvAddr := hsv.listener.Addr().String()
+ defer func() {
+ lg.Infof("HrpcServer on %s exiting\n", srvAddr)
+ hsv.exited.Done()
+ }()
for {
- conn, err := hsv.listener.Accept()
- if err != nil {
- lg.Errorf("HRPC Accept error: %s\n", err.Error())
- continue
- }
- if lg.TraceEnabled() {
- lg.Tracef("Accepted HRPC connection from %s\n", conn.RemoteAddr())
+ select {
+ case cdc:=<-hsv.cdcs:
+ conn, err := hsv.listener.Accept()
+ if err != nil {
+ lg.Errorf("HrpcServer on %s got accept error: %s\n", srvAddr, err.Error())
+ hsv.cdcs<-cdc // never blocks; there is always sufficient buffer space
+ continue
+ }
+ if lg.TraceEnabled() {
+ lg.Tracef("%s: Accepted HRPC connection.\n", conn.RemoteAddr())
+ }
+ cdc.conn = conn
+ cdc.numHandled = 0
+ if hsv.testHooks != nil && hsv.testHooks.HandleAdmission != nil {
+ hsv.testHooks.HandleAdmission()
+ }
+ go hsv.ServeCodec(cdc)
+ case <-hsv.shutdown:
+ return
}
- go hsv.ServeCodec(&HrpcServerCodec{
- lg: lg,
- conn: conn,
- })
}
}
@@ -270,6 +369,12 @@ func (hsv *HrpcServer) Addr() net.Addr {
return hsv.listener.Addr()
}
+func (hsv *HrpcServer) GetNumIoErrors() uint64 {
+ return atomic.LoadUint64(&hsv.ioErrorCount)
+}
+
func (hsv *HrpcServer) Close() {
+ close(hsv.shutdown)
hsv.listener.Close()
+ hsv.exited.Wait()
}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
index 97b72ca..5b0dfc6 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
@@ -110,7 +110,7 @@ func main() {
}
var hsv *HrpcServer
if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
- hsv, err = CreateHrpcServer(cnf, store)
+ hsv, err = CreateHrpcServer(cnf, store, nil)
if err != nil {
lg.Errorf("Error creating HRPC server: %s\n", err.Error())
os.Exit(1)
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
index 48c20f0..5243d9e 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
@@ -205,13 +205,12 @@ func testIngestedSpansMetricsImpl(t *testing.T, usePacked bool) {
}
defer ht.Close()
var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf())
+ hcl, err = htrace.NewClient(ht.ClientConf(), &htrace.TestHooks {
+ HrpcDisabled: !usePacked,
+ })
if err != nil {
t.Fatalf("failed to create client: %s", err.Error())
}
- if !usePacked {
- hcl.DisableHrpc()
- }
NUM_TEST_SPANS := 12
allSpans := createRandomTestSpans(NUM_TEST_SPANS)
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
index a50799a..353beae 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
@@ -55,6 +55,9 @@ type MiniHTracedBuilder struct {
// If non-null, the WrittenSpans channel to use when creating the DataStore.
WrittenSpans chan *common.Span
+
+ // The test hooks to use for the HRPC server
+ HrpcTestHooks *hrpcTestHooks
}
type MiniHTraced struct {
@@ -141,7 +144,7 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) {
return nil, err
}
rstListener = nil
- hsv, err = CreateHrpcServer(cnf, store)
+ hsv, err = CreateHrpcServer(cnf, store, bld.HrpcTestHooks)
if err != nil {
return nil, err
}
@@ -175,6 +178,7 @@ func (ht *MiniHTraced) RestOnlyClientConf() *conf.Config {
func (ht *MiniHTraced) Close() {
ht.Lg.Infof("Closing MiniHTraced %s\n", ht.Name)
ht.Rsv.Close()
+ ht.Hsv.Close()
ht.Store.Close()
if !ht.KeepDataDirsOnClose {
for idx := range ht.DataDirs {
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
index 394e1c1..7b5e433 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
@@ -123,7 +123,7 @@ func main() {
}
// Create HTrace client
- hcl, err := htrace.NewClient(cnf)
+ hcl, err := htrace.NewClient(cnf, nil)
if err != nil {
fmt.Printf("Failed to create HTrace client: %s\n", err.Error())
os.Exit(EXIT_FAILURE)