You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by iw...@apache.org on 2016/04/20 01:32:48 UTC

[5/7] incubator-htrace git commit: HTRACE-357. Rename htrace-htraced/go/src/org/apache/htrace to htrace-htraced/go/src/htrace (Colin Patrick McCabe via iwasakims)

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/heartbeater_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/heartbeater_test.go b/htrace-htraced/go/src/htrace/htraced/heartbeater_test.go
new file mode 100644
index 0000000..9157965
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/heartbeater_test.go
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"htrace/common"
+	"htrace/conf"
+	"testing"
+	"time"
+)
+
+func TestHeartbeaterStartupShutdown(t *testing.T) {
+	cnfBld := conf.Builder{
+		Values:   conf.TEST_VALUES(),
+		Defaults: conf.DEFAULTS,
+	}
+	cnf, err := cnfBld.Build()
+	if err != nil {
+		t.Fatalf("failed to create conf: %s", err.Error())
+	}
+	lg := common.NewLogger("heartbeater", cnf)
+	hb := NewHeartbeater("ExampleHeartbeater", 1, lg)
+	if hb.String() != "ExampleHeartbeater" {
+		t.Fatalf("hb.String() returned %s instead of %s\n", hb.String(), "ExampleHeartbeater")
+	}
+	hb.Shutdown()
+}
+
+// The number of milliseconds between heartbeats
+const HEARTBEATER_PERIOD = 5
+
+// The number of heartbeats to send in the test.
+const NUM_TEST_HEARTBEATS = 3
+
+func TestHeartbeaterSendsHeartbeats(t *testing.T) {
+	cnfBld := conf.Builder{
+		Values:   conf.TEST_VALUES(),
+		Defaults: conf.DEFAULTS,
+	}
+	cnf, err := cnfBld.Build()
+	if err != nil {
+		t.Fatalf("failed to create conf: %s", err.Error())
+	}
+	lg := common.NewLogger("heartbeater", cnf)
+	// The minimum amount of time which the heartbeater test should take
+	MINIMUM_TEST_DURATION := time.Millisecond * (NUM_TEST_HEARTBEATS * HEARTBEATER_PERIOD)
+	duration := MINIMUM_TEST_DURATION
+	for duration <= MINIMUM_TEST_DURATION {
+		start := time.Now()
+		testHeartbeaterSendsHeartbeatsImpl(t, lg)
+		end := time.Now()
+		duration = end.Sub(start)
+		lg.Debugf("Measured duration: %v; minimum expected duration: %v\n",
+			duration, MINIMUM_TEST_DURATION)
+	}
+}
+
+func testHeartbeaterSendsHeartbeatsImpl(t *testing.T, lg *common.Logger) {
+	hb := NewHeartbeater("ExampleHeartbeater", HEARTBEATER_PERIOD, lg)
+	if hb.String() != "ExampleHeartbeater" {
+		t.Fatalf("hb.String() returned %s instead of %s\n", hb.String(), "ExampleHeartbeater")
+	}
+	testChan := make(chan interface{}, NUM_TEST_HEARTBEATS)
+	gotAllHeartbeats := make(chan bool)
+	hb.AddHeartbeatTarget(&HeartbeatTarget{
+		name:       "ExampleHeartbeatTarget",
+		targetChan: testChan,
+	})
+	go func() {
+		for i := 0; i < NUM_TEST_HEARTBEATS; i++ {
+			<-testChan
+		}
+		gotAllHeartbeats <- true
+		for i := 0; i < NUM_TEST_HEARTBEATS; i++ {
+			_, open := <-testChan
+			if !open {
+				return
+			}
+		}
+	}()
+	<-gotAllHeartbeats
+	hb.Shutdown()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/hrpc.go b/htrace-htraced/go/src/htrace/htraced/hrpc.go
new file mode 100644
index 0000000..8b5a728
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/hrpc.go
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"bufio"
+	"bytes"
+	"encoding/binary"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"github.com/ugorji/go/codec"
+	"htrace/common"
+	"htrace/conf"
+	"io"
+	"net"
+	"net/rpc"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+const MAX_HRPC_HANDLERS = 32765
+
+// Handles HRPC calls
+type HrpcHandler struct {
+	lg    *common.Logger
+	store *dataStore
+}
+
+// The HRPC server
+type HrpcServer struct {
+	*rpc.Server
+	hand *HrpcHandler
+
+	// The listener we are using to accept new connections.
+	listener net.Listener
+
+	// A WaitGroup used to block until the HRPC server has exited.
+	exited sync.WaitGroup
+
+	// A channel containing server codecs to use.  This channel is fully
+	// buffered.  The number of entries it initially contains determines how
+	// many concurrent codecs we will have running at once.
+	cdcs chan *HrpcServerCodec
+
+	// Used to shut down
+	shutdown chan interface{}
+
+	// The I/O timeout to use when reading requests or sending responses.  This
+	// timeout does not apply to the time we spend processing the message.
+	ioTimeo time.Duration
+
+	// A count of all I/O errors that we have encountered since the server
+	// started.  This counts errors like improperly formatted message frames,
+	// but not errors like properly formatted but invalid messages.
+	// This count is updated from multiple goroutines via sync/atomic.
+	ioErrorCount uint64
+
+	// The test hooks to use, or nil during normal operation.
+	testHooks *hrpcTestHooks
+}
+
+type hrpcTestHooks struct {
+	// A callback we make right after calling Accept() but before reading from
+	// the new connection.
+	HandleAdmission func()
+}
+
+// A codec which encodes HRPC data via JSON.  This structure holds the context
+// for a particular client connection.
+type HrpcServerCodec struct {
+	lg *common.Logger
+
+	// The current connection.
+	conn net.Conn
+
+	// The HrpcServer which this connection is part of.
+	hsv *HrpcServer
+
+	// The message length we read from the header.
+	length uint32
+
+	// The number of messages this connection has handled.
+	numHandled int
+
+	// The buffer for reading requests.  These buffers are reused for multiple
+	// requests to avoid allocating memory.
+	buf []byte
+
+	// Configuration for msgpack decoding
+	msgpackHandle codec.MsgpackHandle
+}
+
+func asJson(val interface{}) string {
+	js, err := json.Marshal(val)
+	if err != nil {
+		return "encoding error: " + err.Error()
+	}
+	return string(js)
+}
+
+func newIoErrorWarn(cdc *HrpcServerCodec, val string) error {
+	return newIoError(cdc, val, common.WARN)
+}
+
+func newIoError(cdc *HrpcServerCodec, val string, level common.Level) error {
+	if cdc.lg.LevelEnabled(level) {
+		cdc.lg.Write(level, cdc.conn.RemoteAddr().String()+": "+val+"\n")
+	}
+	if level >= common.INFO {
+		atomic.AddUint64(&cdc.hsv.ioErrorCount, 1)
+	}
+	return errors.New(val)
+}
+
+func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
+	hdr := common.HrpcRequestHeader{}
+	if cdc.lg.TraceEnabled() {
+		cdc.lg.Tracef("%s: Reading HRPC request header.\n", cdc.conn.RemoteAddr())
+	}
+	cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
+	err := binary.Read(cdc.conn, binary.LittleEndian, &hdr)
+	if err != nil {
+		if err == io.EOF && cdc.numHandled > 0 {
+			return newIoError(cdc, fmt.Sprintf("Remote closed connection "+
+				"after writing %d message(s)", cdc.numHandled), common.DEBUG)
+		}
+		return newIoError(cdc,
+			fmt.Sprintf("Error reading request header: %s", err.Error()), common.WARN)
+	}
+	if cdc.lg.TraceEnabled() {
+		cdc.lg.Tracef("%s: Read HRPC request header %s\n",
+			cdc.conn.RemoteAddr(), asJson(&hdr))
+	}
+	if hdr.Magic != common.HRPC_MAGIC {
+		return newIoErrorWarn(cdc, fmt.Sprintf("Invalid request header: expected "+
+			"magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic))
+	}
+	if hdr.Length > common.MAX_HRPC_BODY_LENGTH {
+		return newIoErrorWarn(cdc, fmt.Sprintf("Length prefix was too long.  "+
+			"Maximum length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH,
+			hdr.Length))
+	}
+	req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
+	if req.ServiceMethod == "" {
+		return newIoErrorWarn(cdc, fmt.Sprintf("Unknown MethodID code 0x%04x",
+			hdr.MethodId))
+	}
+	req.Seq = hdr.Seq
+	cdc.length = hdr.Length
+	return nil
+}
+
+func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
+	remoteAddr := cdc.conn.RemoteAddr().String()
+	if cdc.lg.TraceEnabled() {
+		cdc.lg.Tracef("%s: Reading HRPC %d-byte request body.\n",
+			remoteAddr, cdc.length)
+	}
+	if cap(cdc.buf) < int(cdc.length) {
+		var pow uint
+		for pow = 0; (1 << pow) < int(cdc.length); pow++ {
+		}
+		cdc.buf = make([]byte, 0, 1<<pow)
+	}
+	_, err := io.ReadFull(cdc.conn, cdc.buf[:cdc.length])
+	if err != nil {
+		return newIoErrorWarn(cdc, fmt.Sprintf("Failed to read %d-byte "+
+			"request body: %s", cdc.length, err.Error()))
+	}
+	var zeroTime time.Time
+	cdc.conn.SetDeadline(zeroTime)
+
+	dec := codec.NewDecoderBytes(cdc.buf[:cdc.length], &cdc.msgpackHandle)
+	err = dec.Decode(body)
+	if cdc.lg.TraceEnabled() {
+		cdc.lg.Tracef("%s: read HRPC message: %s\n",
+			remoteAddr, asJson(&body))
+	}
+	req := body.(*common.WriteSpansReq)
+	if req == nil {
+		return nil
+	}
+	// We decode WriteSpans requests in a streaming fashion, to avoid overloading the garbage
+	// collector with a ton of trace spans all at once.
+	startTime := time.Now()
+	client, _, err := net.SplitHostPort(remoteAddr)
+	if err != nil {
+		return newIoErrorWarn(cdc, fmt.Sprintf("Failed to split host and port "+
+			"for %s: %s\n", remoteAddr, err.Error()))
+	}
+	hand := cdc.hsv.hand
+	ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid)
+	for spanIdx := 0; spanIdx < req.NumSpans; spanIdx++ {
+		var span *common.Span
+		err := dec.Decode(&span)
+		if err != nil {
+			return newIoErrorWarn(cdc, fmt.Sprintf("Failed to decode span %d "+
+				"out of %d: %s\n", spanIdx, req.NumSpans, err.Error()))
+		}
+		ing.IngestSpan(span)
+	}
+	ing.Close(startTime)
+	return nil
+}
+
+var EMPTY []byte = make([]byte, 0)
+
+func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) error {
+	cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
+	var err error
+	buf := EMPTY
+	if msg != nil {
+		w := bytes.NewBuffer(make([]byte, 0, 128))
+		enc := codec.NewEncoder(w, &cdc.msgpackHandle)
+		err := enc.Encode(msg)
+		if err != nil {
+			return newIoErrorWarn(cdc, fmt.Sprintf("Failed to marshal "+
+				"response message: %s", err.Error()))
+		}
+		buf = w.Bytes()
+	}
+	hdr := common.HrpcResponseHeader{}
+	hdr.MethodId = common.HrpcMethodNameToId(resp.ServiceMethod)
+	hdr.Seq = resp.Seq
+	hdr.ErrLength = uint32(len(resp.Error))
+	hdr.Length = uint32(len(buf))
+	writer := bufio.NewWriterSize(cdc.conn, 256)
+	err = binary.Write(writer, binary.LittleEndian, &hdr)
+	if err != nil {
+		return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+
+			"header: %s", err.Error()))
+	}
+	if hdr.ErrLength > 0 {
+		_, err = io.WriteString(writer, resp.Error)
+		if err != nil {
+			return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write error "+
+				"string: %s", err.Error()))
+		}
+	}
+	if hdr.Length > 0 {
+		var length int
+		length, err = writer.Write(buf)
+		if err != nil {
+			return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+
+				"message: %s", err.Error()))
+		}
+		if uint32(length) != hdr.Length {
+			return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write all of "+
+				"response message: %s", err.Error()))
+		}
+	}
+	err = writer.Flush()
+	if err != nil {
+		return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write the response "+
+			"bytes: %s", err.Error()))
+	}
+	cdc.numHandled++
+	return nil
+}
+
+func (cdc *HrpcServerCodec) Close() error {
+	err := cdc.conn.Close()
+	cdc.conn = nil
+	cdc.length = 0
+	cdc.numHandled = 0
+	cdc.hsv.cdcs <- cdc
+	return err
+}
+
+func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
+	resp *common.WriteSpansResp) (err error) {
+	// Nothing to do here; WriteSpans is handled in ReadRequestBody.
+	return nil
+}
+
+func CreateHrpcServer(cnf *conf.Config, store *dataStore,
+	testHooks *hrpcTestHooks) (*HrpcServer, error) {
+	lg := common.NewLogger("hrpc", cnf)
+	numHandlers := cnf.GetInt(conf.HTRACE_NUM_HRPC_HANDLERS)
+	if numHandlers < 1 {
+		lg.Warnf("%s must be positive: using 1 handler.\n", conf.HTRACE_NUM_HRPC_HANDLERS)
+		numHandlers = 1
+	}
+	if numHandlers > MAX_HRPC_HANDLERS {
+		lg.Warnf("%s cannot be more than %d: using %d handlers\n",
+			conf.HTRACE_NUM_HRPC_HANDLERS, MAX_HRPC_HANDLERS, MAX_HRPC_HANDLERS)
+		numHandlers = MAX_HRPC_HANDLERS
+	}
+	hsv := &HrpcServer{
+		Server: rpc.NewServer(),
+		hand: &HrpcHandler{
+			lg:    lg,
+			store: store,
+		},
+		cdcs:     make(chan *HrpcServerCodec, numHandlers),
+		shutdown: make(chan interface{}),
+		ioTimeo: time.Millisecond *
+			time.Duration(cnf.GetInt64(conf.HTRACE_HRPC_IO_TIMEOUT_MS)),
+		testHooks: testHooks,
+	}
+	for i := 0; i < numHandlers; i++ {
+		hsv.cdcs <- &HrpcServerCodec{
+			lg:  lg,
+			hsv: hsv,
+			msgpackHandle: codec.MsgpackHandle{
+				WriteExt: true,
+			},
+		}
+	}
+	var err error
+	hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS))
+	if err != nil {
+		return nil, err
+	}
+	hsv.Server.Register(hsv.hand)
+	hsv.exited.Add(1)
+	go hsv.run()
+	lg.Infof("Started HRPC server on %s with %d handler routines. "+
+		"ioTimeo=%s.\n", hsv.listener.Addr().String(), numHandlers,
+		hsv.ioTimeo.String())
+	return hsv, nil
+}
+
+func (hsv *HrpcServer) run() {
+	lg := hsv.hand.lg
+	srvAddr := hsv.listener.Addr().String()
+	defer func() {
+		lg.Infof("HrpcServer on %s exiting\n", srvAddr)
+		hsv.exited.Done()
+	}()
+	for {
+		select {
+		case cdc := <-hsv.cdcs:
+			conn, err := hsv.listener.Accept()
+			if err != nil {
+				lg.Errorf("HrpcServer on %s got accept error: %s\n", srvAddr, err.Error())
+				hsv.cdcs <- cdc // never blocks; there is always sufficient buffer space
+				continue
+			}
+			if lg.TraceEnabled() {
+				lg.Tracef("%s: Accepted HRPC connection.\n", conn.RemoteAddr())
+			}
+			cdc.conn = conn
+			cdc.numHandled = 0
+			if hsv.testHooks != nil && hsv.testHooks.HandleAdmission != nil {
+				hsv.testHooks.HandleAdmission()
+			}
+			go hsv.ServeCodec(cdc)
+		case <-hsv.shutdown:
+			return
+		}
+	}
+}
+
+func (hsv *HrpcServer) Addr() net.Addr {
+	return hsv.listener.Addr()
+}
+
+func (hsv *HrpcServer) GetNumIoErrors() uint64 {
+	return atomic.LoadUint64(&hsv.ioErrorCount)
+}
+
+func (hsv *HrpcServer) Close() {
+	close(hsv.shutdown)
+	hsv.listener.Close()
+	hsv.exited.Wait()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/htraced.go b/htrace-htraced/go/src/htrace/htraced/htraced.go
new file mode 100644
index 0000000..0d41e0d
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/htraced.go
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"bufio"
+	"encoding/json"
+	"fmt"
+	"github.com/alecthomas/kingpin"
+	"github.com/jmhodges/levigo"
+	"htrace/common"
+	"htrace/conf"
+	"net"
+	"os"
+	"runtime"
+	"time"
+)
+
+var RELEASE_VERSION string
+var GIT_VERSION string
+
+const USAGE = `htraced: the HTrace server daemon.
+
+htraced receives trace spans sent from HTrace clients.  It exposes a REST
+interface which others can query.  It also runs a web server with a graphical
+user interface.  htraced stores its span data in levelDB files on the local
+disks.
+
+Usage:
+--help: this help message
+
+-Dk=v: set configuration key 'k' to value 'v'
+For example -Dweb.address=127.0.0.1:8080 sets the web address to localhost,
+port 8080.  -Dlog.level=DEBUG will set the default log level to DEBUG.
+
+-Dk: set configuration key 'k' to 'true'
+
+Normally, configuration options should be set in the ` + conf.CONFIG_FILE_NAME + `
+configuration file.  We find this file by searching the paths in the
+` + conf.HTRACED_CONF_DIR + `. The command-line options are just an alternate way
+of setting configuration when launching the daemon.
+`
+
+func main() {
+	// Load the htraced configuration.
+	// This also parses the -Dfoo=bar command line arguments and removes them
+	// from os.Argv.
+	cnf, cnfLog := conf.LoadApplicationConfig("htraced.")
+
+	// Parse the remaining command-line arguments.
+	app := kingpin.New(os.Args[0], USAGE)
+	version := app.Command("version", "Print server version and exit.")
+	cmd := kingpin.MustParse(app.Parse(os.Args[1:]))
+
+	// Handle the "version" command-line argument.
+	if cmd == version.FullCommand() {
+		fmt.Printf("Running htraced %s [%s].\n", RELEASE_VERSION, GIT_VERSION)
+		os.Exit(0)
+	}
+
+	// Open the HTTP port.
+	// We want to do this first, before initializing the datastore or setting up
+	// logging.  That way, if someone accidentally starts two daemons with the
+	// same config file, the second invocation will exit with a "port in use"
+	// error rather than potentially disrupting the first invocation.
+	rstListener, listenErr := net.Listen("tcp", cnf.Get(conf.HTRACE_WEB_ADDRESS))
+	if listenErr != nil {
+		fmt.Fprintf(os.Stderr, "Error opening HTTP port: %s\n",
+			listenErr.Error())
+		os.Exit(1)
+	}
+
+	// Print out the startup banner and information about the daemon
+	// configuration.
+	lg := common.NewLogger("main", cnf)
+	defer lg.Close()
+	lg.Infof("*** Starting htraced %s [%s]***\n", RELEASE_VERSION, GIT_VERSION)
+	scanner := bufio.NewScanner(cnfLog)
+	for scanner.Scan() {
+		lg.Infof(scanner.Text() + "\n")
+	}
+	common.InstallSignalHandlers(cnf)
+	if runtime.GOMAXPROCS(0) == 1 {
+		ncpu := runtime.NumCPU()
+		runtime.GOMAXPROCS(ncpu)
+		lg.Infof("setting GOMAXPROCS=%d\n", ncpu)
+	} else {
+		lg.Infof("GOMAXPROCS=%d\n", runtime.GOMAXPROCS(0))
+	}
+	lg.Infof("leveldb version=%d.%d\n",
+		levigo.GetLevelDBMajorVersion(), levigo.GetLevelDBMinorVersion())
+
+	// Initialize the datastore.
+	store, err := CreateDataStore(cnf, nil)
+	if err != nil {
+		lg.Errorf("Error creating datastore: %s\n", err.Error())
+		os.Exit(1)
+	}
+	var rsv *RestServer
+	rsv, err = CreateRestServer(cnf, store, rstListener)
+	if err != nil {
+		lg.Errorf("Error creating REST server: %s\n", err.Error())
+		os.Exit(1)
+	}
+	var hsv *HrpcServer
+	if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
+		hsv, err = CreateHrpcServer(cnf, store, nil)
+		if err != nil {
+			lg.Errorf("Error creating HRPC server: %s\n", err.Error())
+			os.Exit(1)
+		}
+	} else {
+		lg.Infof("Not starting HRPC server because no value was given for %s.\n",
+			conf.HTRACE_HRPC_ADDRESS)
+	}
+	naddr := cnf.Get(conf.HTRACE_STARTUP_NOTIFICATION_ADDRESS)
+	if naddr != "" {
+		notif := StartupNotification{
+			HttpAddr:  rsv.Addr().String(),
+			ProcessId: os.Getpid(),
+		}
+		if hsv != nil {
+			notif.HrpcAddr = hsv.Addr().String()
+		}
+		err = sendStartupNotification(naddr, &notif)
+		if err != nil {
+			fmt.Fprintf(os.Stderr, "Failed to send startup notification: "+
+				"%s\n", err.Error())
+			os.Exit(1)
+		}
+	}
+	for {
+		time.Sleep(time.Duration(10) * time.Hour)
+	}
+}
+
+// A startup notification message that we optionally send on startup.
+// Used by unit tests.
+type StartupNotification struct {
+	HttpAddr  string
+	HrpcAddr  string
+	ProcessId int
+}
+
+func sendStartupNotification(naddr string, notif *StartupNotification) error {
+	conn, err := net.Dial("tcp", naddr)
+	if err != nil {
+		return err
+	}
+	defer func() {
+		if conn != nil {
+			conn.Close()
+		}
+	}()
+	var buf []byte
+	buf, err = json.Marshal(notif)
+	if err != nil {
+		return err
+	}
+	_, err = conn.Write(buf)
+	conn.Close()
+	conn = nil
+	return nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/loader.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/loader.go b/htrace-htraced/go/src/htrace/htraced/loader.go
new file mode 100644
index 0000000..95c5c3e
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/loader.go
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"bytes"
+	"errors"
+	"fmt"
+	"github.com/jmhodges/levigo"
+	"github.com/ugorji/go/codec"
+	"htrace/common"
+	"htrace/conf"
+	"io"
+	"math"
+	"math/rand"
+	"os"
+	"strings"
+	"syscall"
+	"time"
+)
+
+// Routines for loading the datastore.
+
+// The leveldb key which has information about the shard.
+const SHARD_INFO_KEY = 'w'
+
+// A constant signifying that we don't know what the layout version is.
+const UNKNOWN_LAYOUT_VERSION = 0
+
+// The current layout version.  We cannot read layout versions newer than this.
+// We may sometimes be able to read older versions, but only by doing an
+// upgrade.
+const CURRENT_LAYOUT_VERSION = 3
+
+type DataStoreLoader struct {
+	// The dataStore logger.
+	lg *common.Logger
+
+	// True if we should clear the stored data.
+	ClearStored bool
+
+	// The shards that we're loading
+	shards []*ShardLoader
+
+	// The options to use for opening datastores in LevelDB.
+	openOpts *levigo.Options
+
+	// The read options to use for LevelDB.
+	readOpts *levigo.ReadOptions
+
+	// The write options to use for LevelDB.
+	writeOpts *levigo.WriteOptions
+}
+
+// Information about a Shard.
+type ShardInfo struct {
+	// The layout version of the datastore.
+	// We should always keep this field so that old software can recognize new
+	// layout versions, even if it can't read them.
+	LayoutVersion uint64
+
+	// A random number identifying this daemon.
+	DaemonId uint64
+
+	// The total number of shards in this datastore.
+	TotalShards uint32
+
+	// The index of this shard within the datastore.
+	ShardIndex uint32
+}
+
+// Create a new datastore loader.
+// Initializes the loader, but does not load any leveldb instances.
+func NewDataStoreLoader(cnf *conf.Config) *DataStoreLoader {
+	dld := &DataStoreLoader{
+		lg:          common.NewLogger("datastore", cnf),
+		ClearStored: cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR),
+	}
+	dld.readOpts = levigo.NewReadOptions()
+	dld.readOpts.SetFillCache(true)
+	dld.readOpts.SetVerifyChecksums(false)
+	dld.writeOpts = levigo.NewWriteOptions()
+	dld.writeOpts.SetSync(false)
+	dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES)
+	rdirs := strings.Split(dirsStr, conf.PATH_LIST_SEP)
+	// Filter out empty entries
+	dirs := make([]string, 0, len(rdirs))
+	for i := range rdirs {
+		if strings.TrimSpace(rdirs[i]) != "" {
+			dirs = append(dirs, rdirs[i])
+		}
+	}
+	dld.shards = make([]*ShardLoader, len(dirs))
+	for i := range dirs {
+		dld.shards[i] = &ShardLoader{
+			dld:  dld,
+			path: dirs[i] + conf.PATH_SEP + "db",
+		}
+	}
+	dld.openOpts = levigo.NewOptions()
+	cacheSize := cnf.GetInt(conf.HTRACE_LEVELDB_CACHE_SIZE)
+	dld.openOpts.SetCache(levigo.NewLRUCache(cacheSize))
+	dld.openOpts.SetParanoidChecks(false)
+	writeBufferSize := cnf.GetInt(conf.HTRACE_LEVELDB_WRITE_BUFFER_SIZE)
+	if writeBufferSize > 0 {
+		dld.openOpts.SetWriteBufferSize(writeBufferSize)
+	}
+	maxFdPerShard := dld.calculateMaxOpenFilesPerShard()
+	if maxFdPerShard > 0 {
+		dld.openOpts.SetMaxOpenFiles(maxFdPerShard)
+	}
+	return dld
+}
+
+func (dld *DataStoreLoader) Close() {
+	if dld.lg != nil {
+		dld.lg.Close()
+		dld.lg = nil
+	}
+	if dld.openOpts != nil {
+		dld.openOpts.Close()
+		dld.openOpts = nil
+	}
+	if dld.readOpts != nil {
+		dld.readOpts.Close()
+		dld.readOpts = nil
+	}
+	if dld.writeOpts != nil {
+		dld.writeOpts.Close()
+		dld.writeOpts = nil
+	}
+	if dld.shards != nil {
+		for i := range dld.shards {
+			if dld.shards[i] != nil {
+				dld.shards[i].Close()
+			}
+		}
+		dld.shards = nil
+	}
+}
+
+func (dld *DataStoreLoader) DisownResources() {
+	dld.lg = nil
+	dld.openOpts = nil
+	dld.readOpts = nil
+	dld.writeOpts = nil
+	dld.shards = nil
+}
+
+// The maximum number of file descriptors we'll use on non-datastore things.
+const NON_DATASTORE_FD_MAX = 300
+
+// The minimum number of file descriptors per shard we will set.  Setting fewer
+// than this number could trigger a bug in some early versions of leveldb.
+const MIN_FDS_PER_SHARD = 80
+
+func (dld *DataStoreLoader) calculateMaxOpenFilesPerShard() int {
+	var rlim syscall.Rlimit
+	err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlim)
+	if err != nil {
+		dld.lg.Warnf("Unable to calculate maximum open files per shard: "+
+			"getrlimit failed: %s\n", err.Error())
+		return 0
+	}
+	// I think RLIMIT_NOFILE fits in 32 bits on all known operating systems,
+	// but there's no harm in being careful.  'int' in golang always holds at
+	// least 32 bits.
+	var maxFd int
+	if rlim.Cur > uint64(math.MaxInt32) {
+		maxFd = math.MaxInt32
+	} else {
+		maxFd = int(rlim.Cur)
+	}
+	if len(dld.shards) == 0 {
+		dld.lg.Warnf("Unable to calculate maximum open files per shard, " +
+			"since there are 0 shards configured.\n")
+		return 0
+	}
+	fdsPerShard := (maxFd - NON_DATASTORE_FD_MAX) / len(dld.shards)
+	if fdsPerShard < MIN_FDS_PER_SHARD {
+		dld.lg.Warnf("Expected to be able to use at least %d "+
+			"fds per shard, but we have %d shards and %d total fds to allocate, "+
+			"giving us only %d FDs per shard.", MIN_FDS_PER_SHARD,
+			len(dld.shards), maxFd-NON_DATASTORE_FD_MAX, fdsPerShard)
+		return 0
+	}
+	dld.lg.Infof("maxFd = %d.  Setting maxFdPerShard = %d\n",
+		maxFd, fdsPerShard)
+	return fdsPerShard
+}
+
+// Load information about all shards.
+func (dld *DataStoreLoader) LoadShards() {
+	for i := range dld.shards {
+		shd := dld.shards[i]
+		shd.load()
+	}
+}
+
+// Verify that the shard infos are consistent.
+// Reorders the shardInfo structures based on their ShardIndex.
+func (dld *DataStoreLoader) VerifyShardInfos() error {
+	if len(dld.shards) < 1 {
+		return errors.New("No shard directories found.")
+	}
+	// Make sure no shards had errors.
+	for i := range dld.shards {
+		shd := dld.shards[i]
+		if shd.infoErr != nil {
+			return shd.infoErr
+		}
+	}
+	// Make sure that if any shards are empty, all shards are empty.
+	emptyShards := ""
+	prefix := ""
+	for i := range dld.shards {
+		if dld.shards[i].info == nil {
+			emptyShards = prefix + dld.shards[i].path
+			prefix = ", "
+		}
+	}
+	if emptyShards != "" {
+		for i := range dld.shards {
+			if dld.shards[i].info != nil {
+				return errors.New(fmt.Sprintf("Shards %s were empty, but "+
+					"the other shards had data.", emptyShards))
+			}
+		}
+		// All shards are empty.
+		return nil
+	}
+	// Make sure that all shards have the same layout version, daemonId, and number of total
+	// shards.
+	layoutVersion := dld.shards[0].info.LayoutVersion
+	daemonId := dld.shards[0].info.DaemonId
+	totalShards := dld.shards[0].info.TotalShards
+	for i := 1; i < len(dld.shards); i++ {
+		shd := dld.shards[i]
+		if layoutVersion != shd.info.LayoutVersion {
+			return errors.New(fmt.Sprintf("Layout version mismatch.  Shard "+
+				"%s has layout version 0x%016x, but shard %s has layout "+
+				"version 0x%016x.",
+				dld.shards[0].path, layoutVersion, shd.path, shd.info.LayoutVersion))
+		}
+		if daemonId != shd.info.DaemonId {
+			return errors.New(fmt.Sprintf("DaemonId mismatch. Shard %s has "+
+				"daemonId 0x%016x, but shard %s has daemonId 0x%016x.",
+				dld.shards[0].path, daemonId, shd.path, shd.info.DaemonId))
+		}
+		if totalShards != shd.info.TotalShards {
+			return errors.New(fmt.Sprintf("TotalShards mismatch.  Shard %s has "+
+				"TotalShards = %d, but shard %s has TotalShards = %d.",
+				dld.shards[0].path, totalShards, shd.path, shd.info.TotalShards))
+		}
+		if shd.info.ShardIndex >= totalShards {
+			return errors.New(fmt.Sprintf("Invalid ShardIndex.  Shard %s has "+
+				"ShardIndex = %d, but TotalShards = %d.",
+				shd.path, shd.info.ShardIndex, shd.info.TotalShards))
+		}
+	}
+	if layoutVersion != CURRENT_LAYOUT_VERSION {
+		return errors.New(fmt.Sprintf("The layout version of all shards "+
+			"is %d, but we only support version %d.",
+			layoutVersion, CURRENT_LAYOUT_VERSION))
+	}
+	if totalShards != uint32(len(dld.shards)) {
+		return errors.New(fmt.Sprintf("The TotalShards field of all shards "+
+			"is %d, but we have %d shards.", totalShards, len(dld.shards)))
+	}
+	// Reorder shards in order of their ShardIndex.
+	reorderedShards := make([]*ShardLoader, len(dld.shards))
+	for i := 0; i < len(dld.shards); i++ {
+		shd := dld.shards[i]
+		shardIdx := shd.info.ShardIndex
+		if reorderedShards[shardIdx] != nil {
+			return errors.New(fmt.Sprintf("Both shard %s and "+
+				"shard %s have ShardIndex %d.", shd.path,
+				reorderedShards[shardIdx].path, shardIdx))
+		}
+		reorderedShards[shardIdx] = shd
+	}
+	dld.shards = reorderedShards
+	return nil
+}
+
+func (dld *DataStoreLoader) Load() error {
+	var err error
+	// If data.store.clear was set, clear existing data.
+	if dld.ClearStored {
+		err = dld.clearStored()
+		if err != nil {
+			return err
+		}
+	}
+	// Make sure the shard directories exist in all cases, with a mkdir -p
+	for i := range dld.shards {
+		err := os.MkdirAll(dld.shards[i].path, 0777)
+		if err != nil {
+			return errors.New(fmt.Sprintf("Failed to MkdirAll(%s): %s",
+				dld.shards[i].path, err.Error()))
+		}
+	}
+	// Get information about each shard, and verify them.
+	dld.LoadShards()
+	err = dld.VerifyShardInfos()
+	if err != nil {
+		return err
+	}
+	if dld.shards[0].ldb != nil {
+		dld.lg.Infof("Loaded %d leveldb instances with "+
+			"DaemonId of 0x%016x\n", len(dld.shards),
+			dld.shards[0].info.DaemonId)
+	} else {
+		// Create leveldb instances if needed.
+		rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
+		daemonId := uint64(rnd.Int63())
+		dld.lg.Infof("Initializing %d leveldb instances with a new "+
+			"DaemonId of 0x%016x\n", len(dld.shards), daemonId)
+		dld.openOpts.SetCreateIfMissing(true)
+		for i := range dld.shards {
+			shd := dld.shards[i]
+			shd.ldb, err = levigo.Open(shd.path, shd.dld.openOpts)
+			if err != nil {
+				return errors.New(fmt.Sprintf("levigo.Open(%s) failed to "+
+					"create the shard: %s", shd.path, err.Error()))
+			}
+			info := &ShardInfo{
+				LayoutVersion: CURRENT_LAYOUT_VERSION,
+				DaemonId:      daemonId,
+				TotalShards:   uint32(len(dld.shards)),
+				ShardIndex:    uint32(i),
+			}
+			err = shd.writeShardInfo(info)
+			if err != nil {
+				return errors.New(fmt.Sprintf("levigo.Open(%s) failed to "+
+					"write shard info: %s", shd.path, err.Error()))
+			}
+			dld.lg.Infof("Shard %s initialized with ShardInfo %s \n",
+				shd.path, asJson(info))
+		}
+	}
+	return nil
+}
+
+func (dld *DataStoreLoader) clearStored() error {
+	for i := range dld.shards {
+		path := dld.shards[i].path
+		fi, err := os.Stat(path)
+		if err != nil && !os.IsNotExist(err) {
+			dld.lg.Errorf("Failed to stat %s: %s\n", path, err.Error())
+			return err
+		}
+		if fi != nil {
+			err = os.RemoveAll(path)
+			if err != nil {
+				dld.lg.Errorf("Failed to clear existing datastore directory %s: %s\n",
+					path, err.Error())
+				return err
+			}
+			dld.lg.Infof("Cleared existing datastore directory %s\n", path)
+		}
+	}
+	return nil
+}
+
+type ShardLoader struct {
+	// The parent DataStoreLoader
+	dld *DataStoreLoader
+
+	// Path to the shard
+	path string
+
+	// Leveldb instance of the shard
+	ldb *levigo.DB
+
+	// Information about the shard
+	info *ShardInfo
+
+	// If non-null, the error we encountered trying to load the shard info.
+	infoErr error
+}
+
+func (shd *ShardLoader) Close() {
+	if shd.ldb != nil {
+		shd.ldb.Close()
+		shd.ldb = nil
+	}
+}
+
+// Load information about a particular shard.
+func (shd *ShardLoader) load() {
+	shd.info = nil
+	fi, err := os.Stat(shd.path)
+	if err != nil {
+		if os.IsNotExist(err) {
+			shd.infoErr = nil
+			return
+		}
+		shd.infoErr = errors.New(fmt.Sprintf(
+			"stat() error on leveldb directory "+
+				"%s: %s", shd.path, err.Error()))
+		return
+	}
+	if !fi.Mode().IsDir() {
+		shd.infoErr = errors.New(fmt.Sprintf(
+			"stat() error on leveldb directory "+
+				"%s: inode is not directory.", shd.path))
+		return
+	}
+	var dbDir *os.File
+	dbDir, err = os.Open(shd.path)
+	if err != nil {
+		shd.infoErr = errors.New(fmt.Sprintf(
+			"open() error on leveldb directory "+
+				"%s: %s.", shd.path, err.Error()))
+		return
+	}
+	defer func() {
+		if dbDir != nil {
+			dbDir.Close()
+		}
+	}()
+	_, err = dbDir.Readdirnames(1)
+	if err != nil {
+		if err == io.EOF {
+			// The db directory is empty.
+			shd.infoErr = nil
+			return
+		}
+		shd.infoErr = errors.New(fmt.Sprintf(
+			"Readdirnames() error on leveldb directory "+
+				"%s: %s.", shd.path, err.Error()))
+		return
+	}
+	dbDir.Close()
+	dbDir = nil
+	shd.ldb, err = levigo.Open(shd.path, shd.dld.openOpts)
+	if err != nil {
+		shd.ldb = nil
+		shd.infoErr = errors.New(fmt.Sprintf(
+			"levigo.Open() error on leveldb directory "+
+				"%s: %s.", shd.path, err.Error()))
+		return
+	}
+	shd.info, err = shd.readShardInfo()
+	if err != nil {
+		shd.infoErr = err
+		return
+	}
+	shd.infoErr = nil
+}
+
+func (shd *ShardLoader) readShardInfo() (*ShardInfo, error) {
+	buf, err := shd.ldb.Get(shd.dld.readOpts, []byte{SHARD_INFO_KEY})
+	if err != nil {
+		return nil, errors.New(fmt.Sprintf("readShardInfo(%s): failed to "+
+			"read shard info key: %s", shd.path, err.Error()))
+	}
+	if len(buf) == 0 {
+		return nil, errors.New(fmt.Sprintf("readShardInfo(%s): got zero-"+
+			"length value for shard info key.", shd.path))
+	}
+	mh := new(codec.MsgpackHandle)
+	mh.WriteExt = true
+	r := bytes.NewBuffer(buf)
+	decoder := codec.NewDecoder(r, mh)
+	shardInfo := &ShardInfo{
+		LayoutVersion: UNKNOWN_LAYOUT_VERSION,
+	}
+	err = decoder.Decode(shardInfo)
+	if err != nil {
+		return nil, errors.New(fmt.Sprintf("readShardInfo(%s): msgpack "+
+			"decoding failed for shard info key: %s", shd.path, err.Error()))
+	}
+	return shardInfo, nil
+}
+
+func (shd *ShardLoader) writeShardInfo(info *ShardInfo) error {
+	mh := new(codec.MsgpackHandle)
+	mh.WriteExt = true
+	w := new(bytes.Buffer)
+	enc := codec.NewEncoder(w, mh)
+	err := enc.Encode(info)
+	if err != nil {
+		return errors.New(fmt.Sprintf("msgpack encoding error: %s",
+			err.Error()))
+	}
+	err = shd.ldb.Put(shd.dld.writeOpts, []byte{SHARD_INFO_KEY}, w.Bytes())
+	if err != nil {
+		return errors.New(fmt.Sprintf("leveldb write error: %s",
+			err.Error()))
+	}
+	return nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/metrics.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/metrics.go b/htrace-htraced/go/src/htrace/htraced/metrics.go
new file mode 100644
index 0000000..d2feca8
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/metrics.go
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"htrace/common"
+	"htrace/conf"
+	"math"
+	"sync"
+	"time"
+)
+
+//
+// The Metrics Sink for HTraced.
+//
+// The Metrics sink keeps track of metrics for the htraced daemon.
+// It is important to have good metrics so that we can properly manager htraced.  In particular, we
+// need to know what rate we are receiving spans at, the main places spans came from.  If spans
+// were dropped because of a high sampling rates, we need to know which part of the system dropped
+// them so that we can adjust the sampling rate there.
+//
+
+const LATENCY_CIRC_BUF_SIZE = 4096
+
+type MetricsSink struct {
+	// The metrics sink logger.
+	lg *common.Logger
+
+	// The maximum number of entries we shuld allow in the HostSpanMetrics map.
+	maxMtx int
+
+	// The total number of spans ingested by the server (counting dropped spans)
+	IngestedSpans uint64
+
+	// The total number of spans written to leveldb since the server started.
+	WrittenSpans uint64
+
+	// The total number of spans dropped by the server.
+	ServerDropped uint64
+
+	// Per-host Span Metrics
+	HostSpanMetrics common.SpanMetricsMap
+
+	// The last few writeSpan latencies
+	wsLatencyCircBuf *CircBufU32
+
+	// Lock protecting all metrics
+	lock sync.Mutex
+}
+
+func NewMetricsSink(cnf *conf.Config) *MetricsSink {
+	return &MetricsSink{
+		lg:               common.NewLogger("metrics", cnf),
+		maxMtx:           cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
+		HostSpanMetrics:  make(common.SpanMetricsMap),
+		wsLatencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE),
+	}
+}
+
+// Update the total number of spans which were ingested, as well as other
+// metrics that get updated during span ingest.
+func (msink *MetricsSink) UpdateIngested(addr string, totalIngested int,
+	serverDropped int, wsLatency time.Duration) {
+	msink.lock.Lock()
+	defer msink.lock.Unlock()
+	msink.IngestedSpans += uint64(totalIngested)
+	msink.ServerDropped += uint64(serverDropped)
+	msink.updateSpanMetrics(addr, 0, serverDropped)
+	wsLatencyMs := wsLatency.Nanoseconds() / 1000000
+	var wsLatency32 uint32
+	if wsLatencyMs > math.MaxUint32 {
+		wsLatency32 = math.MaxUint32
+	} else {
+		wsLatency32 = uint32(wsLatencyMs)
+	}
+	msink.wsLatencyCircBuf.Append(wsLatency32)
+}
+
+// Update the per-host span metrics.  Must be called with the lock held.
+func (msink *MetricsSink) updateSpanMetrics(addr string, numWritten int,
+	serverDropped int) {
+	mtx, found := msink.HostSpanMetrics[addr]
+	if !found {
+		// Ensure that the per-host span metrics map doesn't grow too large.
+		if len(msink.HostSpanMetrics) >= msink.maxMtx {
+			// Delete a random entry
+			for k := range msink.HostSpanMetrics {
+				msink.lg.Warnf("Evicting metrics entry for addr %s "+
+					"because there are more than %d addrs.\n", k, msink.maxMtx)
+				delete(msink.HostSpanMetrics, k)
+				break
+			}
+		}
+		mtx = &common.SpanMetrics{}
+		msink.HostSpanMetrics[addr] = mtx
+	}
+	mtx.Written += uint64(numWritten)
+	mtx.ServerDropped += uint64(serverDropped)
+}
+
+// Update the total number of spans which were persisted to disk.
+func (msink *MetricsSink) UpdatePersisted(addr string, totalWritten int,
+	serverDropped int) {
+	msink.lock.Lock()
+	defer msink.lock.Unlock()
+	msink.WrittenSpans += uint64(totalWritten)
+	msink.ServerDropped += uint64(serverDropped)
+	msink.updateSpanMetrics(addr, totalWritten, serverDropped)
+}
+
+// Read the server stats.
+func (msink *MetricsSink) PopulateServerStats(stats *common.ServerStats) {
+	msink.lock.Lock()
+	defer msink.lock.Unlock()
+	stats.IngestedSpans = msink.IngestedSpans
+	stats.WrittenSpans = msink.WrittenSpans
+	stats.ServerDroppedSpans = msink.ServerDropped
+	stats.MaxWriteSpansLatencyMs = msink.wsLatencyCircBuf.Max()
+	stats.AverageWriteSpansLatencyMs = msink.wsLatencyCircBuf.Average()
+	stats.HostSpanMetrics = make(common.SpanMetricsMap)
+	for k, v := range msink.HostSpanMetrics {
+		stats.HostSpanMetrics[k] = &common.SpanMetrics{
+			Written:       v.Written,
+			ServerDropped: v.ServerDropped,
+		}
+	}
+}
+
+// A circular buffer of uint32s which supports appending and taking the
+// average, and some other things.
+type CircBufU32 struct {
+	// The next slot to fill
+	slot int
+
+	// The number of slots which are in use.  This number only ever
+	// increases until the buffer is full.
+	slotsUsed int
+
+	// The buffer
+	buf []uint32
+}
+
+func NewCircBufU32(size int) *CircBufU32 {
+	return &CircBufU32{
+		slotsUsed: -1,
+		buf:       make([]uint32, size),
+	}
+}
+
+func (cbuf *CircBufU32) Max() uint32 {
+	var max uint32
+	for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ {
+		if cbuf.buf[bufIdx] > max {
+			max = cbuf.buf[bufIdx]
+		}
+	}
+	return max
+}
+
+func (cbuf *CircBufU32) Average() uint32 {
+	var total uint64
+	for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ {
+		total += uint64(cbuf.buf[bufIdx])
+	}
+	return uint32(total / uint64(cbuf.slotsUsed))
+}
+
+func (cbuf *CircBufU32) Append(val uint32) {
+	cbuf.buf[cbuf.slot] = val
+	cbuf.slot++
+	if cbuf.slotsUsed < cbuf.slot {
+		cbuf.slotsUsed = cbuf.slot
+	}
+	if cbuf.slot >= len(cbuf.buf) {
+		cbuf.slot = 0
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/metrics_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/htrace/htraced/metrics_test.go
new file mode 100644
index 0000000..4f27ffd
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/metrics_test.go
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"fmt"
+	htrace "htrace/client"
+	"htrace/common"
+	"htrace/conf"
+	"reflect"
+	"testing"
+	"time"
+)
+
+func compareTotals(a, b common.SpanMetricsMap) bool {
+	for k, v := range a {
+		if !reflect.DeepEqual(v, b[k]) {
+			return false
+		}
+	}
+	for k, v := range b {
+		if !reflect.DeepEqual(v, a[k]) {
+			return false
+		}
+	}
+	return true
+}
+
+type Fatalfer interface {
+	Fatalf(format string, args ...interface{})
+}
+
+func assertNumWrittenEquals(t Fatalfer, msink *MetricsSink,
+	expectedNumWritten int) {
+	var sstats common.ServerStats
+	msink.PopulateServerStats(&sstats)
+	if sstats.WrittenSpans != uint64(expectedNumWritten) {
+		t.Fatalf("sstats.WrittenSpans = %d, but expected %d\n",
+			sstats.WrittenSpans, len(SIMPLE_TEST_SPANS))
+	}
+	if sstats.HostSpanMetrics["127.0.0.1"] == nil {
+		t.Fatalf("no entry for sstats.HostSpanMetrics[127.0.0.1] found.")
+	}
+	if sstats.HostSpanMetrics["127.0.0.1"].Written !=
+		uint64(expectedNumWritten) {
+		t.Fatalf("sstats.HostSpanMetrics[127.0.0.1].Written = %d, but "+
+			"expected %d\n", sstats.HostSpanMetrics["127.0.0.1"].Written,
+			len(SIMPLE_TEST_SPANS))
+	}
+}
+
+func TestMetricsSinkPerHostEviction(t *testing.T) {
+	cnfBld := conf.Builder{
+		Values:   conf.TEST_VALUES(),
+		Defaults: conf.DEFAULTS,
+	}
+	cnfBld.Values[conf.HTRACE_METRICS_MAX_ADDR_ENTRIES] = "2"
+	cnf, err := cnfBld.Build()
+	if err != nil {
+		t.Fatalf("failed to create conf: %s", err.Error())
+	}
+	msink := NewMetricsSink(cnf)
+	msink.UpdatePersisted("192.168.0.100", 20, 10)
+	msink.UpdatePersisted("192.168.0.101", 20, 10)
+	msink.UpdatePersisted("192.168.0.102", 20, 10)
+	msink.lock.Lock()
+	defer msink.lock.Unlock()
+	if len(msink.HostSpanMetrics) != 2 {
+		for k, v := range msink.HostSpanMetrics {
+			fmt.Printf("WATERMELON: [%s] = [%s]\n", k, v)
+		}
+		t.Fatalf("Expected len(msink.HostSpanMetrics) to be 2, but got %d\n",
+			len(msink.HostSpanMetrics))
+	}
+}
+
+func TestIngestedSpansMetricsRest(t *testing.T) {
+	testIngestedSpansMetricsImpl(t, false)
+}
+
+func TestIngestedSpansMetricsPacked(t *testing.T) {
+	testIngestedSpansMetricsImpl(t, true)
+}
+
+func testIngestedSpansMetricsImpl(t *testing.T, usePacked bool) {
+	htraceBld := &MiniHTracedBuilder{Name: "TestIngestedSpansMetrics",
+		DataDirs: make([]string, 2),
+	}
+	ht, err := htraceBld.Build()
+	if err != nil {
+		t.Fatalf("failed to create datastore: %s", err.Error())
+	}
+	defer ht.Close()
+	var hcl *htrace.Client
+	hcl, err = htrace.NewClient(ht.ClientConf(), &htrace.TestHooks{
+		HrpcDisabled: !usePacked,
+	})
+	if err != nil {
+		t.Fatalf("failed to create client: %s", err.Error())
+	}
+
+	NUM_TEST_SPANS := 12
+	allSpans := createRandomTestSpans(NUM_TEST_SPANS)
+	err = hcl.WriteSpans(allSpans)
+	if err != nil {
+		t.Fatalf("WriteSpans failed: %s\n", err.Error())
+	}
+	for {
+		var stats *common.ServerStats
+		stats, err = hcl.GetServerStats()
+		if err != nil {
+			t.Fatalf("GetServerStats failed: %s\n", err.Error())
+		}
+		if stats.IngestedSpans == uint64(NUM_TEST_SPANS) {
+			break
+		}
+		time.Sleep(1 * time.Millisecond)
+	}
+}
+
+func TestCircBuf32(t *testing.T) {
+	cbuf := NewCircBufU32(3)
+	// We arbitrarily define that empty circular buffers have an average of 0.
+	if cbuf.Average() != 0 {
+		t.Fatalf("expected empty CircBufU32 to have an average of 0.\n")
+	}
+	if cbuf.Max() != 0 {
+		t.Fatalf("expected empty CircBufU32 to have a max of 0.\n")
+	}
+	cbuf.Append(2)
+	if cbuf.Average() != 2 {
+		t.Fatalf("expected one-element CircBufU32 to have an average of 2.\n")
+	}
+	cbuf.Append(10)
+	if cbuf.Average() != 6 {
+		t.Fatalf("expected two-element CircBufU32 to have an average of 6.\n")
+	}
+	cbuf.Append(12)
+	if cbuf.Average() != 8 {
+		t.Fatalf("expected three-element CircBufU32 to have an average of 8.\n")
+	}
+	cbuf.Append(14)
+	// The 14 overwrites the original 2 element.
+	if cbuf.Average() != 12 {
+		t.Fatalf("expected three-element CircBufU32 to have an average of 12.\n")
+	}
+	cbuf.Append(1)
+	// The 1 overwrites the original 10 element.
+	if cbuf.Average() != 9 {
+		t.Fatalf("expected three-element CircBufU32 to have an average of 12.\n")
+	}
+	if cbuf.Max() != 14 {
+		t.Fatalf("expected three-element CircBufU32 to have a max of 14.\n")
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/mini_htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/htrace/htraced/mini_htraced.go
new file mode 100644
index 0000000..af8d379
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/mini_htraced.go
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"fmt"
+	"htrace/common"
+	"htrace/conf"
+	"io/ioutil"
+	"net"
+	"os"
+	"strings"
+)
+
+//
+// MiniHTraceD is used in unit tests to set up a daemon with certain settings.
+// It takes care of things like creating and cleaning up temporary directories.
+//
+
+// The default number of managed data directories to use.
+const DEFAULT_NUM_DATA_DIRS = 2
+
+// Builds a MiniHTraced object.
+type MiniHTracedBuilder struct {
+	// The name of the MiniHTraced to build.  This shows up in the test directory name and some
+	// other places.
+	Name string
+
+	// The configuration values to use for the MiniHTraced.
+	// If ths is nil, we use the default configuration for everything.
+	Cnf map[string]string
+
+	// The DataDirs to use.  Empty entries will turn into random names.
+	DataDirs []string
+
+	// If true, we will keep the data dirs around after MiniHTraced#Close
+	KeepDataDirsOnClose bool
+
+	// If non-null, the WrittenSpans semaphore to use when creating the DataStore.
+	WrittenSpans *common.Semaphore
+
+	// The test hooks to use for the HRPC server
+	HrpcTestHooks *hrpcTestHooks
+}
+
+type MiniHTraced struct {
+	Name                string
+	Cnf                 *conf.Config
+	DataDirs            []string
+	Store               *dataStore
+	Rsv                 *RestServer
+	Hsv                 *HrpcServer
+	Lg                  *common.Logger
+	KeepDataDirsOnClose bool
+}
+
+func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) {
+	var err error
+	var store *dataStore
+	var rsv *RestServer
+	var hsv *HrpcServer
+	if bld.Name == "" {
+		bld.Name = "HTraceTest"
+	}
+	if bld.Cnf == nil {
+		bld.Cnf = make(map[string]string)
+	}
+	if bld.DataDirs == nil {
+		bld.DataDirs = make([]string, 2)
+	}
+	for idx := range bld.DataDirs {
+		if bld.DataDirs[idx] == "" {
+			bld.DataDirs[idx], err = ioutil.TempDir(os.TempDir(),
+				fmt.Sprintf("%s%d", bld.Name, idx+1))
+			if err != nil {
+				return nil, err
+			}
+		}
+	}
+	// Copy the default test configuration values.
+	for k, v := range conf.TEST_VALUES() {
+		_, hasVal := bld.Cnf[k]
+		if !hasVal {
+			bld.Cnf[k] = v
+		}
+	}
+	bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] =
+		strings.Join(bld.DataDirs, conf.PATH_LIST_SEP)
+	cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS}
+	cnf, err := cnfBld.Build()
+	if err != nil {
+		return nil, err
+	}
+	lg := common.NewLogger("mini.htraced", cnf)
+	defer func() {
+		if err != nil {
+			if store != nil {
+				store.Close()
+			}
+			for idx := range bld.DataDirs {
+				if !bld.KeepDataDirsOnClose {
+					if bld.DataDirs[idx] != "" {
+						os.RemoveAll(bld.DataDirs[idx])
+					}
+				}
+			}
+			if rsv != nil {
+				rsv.Close()
+			}
+			lg.Infof("Failed to create MiniHTraced %s: %s\n", bld.Name, err.Error())
+			lg.Close()
+		}
+	}()
+	store, err = CreateDataStore(cnf, bld.WrittenSpans)
+	if err != nil {
+		return nil, err
+	}
+	rstListener, listenErr := net.Listen("tcp", cnf.Get(conf.HTRACE_WEB_ADDRESS))
+	if listenErr != nil {
+		return nil, listenErr
+	}
+	defer func() {
+		if rstListener != nil {
+			rstListener.Close()
+		}
+	}()
+	rsv, err = CreateRestServer(cnf, store, rstListener)
+	if err != nil {
+		return nil, err
+	}
+	rstListener = nil
+	hsv, err = CreateHrpcServer(cnf, store, bld.HrpcTestHooks)
+	if err != nil {
+		return nil, err
+	}
+
+	lg.Infof("Created MiniHTraced %s\n", bld.Name)
+	return &MiniHTraced{
+		Name:                bld.Name,
+		Cnf:                 cnf,
+		DataDirs:            bld.DataDirs,
+		Store:               store,
+		Rsv:                 rsv,
+		Hsv:                 hsv,
+		Lg:                  lg,
+		KeepDataDirsOnClose: bld.KeepDataDirsOnClose,
+	}, nil
+}
+
+// Return a Config object that clients can use to connect to this MiniHTraceD.
+func (ht *MiniHTraced) ClientConf() *conf.Config {
+	return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(),
+		conf.HTRACE_HRPC_ADDRESS, ht.Hsv.Addr().String())
+}
+
+// Return a Config object that clients can use to connect to this MiniHTraceD
+// by HTTP only (no HRPC).
+func (ht *MiniHTraced) RestOnlyClientConf() *conf.Config {
+	return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(),
+		conf.HTRACE_HRPC_ADDRESS, "")
+}
+
+func (ht *MiniHTraced) Close() {
+	ht.Lg.Infof("Closing MiniHTraced %s\n", ht.Name)
+	ht.Rsv.Close()
+	ht.Hsv.Close()
+	ht.Store.Close()
+	if !ht.KeepDataDirsOnClose {
+		for idx := range ht.DataDirs {
+			ht.Lg.Infof("Removing %s...\n", ht.DataDirs[idx])
+			os.RemoveAll(ht.DataDirs[idx])
+		}
+	}
+	ht.Lg.Infof("Finished closing MiniHTraced %s\n", ht.Name)
+	ht.Lg.Close()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/reaper_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/reaper_test.go b/htrace-htraced/go/src/htrace/htraced/reaper_test.go
new file mode 100644
index 0000000..af11e38
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/reaper_test.go
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"fmt"
+	"htrace/common"
+	"htrace/conf"
+	"htrace/test"
+	"math/rand"
+	"testing"
+	"time"
+)
+
+func TestReapingOldSpans(t *testing.T) {
+	const NUM_TEST_SPANS = 20
+	testSpans := make([]*common.Span, NUM_TEST_SPANS)
+	rnd := rand.New(rand.NewSource(2))
+	now := common.TimeToUnixMs(time.Now().UTC())
+	for i := range testSpans {
+		testSpans[i] = test.NewRandomSpan(rnd, testSpans[0:i])
+		testSpans[i].Begin = now - int64(NUM_TEST_SPANS-1-i)
+		testSpans[i].Description = fmt.Sprintf("Span%02d", i)
+	}
+	htraceBld := &MiniHTracedBuilder{Name: "TestReapingOldSpans",
+		Cnf: map[string]string{
+			conf.HTRACE_SPAN_EXPIRY_MS:                fmt.Sprintf("%d", 60*60*1000),
+			conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS:    "1",
+			conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "1",
+		},
+		WrittenSpans: common.NewSemaphore(0),
+		DataDirs:     make([]string, 2),
+	}
+	ht, err := htraceBld.Build()
+	if err != nil {
+		t.Fatalf("failed to create mini htraced cluster: %s\n", err.Error())
+	}
+	ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "")
+	for spanIdx := range testSpans {
+		ing.IngestSpan(testSpans[spanIdx])
+	}
+	ing.Close(time.Now())
+	// Wait the spans to be created
+	ht.Store.WrittenSpans.Waits(NUM_TEST_SPANS)
+	// Set a reaper date that will remove all the spans except final one.
+	ht.Store.rpr.SetReaperDate(now)
+
+	common.WaitFor(5*time.Minute, time.Millisecond, func() bool {
+		for i := 0; i < NUM_TEST_SPANS-1; i++ {
+			span := ht.Store.FindSpan(testSpans[i].Id)
+			if span != nil {
+				ht.Store.lg.Debugf("Waiting for %s to be removed...\n",
+					testSpans[i].Description)
+				return false
+			}
+		}
+		span := ht.Store.FindSpan(testSpans[NUM_TEST_SPANS-1].Id)
+		if span == nil {
+			ht.Store.lg.Debugf("Did not expect %s to be removed\n",
+				testSpans[NUM_TEST_SPANS-1].Description)
+			return false
+		}
+		return true
+	})
+	defer ht.Close()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htraced/rest.go b/htrace-htraced/go/src/htrace/htraced/rest.go
new file mode 100644
index 0000000..1ba4791
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htraced/rest.go
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"github.com/gorilla/mux"
+	"htrace/common"
+	"htrace/conf"
+	"net"
+	"net/http"
+	"os"
+	"path/filepath"
+	"strconv"
+	"strings"
+	"time"
+)
+
+// Set the response headers.
+func setResponseHeaders(hdr http.Header) {
+	hdr.Set("Content-Type", "application/json")
+}
+
+// Write a JSON error response.
+func writeError(lg *common.Logger, w http.ResponseWriter, errCode int,
+	errStr string) {
+	str := strings.Replace(errStr, `"`, `'`, -1)
+	lg.Info(str + "\n")
+	w.WriteHeader(errCode)
+	w.Write([]byte(`{ "error" : "` + str + `"}`))
+}
+
+type serverVersionHandler struct {
+	lg *common.Logger
+}
+
+func (hand *serverVersionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+	setResponseHeaders(w.Header())
+	version := common.ServerVersion{ReleaseVersion: RELEASE_VERSION,
+		GitVersion: GIT_VERSION}
+	buf, err := json.Marshal(&version)
+	if err != nil {
+		writeError(hand.lg, w, http.StatusInternalServerError,
+			fmt.Sprintf("error marshalling ServerVersion: %s\n", err.Error()))
+		return
+	}
+	if hand.lg.DebugEnabled() {
+		hand.lg.Debugf("Returned ServerVersion %s\n", string(buf))
+	}
+	w.Write(buf)
+}
+
+type serverDebugInfoHandler struct {
+	lg *common.Logger
+}
+
+func (hand *serverDebugInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+	setResponseHeaders(w.Header())
+	buf := make([]byte, 1<<20)
+	common.GetStackTraces(&buf)
+	resp := common.ServerDebugInfo{
+		StackTraces: string(buf),
+		GCStats:     common.GetGCStats(),
+	}
+	buf, err := json.Marshal(&resp)
+	if err != nil {
+		writeError(hand.lg, w, http.StatusInternalServerError,
+			fmt.Sprintf("error marshalling ServerDebugInfo: %s\n", err.Error()))
+		return
+	}
+	w.Write(buf)
+	hand.lg.Info("Returned ServerDebugInfo\n")
+}
+
+type serverStatsHandler struct {
+	dataStoreHandler
+}
+
+func (hand *serverStatsHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+	setResponseHeaders(w.Header())
+	hand.lg.Debugf("serverStatsHandler\n")
+	stats := hand.store.ServerStats()
+	buf, err := json.Marshal(&stats)
+	if err != nil {
+		writeError(hand.lg, w, http.StatusInternalServerError,
+			fmt.Sprintf("error marshalling ServerStats: %s\n", err.Error()))
+		return
+	}
+	hand.lg.Debugf("Returned ServerStats %s\n", string(buf))
+	w.Write(buf)
+}
+
+type serverConfHandler struct {
+	cnf *conf.Config
+	lg  *common.Logger
+}
+
+func (hand *serverConfHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+	setResponseHeaders(w.Header())
+	hand.lg.Debugf("serverConfHandler\n")
+	cnfMap := hand.cnf.Export()
+	buf, err := json.Marshal(&cnfMap)
+	if err != nil {
+		writeError(hand.lg, w, http.StatusInternalServerError,
+			fmt.Sprintf("error marshalling serverConf: %s\n", err.Error()))
+		return
+	}
+	hand.lg.Debugf("Returned server configuration %s\n", string(buf))
+	w.Write(buf)
+}
+
+type dataStoreHandler struct {
+	lg    *common.Logger
+	store *dataStore
+}
+
+func (hand *dataStoreHandler) parseSid(w http.ResponseWriter,
+	str string) (common.SpanId, bool) {
+	var id common.SpanId
+	err := id.FromString(str)
+	if err != nil {
+		writeError(hand.lg, w, http.StatusBadRequest,
+			fmt.Sprintf("Failed to parse span ID %s: %s", str, err.Error()))
+		w.Write([]byte("Error parsing : " + err.Error()))
+		return common.INVALID_SPAN_ID, false
+	}
+	return id, true
+}
+
+func (hand *dataStoreHandler) getReqField32(fieldName string, w http.ResponseWriter,
+	req *http.Request) (int32, bool) {
+	str := req.FormValue(fieldName)
+	if str == "" {
+		writeError(hand.lg, w, http.StatusBadRequest, fmt.Sprintf("No %s specified.", fieldName))
+		return -1, false
+	}
+	val, err := strconv.ParseUint(str, 16, 32)
+	if err != nil {
+		writeError(hand.lg, w, http.StatusBadRequest,
+			fmt.Sprintf("Error parsing %s: %s.", fieldName, err.Error()))
+		return -1, false
+	}
+	return int32(val), true
+}
+
+type findSidHandler struct {
+	dataStoreHandler
+}
+
+func (hand *findSidHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+	setResponseHeaders(w.Header())
+	req.ParseForm()
+	vars := mux.Vars(req)
+	stringSid := vars["id"]
+	sid, ok := hand.parseSid(w, stringSid)
+	if !ok {
+		return
+	}
+	hand.lg.Debugf("findSidHandler(sid=%s)\n", sid.String())
+	span := hand.store.FindSpan(sid)
+	if span == nil {
+		writeError(hand.lg, w, http.StatusNoContent,
+			fmt.Sprintf("No such span as %s\n", sid.String()))
+		return
+	}
+	w.Write(span.ToJson())
+}
+
+type findChildrenHandler struct {
+	dataStoreHandler
+}
+
+func (hand *findChildrenHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+	setResponseHeaders(w.Header())
+	req.ParseForm()
+	vars := mux.Vars(req)
+	stringSid := vars["id"]
+	sid, ok := hand.parseSid(w, stringSid)
+	if !ok {
+		return
+	}
+	var lim int32
+	lim, ok = hand.getReqField32("lim", w, req)
+	if !ok {
+		return
+	}
+	hand.lg.Debugf("findChildrenHandler(sid=%s, lim=%d)\n", sid.String(), lim)
+	children := hand.store.FindChildren(sid, lim)
+	jbytes, err := json.Marshal(children)
+	if err != nil {
+		writeError(hand.lg, w, http.StatusInternalServerError,
+			fmt.Sprintf("Error marshalling children: %s", err.Error()))
+		return
+	}
+	w.Write(jbytes)
+}
+
+type writeSpansHandler struct {
+	dataStoreHandler
+}
+
+func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+	startTime := time.Now()
+	setResponseHeaders(w.Header())
+	client, _, serr := net.SplitHostPort(req.RemoteAddr)
+	if serr != nil {
+		writeError(hand.lg, w, http.StatusBadRequest,
+			fmt.Sprintf("Failed to split host and port for %s: %s\n",
+				req.RemoteAddr, serr.Error()))
+		return
+	}
+	dec := json.NewDecoder(req.Body)
+	var msg common.WriteSpansReq
+	err := dec.Decode(&msg)
+	if err != nil {
+		writeError(hand.lg, w, http.StatusBadRequest,
+			fmt.Sprintf("Error parsing WriteSpansReq: %s", err.Error()))
+		return
+	}
+	if hand.lg.TraceEnabled() {
+		hand.lg.Tracef("%s: read WriteSpans REST message: %s\n",
+			req.RemoteAddr, asJson(&msg))
+	}
+	ing := hand.store.NewSpanIngestor(hand.lg, client, msg.DefaultTrid)
+	for spanIdx := 0; spanIdx < msg.NumSpans; spanIdx++ {
+		var span *common.Span
+		err := dec.Decode(&span)
+		if err != nil {
+			writeError(hand.lg, w, http.StatusBadRequest,
+				fmt.Sprintf("Failed to decode span %d out of %d: ",
+					spanIdx, msg.NumSpans, err.Error()))
+			return
+		}
+		ing.IngestSpan(span)
+	}
+	ing.Close(startTime)
+	return
+}
+
+type queryHandler struct {
+	lg *common.Logger
+	dataStoreHandler
+}
+
+func (hand *queryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+	setResponseHeaders(w.Header())
+	queryString := req.FormValue("query")
+	if queryString == "" {
+		writeError(hand.lg, w, http.StatusBadRequest, "No query provided.\n")
+		return
+	}
+	var query common.Query
+	reader := bytes.NewBufferString(queryString)
+	dec := json.NewDecoder(reader)
+	err := dec.Decode(&query)
+	if err != nil {
+		writeError(hand.lg, w, http.StatusBadRequest,
+			fmt.Sprintf("Error parsing query '%s': %s", queryString, err.Error()))
+		return
+	}
+	var results []*common.Span
+	results, err, _ = hand.store.HandleQuery(&query)
+	if err != nil {
+		writeError(hand.lg, w, http.StatusInternalServerError,
+			fmt.Sprintf("Internal error processing query %s: %s",
+				query.String(), err.Error()))
+		return
+	}
+	var jbytes []byte
+	jbytes, err = json.Marshal(results)
+	if err != nil {
+		writeError(hand.lg, w, http.StatusInternalServerError,
+			fmt.Sprintf("Error marshalling results: %s", err.Error()))
+		return
+	}
+	w.Write(jbytes)
+}
+
+type logErrorHandler struct {
+	lg *common.Logger
+}
+
+func (hand *logErrorHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+	hand.lg.Errorf("Got unknown request %s\n", req.RequestURI)
+	writeError(hand.lg, w, http.StatusBadRequest, "Unknown request.")
+}
+
+type RestServer struct {
+	http.Server
+	listener net.Listener
+	lg       *common.Logger
+}
+
+func CreateRestServer(cnf *conf.Config, store *dataStore,
+	listener net.Listener) (*RestServer, error) {
+	var err error
+	rsv := &RestServer{}
+	rsv.lg = common.NewLogger("rest", cnf)
+
+	r := mux.NewRouter().StrictSlash(false)
+
+	r.Handle("/server/info", &serverVersionHandler{lg: rsv.lg}).Methods("GET")
+	r.Handle("/server/version", &serverVersionHandler{lg: rsv.lg}).Methods("GET")
+	r.Handle("/server/debugInfo", &serverDebugInfoHandler{lg: rsv.lg}).Methods("GET")
+
+	serverStatsH := &serverStatsHandler{dataStoreHandler: dataStoreHandler{
+		store: store, lg: rsv.lg}}
+	r.Handle("/server/stats", serverStatsH).Methods("GET")
+
+	serverConfH := &serverConfHandler{cnf: cnf, lg: rsv.lg}
+	r.Handle("/server/conf", serverConfH).Methods("GET")
+
+	writeSpansH := &writeSpansHandler{dataStoreHandler: dataStoreHandler{
+		store: store, lg: rsv.lg}}
+	r.Handle("/writeSpans", writeSpansH).Methods("POST")
+
+	queryH := &queryHandler{lg: rsv.lg, dataStoreHandler: dataStoreHandler{store: store}}
+	r.Handle("/query", queryH).Methods("GET")
+
+	span := r.PathPrefix("/span").Subrouter()
+	findSidH := &findSidHandler{dataStoreHandler: dataStoreHandler{store: store, lg: rsv.lg}}
+	span.Handle("/{id}", findSidH).Methods("GET")
+
+	findChildrenH := &findChildrenHandler{dataStoreHandler: dataStoreHandler{store: store,
+		lg: rsv.lg}}
+	span.Handle("/{id}/children", findChildrenH).Methods("GET")
+
+	// Default Handler. This will serve requests for static requests.
+	webdir := os.Getenv("HTRACED_WEB_DIR")
+	if webdir == "" {
+		webdir, err = filepath.Abs(filepath.Join(filepath.Dir(os.Args[0]), "..", "web"))
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	rsv.lg.Infof(`Serving static files from "%s"`+"\n", webdir)
+	r.PathPrefix("/").Handler(http.FileServer(http.Dir(webdir))).Methods("GET")
+
+	// Log an error message for unknown non-GET requests.
+	r.PathPrefix("/").Handler(&logErrorHandler{lg: rsv.lg})
+
+	rsv.listener = listener
+	rsv.Handler = r
+	rsv.ErrorLog = rsv.lg.Wrap("[REST] ", common.INFO)
+	go rsv.Serve(rsv.listener)
+	rsv.lg.Infof("Started REST server on %s\n", rsv.listener.Addr().String())
+	return rsv, nil
+}
+
+func (rsv *RestServer) Addr() net.Addr {
+	return rsv.listener.Addr()
+}
+
+func (rsv *RestServer) Close() {
+	rsv.listener.Close()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htracedTool/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htracedTool/cmd.go b/htrace-htraced/go/src/htrace/htracedTool/cmd.go
new file mode 100644
index 0000000..65b67e5
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htracedTool/cmd.go
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"bufio"
+	"bytes"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"github.com/alecthomas/kingpin"
+	htrace "htrace/client"
+	"htrace/common"
+	"htrace/conf"
+	"io"
+	"os"
+	"sort"
+	"strings"
+	"text/tabwriter"
+	"time"
+)
+
+var RELEASE_VERSION string
+var GIT_VERSION string
+
+const EXIT_SUCCESS = 0
+const EXIT_FAILURE = 1
+
+var verbose bool
+
+const USAGE = `The Apache HTrace command-line tool.  This tool retrieves and modifies settings and
+other data on a running htraced daemon.
+
+If we find an ` + conf.CONFIG_FILE_NAME + ` configuration file in the list of directories
+specified in ` + conf.HTRACED_CONF_DIR + `, we will use that configuration; otherwise,
+the defaults will be used.
+`
+
+func main() {
+	// Load htraced configuration
+	cnf, cnfLog := conf.LoadApplicationConfig("htrace.tool.")
+	lg := common.NewLogger("conf", cnf)
+	defer lg.Close()
+	scanner := bufio.NewScanner(cnfLog)
+	for scanner.Scan() {
+		lg.Debugf(scanner.Text() + "\n")
+	}
+
+	// Parse argv
+	app := kingpin.New(os.Args[0], USAGE)
+	app.Flag("Dmy.key", "Set configuration key 'my.key' to 'my.value'.  Replace 'my.key' "+
+		"with any key you want to set.").Default("my.value").String()
+	addr := app.Flag("addr", "Server address.").String()
+	verbose = *app.Flag("verbose", "Verbose.").Default("false").Bool()
+	version := app.Command("version", "Print the version of this program.")
+	serverVersion := app.Command("serverVersion", "Print the version of the htraced server.")
+	serverStats := app.Command("serverStats", "Print statistics retrieved from the htraced server.")
+	serverStatsJson := serverStats.Flag("json", "Display statistics as raw JSON.").Default("false").Bool()
+	serverDebugInfo := app.Command("serverDebugInfo", "Print the debug info of the htraced server.")
+	serverConf := app.Command("serverConf", "Print the server configuration retrieved from the htraced server.")
+	findSpan := app.Command("findSpan", "Print information about a trace span with a given ID.")
+	findSpanId := findSpan.Arg("id", "Span ID to find. Example: be305e54-4534-2110-a0b2-e06b9effe112").Required().String()
+	findChildren := app.Command("findChildren", "Print out the span IDs that are children of a given span ID.")
+	parentSpanId := findChildren.Arg("id", "Span ID to print children for. Example: be305e54-4534-2110-a0b2-e06b9effe112").
+		Required().String()
+	childLim := findChildren.Flag("lim", "Maximum number of child IDs to print.").Default("20").Int()
+	loadFile := app.Command("loadFile", "Write whitespace-separated JSON spans from a file to the server.")
+	loadFilePath := loadFile.Arg("path",
+		"A file containing whitespace-separated span JSON.").Required().String()
+	loadJson := app.Command("load", "Write JSON spans from the command-line to the server.")
+	loadJsonArg := loadJson.Arg("json", "A JSON span to write to the server.").Required().String()
+	dumpAll := app.Command("dumpAll", "Dump all spans from the htraced daemon.")
+	dumpAllOutPath := dumpAll.Arg("path", "The path to dump the trace spans to.").Default("-").String()
+	dumpAllLim := dumpAll.Flag("lim", "The number of spans to transfer from the server at once.").
+		Default("100").Int()
+	graph := app.Command("graph", "Visualize span JSON as a graph.")
+	graphJsonFile := graph.Arg("input", "The JSON file to load").Required().String()
+	graphDotFile := graph.Flag("output",
+		"The path to write a GraphViz dotfile to.  This file can be used as input to "+
+			"GraphViz, in order to generate a pretty picture.  See graphviz.org for more "+
+			"information about generating pictures of graphs.").Default("-").String()
+	query := app.Command("query", "Send a query to htraced.")
+	queryLim := query.Flag("lim", "Maximum number of spans to retrieve.").Default("20").Int()
+	queryArg := query.Arg("query", "The query string to send.  Query strings have the format "+
+		"[TYPE] [OPERATOR] [CONST], joined by AND statements.").Required().String()
+	rawQuery := app.Command("rawQuery", "Send a raw JSON query to htraced.")
+	rawQueryArg := rawQuery.Arg("json", "The query JSON to send.").Required().String()
+	cmd := kingpin.MustParse(app.Parse(os.Args[1:]))
+
+	// Add the command-line settings into the configuration.
+	if *addr != "" {
+		cnf = cnf.Clone(conf.HTRACE_WEB_ADDRESS, *addr)
+	}
+
+	// Handle commands that don't require an HTrace client.
+	switch cmd {
+	case version.FullCommand():
+		os.Exit(printVersion())
+	case graph.FullCommand():
+		err := jsonSpanFileToDotFile(*graphJsonFile, *graphDotFile)
+		if err != nil {
+			fmt.Printf("graphing error: %s\n", err.Error())
+			os.Exit(EXIT_FAILURE)
+		}
+		os.Exit(EXIT_SUCCESS)
+	}
+
+	// Create HTrace client
+	hcl, err := htrace.NewClient(cnf, nil)
+	if err != nil {
+		fmt.Printf("Failed to create HTrace client: %s\n", err.Error())
+		os.Exit(EXIT_FAILURE)
+	}
+
+	// Handle commands that require an HTrace client.
+	switch cmd {
+	case version.FullCommand():
+		os.Exit(printVersion())
+	case serverVersion.FullCommand():
+		os.Exit(printServerVersion(hcl))
+	case serverStats.FullCommand():
+		if *serverStatsJson {
+			os.Exit(printServerStatsJson(hcl))
+		} else {
+			os.Exit(printServerStats(hcl))
+		}
+	case serverDebugInfo.FullCommand():
+		os.Exit(printServerDebugInfo(hcl))
+	case serverConf.FullCommand():
+		os.Exit(printServerConfJson(hcl))
+	case findSpan.FullCommand():
+		var id *common.SpanId
+		id.FromString(*findSpanId)
+		os.Exit(doFindSpan(hcl, *id))
+	case findChildren.FullCommand():
+		var id *common.SpanId
+		id.FromString(*parentSpanId)
+		os.Exit(doFindChildren(hcl, *id, *childLim))
+	case loadJson.FullCommand():
+		os.Exit(doLoadSpanJson(hcl, *loadJsonArg))
+	case loadFile.FullCommand():
+		os.Exit(doLoadSpanJsonFile(hcl, *loadFilePath))
+	case dumpAll.FullCommand():
+		err := doDumpAll(hcl, *dumpAllOutPath, *dumpAllLim)
+		if err != nil {
+			fmt.Printf("dumpAll error: %s\n", err.Error())
+			os.Exit(EXIT_FAILURE)
+		}
+		os.Exit(EXIT_SUCCESS)
+	case query.FullCommand():
+		err := doQueryFromString(hcl, *queryArg, *queryLim)
+		if err != nil {
+			fmt.Printf("query error: %s\n", err.Error())
+			os.Exit(EXIT_FAILURE)
+		}
+		os.Exit(EXIT_SUCCESS)
+	case rawQuery.FullCommand():
+		err := doRawQuery(hcl, *rawQueryArg)
+		if err != nil {
+			fmt.Printf("raw query error: %s\n", err.Error())
+			os.Exit(EXIT_FAILURE)
+		}
+		os.Exit(EXIT_SUCCESS)
+	}
+
+	app.UsageErrorf(os.Stderr, "You must supply a command to run.")
+}
+
+// Print the version of the htrace binary.
+func printVersion() int {
+	fmt.Printf("Running htracedTool %s [%s].\n", RELEASE_VERSION, GIT_VERSION)
+	return EXIT_SUCCESS
+}
+
+// Print information retrieved from an htraced server via /server/info
+func printServerVersion(hcl *htrace.Client) int {
+	ver, err := hcl.GetServerVersion()
+	if err != nil {
+		fmt.Println(err.Error())
+		return EXIT_FAILURE
+	}
+	fmt.Printf("HTraced server version %s (%s)\n", ver.ReleaseVersion, ver.GitVersion)
+	return EXIT_SUCCESS
+}
+
+// Print information retrieved from an htraced server via /server/info
+func printServerStats(hcl *htrace.Client) int {
+	stats, err := hcl.GetServerStats()
+	if err != nil {
+		fmt.Println(err.Error())
+		return EXIT_FAILURE
+	}
+	w := new(tabwriter.Writer)
+	w.Init(os.Stdout, 0, 8, 0, '\t', 0)
+	fmt.Fprintf(w, "HTRACED SERVER STATS\n")
+	fmt.Fprintf(w, "Datastore Start\t%s\n",
+		common.UnixMsToTime(stats.LastStartMs).Format(time.RFC3339))
+	fmt.Fprintf(w, "Server Time\t%s\n",
+		common.UnixMsToTime(stats.CurMs).Format(time.RFC3339))
+	fmt.Fprintf(w, "Spans reaped\t%d\n", stats.ReapedSpans)
+	fmt.Fprintf(w, "Spans ingested\t%d\n", stats.IngestedSpans)
+	fmt.Fprintf(w, "Spans written\t%d\n", stats.WrittenSpans)
+	fmt.Fprintf(w, "Spans dropped by server\t%d\n", stats.ServerDroppedSpans)
+	dur := time.Millisecond * time.Duration(stats.AverageWriteSpansLatencyMs)
+	fmt.Fprintf(w, "Average WriteSpan Latency\t%s\n", dur.String())
+	dur = time.Millisecond * time.Duration(stats.MaxWriteSpansLatencyMs)
+	fmt.Fprintf(w, "Maximum WriteSpan Latency\t%s\n", dur.String())
+	fmt.Fprintf(w, "Number of leveldb directories\t%d\n", len(stats.Dirs))
+	w.Flush()
+	fmt.Println("")
+	for i := range stats.Dirs {
+		dir := stats.Dirs[i]
+		fmt.Printf("==== %s ===\n", dir.Path)
+		fmt.Printf("Approximate number of bytes: %d\n", dir.ApproximateBytes)
+		stats := strings.Replace(dir.LevelDbStats, "\\n", "\n", -1)
+		fmt.Printf("%s\n", stats)
+	}
+	w = new(tabwriter.Writer)
+	w.Init(os.Stdout, 0, 8, 0, '\t', 0)
+	fmt.Fprintf(w, "HOST SPAN METRICS\n")
+	mtxMap := stats.HostSpanMetrics
+	keys := make(sort.StringSlice, len(mtxMap))
+	i := 0
+	for k, _ := range mtxMap {
+		keys[i] = k
+		i++
+	}
+	sort.Sort(keys)
+	for k := range keys {
+		mtx := mtxMap[keys[k]]
+		fmt.Fprintf(w, "%s\twritten: %d\tserver dropped: %d\n",
+			keys[k], mtx.Written, mtx.ServerDropped)
+	}
+	w.Flush()
+	return EXIT_SUCCESS
+}
+
+// Print information retrieved from an htraced server via /server/info as JSON
+func printServerStatsJson(hcl *htrace.Client) int {
+	stats, err := hcl.GetServerStats()
+	if err != nil {
+		fmt.Println(err.Error())
+		return EXIT_FAILURE
+	}
+	buf, err := json.MarshalIndent(stats, "", "  ")
+	if err != nil {
+		fmt.Printf("Error marshalling server stats: %s", err.Error())
+		return EXIT_FAILURE
+	}
+	fmt.Printf("%s\n", string(buf))
+	return EXIT_SUCCESS
+}
+
+// Print information retrieved from an htraced server via /server/debugInfo
+func printServerDebugInfo(hcl *htrace.Client) int {
+	stats, err := hcl.GetServerDebugInfo()
+	if err != nil {
+		fmt.Println(err.Error())
+		return EXIT_FAILURE
+	}
+	fmt.Println("=== GOROUTINE STACKS ===")
+	fmt.Print(stats.StackTraces)
+	fmt.Println("=== END GOROUTINE STACKS ===")
+	fmt.Println("=== GC STATISTICS ===")
+	fmt.Print(stats.GCStats)
+	fmt.Println("=== END GC STATISTICS ===")
+	return EXIT_SUCCESS
+}
+
+// Print information retrieved from an htraced server via /server/conf as JSON
+func printServerConfJson(hcl *htrace.Client) int {
+	cnf, err := hcl.GetServerConf()
+	if err != nil {
+		fmt.Println(err.Error())
+		return EXIT_FAILURE
+	}
+	buf, err := json.MarshalIndent(cnf, "", "  ")
+	if err != nil {
+		fmt.Printf("Error marshalling server conf: %s", err.Error())
+		return EXIT_FAILURE
+	}
+	fmt.Printf("%s\n", string(buf))
+	return EXIT_SUCCESS
+}
+
+// Print information about a trace span.
+func doFindSpan(hcl *htrace.Client, sid common.SpanId) int {
+	span, err := hcl.FindSpan(sid)
+	if err != nil {
+		fmt.Println(err.Error())
+		return EXIT_FAILURE
+	}
+	if span == nil {
+		fmt.Printf("Span ID not found.\n")
+		return EXIT_FAILURE
+	}
+	pbuf, err := json.MarshalIndent(span, "", "  ")
+	if err != nil {
+		fmt.Printf("Error: error pretty-printing span to JSON: %s\n", err.Error())
+		return EXIT_FAILURE
+	}
+	fmt.Printf("%s\n", string(pbuf))
+	return EXIT_SUCCESS
+}
+
+func doLoadSpanJsonFile(hcl *htrace.Client, spanFile string) int {
+	if spanFile == "" {
+		fmt.Printf("You must specify the json file to load.\n")
+		return EXIT_FAILURE
+	}
+	file, err := OpenInputFile(spanFile)
+	if err != nil {
+		fmt.Printf("Failed to open %s: %s\n", spanFile, err.Error())
+		return EXIT_FAILURE
+	}
+	defer file.Close()
+	return doLoadSpans(hcl, bufio.NewReader(file))
+}
+
+func doLoadSpanJson(hcl *htrace.Client, spanJson string) int {
+	return doLoadSpans(hcl, bytes.NewBufferString(spanJson))
+}
+
+func doLoadSpans(hcl *htrace.Client, reader io.Reader) int {
+	dec := json.NewDecoder(reader)
+	spans := make([]*common.Span, 0, 32)
+	var err error
+	for {
+		var span common.Span
+		if err = dec.Decode(&span); err != nil {
+			if err == io.EOF {
+				break
+			}
+			fmt.Printf("Failed to decode JSON: %s\n", err.Error())
+			return EXIT_FAILURE
+		}
+		spans = append(spans, &span)
+	}
+	if verbose {
+		fmt.Printf("Writing ")
+		prefix := ""
+		for i := range spans {
+			fmt.Printf("%s%s", prefix, spans[i].ToJson())
+			prefix = ", "
+		}
+		fmt.Printf("\n")
+	}
+	err = hcl.WriteSpans(spans)
+	if err != nil {
+		fmt.Println(err.Error())
+		return EXIT_FAILURE
+	}
+	return EXIT_SUCCESS
+}
+
+// Find information about the children of a span.
+func doFindChildren(hcl *htrace.Client, sid common.SpanId, lim int) int {
+	spanIds, err := hcl.FindChildren(sid, lim)
+	if err != nil {
+		fmt.Printf("%s\n", err.Error())
+		return EXIT_FAILURE
+	}
+	pbuf, err := json.MarshalIndent(spanIds, "", "  ")
+	if err != nil {
+		fmt.Println("Error: error pretty-printing span IDs to JSON: %s", err.Error())
+		return 1
+	}
+	fmt.Printf("%s\n", string(pbuf))
+	return 0
+}
+
+// Dump all spans from the htraced daemon.
+func doDumpAll(hcl *htrace.Client, outPath string, lim int) error {
+	file, err := CreateOutputFile(outPath)
+	if err != nil {
+		return err
+	}
+	w := bufio.NewWriter(file)
+	defer func() {
+		if file != nil {
+			w.Flush()
+			file.Close()
+		}
+	}()
+	out := make(chan *common.Span, 50)
+	var dumpErr error
+	go func() {
+		dumpErr = hcl.DumpAll(lim, out)
+	}()
+	var numSpans int64
+	nextLogTime := time.Now().Add(time.Second * 5)
+	for {
+		span, channelOpen := <-out
+		if !channelOpen {
+			break
+		}
+		if err == nil {
+			_, err = fmt.Fprintf(w, "%s\n", span.ToJson())
+		}
+		if verbose {
+			numSpans++
+			now := time.Now()
+			if !now.Before(nextLogTime) {
+				nextLogTime = now.Add(time.Second * 5)
+				fmt.Printf("received %d span(s)...\n", numSpans)
+			}
+		}
+	}
+	if err != nil {
+		return errors.New(fmt.Sprintf("Write error %s", err.Error()))
+	}
+	if dumpErr != nil {
+		return errors.New(fmt.Sprintf("Dump error %s", dumpErr.Error()))
+	}
+	err = w.Flush()
+	if err != nil {
+		return err
+	}
+	err = file.Close()
+	file = nil
+	if err != nil {
+		return err
+	}
+	return nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htracedTool/file.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/htrace/htracedTool/file.go b/htrace-htraced/go/src/htrace/htracedTool/file.go
new file mode 100644
index 0000000..ca9c18d
--- /dev/null
+++ b/htrace-htraced/go/src/htrace/htracedTool/file.go
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+	"bufio"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"htrace/common"
+	"io"
+	"os"
+)
+
+// A file used for input.
+// Transparently supports using stdin for input.
+type InputFile struct {
+	*os.File
+	path string
+}
+
+// Open an input file.  Stdin will be used when path is -
+func OpenInputFile(path string) (*InputFile, error) {
+	if path == "-" {
+		return &InputFile{File: os.Stdin, path: path}, nil
+	}
+	file, err := os.Open(path)
+	if err != nil {
+		return nil, err
+	}
+	return &InputFile{File: file, path: path}, nil
+}
+
+func (file *InputFile) Close() {
+	if file.path != "-" {
+		file.File.Close()
+	}
+}
+
+// A file used for output.
+// Transparently supports using stdout for output.
+type OutputFile struct {
+	*os.File
+	path string
+}
+
+// Create an output file.  Stdout will be used when path is -
+func CreateOutputFile(path string) (*OutputFile, error) {
+	if path == "-" {
+		return &OutputFile{File: os.Stdout, path: path}, nil
+	}
+	file, err := os.Create(path)
+	if err != nil {
+		return nil, err
+	}
+	return &OutputFile{File: file, path: path}, nil
+}
+
+func (file *OutputFile) Close() error {
+	if file.path != "-" {
+		return file.File.Close()
+	}
+	return nil
+}
+
+// FailureDeferringWriter is a writer which allows us to call Printf multiple
+// times and then check if all the printfs succeeded at the very end, rather
+// than checking after each call.   We will not attempt to write more data
+// after the first write failure.
+type FailureDeferringWriter struct {
+	io.Writer
+	err error
+}
+
+func NewFailureDeferringWriter(writer io.Writer) *FailureDeferringWriter {
+	return &FailureDeferringWriter{writer, nil}
+}
+
+func (w *FailureDeferringWriter) Printf(format string, v ...interface{}) {
+	if w.err != nil {
+		return
+	}
+	str := fmt.Sprintf(format, v...)
+	_, err := w.Writer.Write([]byte(str))
+	if err != nil {
+		w.err = err
+	}
+}
+
+func (w *FailureDeferringWriter) Error() error {
+	return w.err
+}
+
+// Read a file full of whitespace-separated span JSON into a slice of spans.
+func readSpansFile(path string) (common.SpanSlice, error) {
+	file, err := OpenInputFile(path)
+	if err != nil {
+		return nil, err
+	}
+	defer file.Close()
+	return readSpans(bufio.NewReader(file))
+}
+
+// Read whitespace-separated span JSON into a slice of spans.
+func readSpans(reader io.Reader) (common.SpanSlice, error) {
+	spans := make(common.SpanSlice, 0)
+	dec := json.NewDecoder(reader)
+	for {
+		var span common.Span
+		err := dec.Decode(&span)
+		if err != nil {
+			if err != io.EOF {
+				return nil, errors.New(fmt.Sprintf("Decode error after decoding %d "+
+					"span(s): %s", len(spans), err.Error()))
+			}
+			break
+		}
+		spans = append(spans, &span)
+	}
+	return spans, nil
+}