You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by iw...@apache.org on 2016/04/20 01:32:48 UTC
[5/7] incubator-htrace git commit: HTRACE-357. Rename
htrace-htraced/go/src/org/apache/htrace to htrace-htraced/go/src/htrace
(Colin Patrick McCabe via iwasakims)
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/heartbeater_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/heartbeater_test.go b/htrace-htraced/go/src/htrace/htraced/heartbeater_test.go
new file mode 100644
index 0000000..9157965
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/heartbeater_test.go
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "htrace/common"
+ "htrace/conf"
+ "testing"
+ "time"
+)
+
+func TestHeartbeaterStartupShutdown(t *testing.T) {
+ cnfBld := conf.Builder{
+ Values: conf.TEST_VALUES(),
+ Defaults: conf.DEFAULTS,
+ }
+ cnf, err := cnfBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create conf: %s", err.Error())
+ }
+ lg := common.NewLogger("heartbeater", cnf)
+ hb := NewHeartbeater("ExampleHeartbeater", 1, lg)
+ if hb.String() != "ExampleHeartbeater" {
+ t.Fatalf("hb.String() returned %s instead of %s\n", hb.String(), "ExampleHeartbeater")
+ }
+ hb.Shutdown()
+}
+
+// The number of milliseconds between heartbeats
+const HEARTBEATER_PERIOD = 5
+
+// The number of heartbeats to send in the test.
+const NUM_TEST_HEARTBEATS = 3
+
+func TestHeartbeaterSendsHeartbeats(t *testing.T) {
+ cnfBld := conf.Builder{
+ Values: conf.TEST_VALUES(),
+ Defaults: conf.DEFAULTS,
+ }
+ cnf, err := cnfBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create conf: %s", err.Error())
+ }
+ lg := common.NewLogger("heartbeater", cnf)
+ // The minimum amount of time which the heartbeater test should take
+ MINIMUM_TEST_DURATION := time.Millisecond * (NUM_TEST_HEARTBEATS * HEARTBEATER_PERIOD)
+ duration := MINIMUM_TEST_DURATION
+ for duration <= MINIMUM_TEST_DURATION {
+ start := time.Now()
+ testHeartbeaterSendsHeartbeatsImpl(t, lg)
+ end := time.Now()
+ duration = end.Sub(start)
+ lg.Debugf("Measured duration: %v; minimum expected duration: %v\n",
+ duration, MINIMUM_TEST_DURATION)
+ }
+}
+
+func testHeartbeaterSendsHeartbeatsImpl(t *testing.T, lg *common.Logger) {
+ hb := NewHeartbeater("ExampleHeartbeater", HEARTBEATER_PERIOD, lg)
+ if hb.String() != "ExampleHeartbeater" {
+ t.Fatalf("hb.String() returned %s instead of %s\n", hb.String(), "ExampleHeartbeater")
+ }
+ testChan := make(chan interface{}, NUM_TEST_HEARTBEATS)
+ gotAllHeartbeats := make(chan bool)
+ hb.AddHeartbeatTarget(&HeartbeatTarget{
+ name: "ExampleHeartbeatTarget",
+ targetChan: testChan,
+ })
+ go func() {
+ for i := 0; i < NUM_TEST_HEARTBEATS; i++ {
+ <-testChan
+ }
+ gotAllHeartbeats <- true
+ for i := 0; i < NUM_TEST_HEARTBEATS; i++ {
+ _, open := <-testChan
+ if !open {
+ return
+ }
+ }
+ }()
+ <-gotAllHeartbeats
+ hb.Shutdown()
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/hrpc.go b/htrace-htraced/go/src/htrace/htraced/hrpc.go
new file mode 100644
index 0000000..8b5a728
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/hrpc.go
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "bufio"
+ "bytes"
+ "encoding/binary"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "github.com/ugorji/go/codec"
+ "htrace/common"
+ "htrace/conf"
+ "io"
+ "net"
+ "net/rpc"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+const MAX_HRPC_HANDLERS = 32765
+
+// Handles HRPC calls
+type HrpcHandler struct {
+ lg *common.Logger
+ store *dataStore
+}
+
+// The HRPC server
+type HrpcServer struct {
+ *rpc.Server
+ hand *HrpcHandler
+
+ // The listener we are using to accept new connections.
+ listener net.Listener
+
+ // A WaitGroup used to block until the HRPC server has exited.
+ exited sync.WaitGroup
+
+ // A channel containing server codecs to use. This channel is fully
+ // buffered. The number of entries it initially contains determines how
+ // many concurrent codecs we will have running at once.
+ cdcs chan *HrpcServerCodec
+
+ // Used to shut down
+ shutdown chan interface{}
+
+ // The I/O timeout to use when reading requests or sending responses. This
+ // timeout does not apply to the time we spend processing the message.
+ ioTimeo time.Duration
+
+ // A count of all I/O errors that we have encountered since the server
+ // started. This counts errors like improperly formatted message frames,
+ // but not errors like properly formatted but invalid messages.
+ // This count is updated from multiple goroutines via sync/atomic.
+ ioErrorCount uint64
+
+ // The test hooks to use, or nil during normal operation.
+ testHooks *hrpcTestHooks
+}
+
+type hrpcTestHooks struct {
+ // A callback we make right after calling Accept() but before reading from
+ // the new connection.
+ HandleAdmission func()
+}
+
+// A codec which encodes HRPC data via JSON. This structure holds the context
+// for a particular client connection.
+type HrpcServerCodec struct {
+ lg *common.Logger
+
+ // The current connection.
+ conn net.Conn
+
+ // The HrpcServer which this connection is part of.
+ hsv *HrpcServer
+
+ // The message length we read from the header.
+ length uint32
+
+ // The number of messages this connection has handled.
+ numHandled int
+
+ // The buffer for reading requests. These buffers are reused for multiple
+ // requests to avoid allocating memory.
+ buf []byte
+
+ // Configuration for msgpack decoding
+ msgpackHandle codec.MsgpackHandle
+}
+
+func asJson(val interface{}) string {
+ js, err := json.Marshal(val)
+ if err != nil {
+ return "encoding error: " + err.Error()
+ }
+ return string(js)
+}
+
+func newIoErrorWarn(cdc *HrpcServerCodec, val string) error {
+ return newIoError(cdc, val, common.WARN)
+}
+
+func newIoError(cdc *HrpcServerCodec, val string, level common.Level) error {
+ if cdc.lg.LevelEnabled(level) {
+ cdc.lg.Write(level, cdc.conn.RemoteAddr().String()+": "+val+"\n")
+ }
+ if level >= common.INFO {
+ atomic.AddUint64(&cdc.hsv.ioErrorCount, 1)
+ }
+ return errors.New(val)
+}
+
+func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
+ hdr := common.HrpcRequestHeader{}
+ if cdc.lg.TraceEnabled() {
+ cdc.lg.Tracef("%s: Reading HRPC request header.\n", cdc.conn.RemoteAddr())
+ }
+ cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
+ err := binary.Read(cdc.conn, binary.LittleEndian, &hdr)
+ if err != nil {
+ if err == io.EOF && cdc.numHandled > 0 {
+ return newIoError(cdc, fmt.Sprintf("Remote closed connection "+
+ "after writing %d message(s)", cdc.numHandled), common.DEBUG)
+ }
+ return newIoError(cdc,
+ fmt.Sprintf("Error reading request header: %s", err.Error()), common.WARN)
+ }
+ if cdc.lg.TraceEnabled() {
+ cdc.lg.Tracef("%s: Read HRPC request header %s\n",
+ cdc.conn.RemoteAddr(), asJson(&hdr))
+ }
+ if hdr.Magic != common.HRPC_MAGIC {
+ return newIoErrorWarn(cdc, fmt.Sprintf("Invalid request header: expected "+
+ "magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic))
+ }
+ if hdr.Length > common.MAX_HRPC_BODY_LENGTH {
+ return newIoErrorWarn(cdc, fmt.Sprintf("Length prefix was too long. "+
+ "Maximum length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH,
+ hdr.Length))
+ }
+ req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
+ if req.ServiceMethod == "" {
+ return newIoErrorWarn(cdc, fmt.Sprintf("Unknown MethodID code 0x%04x",
+ hdr.MethodId))
+ }
+ req.Seq = hdr.Seq
+ cdc.length = hdr.Length
+ return nil
+}
+
+func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
+ remoteAddr := cdc.conn.RemoteAddr().String()
+ if cdc.lg.TraceEnabled() {
+ cdc.lg.Tracef("%s: Reading HRPC %d-byte request body.\n",
+ remoteAddr, cdc.length)
+ }
+ if cap(cdc.buf) < int(cdc.length) {
+ var pow uint
+ for pow = 0; (1 << pow) < int(cdc.length); pow++ {
+ }
+ cdc.buf = make([]byte, 0, 1<<pow)
+ }
+ _, err := io.ReadFull(cdc.conn, cdc.buf[:cdc.length])
+ if err != nil {
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to read %d-byte "+
+ "request body: %s", cdc.length, err.Error()))
+ }
+ var zeroTime time.Time
+ cdc.conn.SetDeadline(zeroTime)
+
+ dec := codec.NewDecoderBytes(cdc.buf[:cdc.length], &cdc.msgpackHandle)
+ err = dec.Decode(body)
+ if cdc.lg.TraceEnabled() {
+ cdc.lg.Tracef("%s: read HRPC message: %s\n",
+ remoteAddr, asJson(&body))
+ }
+ req := body.(*common.WriteSpansReq)
+ if req == nil {
+ return nil
+ }
+ // We decode WriteSpans requests in a streaming fashion, to avoid overloading the garbage
+ // collector with a ton of trace spans all at once.
+ startTime := time.Now()
+ client, _, err := net.SplitHostPort(remoteAddr)
+ if err != nil {
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to split host and port "+
+ "for %s: %s\n", remoteAddr, err.Error()))
+ }
+ hand := cdc.hsv.hand
+ ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid)
+ for spanIdx := 0; spanIdx < req.NumSpans; spanIdx++ {
+ var span *common.Span
+ err := dec.Decode(&span)
+ if err != nil {
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to decode span %d "+
+ "out of %d: %s\n", spanIdx, req.NumSpans, err.Error()))
+ }
+ ing.IngestSpan(span)
+ }
+ ing.Close(startTime)
+ return nil
+}
+
+var EMPTY []byte = make([]byte, 0)
+
+func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) error {
+ cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
+ var err error
+ buf := EMPTY
+ if msg != nil {
+ w := bytes.NewBuffer(make([]byte, 0, 128))
+ enc := codec.NewEncoder(w, &cdc.msgpackHandle)
+ err := enc.Encode(msg)
+ if err != nil {
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to marshal "+
+ "response message: %s", err.Error()))
+ }
+ buf = w.Bytes()
+ }
+ hdr := common.HrpcResponseHeader{}
+ hdr.MethodId = common.HrpcMethodNameToId(resp.ServiceMethod)
+ hdr.Seq = resp.Seq
+ hdr.ErrLength = uint32(len(resp.Error))
+ hdr.Length = uint32(len(buf))
+ writer := bufio.NewWriterSize(cdc.conn, 256)
+ err = binary.Write(writer, binary.LittleEndian, &hdr)
+ if err != nil {
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+
+ "header: %s", err.Error()))
+ }
+ if hdr.ErrLength > 0 {
+ _, err = io.WriteString(writer, resp.Error)
+ if err != nil {
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write error "+
+ "string: %s", err.Error()))
+ }
+ }
+ if hdr.Length > 0 {
+ var length int
+ length, err = writer.Write(buf)
+ if err != nil {
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+
+ "message: %s", err.Error()))
+ }
+ if uint32(length) != hdr.Length {
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write all of "+
+ "response message: %s", err.Error()))
+ }
+ }
+ err = writer.Flush()
+ if err != nil {
+ return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write the response "+
+ "bytes: %s", err.Error()))
+ }
+ cdc.numHandled++
+ return nil
+}
+
+func (cdc *HrpcServerCodec) Close() error {
+ err := cdc.conn.Close()
+ cdc.conn = nil
+ cdc.length = 0
+ cdc.numHandled = 0
+ cdc.hsv.cdcs <- cdc
+ return err
+}
+
+func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
+ resp *common.WriteSpansResp) (err error) {
+ // Nothing to do here; WriteSpans is handled in ReadRequestBody.
+ return nil
+}
+
+func CreateHrpcServer(cnf *conf.Config, store *dataStore,
+ testHooks *hrpcTestHooks) (*HrpcServer, error) {
+ lg := common.NewLogger("hrpc", cnf)
+ numHandlers := cnf.GetInt(conf.HTRACE_NUM_HRPC_HANDLERS)
+ if numHandlers < 1 {
+ lg.Warnf("%s must be positive: using 1 handler.\n", conf.HTRACE_NUM_HRPC_HANDLERS)
+ numHandlers = 1
+ }
+ if numHandlers > MAX_HRPC_HANDLERS {
+ lg.Warnf("%s cannot be more than %d: using %d handlers\n",
+ conf.HTRACE_NUM_HRPC_HANDLERS, MAX_HRPC_HANDLERS, MAX_HRPC_HANDLERS)
+ numHandlers = MAX_HRPC_HANDLERS
+ }
+ hsv := &HrpcServer{
+ Server: rpc.NewServer(),
+ hand: &HrpcHandler{
+ lg: lg,
+ store: store,
+ },
+ cdcs: make(chan *HrpcServerCodec, numHandlers),
+ shutdown: make(chan interface{}),
+ ioTimeo: time.Millisecond *
+ time.Duration(cnf.GetInt64(conf.HTRACE_HRPC_IO_TIMEOUT_MS)),
+ testHooks: testHooks,
+ }
+ for i := 0; i < numHandlers; i++ {
+ hsv.cdcs <- &HrpcServerCodec{
+ lg: lg,
+ hsv: hsv,
+ msgpackHandle: codec.MsgpackHandle{
+ WriteExt: true,
+ },
+ }
+ }
+ var err error
+ hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS))
+ if err != nil {
+ return nil, err
+ }
+ hsv.Server.Register(hsv.hand)
+ hsv.exited.Add(1)
+ go hsv.run()
+ lg.Infof("Started HRPC server on %s with %d handler routines. "+
+ "ioTimeo=%s.\n", hsv.listener.Addr().String(), numHandlers,
+ hsv.ioTimeo.String())
+ return hsv, nil
+}
+
+func (hsv *HrpcServer) run() {
+ lg := hsv.hand.lg
+ srvAddr := hsv.listener.Addr().String()
+ defer func() {
+ lg.Infof("HrpcServer on %s exiting\n", srvAddr)
+ hsv.exited.Done()
+ }()
+ for {
+ select {
+ case cdc := <-hsv.cdcs:
+ conn, err := hsv.listener.Accept()
+ if err != nil {
+ lg.Errorf("HrpcServer on %s got accept error: %s\n", srvAddr, err.Error())
+ hsv.cdcs <- cdc // never blocks; there is always sufficient buffer space
+ continue
+ }
+ if lg.TraceEnabled() {
+ lg.Tracef("%s: Accepted HRPC connection.\n", conn.RemoteAddr())
+ }
+ cdc.conn = conn
+ cdc.numHandled = 0
+ if hsv.testHooks != nil && hsv.testHooks.HandleAdmission != nil {
+ hsv.testHooks.HandleAdmission()
+ }
+ go hsv.ServeCodec(cdc)
+ case <-hsv.shutdown:
+ return
+ }
+ }
+}
+
+func (hsv *HrpcServer) Addr() net.Addr {
+ return hsv.listener.Addr()
+}
+
+func (hsv *HrpcServer) GetNumIoErrors() uint64 {
+ return atomic.LoadUint64(&hsv.ioErrorCount)
+}
+
+func (hsv *HrpcServer) Close() {
+ close(hsv.shutdown)
+ hsv.listener.Close()
+ hsv.exited.Wait()
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/htraced.go b/htrace-htraced/go/src/htrace/htraced/htraced.go
new file mode 100644
index 0000000..0d41e0d
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/htraced.go
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "bufio"
+ "encoding/json"
+ "fmt"
+ "github.com/alecthomas/kingpin"
+ "github.com/jmhodges/levigo"
+ "htrace/common"
+ "htrace/conf"
+ "net"
+ "os"
+ "runtime"
+ "time"
+)
+
+var RELEASE_VERSION string
+var GIT_VERSION string
+
+const USAGE = `htraced: the HTrace server daemon.
+
+htraced receives trace spans sent from HTrace clients. It exposes a REST
+interface which others can query. It also runs a web server with a graphical
+user interface. htraced stores its span data in levelDB files on the local
+disks.
+
+Usage:
+--help: this help message
+
+-Dk=v: set configuration key 'k' to value 'v'
+For example -Dweb.address=127.0.0.1:8080 sets the web address to localhost,
+port 8080. -Dlog.level=DEBUG will set the default log level to DEBUG.
+
+-Dk: set configuration key 'k' to 'true'
+
+Normally, configuration options should be set in the ` + conf.CONFIG_FILE_NAME + `
+configuration file. We find this file by searching the paths in the
+` + conf.HTRACED_CONF_DIR + `. The command-line options are just an alternate way
+of setting configuration when launching the daemon.
+`
+
+func main() {
+ // Load the htraced configuration.
+ // This also parses the -Dfoo=bar command line arguments and removes them
+ // from os.Argv.
+ cnf, cnfLog := conf.LoadApplicationConfig("htraced.")
+
+ // Parse the remaining command-line arguments.
+ app := kingpin.New(os.Args[0], USAGE)
+ version := app.Command("version", "Print server version and exit.")
+ cmd := kingpin.MustParse(app.Parse(os.Args[1:]))
+
+ // Handle the "version" command-line argument.
+ if cmd == version.FullCommand() {
+ fmt.Printf("Running htraced %s [%s].\n", RELEASE_VERSION, GIT_VERSION)
+ os.Exit(0)
+ }
+
+ // Open the HTTP port.
+ // We want to do this first, before initializing the datastore or setting up
+ // logging. That way, if someone accidentally starts two daemons with the
+ // same config file, the second invocation will exit with a "port in use"
+ // error rather than potentially disrupting the first invocation.
+ rstListener, listenErr := net.Listen("tcp", cnf.Get(conf.HTRACE_WEB_ADDRESS))
+ if listenErr != nil {
+ fmt.Fprintf(os.Stderr, "Error opening HTTP port: %s\n",
+ listenErr.Error())
+ os.Exit(1)
+ }
+
+ // Print out the startup banner and information about the daemon
+ // configuration.
+ lg := common.NewLogger("main", cnf)
+ defer lg.Close()
+ lg.Infof("*** Starting htraced %s [%s]***\n", RELEASE_VERSION, GIT_VERSION)
+ scanner := bufio.NewScanner(cnfLog)
+ for scanner.Scan() {
+ lg.Infof(scanner.Text() + "\n")
+ }
+ common.InstallSignalHandlers(cnf)
+ if runtime.GOMAXPROCS(0) == 1 {
+ ncpu := runtime.NumCPU()
+ runtime.GOMAXPROCS(ncpu)
+ lg.Infof("setting GOMAXPROCS=%d\n", ncpu)
+ } else {
+ lg.Infof("GOMAXPROCS=%d\n", runtime.GOMAXPROCS(0))
+ }
+ lg.Infof("leveldb version=%d.%d\n",
+ levigo.GetLevelDBMajorVersion(), levigo.GetLevelDBMinorVersion())
+
+ // Initialize the datastore.
+ store, err := CreateDataStore(cnf, nil)
+ if err != nil {
+ lg.Errorf("Error creating datastore: %s\n", err.Error())
+ os.Exit(1)
+ }
+ var rsv *RestServer
+ rsv, err = CreateRestServer(cnf, store, rstListener)
+ if err != nil {
+ lg.Errorf("Error creating REST server: %s\n", err.Error())
+ os.Exit(1)
+ }
+ var hsv *HrpcServer
+ if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
+ hsv, err = CreateHrpcServer(cnf, store, nil)
+ if err != nil {
+ lg.Errorf("Error creating HRPC server: %s\n", err.Error())
+ os.Exit(1)
+ }
+ } else {
+ lg.Infof("Not starting HRPC server because no value was given for %s.\n",
+ conf.HTRACE_HRPC_ADDRESS)
+ }
+ naddr := cnf.Get(conf.HTRACE_STARTUP_NOTIFICATION_ADDRESS)
+ if naddr != "" {
+ notif := StartupNotification{
+ HttpAddr: rsv.Addr().String(),
+ ProcessId: os.Getpid(),
+ }
+ if hsv != nil {
+ notif.HrpcAddr = hsv.Addr().String()
+ }
+ err = sendStartupNotification(naddr, ¬if)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Failed to send startup notification: "+
+ "%s\n", err.Error())
+ os.Exit(1)
+ }
+ }
+ for {
+ time.Sleep(time.Duration(10) * time.Hour)
+ }
+}
+
+// A startup notification message that we optionally send on startup.
+// Used by unit tests.
+type StartupNotification struct {
+ HttpAddr string
+ HrpcAddr string
+ ProcessId int
+}
+
+func sendStartupNotification(naddr string, notif *StartupNotification) error {
+ conn, err := net.Dial("tcp", naddr)
+ if err != nil {
+ return err
+ }
+ defer func() {
+ if conn != nil {
+ conn.Close()
+ }
+ }()
+ var buf []byte
+ buf, err = json.Marshal(notif)
+ if err != nil {
+ return err
+ }
+ _, err = conn.Write(buf)
+ conn.Close()
+ conn = nil
+ return nil
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/loader.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/loader.go b/htrace-htraced/go/src/htrace/htraced/loader.go
new file mode 100644
index 0000000..95c5c3e
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/loader.go
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "github.com/jmhodges/levigo"
+ "github.com/ugorji/go/codec"
+ "htrace/common"
+ "htrace/conf"
+ "io"
+ "math"
+ "math/rand"
+ "os"
+ "strings"
+ "syscall"
+ "time"
+)
+
+// Routines for loading the datastore.
+
+// The leveldb key which has information about the shard.
+const SHARD_INFO_KEY = 'w'
+
+// A constant signifying that we don't know what the layout version is.
+const UNKNOWN_LAYOUT_VERSION = 0
+
+// The current layout version. We cannot read layout versions newer than this.
+// We may sometimes be able to read older versions, but only by doing an
+// upgrade.
+const CURRENT_LAYOUT_VERSION = 3
+
+type DataStoreLoader struct {
+ // The dataStore logger.
+ lg *common.Logger
+
+ // True if we should clear the stored data.
+ ClearStored bool
+
+ // The shards that we're loading
+ shards []*ShardLoader
+
+ // The options to use for opening datastores in LevelDB.
+ openOpts *levigo.Options
+
+ // The read options to use for LevelDB.
+ readOpts *levigo.ReadOptions
+
+ // The write options to use for LevelDB.
+ writeOpts *levigo.WriteOptions
+}
+
+// Information about a Shard.
+type ShardInfo struct {
+ // The layout version of the datastore.
+ // We should always keep this field so that old software can recognize new
+ // layout versions, even if it can't read them.
+ LayoutVersion uint64
+
+ // A random number identifying this daemon.
+ DaemonId uint64
+
+ // The total number of shards in this datastore.
+ TotalShards uint32
+
+ // The index of this shard within the datastore.
+ ShardIndex uint32
+}
+
+// Create a new datastore loader.
+// Initializes the loader, but does not load any leveldb instances.
+func NewDataStoreLoader(cnf *conf.Config) *DataStoreLoader {
+ dld := &DataStoreLoader{
+ lg: common.NewLogger("datastore", cnf),
+ ClearStored: cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR),
+ }
+ dld.readOpts = levigo.NewReadOptions()
+ dld.readOpts.SetFillCache(true)
+ dld.readOpts.SetVerifyChecksums(false)
+ dld.writeOpts = levigo.NewWriteOptions()
+ dld.writeOpts.SetSync(false)
+ dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES)
+ rdirs := strings.Split(dirsStr, conf.PATH_LIST_SEP)
+ // Filter out empty entries
+ dirs := make([]string, 0, len(rdirs))
+ for i := range rdirs {
+ if strings.TrimSpace(rdirs[i]) != "" {
+ dirs = append(dirs, rdirs[i])
+ }
+ }
+ dld.shards = make([]*ShardLoader, len(dirs))
+ for i := range dirs {
+ dld.shards[i] = &ShardLoader{
+ dld: dld,
+ path: dirs[i] + conf.PATH_SEP + "db",
+ }
+ }
+ dld.openOpts = levigo.NewOptions()
+ cacheSize := cnf.GetInt(conf.HTRACE_LEVELDB_CACHE_SIZE)
+ dld.openOpts.SetCache(levigo.NewLRUCache(cacheSize))
+ dld.openOpts.SetParanoidChecks(false)
+ writeBufferSize := cnf.GetInt(conf.HTRACE_LEVELDB_WRITE_BUFFER_SIZE)
+ if writeBufferSize > 0 {
+ dld.openOpts.SetWriteBufferSize(writeBufferSize)
+ }
+ maxFdPerShard := dld.calculateMaxOpenFilesPerShard()
+ if maxFdPerShard > 0 {
+ dld.openOpts.SetMaxOpenFiles(maxFdPerShard)
+ }
+ return dld
+}
+
+func (dld *DataStoreLoader) Close() {
+ if dld.lg != nil {
+ dld.lg.Close()
+ dld.lg = nil
+ }
+ if dld.openOpts != nil {
+ dld.openOpts.Close()
+ dld.openOpts = nil
+ }
+ if dld.readOpts != nil {
+ dld.readOpts.Close()
+ dld.readOpts = nil
+ }
+ if dld.writeOpts != nil {
+ dld.writeOpts.Close()
+ dld.writeOpts = nil
+ }
+ if dld.shards != nil {
+ for i := range dld.shards {
+ if dld.shards[i] != nil {
+ dld.shards[i].Close()
+ }
+ }
+ dld.shards = nil
+ }
+}
+
+func (dld *DataStoreLoader) DisownResources() {
+ dld.lg = nil
+ dld.openOpts = nil
+ dld.readOpts = nil
+ dld.writeOpts = nil
+ dld.shards = nil
+}
+
+// The maximum number of file descriptors we'll use on non-datastore things.
+const NON_DATASTORE_FD_MAX = 300
+
+// The minimum number of file descriptors per shard we will set. Setting fewer
+// than this number could trigger a bug in some early versions of leveldb.
+const MIN_FDS_PER_SHARD = 80
+
+func (dld *DataStoreLoader) calculateMaxOpenFilesPerShard() int {
+ var rlim syscall.Rlimit
+ err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlim)
+ if err != nil {
+ dld.lg.Warnf("Unable to calculate maximum open files per shard: "+
+ "getrlimit failed: %s\n", err.Error())
+ return 0
+ }
+ // I think RLIMIT_NOFILE fits in 32 bits on all known operating systems,
+ // but there's no harm in being careful. 'int' in golang always holds at
+ // least 32 bits.
+ var maxFd int
+ if rlim.Cur > uint64(math.MaxInt32) {
+ maxFd = math.MaxInt32
+ } else {
+ maxFd = int(rlim.Cur)
+ }
+ if len(dld.shards) == 0 {
+ dld.lg.Warnf("Unable to calculate maximum open files per shard, " +
+ "since there are 0 shards configured.\n")
+ return 0
+ }
+ fdsPerShard := (maxFd - NON_DATASTORE_FD_MAX) / len(dld.shards)
+ if fdsPerShard < MIN_FDS_PER_SHARD {
+ dld.lg.Warnf("Expected to be able to use at least %d "+
+ "fds per shard, but we have %d shards and %d total fds to allocate, "+
+ "giving us only %d FDs per shard.", MIN_FDS_PER_SHARD,
+ len(dld.shards), maxFd-NON_DATASTORE_FD_MAX, fdsPerShard)
+ return 0
+ }
+ dld.lg.Infof("maxFd = %d. Setting maxFdPerShard = %d\n",
+ maxFd, fdsPerShard)
+ return fdsPerShard
+}
+
+// Load information about all shards.
+func (dld *DataStoreLoader) LoadShards() {
+ for i := range dld.shards {
+ shd := dld.shards[i]
+ shd.load()
+ }
+}
+
+// Verify that the shard infos are consistent.
+// Reorders the shardInfo structures based on their ShardIndex.
+func (dld *DataStoreLoader) VerifyShardInfos() error {
+ if len(dld.shards) < 1 {
+ return errors.New("No shard directories found.")
+ }
+ // Make sure no shards had errors.
+ for i := range dld.shards {
+ shd := dld.shards[i]
+ if shd.infoErr != nil {
+ return shd.infoErr
+ }
+ }
+ // Make sure that if any shards are empty, all shards are empty.
+ emptyShards := ""
+ prefix := ""
+ for i := range dld.shards {
+ if dld.shards[i].info == nil {
+ emptyShards = prefix + dld.shards[i].path
+ prefix = ", "
+ }
+ }
+ if emptyShards != "" {
+ for i := range dld.shards {
+ if dld.shards[i].info != nil {
+ return errors.New(fmt.Sprintf("Shards %s were empty, but "+
+ "the other shards had data.", emptyShards))
+ }
+ }
+ // All shards are empty.
+ return nil
+ }
+ // Make sure that all shards have the same layout version, daemonId, and number of total
+ // shards.
+ layoutVersion := dld.shards[0].info.LayoutVersion
+ daemonId := dld.shards[0].info.DaemonId
+ totalShards := dld.shards[0].info.TotalShards
+ for i := 1; i < len(dld.shards); i++ {
+ shd := dld.shards[i]
+ if layoutVersion != shd.info.LayoutVersion {
+ return errors.New(fmt.Sprintf("Layout version mismatch. Shard "+
+ "%s has layout version 0x%016x, but shard %s has layout "+
+ "version 0x%016x.",
+ dld.shards[0].path, layoutVersion, shd.path, shd.info.LayoutVersion))
+ }
+ if daemonId != shd.info.DaemonId {
+ return errors.New(fmt.Sprintf("DaemonId mismatch. Shard %s has "+
+ "daemonId 0x%016x, but shard %s has daemonId 0x%016x.",
+ dld.shards[0].path, daemonId, shd.path, shd.info.DaemonId))
+ }
+ if totalShards != shd.info.TotalShards {
+ return errors.New(fmt.Sprintf("TotalShards mismatch. Shard %s has "+
+ "TotalShards = %d, but shard %s has TotalShards = %d.",
+ dld.shards[0].path, totalShards, shd.path, shd.info.TotalShards))
+ }
+ if shd.info.ShardIndex >= totalShards {
+ return errors.New(fmt.Sprintf("Invalid ShardIndex. Shard %s has "+
+ "ShardIndex = %d, but TotalShards = %d.",
+ shd.path, shd.info.ShardIndex, shd.info.TotalShards))
+ }
+ }
+ if layoutVersion != CURRENT_LAYOUT_VERSION {
+ return errors.New(fmt.Sprintf("The layout version of all shards "+
+ "is %d, but we only support version %d.",
+ layoutVersion, CURRENT_LAYOUT_VERSION))
+ }
+ if totalShards != uint32(len(dld.shards)) {
+ return errors.New(fmt.Sprintf("The TotalShards field of all shards "+
+ "is %d, but we have %d shards.", totalShards, len(dld.shards)))
+ }
+ // Reorder shards in order of their ShardIndex.
+ reorderedShards := make([]*ShardLoader, len(dld.shards))
+ for i := 0; i < len(dld.shards); i++ {
+ shd := dld.shards[i]
+ shardIdx := shd.info.ShardIndex
+ if reorderedShards[shardIdx] != nil {
+ return errors.New(fmt.Sprintf("Both shard %s and "+
+ "shard %s have ShardIndex %d.", shd.path,
+ reorderedShards[shardIdx].path, shardIdx))
+ }
+ reorderedShards[shardIdx] = shd
+ }
+ dld.shards = reorderedShards
+ return nil
+}
+
+func (dld *DataStoreLoader) Load() error {
+ var err error
+ // If data.store.clear was set, clear existing data.
+ if dld.ClearStored {
+ err = dld.clearStored()
+ if err != nil {
+ return err
+ }
+ }
+ // Make sure the shard directories exist in all cases, with a mkdir -p
+ for i := range dld.shards {
+ err := os.MkdirAll(dld.shards[i].path, 0777)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Failed to MkdirAll(%s): %s",
+ dld.shards[i].path, err.Error()))
+ }
+ }
+ // Get information about each shard, and verify them.
+ dld.LoadShards()
+ err = dld.VerifyShardInfos()
+ if err != nil {
+ return err
+ }
+ if dld.shards[0].ldb != nil {
+ dld.lg.Infof("Loaded %d leveldb instances with "+
+ "DaemonId of 0x%016x\n", len(dld.shards),
+ dld.shards[0].info.DaemonId)
+ } else {
+ // Create leveldb instances if needed.
+ rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
+ daemonId := uint64(rnd.Int63())
+ dld.lg.Infof("Initializing %d leveldb instances with a new "+
+ "DaemonId of 0x%016x\n", len(dld.shards), daemonId)
+ dld.openOpts.SetCreateIfMissing(true)
+ for i := range dld.shards {
+ shd := dld.shards[i]
+ shd.ldb, err = levigo.Open(shd.path, shd.dld.openOpts)
+ if err != nil {
+ return errors.New(fmt.Sprintf("levigo.Open(%s) failed to "+
+ "create the shard: %s", shd.path, err.Error()))
+ }
+ info := &ShardInfo{
+ LayoutVersion: CURRENT_LAYOUT_VERSION,
+ DaemonId: daemonId,
+ TotalShards: uint32(len(dld.shards)),
+ ShardIndex: uint32(i),
+ }
+ err = shd.writeShardInfo(info)
+ if err != nil {
+ return errors.New(fmt.Sprintf("levigo.Open(%s) failed to "+
+ "write shard info: %s", shd.path, err.Error()))
+ }
+ dld.lg.Infof("Shard %s initialized with ShardInfo %s \n",
+ shd.path, asJson(info))
+ }
+ }
+ return nil
+}
+
+func (dld *DataStoreLoader) clearStored() error {
+ for i := range dld.shards {
+ path := dld.shards[i].path
+ fi, err := os.Stat(path)
+ if err != nil && !os.IsNotExist(err) {
+ dld.lg.Errorf("Failed to stat %s: %s\n", path, err.Error())
+ return err
+ }
+ if fi != nil {
+ err = os.RemoveAll(path)
+ if err != nil {
+ dld.lg.Errorf("Failed to clear existing datastore directory %s: %s\n",
+ path, err.Error())
+ return err
+ }
+ dld.lg.Infof("Cleared existing datastore directory %s\n", path)
+ }
+ }
+ return nil
+}
+
+type ShardLoader struct {
+ // The parent DataStoreLoader
+ dld *DataStoreLoader
+
+ // Path to the shard
+ path string
+
+ // Leveldb instance of the shard
+ ldb *levigo.DB
+
+ // Information about the shard
+ info *ShardInfo
+
+ // If non-null, the error we encountered trying to load the shard info.
+ infoErr error
+}
+
+func (shd *ShardLoader) Close() {
+ if shd.ldb != nil {
+ shd.ldb.Close()
+ shd.ldb = nil
+ }
+}
+
+// Load information about a particular shard.
+func (shd *ShardLoader) load() {
+ shd.info = nil
+ fi, err := os.Stat(shd.path)
+ if err != nil {
+ if os.IsNotExist(err) {
+ shd.infoErr = nil
+ return
+ }
+ shd.infoErr = errors.New(fmt.Sprintf(
+ "stat() error on leveldb directory "+
+ "%s: %s", shd.path, err.Error()))
+ return
+ }
+ if !fi.Mode().IsDir() {
+ shd.infoErr = errors.New(fmt.Sprintf(
+ "stat() error on leveldb directory "+
+ "%s: inode is not directory.", shd.path))
+ return
+ }
+ var dbDir *os.File
+ dbDir, err = os.Open(shd.path)
+ if err != nil {
+ shd.infoErr = errors.New(fmt.Sprintf(
+ "open() error on leveldb directory "+
+ "%s: %s.", shd.path, err.Error()))
+ return
+ }
+ defer func() {
+ if dbDir != nil {
+ dbDir.Close()
+ }
+ }()
+ _, err = dbDir.Readdirnames(1)
+ if err != nil {
+ if err == io.EOF {
+ // The db directory is empty.
+ shd.infoErr = nil
+ return
+ }
+ shd.infoErr = errors.New(fmt.Sprintf(
+ "Readdirnames() error on leveldb directory "+
+ "%s: %s.", shd.path, err.Error()))
+ return
+ }
+ dbDir.Close()
+ dbDir = nil
+ shd.ldb, err = levigo.Open(shd.path, shd.dld.openOpts)
+ if err != nil {
+ shd.ldb = nil
+ shd.infoErr = errors.New(fmt.Sprintf(
+ "levigo.Open() error on leveldb directory "+
+ "%s: %s.", shd.path, err.Error()))
+ return
+ }
+ shd.info, err = shd.readShardInfo()
+ if err != nil {
+ shd.infoErr = err
+ return
+ }
+ shd.infoErr = nil
+}
+
+func (shd *ShardLoader) readShardInfo() (*ShardInfo, error) {
+ buf, err := shd.ldb.Get(shd.dld.readOpts, []byte{SHARD_INFO_KEY})
+ if err != nil {
+ return nil, errors.New(fmt.Sprintf("readShardInfo(%s): failed to "+
+ "read shard info key: %s", shd.path, err.Error()))
+ }
+ if len(buf) == 0 {
+ return nil, errors.New(fmt.Sprintf("readShardInfo(%s): got zero-"+
+ "length value for shard info key.", shd.path))
+ }
+ mh := new(codec.MsgpackHandle)
+ mh.WriteExt = true
+ r := bytes.NewBuffer(buf)
+ decoder := codec.NewDecoder(r, mh)
+ shardInfo := &ShardInfo{
+ LayoutVersion: UNKNOWN_LAYOUT_VERSION,
+ }
+ err = decoder.Decode(shardInfo)
+ if err != nil {
+ return nil, errors.New(fmt.Sprintf("readShardInfo(%s): msgpack "+
+ "decoding failed for shard info key: %s", shd.path, err.Error()))
+ }
+ return shardInfo, nil
+}
+
+func (shd *ShardLoader) writeShardInfo(info *ShardInfo) error {
+ mh := new(codec.MsgpackHandle)
+ mh.WriteExt = true
+ w := new(bytes.Buffer)
+ enc := codec.NewEncoder(w, mh)
+ err := enc.Encode(info)
+ if err != nil {
+ return errors.New(fmt.Sprintf("msgpack encoding error: %s",
+ err.Error()))
+ }
+ err = shd.ldb.Put(shd.dld.writeOpts, []byte{SHARD_INFO_KEY}, w.Bytes())
+ if err != nil {
+ return errors.New(fmt.Sprintf("leveldb write error: %s",
+ err.Error()))
+ }
+ return nil
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/metrics.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/metrics.go b/htrace-htraced/go/src/htrace/htraced/metrics.go
new file mode 100644
index 0000000..d2feca8
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/metrics.go
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "htrace/common"
+ "htrace/conf"
+ "math"
+ "sync"
+ "time"
+)
+
+//
+// The Metrics Sink for HTraced.
+//
+// The Metrics sink keeps track of metrics for the htraced daemon.
+// It is important to have good metrics so that we can properly manager htraced. In particular, we
+// need to know what rate we are receiving spans at, the main places spans came from. If spans
+// were dropped because of a high sampling rates, we need to know which part of the system dropped
+// them so that we can adjust the sampling rate there.
+//
+
+const LATENCY_CIRC_BUF_SIZE = 4096
+
+type MetricsSink struct {
+ // The metrics sink logger.
+ lg *common.Logger
+
+ // The maximum number of entries we shuld allow in the HostSpanMetrics map.
+ maxMtx int
+
+ // The total number of spans ingested by the server (counting dropped spans)
+ IngestedSpans uint64
+
+ // The total number of spans written to leveldb since the server started.
+ WrittenSpans uint64
+
+ // The total number of spans dropped by the server.
+ ServerDropped uint64
+
+ // Per-host Span Metrics
+ HostSpanMetrics common.SpanMetricsMap
+
+ // The last few writeSpan latencies
+ wsLatencyCircBuf *CircBufU32
+
+ // Lock protecting all metrics
+ lock sync.Mutex
+}
+
+func NewMetricsSink(cnf *conf.Config) *MetricsSink {
+ return &MetricsSink{
+ lg: common.NewLogger("metrics", cnf),
+ maxMtx: cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
+ HostSpanMetrics: make(common.SpanMetricsMap),
+ wsLatencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE),
+ }
+}
+
+// Update the total number of spans which were ingested, as well as other
+// metrics that get updated during span ingest.
+func (msink *MetricsSink) UpdateIngested(addr string, totalIngested int,
+ serverDropped int, wsLatency time.Duration) {
+ msink.lock.Lock()
+ defer msink.lock.Unlock()
+ msink.IngestedSpans += uint64(totalIngested)
+ msink.ServerDropped += uint64(serverDropped)
+ msink.updateSpanMetrics(addr, 0, serverDropped)
+ wsLatencyMs := wsLatency.Nanoseconds() / 1000000
+ var wsLatency32 uint32
+ if wsLatencyMs > math.MaxUint32 {
+ wsLatency32 = math.MaxUint32
+ } else {
+ wsLatency32 = uint32(wsLatencyMs)
+ }
+ msink.wsLatencyCircBuf.Append(wsLatency32)
+}
+
+// Update the per-host span metrics. Must be called with the lock held.
+func (msink *MetricsSink) updateSpanMetrics(addr string, numWritten int,
+ serverDropped int) {
+ mtx, found := msink.HostSpanMetrics[addr]
+ if !found {
+ // Ensure that the per-host span metrics map doesn't grow too large.
+ if len(msink.HostSpanMetrics) >= msink.maxMtx {
+ // Delete a random entry
+ for k := range msink.HostSpanMetrics {
+ msink.lg.Warnf("Evicting metrics entry for addr %s "+
+ "because there are more than %d addrs.\n", k, msink.maxMtx)
+ delete(msink.HostSpanMetrics, k)
+ break
+ }
+ }
+ mtx = &common.SpanMetrics{}
+ msink.HostSpanMetrics[addr] = mtx
+ }
+ mtx.Written += uint64(numWritten)
+ mtx.ServerDropped += uint64(serverDropped)
+}
+
+// Update the total number of spans which were persisted to disk.
+func (msink *MetricsSink) UpdatePersisted(addr string, totalWritten int,
+ serverDropped int) {
+ msink.lock.Lock()
+ defer msink.lock.Unlock()
+ msink.WrittenSpans += uint64(totalWritten)
+ msink.ServerDropped += uint64(serverDropped)
+ msink.updateSpanMetrics(addr, totalWritten, serverDropped)
+}
+
+// Read the server stats.
+func (msink *MetricsSink) PopulateServerStats(stats *common.ServerStats) {
+ msink.lock.Lock()
+ defer msink.lock.Unlock()
+ stats.IngestedSpans = msink.IngestedSpans
+ stats.WrittenSpans = msink.WrittenSpans
+ stats.ServerDroppedSpans = msink.ServerDropped
+ stats.MaxWriteSpansLatencyMs = msink.wsLatencyCircBuf.Max()
+ stats.AverageWriteSpansLatencyMs = msink.wsLatencyCircBuf.Average()
+ stats.HostSpanMetrics = make(common.SpanMetricsMap)
+ for k, v := range msink.HostSpanMetrics {
+ stats.HostSpanMetrics[k] = &common.SpanMetrics{
+ Written: v.Written,
+ ServerDropped: v.ServerDropped,
+ }
+ }
+}
+
+// A circular buffer of uint32s which supports appending and taking the
+// average, and some other things.
+type CircBufU32 struct {
+ // The next slot to fill
+ slot int
+
+ // The number of slots which are in use. This number only ever
+ // increases until the buffer is full.
+ slotsUsed int
+
+ // The buffer
+ buf []uint32
+}
+
+func NewCircBufU32(size int) *CircBufU32 {
+ return &CircBufU32{
+ slotsUsed: -1,
+ buf: make([]uint32, size),
+ }
+}
+
+func (cbuf *CircBufU32) Max() uint32 {
+ var max uint32
+ for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ {
+ if cbuf.buf[bufIdx] > max {
+ max = cbuf.buf[bufIdx]
+ }
+ }
+ return max
+}
+
+func (cbuf *CircBufU32) Average() uint32 {
+ var total uint64
+ for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ {
+ total += uint64(cbuf.buf[bufIdx])
+ }
+ return uint32(total / uint64(cbuf.slotsUsed))
+}
+
+func (cbuf *CircBufU32) Append(val uint32) {
+ cbuf.buf[cbuf.slot] = val
+ cbuf.slot++
+ if cbuf.slotsUsed < cbuf.slot {
+ cbuf.slotsUsed = cbuf.slot
+ }
+ if cbuf.slot >= len(cbuf.buf) {
+ cbuf.slot = 0
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/metrics_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/htrace/htraced/metrics_test.go
new file mode 100644
index 0000000..4f27ffd
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/metrics_test.go
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "fmt"
+ htrace "htrace/client"
+ "htrace/common"
+ "htrace/conf"
+ "reflect"
+ "testing"
+ "time"
+)
+
+func compareTotals(a, b common.SpanMetricsMap) bool {
+ for k, v := range a {
+ if !reflect.DeepEqual(v, b[k]) {
+ return false
+ }
+ }
+ for k, v := range b {
+ if !reflect.DeepEqual(v, a[k]) {
+ return false
+ }
+ }
+ return true
+}
+
+type Fatalfer interface {
+ Fatalf(format string, args ...interface{})
+}
+
+func assertNumWrittenEquals(t Fatalfer, msink *MetricsSink,
+ expectedNumWritten int) {
+ var sstats common.ServerStats
+ msink.PopulateServerStats(&sstats)
+ if sstats.WrittenSpans != uint64(expectedNumWritten) {
+ t.Fatalf("sstats.WrittenSpans = %d, but expected %d\n",
+ sstats.WrittenSpans, len(SIMPLE_TEST_SPANS))
+ }
+ if sstats.HostSpanMetrics["127.0.0.1"] == nil {
+ t.Fatalf("no entry for sstats.HostSpanMetrics[127.0.0.1] found.")
+ }
+ if sstats.HostSpanMetrics["127.0.0.1"].Written !=
+ uint64(expectedNumWritten) {
+ t.Fatalf("sstats.HostSpanMetrics[127.0.0.1].Written = %d, but "+
+ "expected %d\n", sstats.HostSpanMetrics["127.0.0.1"].Written,
+ len(SIMPLE_TEST_SPANS))
+ }
+}
+
+func TestMetricsSinkPerHostEviction(t *testing.T) {
+ cnfBld := conf.Builder{
+ Values: conf.TEST_VALUES(),
+ Defaults: conf.DEFAULTS,
+ }
+ cnfBld.Values[conf.HTRACE_METRICS_MAX_ADDR_ENTRIES] = "2"
+ cnf, err := cnfBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create conf: %s", err.Error())
+ }
+ msink := NewMetricsSink(cnf)
+ msink.UpdatePersisted("192.168.0.100", 20, 10)
+ msink.UpdatePersisted("192.168.0.101", 20, 10)
+ msink.UpdatePersisted("192.168.0.102", 20, 10)
+ msink.lock.Lock()
+ defer msink.lock.Unlock()
+ if len(msink.HostSpanMetrics) != 2 {
+ for k, v := range msink.HostSpanMetrics {
+ fmt.Printf("WATERMELON: [%s] = [%s]\n", k, v)
+ }
+ t.Fatalf("Expected len(msink.HostSpanMetrics) to be 2, but got %d\n",
+ len(msink.HostSpanMetrics))
+ }
+}
+
+func TestIngestedSpansMetricsRest(t *testing.T) {
+ testIngestedSpansMetricsImpl(t, false)
+}
+
+func TestIngestedSpansMetricsPacked(t *testing.T) {
+ testIngestedSpansMetricsImpl(t, true)
+}
+
+func testIngestedSpansMetricsImpl(t *testing.T, usePacked bool) {
+ htraceBld := &MiniHTracedBuilder{Name: "TestIngestedSpansMetrics",
+ DataDirs: make([]string, 2),
+ }
+ ht, err := htraceBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create datastore: %s", err.Error())
+ }
+ defer ht.Close()
+ var hcl *htrace.Client
+ hcl, err = htrace.NewClient(ht.ClientConf(), &htrace.TestHooks{
+ HrpcDisabled: !usePacked,
+ })
+ if err != nil {
+ t.Fatalf("failed to create client: %s", err.Error())
+ }
+
+ NUM_TEST_SPANS := 12
+ allSpans := createRandomTestSpans(NUM_TEST_SPANS)
+ err = hcl.WriteSpans(allSpans)
+ if err != nil {
+ t.Fatalf("WriteSpans failed: %s\n", err.Error())
+ }
+ for {
+ var stats *common.ServerStats
+ stats, err = hcl.GetServerStats()
+ if err != nil {
+ t.Fatalf("GetServerStats failed: %s\n", err.Error())
+ }
+ if stats.IngestedSpans == uint64(NUM_TEST_SPANS) {
+ break
+ }
+ time.Sleep(1 * time.Millisecond)
+ }
+}
+
+func TestCircBuf32(t *testing.T) {
+ cbuf := NewCircBufU32(3)
+ // We arbitrarily define that empty circular buffers have an average of 0.
+ if cbuf.Average() != 0 {
+ t.Fatalf("expected empty CircBufU32 to have an average of 0.\n")
+ }
+ if cbuf.Max() != 0 {
+ t.Fatalf("expected empty CircBufU32 to have a max of 0.\n")
+ }
+ cbuf.Append(2)
+ if cbuf.Average() != 2 {
+ t.Fatalf("expected one-element CircBufU32 to have an average of 2.\n")
+ }
+ cbuf.Append(10)
+ if cbuf.Average() != 6 {
+ t.Fatalf("expected two-element CircBufU32 to have an average of 6.\n")
+ }
+ cbuf.Append(12)
+ if cbuf.Average() != 8 {
+ t.Fatalf("expected three-element CircBufU32 to have an average of 8.\n")
+ }
+ cbuf.Append(14)
+ // The 14 overwrites the original 2 element.
+ if cbuf.Average() != 12 {
+ t.Fatalf("expected three-element CircBufU32 to have an average of 12.\n")
+ }
+ cbuf.Append(1)
+ // The 1 overwrites the original 10 element.
+ if cbuf.Average() != 9 {
+ t.Fatalf("expected three-element CircBufU32 to have an average of 12.\n")
+ }
+ if cbuf.Max() != 14 {
+ t.Fatalf("expected three-element CircBufU32 to have a max of 14.\n")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/mini_htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/htrace/htraced/mini_htraced.go
new file mode 100644
index 0000000..af8d379
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/mini_htraced.go
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "fmt"
+ "htrace/common"
+ "htrace/conf"
+ "io/ioutil"
+ "net"
+ "os"
+ "strings"
+)
+
+//
+// MiniHTraceD is used in unit tests to set up a daemon with certain settings.
+// It takes care of things like creating and cleaning up temporary directories.
+//
+
+// The default number of managed data directories to use.
+const DEFAULT_NUM_DATA_DIRS = 2
+
+// Builds a MiniHTraced object.
+type MiniHTracedBuilder struct {
+ // The name of the MiniHTraced to build. This shows up in the test directory name and some
+ // other places.
+ Name string
+
+ // The configuration values to use for the MiniHTraced.
+ // If ths is nil, we use the default configuration for everything.
+ Cnf map[string]string
+
+ // The DataDirs to use. Empty entries will turn into random names.
+ DataDirs []string
+
+ // If true, we will keep the data dirs around after MiniHTraced#Close
+ KeepDataDirsOnClose bool
+
+ // If non-null, the WrittenSpans semaphore to use when creating the DataStore.
+ WrittenSpans *common.Semaphore
+
+ // The test hooks to use for the HRPC server
+ HrpcTestHooks *hrpcTestHooks
+}
+
+type MiniHTraced struct {
+ Name string
+ Cnf *conf.Config
+ DataDirs []string
+ Store *dataStore
+ Rsv *RestServer
+ Hsv *HrpcServer
+ Lg *common.Logger
+ KeepDataDirsOnClose bool
+}
+
+func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) {
+ var err error
+ var store *dataStore
+ var rsv *RestServer
+ var hsv *HrpcServer
+ if bld.Name == "" {
+ bld.Name = "HTraceTest"
+ }
+ if bld.Cnf == nil {
+ bld.Cnf = make(map[string]string)
+ }
+ if bld.DataDirs == nil {
+ bld.DataDirs = make([]string, 2)
+ }
+ for idx := range bld.DataDirs {
+ if bld.DataDirs[idx] == "" {
+ bld.DataDirs[idx], err = ioutil.TempDir(os.TempDir(),
+ fmt.Sprintf("%s%d", bld.Name, idx+1))
+ if err != nil {
+ return nil, err
+ }
+ }
+ }
+ // Copy the default test configuration values.
+ for k, v := range conf.TEST_VALUES() {
+ _, hasVal := bld.Cnf[k]
+ if !hasVal {
+ bld.Cnf[k] = v
+ }
+ }
+ bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] =
+ strings.Join(bld.DataDirs, conf.PATH_LIST_SEP)
+ cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS}
+ cnf, err := cnfBld.Build()
+ if err != nil {
+ return nil, err
+ }
+ lg := common.NewLogger("mini.htraced", cnf)
+ defer func() {
+ if err != nil {
+ if store != nil {
+ store.Close()
+ }
+ for idx := range bld.DataDirs {
+ if !bld.KeepDataDirsOnClose {
+ if bld.DataDirs[idx] != "" {
+ os.RemoveAll(bld.DataDirs[idx])
+ }
+ }
+ }
+ if rsv != nil {
+ rsv.Close()
+ }
+ lg.Infof("Failed to create MiniHTraced %s: %s\n", bld.Name, err.Error())
+ lg.Close()
+ }
+ }()
+ store, err = CreateDataStore(cnf, bld.WrittenSpans)
+ if err != nil {
+ return nil, err
+ }
+ rstListener, listenErr := net.Listen("tcp", cnf.Get(conf.HTRACE_WEB_ADDRESS))
+ if listenErr != nil {
+ return nil, listenErr
+ }
+ defer func() {
+ if rstListener != nil {
+ rstListener.Close()
+ }
+ }()
+ rsv, err = CreateRestServer(cnf, store, rstListener)
+ if err != nil {
+ return nil, err
+ }
+ rstListener = nil
+ hsv, err = CreateHrpcServer(cnf, store, bld.HrpcTestHooks)
+ if err != nil {
+ return nil, err
+ }
+
+ lg.Infof("Created MiniHTraced %s\n", bld.Name)
+ return &MiniHTraced{
+ Name: bld.Name,
+ Cnf: cnf,
+ DataDirs: bld.DataDirs,
+ Store: store,
+ Rsv: rsv,
+ Hsv: hsv,
+ Lg: lg,
+ KeepDataDirsOnClose: bld.KeepDataDirsOnClose,
+ }, nil
+}
+
+// Return a Config object that clients can use to connect to this MiniHTraceD.
+func (ht *MiniHTraced) ClientConf() *conf.Config {
+ return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(),
+ conf.HTRACE_HRPC_ADDRESS, ht.Hsv.Addr().String())
+}
+
+// Return a Config object that clients can use to connect to this MiniHTraceD
+// by HTTP only (no HRPC).
+func (ht *MiniHTraced) RestOnlyClientConf() *conf.Config {
+ return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(),
+ conf.HTRACE_HRPC_ADDRESS, "")
+}
+
+func (ht *MiniHTraced) Close() {
+ ht.Lg.Infof("Closing MiniHTraced %s\n", ht.Name)
+ ht.Rsv.Close()
+ ht.Hsv.Close()
+ ht.Store.Close()
+ if !ht.KeepDataDirsOnClose {
+ for idx := range ht.DataDirs {
+ ht.Lg.Infof("Removing %s...\n", ht.DataDirs[idx])
+ os.RemoveAll(ht.DataDirs[idx])
+ }
+ }
+ ht.Lg.Infof("Finished closing MiniHTraced %s\n", ht.Name)
+ ht.Lg.Close()
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/reaper_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/reaper_test.go b/htrace-htraced/go/src/htrace/htraced/reaper_test.go
new file mode 100644
index 0000000..af11e38
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/reaper_test.go
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "fmt"
+ "htrace/common"
+ "htrace/conf"
+ "htrace/test"
+ "math/rand"
+ "testing"
+ "time"
+)
+
+func TestReapingOldSpans(t *testing.T) {
+ const NUM_TEST_SPANS = 20
+ testSpans := make([]*common.Span, NUM_TEST_SPANS)
+ rnd := rand.New(rand.NewSource(2))
+ now := common.TimeToUnixMs(time.Now().UTC())
+ for i := range testSpans {
+ testSpans[i] = test.NewRandomSpan(rnd, testSpans[0:i])
+ testSpans[i].Begin = now - int64(NUM_TEST_SPANS-1-i)
+ testSpans[i].Description = fmt.Sprintf("Span%02d", i)
+ }
+ htraceBld := &MiniHTracedBuilder{Name: "TestReapingOldSpans",
+ Cnf: map[string]string{
+ conf.HTRACE_SPAN_EXPIRY_MS: fmt.Sprintf("%d", 60*60*1000),
+ conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS: "1",
+ conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "1",
+ },
+ WrittenSpans: common.NewSemaphore(0),
+ DataDirs: make([]string, 2),
+ }
+ ht, err := htraceBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create mini htraced cluster: %s\n", err.Error())
+ }
+ ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "")
+ for spanIdx := range testSpans {
+ ing.IngestSpan(testSpans[spanIdx])
+ }
+ ing.Close(time.Now())
+ // Wait the spans to be created
+ ht.Store.WrittenSpans.Waits(NUM_TEST_SPANS)
+ // Set a reaper date that will remove all the spans except final one.
+ ht.Store.rpr.SetReaperDate(now)
+
+ common.WaitFor(5*time.Minute, time.Millisecond, func() bool {
+ for i := 0; i < NUM_TEST_SPANS-1; i++ {
+ span := ht.Store.FindSpan(testSpans[i].Id)
+ if span != nil {
+ ht.Store.lg.Debugf("Waiting for %s to be removed...\n",
+ testSpans[i].Description)
+ return false
+ }
+ }
+ span := ht.Store.FindSpan(testSpans[NUM_TEST_SPANS-1].Id)
+ if span == nil {
+ ht.Store.lg.Debugf("Did not expect %s to be removed\n",
+ testSpans[NUM_TEST_SPANS-1].Description)
+ return false
+ }
+ return true
+ })
+ defer ht.Close()
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/rest.go b/htrace-htraced/go/src/htrace/htraced/rest.go
new file mode 100644
index 0000000..1ba4791
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/rest.go
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "github.com/gorilla/mux"
+ "htrace/common"
+ "htrace/conf"
+ "net"
+ "net/http"
+ "os"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "time"
+)
+
+// Set the response headers.
+func setResponseHeaders(hdr http.Header) {
+ hdr.Set("Content-Type", "application/json")
+}
+
+// Write a JSON error response.
+func writeError(lg *common.Logger, w http.ResponseWriter, errCode int,
+ errStr string) {
+ str := strings.Replace(errStr, `"`, `'`, -1)
+ lg.Info(str + "\n")
+ w.WriteHeader(errCode)
+ w.Write([]byte(`{ "error" : "` + str + `"}`))
+}
+
+type serverVersionHandler struct {
+ lg *common.Logger
+}
+
+func (hand *serverVersionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ setResponseHeaders(w.Header())
+ version := common.ServerVersion{ReleaseVersion: RELEASE_VERSION,
+ GitVersion: GIT_VERSION}
+ buf, err := json.Marshal(&version)
+ if err != nil {
+ writeError(hand.lg, w, http.StatusInternalServerError,
+ fmt.Sprintf("error marshalling ServerVersion: %s\n", err.Error()))
+ return
+ }
+ if hand.lg.DebugEnabled() {
+ hand.lg.Debugf("Returned ServerVersion %s\n", string(buf))
+ }
+ w.Write(buf)
+}
+
+type serverDebugInfoHandler struct {
+ lg *common.Logger
+}
+
+func (hand *serverDebugInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ setResponseHeaders(w.Header())
+ buf := make([]byte, 1<<20)
+ common.GetStackTraces(&buf)
+ resp := common.ServerDebugInfo{
+ StackTraces: string(buf),
+ GCStats: common.GetGCStats(),
+ }
+ buf, err := json.Marshal(&resp)
+ if err != nil {
+ writeError(hand.lg, w, http.StatusInternalServerError,
+ fmt.Sprintf("error marshalling ServerDebugInfo: %s\n", err.Error()))
+ return
+ }
+ w.Write(buf)
+ hand.lg.Info("Returned ServerDebugInfo\n")
+}
+
+type serverStatsHandler struct {
+ dataStoreHandler
+}
+
+func (hand *serverStatsHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ setResponseHeaders(w.Header())
+ hand.lg.Debugf("serverStatsHandler\n")
+ stats := hand.store.ServerStats()
+ buf, err := json.Marshal(&stats)
+ if err != nil {
+ writeError(hand.lg, w, http.StatusInternalServerError,
+ fmt.Sprintf("error marshalling ServerStats: %s\n", err.Error()))
+ return
+ }
+ hand.lg.Debugf("Returned ServerStats %s\n", string(buf))
+ w.Write(buf)
+}
+
+type serverConfHandler struct {
+ cnf *conf.Config
+ lg *common.Logger
+}
+
+func (hand *serverConfHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ setResponseHeaders(w.Header())
+ hand.lg.Debugf("serverConfHandler\n")
+ cnfMap := hand.cnf.Export()
+ buf, err := json.Marshal(&cnfMap)
+ if err != nil {
+ writeError(hand.lg, w, http.StatusInternalServerError,
+ fmt.Sprintf("error marshalling serverConf: %s\n", err.Error()))
+ return
+ }
+ hand.lg.Debugf("Returned server configuration %s\n", string(buf))
+ w.Write(buf)
+}
+
+type dataStoreHandler struct {
+ lg *common.Logger
+ store *dataStore
+}
+
+func (hand *dataStoreHandler) parseSid(w http.ResponseWriter,
+ str string) (common.SpanId, bool) {
+ var id common.SpanId
+ err := id.FromString(str)
+ if err != nil {
+ writeError(hand.lg, w, http.StatusBadRequest,
+ fmt.Sprintf("Failed to parse span ID %s: %s", str, err.Error()))
+ w.Write([]byte("Error parsing : " + err.Error()))
+ return common.INVALID_SPAN_ID, false
+ }
+ return id, true
+}
+
+func (hand *dataStoreHandler) getReqField32(fieldName string, w http.ResponseWriter,
+ req *http.Request) (int32, bool) {
+ str := req.FormValue(fieldName)
+ if str == "" {
+ writeError(hand.lg, w, http.StatusBadRequest, fmt.Sprintf("No %s specified.", fieldName))
+ return -1, false
+ }
+ val, err := strconv.ParseUint(str, 16, 32)
+ if err != nil {
+ writeError(hand.lg, w, http.StatusBadRequest,
+ fmt.Sprintf("Error parsing %s: %s.", fieldName, err.Error()))
+ return -1, false
+ }
+ return int32(val), true
+}
+
+type findSidHandler struct {
+ dataStoreHandler
+}
+
+func (hand *findSidHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ setResponseHeaders(w.Header())
+ req.ParseForm()
+ vars := mux.Vars(req)
+ stringSid := vars["id"]
+ sid, ok := hand.parseSid(w, stringSid)
+ if !ok {
+ return
+ }
+ hand.lg.Debugf("findSidHandler(sid=%s)\n", sid.String())
+ span := hand.store.FindSpan(sid)
+ if span == nil {
+ writeError(hand.lg, w, http.StatusNoContent,
+ fmt.Sprintf("No such span as %s\n", sid.String()))
+ return
+ }
+ w.Write(span.ToJson())
+}
+
+type findChildrenHandler struct {
+ dataStoreHandler
+}
+
+func (hand *findChildrenHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ setResponseHeaders(w.Header())
+ req.ParseForm()
+ vars := mux.Vars(req)
+ stringSid := vars["id"]
+ sid, ok := hand.parseSid(w, stringSid)
+ if !ok {
+ return
+ }
+ var lim int32
+ lim, ok = hand.getReqField32("lim", w, req)
+ if !ok {
+ return
+ }
+ hand.lg.Debugf("findChildrenHandler(sid=%s, lim=%d)\n", sid.String(), lim)
+ children := hand.store.FindChildren(sid, lim)
+ jbytes, err := json.Marshal(children)
+ if err != nil {
+ writeError(hand.lg, w, http.StatusInternalServerError,
+ fmt.Sprintf("Error marshalling children: %s", err.Error()))
+ return
+ }
+ w.Write(jbytes)
+}
+
+type writeSpansHandler struct {
+ dataStoreHandler
+}
+
+func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ startTime := time.Now()
+ setResponseHeaders(w.Header())
+ client, _, serr := net.SplitHostPort(req.RemoteAddr)
+ if serr != nil {
+ writeError(hand.lg, w, http.StatusBadRequest,
+ fmt.Sprintf("Failed to split host and port for %s: %s\n",
+ req.RemoteAddr, serr.Error()))
+ return
+ }
+ dec := json.NewDecoder(req.Body)
+ var msg common.WriteSpansReq
+ err := dec.Decode(&msg)
+ if err != nil {
+ writeError(hand.lg, w, http.StatusBadRequest,
+ fmt.Sprintf("Error parsing WriteSpansReq: %s", err.Error()))
+ return
+ }
+ if hand.lg.TraceEnabled() {
+ hand.lg.Tracef("%s: read WriteSpans REST message: %s\n",
+ req.RemoteAddr, asJson(&msg))
+ }
+ ing := hand.store.NewSpanIngestor(hand.lg, client, msg.DefaultTrid)
+ for spanIdx := 0; spanIdx < msg.NumSpans; spanIdx++ {
+ var span *common.Span
+ err := dec.Decode(&span)
+ if err != nil {
+ writeError(hand.lg, w, http.StatusBadRequest,
+ fmt.Sprintf("Failed to decode span %d out of %d: ",
+ spanIdx, msg.NumSpans, err.Error()))
+ return
+ }
+ ing.IngestSpan(span)
+ }
+ ing.Close(startTime)
+ return
+}
+
+type queryHandler struct {
+ lg *common.Logger
+ dataStoreHandler
+}
+
+func (hand *queryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ setResponseHeaders(w.Header())
+ queryString := req.FormValue("query")
+ if queryString == "" {
+ writeError(hand.lg, w, http.StatusBadRequest, "No query provided.\n")
+ return
+ }
+ var query common.Query
+ reader := bytes.NewBufferString(queryString)
+ dec := json.NewDecoder(reader)
+ err := dec.Decode(&query)
+ if err != nil {
+ writeError(hand.lg, w, http.StatusBadRequest,
+ fmt.Sprintf("Error parsing query '%s': %s", queryString, err.Error()))
+ return
+ }
+ var results []*common.Span
+ results, err, _ = hand.store.HandleQuery(&query)
+ if err != nil {
+ writeError(hand.lg, w, http.StatusInternalServerError,
+ fmt.Sprintf("Internal error processing query %s: %s",
+ query.String(), err.Error()))
+ return
+ }
+ var jbytes []byte
+ jbytes, err = json.Marshal(results)
+ if err != nil {
+ writeError(hand.lg, w, http.StatusInternalServerError,
+ fmt.Sprintf("Error marshalling results: %s", err.Error()))
+ return
+ }
+ w.Write(jbytes)
+}
+
+type logErrorHandler struct {
+ lg *common.Logger
+}
+
+func (hand *logErrorHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ hand.lg.Errorf("Got unknown request %s\n", req.RequestURI)
+ writeError(hand.lg, w, http.StatusBadRequest, "Unknown request.")
+}
+
+type RestServer struct {
+ http.Server
+ listener net.Listener
+ lg *common.Logger
+}
+
+func CreateRestServer(cnf *conf.Config, store *dataStore,
+ listener net.Listener) (*RestServer, error) {
+ var err error
+ rsv := &RestServer{}
+ rsv.lg = common.NewLogger("rest", cnf)
+
+ r := mux.NewRouter().StrictSlash(false)
+
+ r.Handle("/server/info", &serverVersionHandler{lg: rsv.lg}).Methods("GET")
+ r.Handle("/server/version", &serverVersionHandler{lg: rsv.lg}).Methods("GET")
+ r.Handle("/server/debugInfo", &serverDebugInfoHandler{lg: rsv.lg}).Methods("GET")
+
+ serverStatsH := &serverStatsHandler{dataStoreHandler: dataStoreHandler{
+ store: store, lg: rsv.lg}}
+ r.Handle("/server/stats", serverStatsH).Methods("GET")
+
+ serverConfH := &serverConfHandler{cnf: cnf, lg: rsv.lg}
+ r.Handle("/server/conf", serverConfH).Methods("GET")
+
+ writeSpansH := &writeSpansHandler{dataStoreHandler: dataStoreHandler{
+ store: store, lg: rsv.lg}}
+ r.Handle("/writeSpans", writeSpansH).Methods("POST")
+
+ queryH := &queryHandler{lg: rsv.lg, dataStoreHandler: dataStoreHandler{store: store}}
+ r.Handle("/query", queryH).Methods("GET")
+
+ span := r.PathPrefix("/span").Subrouter()
+ findSidH := &findSidHandler{dataStoreHandler: dataStoreHandler{store: store, lg: rsv.lg}}
+ span.Handle("/{id}", findSidH).Methods("GET")
+
+ findChildrenH := &findChildrenHandler{dataStoreHandler: dataStoreHandler{store: store,
+ lg: rsv.lg}}
+ span.Handle("/{id}/children", findChildrenH).Methods("GET")
+
+ // Default Handler. This will serve requests for static requests.
+ webdir := os.Getenv("HTRACED_WEB_DIR")
+ if webdir == "" {
+ webdir, err = filepath.Abs(filepath.Join(filepath.Dir(os.Args[0]), "..", "web"))
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ rsv.lg.Infof(`Serving static files from "%s"`+"\n", webdir)
+ r.PathPrefix("/").Handler(http.FileServer(http.Dir(webdir))).Methods("GET")
+
+ // Log an error message for unknown non-GET requests.
+ r.PathPrefix("/").Handler(&logErrorHandler{lg: rsv.lg})
+
+ rsv.listener = listener
+ rsv.Handler = r
+ rsv.ErrorLog = rsv.lg.Wrap("[REST] ", common.INFO)
+ go rsv.Serve(rsv.listener)
+ rsv.lg.Infof("Started REST server on %s\n", rsv.listener.Addr().String())
+ return rsv, nil
+}
+
+func (rsv *RestServer) Addr() net.Addr {
+ return rsv.listener.Addr()
+}
+
+func (rsv *RestServer) Close() {
+ rsv.listener.Close()
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htracedTool/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htracedTool/cmd.go b/htrace-htraced/go/src/htrace/htracedTool/cmd.go
new file mode 100644
index 0000000..65b67e5
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htracedTool/cmd.go
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "bufio"
+ "bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "github.com/alecthomas/kingpin"
+ htrace "htrace/client"
+ "htrace/common"
+ "htrace/conf"
+ "io"
+ "os"
+ "sort"
+ "strings"
+ "text/tabwriter"
+ "time"
+)
+
+var RELEASE_VERSION string
+var GIT_VERSION string
+
+const EXIT_SUCCESS = 0
+const EXIT_FAILURE = 1
+
+var verbose bool
+
+const USAGE = `The Apache HTrace command-line tool. This tool retrieves and modifies settings and
+other data on a running htraced daemon.
+
+If we find an ` + conf.CONFIG_FILE_NAME + ` configuration file in the list of directories
+specified in ` + conf.HTRACED_CONF_DIR + `, we will use that configuration; otherwise,
+the defaults will be used.
+`
+
+func main() {
+ // Load htraced configuration
+ cnf, cnfLog := conf.LoadApplicationConfig("htrace.tool.")
+ lg := common.NewLogger("conf", cnf)
+ defer lg.Close()
+ scanner := bufio.NewScanner(cnfLog)
+ for scanner.Scan() {
+ lg.Debugf(scanner.Text() + "\n")
+ }
+
+ // Parse argv
+ app := kingpin.New(os.Args[0], USAGE)
+ app.Flag("Dmy.key", "Set configuration key 'my.key' to 'my.value'. Replace 'my.key' "+
+ "with any key you want to set.").Default("my.value").String()
+ addr := app.Flag("addr", "Server address.").String()
+ verbose = *app.Flag("verbose", "Verbose.").Default("false").Bool()
+ version := app.Command("version", "Print the version of this program.")
+ serverVersion := app.Command("serverVersion", "Print the version of the htraced server.")
+ serverStats := app.Command("serverStats", "Print statistics retrieved from the htraced server.")
+ serverStatsJson := serverStats.Flag("json", "Display statistics as raw JSON.").Default("false").Bool()
+ serverDebugInfo := app.Command("serverDebugInfo", "Print the debug info of the htraced server.")
+ serverConf := app.Command("serverConf", "Print the server configuration retrieved from the htraced server.")
+ findSpan := app.Command("findSpan", "Print information about a trace span with a given ID.")
+ findSpanId := findSpan.Arg("id", "Span ID to find. Example: be305e54-4534-2110-a0b2-e06b9effe112").Required().String()
+ findChildren := app.Command("findChildren", "Print out the span IDs that are children of a given span ID.")
+ parentSpanId := findChildren.Arg("id", "Span ID to print children for. Example: be305e54-4534-2110-a0b2-e06b9effe112").
+ Required().String()
+ childLim := findChildren.Flag("lim", "Maximum number of child IDs to print.").Default("20").Int()
+ loadFile := app.Command("loadFile", "Write whitespace-separated JSON spans from a file to the server.")
+ loadFilePath := loadFile.Arg("path",
+ "A file containing whitespace-separated span JSON.").Required().String()
+ loadJson := app.Command("load", "Write JSON spans from the command-line to the server.")
+ loadJsonArg := loadJson.Arg("json", "A JSON span to write to the server.").Required().String()
+ dumpAll := app.Command("dumpAll", "Dump all spans from the htraced daemon.")
+ dumpAllOutPath := dumpAll.Arg("path", "The path to dump the trace spans to.").Default("-").String()
+ dumpAllLim := dumpAll.Flag("lim", "The number of spans to transfer from the server at once.").
+ Default("100").Int()
+ graph := app.Command("graph", "Visualize span JSON as a graph.")
+ graphJsonFile := graph.Arg("input", "The JSON file to load").Required().String()
+ graphDotFile := graph.Flag("output",
+ "The path to write a GraphViz dotfile to. This file can be used as input to "+
+ "GraphViz, in order to generate a pretty picture. See graphviz.org for more "+
+ "information about generating pictures of graphs.").Default("-").String()
+ query := app.Command("query", "Send a query to htraced.")
+ queryLim := query.Flag("lim", "Maximum number of spans to retrieve.").Default("20").Int()
+ queryArg := query.Arg("query", "The query string to send. Query strings have the format "+
+ "[TYPE] [OPERATOR] [CONST], joined by AND statements.").Required().String()
+ rawQuery := app.Command("rawQuery", "Send a raw JSON query to htraced.")
+ rawQueryArg := rawQuery.Arg("json", "The query JSON to send.").Required().String()
+ cmd := kingpin.MustParse(app.Parse(os.Args[1:]))
+
+ // Add the command-line settings into the configuration.
+ if *addr != "" {
+ cnf = cnf.Clone(conf.HTRACE_WEB_ADDRESS, *addr)
+ }
+
+ // Handle commands that don't require an HTrace client.
+ switch cmd {
+ case version.FullCommand():
+ os.Exit(printVersion())
+ case graph.FullCommand():
+ err := jsonSpanFileToDotFile(*graphJsonFile, *graphDotFile)
+ if err != nil {
+ fmt.Printf("graphing error: %s\n", err.Error())
+ os.Exit(EXIT_FAILURE)
+ }
+ os.Exit(EXIT_SUCCESS)
+ }
+
+ // Create HTrace client
+ hcl, err := htrace.NewClient(cnf, nil)
+ if err != nil {
+ fmt.Printf("Failed to create HTrace client: %s\n", err.Error())
+ os.Exit(EXIT_FAILURE)
+ }
+
+ // Handle commands that require an HTrace client.
+ switch cmd {
+ case version.FullCommand():
+ os.Exit(printVersion())
+ case serverVersion.FullCommand():
+ os.Exit(printServerVersion(hcl))
+ case serverStats.FullCommand():
+ if *serverStatsJson {
+ os.Exit(printServerStatsJson(hcl))
+ } else {
+ os.Exit(printServerStats(hcl))
+ }
+ case serverDebugInfo.FullCommand():
+ os.Exit(printServerDebugInfo(hcl))
+ case serverConf.FullCommand():
+ os.Exit(printServerConfJson(hcl))
+ case findSpan.FullCommand():
+ var id *common.SpanId
+ id.FromString(*findSpanId)
+ os.Exit(doFindSpan(hcl, *id))
+ case findChildren.FullCommand():
+ var id *common.SpanId
+ id.FromString(*parentSpanId)
+ os.Exit(doFindChildren(hcl, *id, *childLim))
+ case loadJson.FullCommand():
+ os.Exit(doLoadSpanJson(hcl, *loadJsonArg))
+ case loadFile.FullCommand():
+ os.Exit(doLoadSpanJsonFile(hcl, *loadFilePath))
+ case dumpAll.FullCommand():
+ err := doDumpAll(hcl, *dumpAllOutPath, *dumpAllLim)
+ if err != nil {
+ fmt.Printf("dumpAll error: %s\n", err.Error())
+ os.Exit(EXIT_FAILURE)
+ }
+ os.Exit(EXIT_SUCCESS)
+ case query.FullCommand():
+ err := doQueryFromString(hcl, *queryArg, *queryLim)
+ if err != nil {
+ fmt.Printf("query error: %s\n", err.Error())
+ os.Exit(EXIT_FAILURE)
+ }
+ os.Exit(EXIT_SUCCESS)
+ case rawQuery.FullCommand():
+ err := doRawQuery(hcl, *rawQueryArg)
+ if err != nil {
+ fmt.Printf("raw query error: %s\n", err.Error())
+ os.Exit(EXIT_FAILURE)
+ }
+ os.Exit(EXIT_SUCCESS)
+ }
+
+ app.UsageErrorf(os.Stderr, "You must supply a command to run.")
+}
+
+// Print the version of the htrace binary.
+func printVersion() int {
+ fmt.Printf("Running htracedTool %s [%s].\n", RELEASE_VERSION, GIT_VERSION)
+ return EXIT_SUCCESS
+}
+
+// Print information retrieved from an htraced server via /server/info
+func printServerVersion(hcl *htrace.Client) int {
+ ver, err := hcl.GetServerVersion()
+ if err != nil {
+ fmt.Println(err.Error())
+ return EXIT_FAILURE
+ }
+ fmt.Printf("HTraced server version %s (%s)\n", ver.ReleaseVersion, ver.GitVersion)
+ return EXIT_SUCCESS
+}
+
+// Print information retrieved from an htraced server via /server/info
+func printServerStats(hcl *htrace.Client) int {
+ stats, err := hcl.GetServerStats()
+ if err != nil {
+ fmt.Println(err.Error())
+ return EXIT_FAILURE
+ }
+ w := new(tabwriter.Writer)
+ w.Init(os.Stdout, 0, 8, 0, '\t', 0)
+ fmt.Fprintf(w, "HTRACED SERVER STATS\n")
+ fmt.Fprintf(w, "Datastore Start\t%s\n",
+ common.UnixMsToTime(stats.LastStartMs).Format(time.RFC3339))
+ fmt.Fprintf(w, "Server Time\t%s\n",
+ common.UnixMsToTime(stats.CurMs).Format(time.RFC3339))
+ fmt.Fprintf(w, "Spans reaped\t%d\n", stats.ReapedSpans)
+ fmt.Fprintf(w, "Spans ingested\t%d\n", stats.IngestedSpans)
+ fmt.Fprintf(w, "Spans written\t%d\n", stats.WrittenSpans)
+ fmt.Fprintf(w, "Spans dropped by server\t%d\n", stats.ServerDroppedSpans)
+ dur := time.Millisecond * time.Duration(stats.AverageWriteSpansLatencyMs)
+ fmt.Fprintf(w, "Average WriteSpan Latency\t%s\n", dur.String())
+ dur = time.Millisecond * time.Duration(stats.MaxWriteSpansLatencyMs)
+ fmt.Fprintf(w, "Maximum WriteSpan Latency\t%s\n", dur.String())
+ fmt.Fprintf(w, "Number of leveldb directories\t%d\n", len(stats.Dirs))
+ w.Flush()
+ fmt.Println("")
+ for i := range stats.Dirs {
+ dir := stats.Dirs[i]
+ fmt.Printf("==== %s ===\n", dir.Path)
+ fmt.Printf("Approximate number of bytes: %d\n", dir.ApproximateBytes)
+ stats := strings.Replace(dir.LevelDbStats, "\\n", "\n", -1)
+ fmt.Printf("%s\n", stats)
+ }
+ w = new(tabwriter.Writer)
+ w.Init(os.Stdout, 0, 8, 0, '\t', 0)
+ fmt.Fprintf(w, "HOST SPAN METRICS\n")
+ mtxMap := stats.HostSpanMetrics
+ keys := make(sort.StringSlice, len(mtxMap))
+ i := 0
+ for k, _ := range mtxMap {
+ keys[i] = k
+ i++
+ }
+ sort.Sort(keys)
+ for k := range keys {
+ mtx := mtxMap[keys[k]]
+ fmt.Fprintf(w, "%s\twritten: %d\tserver dropped: %d\n",
+ keys[k], mtx.Written, mtx.ServerDropped)
+ }
+ w.Flush()
+ return EXIT_SUCCESS
+}
+
+// Print information retrieved from an htraced server via /server/info as JSON
+func printServerStatsJson(hcl *htrace.Client) int {
+ stats, err := hcl.GetServerStats()
+ if err != nil {
+ fmt.Println(err.Error())
+ return EXIT_FAILURE
+ }
+ buf, err := json.MarshalIndent(stats, "", " ")
+ if err != nil {
+ fmt.Printf("Error marshalling server stats: %s", err.Error())
+ return EXIT_FAILURE
+ }
+ fmt.Printf("%s\n", string(buf))
+ return EXIT_SUCCESS
+}
+
+// Print information retrieved from an htraced server via /server/debugInfo
+func printServerDebugInfo(hcl *htrace.Client) int {
+ stats, err := hcl.GetServerDebugInfo()
+ if err != nil {
+ fmt.Println(err.Error())
+ return EXIT_FAILURE
+ }
+ fmt.Println("=== GOROUTINE STACKS ===")
+ fmt.Print(stats.StackTraces)
+ fmt.Println("=== END GOROUTINE STACKS ===")
+ fmt.Println("=== GC STATISTICS ===")
+ fmt.Print(stats.GCStats)
+ fmt.Println("=== END GC STATISTICS ===")
+ return EXIT_SUCCESS
+}
+
+// Print information retrieved from an htraced server via /server/conf as JSON
+func printServerConfJson(hcl *htrace.Client) int {
+ cnf, err := hcl.GetServerConf()
+ if err != nil {
+ fmt.Println(err.Error())
+ return EXIT_FAILURE
+ }
+ buf, err := json.MarshalIndent(cnf, "", " ")
+ if err != nil {
+ fmt.Printf("Error marshalling server conf: %s", err.Error())
+ return EXIT_FAILURE
+ }
+ fmt.Printf("%s\n", string(buf))
+ return EXIT_SUCCESS
+}
+
+// Print information about a trace span.
+func doFindSpan(hcl *htrace.Client, sid common.SpanId) int {
+ span, err := hcl.FindSpan(sid)
+ if err != nil {
+ fmt.Println(err.Error())
+ return EXIT_FAILURE
+ }
+ if span == nil {
+ fmt.Printf("Span ID not found.\n")
+ return EXIT_FAILURE
+ }
+ pbuf, err := json.MarshalIndent(span, "", " ")
+ if err != nil {
+ fmt.Printf("Error: error pretty-printing span to JSON: %s\n", err.Error())
+ return EXIT_FAILURE
+ }
+ fmt.Printf("%s\n", string(pbuf))
+ return EXIT_SUCCESS
+}
+
+func doLoadSpanJsonFile(hcl *htrace.Client, spanFile string) int {
+ if spanFile == "" {
+ fmt.Printf("You must specify the json file to load.\n")
+ return EXIT_FAILURE
+ }
+ file, err := OpenInputFile(spanFile)
+ if err != nil {
+ fmt.Printf("Failed to open %s: %s\n", spanFile, err.Error())
+ return EXIT_FAILURE
+ }
+ defer file.Close()
+ return doLoadSpans(hcl, bufio.NewReader(file))
+}
+
+func doLoadSpanJson(hcl *htrace.Client, spanJson string) int {
+ return doLoadSpans(hcl, bytes.NewBufferString(spanJson))
+}
+
+func doLoadSpans(hcl *htrace.Client, reader io.Reader) int {
+ dec := json.NewDecoder(reader)
+ spans := make([]*common.Span, 0, 32)
+ var err error
+ for {
+ var span common.Span
+ if err = dec.Decode(&span); err != nil {
+ if err == io.EOF {
+ break
+ }
+ fmt.Printf("Failed to decode JSON: %s\n", err.Error())
+ return EXIT_FAILURE
+ }
+ spans = append(spans, &span)
+ }
+ if verbose {
+ fmt.Printf("Writing ")
+ prefix := ""
+ for i := range spans {
+ fmt.Printf("%s%s", prefix, spans[i].ToJson())
+ prefix = ", "
+ }
+ fmt.Printf("\n")
+ }
+ err = hcl.WriteSpans(spans)
+ if err != nil {
+ fmt.Println(err.Error())
+ return EXIT_FAILURE
+ }
+ return EXIT_SUCCESS
+}
+
+// Find information about the children of a span.
+func doFindChildren(hcl *htrace.Client, sid common.SpanId, lim int) int {
+ spanIds, err := hcl.FindChildren(sid, lim)
+ if err != nil {
+ fmt.Printf("%s\n", err.Error())
+ return EXIT_FAILURE
+ }
+ pbuf, err := json.MarshalIndent(spanIds, "", " ")
+ if err != nil {
+ fmt.Println("Error: error pretty-printing span IDs to JSON: %s", err.Error())
+ return 1
+ }
+ fmt.Printf("%s\n", string(pbuf))
+ return 0
+}
+
+// Dump all spans from the htraced daemon.
+func doDumpAll(hcl *htrace.Client, outPath string, lim int) error {
+ file, err := CreateOutputFile(outPath)
+ if err != nil {
+ return err
+ }
+ w := bufio.NewWriter(file)
+ defer func() {
+ if file != nil {
+ w.Flush()
+ file.Close()
+ }
+ }()
+ out := make(chan *common.Span, 50)
+ var dumpErr error
+ go func() {
+ dumpErr = hcl.DumpAll(lim, out)
+ }()
+ var numSpans int64
+ nextLogTime := time.Now().Add(time.Second * 5)
+ for {
+ span, channelOpen := <-out
+ if !channelOpen {
+ break
+ }
+ if err == nil {
+ _, err = fmt.Fprintf(w, "%s\n", span.ToJson())
+ }
+ if verbose {
+ numSpans++
+ now := time.Now()
+ if !now.Before(nextLogTime) {
+ nextLogTime = now.Add(time.Second * 5)
+ fmt.Printf("received %d span(s)...\n", numSpans)
+ }
+ }
+ }
+ if err != nil {
+ return errors.New(fmt.Sprintf("Write error %s", err.Error()))
+ }
+ if dumpErr != nil {
+ return errors.New(fmt.Sprintf("Dump error %s", dumpErr.Error()))
+ }
+ err = w.Flush()
+ if err != nil {
+ return err
+ }
+ err = file.Close()
+ file = nil
+ if err != nil {
+ return err
+ }
+ return nil
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htracedTool/file.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htracedTool/file.go b/htrace-htraced/go/src/htrace/htracedTool/file.go
new file mode 100644
index 0000000..ca9c18d
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htracedTool/file.go
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "bufio"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "htrace/common"
+ "io"
+ "os"
+)
+
+// A file used for input.
+// Transparently supports using stdin for input.
+type InputFile struct {
+ *os.File
+ path string
+}
+
+// Open an input file. Stdin will be used when path is -
+func OpenInputFile(path string) (*InputFile, error) {
+ if path == "-" {
+ return &InputFile{File: os.Stdin, path: path}, nil
+ }
+ file, err := os.Open(path)
+ if err != nil {
+ return nil, err
+ }
+ return &InputFile{File: file, path: path}, nil
+}
+
+func (file *InputFile) Close() {
+ if file.path != "-" {
+ file.File.Close()
+ }
+}
+
+// A file used for output.
+// Transparently supports using stdout for output.
+type OutputFile struct {
+ *os.File
+ path string
+}
+
+// Create an output file. Stdout will be used when path is -
+func CreateOutputFile(path string) (*OutputFile, error) {
+ if path == "-" {
+ return &OutputFile{File: os.Stdout, path: path}, nil
+ }
+ file, err := os.Create(path)
+ if err != nil {
+ return nil, err
+ }
+ return &OutputFile{File: file, path: path}, nil
+}
+
+func (file *OutputFile) Close() error {
+ if file.path != "-" {
+ return file.File.Close()
+ }
+ return nil
+}
+
+// FailureDeferringWriter is a writer which allows us to call Printf multiple
+// times and then check if all the printfs succeeded at the very end, rather
+// than checking after each call. We will not attempt to write more data
+// after the first write failure.
+type FailureDeferringWriter struct {
+ io.Writer
+ err error
+}
+
+func NewFailureDeferringWriter(writer io.Writer) *FailureDeferringWriter {
+ return &FailureDeferringWriter{writer, nil}
+}
+
+func (w *FailureDeferringWriter) Printf(format string, v ...interface{}) {
+ if w.err != nil {
+ return
+ }
+ str := fmt.Sprintf(format, v...)
+ _, err := w.Writer.Write([]byte(str))
+ if err != nil {
+ w.err = err
+ }
+}
+
+func (w *FailureDeferringWriter) Error() error {
+ return w.err
+}
+
+// Read a file full of whitespace-separated span JSON into a slice of spans.
+func readSpansFile(path string) (common.SpanSlice, error) {
+ file, err := OpenInputFile(path)
+ if err != nil {
+ return nil, err
+ }
+ defer file.Close()
+ return readSpans(bufio.NewReader(file))
+}
+
+// Read whitespace-separated span JSON into a slice of spans.
+func readSpans(reader io.Reader) (common.SpanSlice, error) {
+ spans := make(common.SpanSlice, 0)
+ dec := json.NewDecoder(reader)
+ for {
+ var span common.Span
+ err := dec.Decode(&span)
+ if err != nil {
+ if err != io.EOF {
+ return nil, errors.New(fmt.Sprintf("Decode error after decoding %d "+
+ "span(s): %s", len(spans), err.Error()))
+ }
+ break
+ }
+ spans = append(spans, &span)
+ }
+ return spans, nil
+}