You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by cm...@apache.org on 2015/11/12 21:40:15 UTC
incubator-htrace git commit: HTRACE-294. htraced: fix some metrics
issues (cmccabe)
Repository: incubator-htrace
Updated Branches:
refs/heads/master ef46897ff -> 021e49144
HTRACE-294. htraced: fix some metrics issues (cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/021e4914
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/021e4914
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/021e4914
Branch: refs/heads/master
Commit: 021e491446716a57eb7f37f9328fe72d103b9823
Parents: ef46897
Author: Colin P. Mccabe <cm...@apache.org>
Authored: Thu Nov 12 12:39:36 2015 -0800
Committer: Colin P. Mccabe <cm...@apache.org>
Committed: Thu Nov 12 12:39:36 2015 -0800
----------------------------------------------------------------------
.../go/src/org/apache/htrace/client/client.go | 11 +-
.../go/src/org/apache/htrace/common/log.go | 32 ++++
.../go/src/org/apache/htrace/common/rpc.go | 20 ++-
.../src/org/apache/htrace/htraced/datastore.go | 29 ++--
.../go/src/org/apache/htrace/htraced/hrpc.go | 14 +-
.../go/src/org/apache/htrace/htraced/htraced.go | 7 +-
.../go/src/org/apache/htrace/htraced/metrics.go | 148 ++++++++++++++++---
.../org/apache/htrace/htraced/metrics_test.go | 90 ++++++++++-
.../org/apache/htrace/htraced/mini_htraced.go | 7 +
.../go/src/org/apache/htrace/htraced/rest.go | 22 ++-
.../go/src/org/apache/htrace/htracedTool/cmd.go | 8 +-
.../src/main/webapp/app/server_info_view.js | 14 ++
.../src/main/webapp/app/server_stats.js | 5 +-
htrace-webapp/src/main/webapp/index.html | 19 +++
14 files changed, 373 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/client/client.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/client.go b/htrace-htraced/go/src/org/apache/htrace/client/client.go
index fb46e62..28b9e29 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/client.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/client.go
@@ -47,9 +47,11 @@ type Client struct {
// HRPC address of the htraced server.
hrpcAddr string
+}
- // The HRPC client, or null if it is not enabled.
- hcr *hClient
+// Disable HRPC
+func (hcl *Client) DisableHrpc() {
+ hcl.hrpcAddr = ""
}
// Get the htraced server version information.
@@ -243,9 +245,6 @@ func (hcl *Client) DumpAll(lim int, out chan *common.Span) error {
}
func (hcl *Client) Close() {
- if hcl.hcr != nil {
- hcl.hcr.Close()
- }
hcl.restAddr = ""
- hcl.hcr = nil
+ hcl.hrpcAddr = ""
}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/common/log.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/log.go b/htrace-htraced/go/src/org/apache/htrace/common/log.go
index 2e3e267..4066094 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/log.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/log.go
@@ -22,6 +22,7 @@ package common
import (
"errors"
"fmt"
+ "log"
"org/apache/htrace/conf"
"os"
"path/filepath"
@@ -294,3 +295,34 @@ func (lg *Logger) Close() {
lg.sink.Unref()
lg.sink = nil
}
+
+// Wraps an htrace logger in a golang standard logger.
+//
+// This is a bit messy because of the difference in interfaces between the
+// golang standard logger and the htrace logger. The golang standard logger
+// doesn't support log levels directly, so you must choose up front what htrace
+// log level all messages should be treated as. Golang standard loggers expect
+// to be able to write to an io.Writer, but make no guarantees about whether
+// they will break messages into multiple Write() calls (although this does
+// not seem to be a major problem in practice.)
+//
+// Despite these limitations, it's still useful to have this method to be able
+// to log things that come out of the go HTTP server and other standard library
+// systems.
+type WrappedLogger struct {
+ lg *Logger
+ level Level
+}
+
+func (lg *Logger) Wrap(prefix string, level Level) *log.Logger {
+ wlg := &WrappedLogger {
+ lg: lg,
+ level: level,
+ }
+ return log.New(wlg, prefix, 0)
+}
+
+func (wlg *WrappedLogger) Write(p []byte) (int, error) {
+ wlg.lg.Write(wlg.level, string(p))
+ return len(p), nil
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index f071e37..74008bc 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -124,14 +124,28 @@ type ServerStats struct {
// The total number of spans which have been reaped.
ReapedSpans uint64
+
+ // The total number of spans which have been ingested since the server started, by WriteSpans
+ // requests. This number counts spans that didn't get written to persistent storage as well as
+ // those that did.
+ IngestedSpans uint64
+
+ // The total number of spans which have been dropped by clients since the server started,
+ // as reported by WriteSpans requests.
+ ClientDroppedSpans uint64
+
+ // The maximum latency of a writeSpans request, in milliseconds.
+ MaxWriteSpansLatencyMs uint32
+
+ // The average latency of a writeSpans request, in milliseconds.
+ AverageWriteSpansLatencyMs uint32
}
type StorageDirectoryStats struct {
Path string
- // The approximate number of spans present in this shard. This may be an
- // underestimate.
- ApproxNumSpans uint64
+ // The approximate number of bytes on disk present in this shard.
+ ApproximateBytes uint64
// leveldb.stats information
LevelDbStats string
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index d0296c3..c676088 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -643,10 +643,6 @@ func writeDataStoreVersion(store *dataStore, ldb *levigo.DB, v uint32) error {
return ldb.Put(store.writeOpts, []byte{VERSION_KEY}, w.Bytes())
}
-func (store *dataStore) GetSpanMetrics() common.SpanMetricsMap {
- return store.msink.AccessTotals()
-}
-
// Close the DataStore.
func (store *dataStore) Close() {
if store.hb != nil {
@@ -1241,21 +1237,32 @@ func (store *dataStore) ServerStats() *common.ServerStats {
shard := store.shards[shardIdx]
serverStats.Dirs[shardIdx].Path = shard.path
r := levigo.Range{
- Start: append([]byte{SPAN_ID_INDEX_PREFIX},
- common.INVALID_SPAN_ID.Val()...),
- Limit: append([]byte{SPAN_ID_INDEX_PREFIX + 1},
- common.INVALID_SPAN_ID.Val()...),
+ Start: []byte{0},
+ Limit: []byte{0xff},
}
vals := shard.ldb.GetApproximateSizes([]levigo.Range{r})
- serverStats.Dirs[shardIdx].ApproxNumSpans = vals[0]
+ serverStats.Dirs[shardIdx].ApproximateBytes = vals[0]
serverStats.Dirs[shardIdx].LevelDbStats =
shard.ldb.PropertyValue("leveldb.stats")
- store.lg.Infof("levedb.stats for %s: %s\n",
+ store.msink.lg.Debugf("levedb.stats for %s: %s\n",
shard.path, shard.ldb.PropertyValue("leveldb.stats"))
}
- serverStats.HostSpanMetrics = store.msink.AccessTotals()
serverStats.LastStartMs = store.startMs
serverStats.CurMs = common.TimeToUnixMs(time.Now().UTC())
serverStats.ReapedSpans = atomic.LoadUint64(&store.rpr.ReapedSpans)
+ wsData := store.msink.wsm.GetData()
+ serverStats.HostSpanMetrics = store.msink.AccessServerTotals()
+ for k, v := range wsData.clientDroppedMap {
+ smtx := serverStats.HostSpanMetrics[k]
+ if smtx == nil {
+ smtx = &common.SpanMetrics {}
+ serverStats.HostSpanMetrics[k] = smtx
+ }
+ smtx.ClientDropped = v
+ }
+ serverStats.IngestedSpans = wsData.ingestedSpans
+ serverStats.ClientDroppedSpans = wsData.clientDroppedSpans
+ serverStats.MaxWriteSpansLatencyMs = wsData.latencyMax
+ serverStats.AverageWriteSpansLatencyMs = wsData.latencyAverage
return &serverStats
}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
index 49587bb..0d72602 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
@@ -33,6 +33,7 @@ import (
"org/apache/htrace/common"
"org/apache/htrace/conf"
"reflect"
+ "time"
)
// Handles HRPC calls
@@ -195,9 +196,15 @@ func (cdc *HrpcServerCodec) Close() error {
}
func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
- resp *common.WriteSpansResp) (err error) {
+ resp *common.WriteSpansResp) (err error) {
+ startTime := time.Now()
hand.lg.Debugf("hrpc writeSpansHandler: received %d span(s). "+
"defaultTrid = %s\n", len(req.Spans), req.DefaultTrid)
+ client, _, err := net.SplitHostPort(req.Addr)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Failed to split host and port " +
+ "for %s: %s\n", req.Addr, err.Error()))
+ }
for i := range req.Spans {
span := req.Spans[i]
spanIdProblem := span.Id.FindProblem()
@@ -211,10 +218,13 @@ func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson())
}
hand.store.WriteSpan(&IncomingSpan{
- Addr: req.Addr,
+ Addr: client,
Span: span,
})
}
+ endTime := time.Now()
+ hand.store.msink.Update(client, req.ClientDropped, len(req.Spans),
+ endTime.Sub(startTime))
return nil
}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
index b482aa3..97b72ca 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
@@ -23,10 +23,12 @@ import (
"bufio"
"encoding/json"
"fmt"
+ "github.com/jmhodges/levigo"
"net"
"org/apache/htrace/common"
"org/apache/htrace/conf"
"os"
+ "runtime"
"strings"
"time"
)
@@ -84,12 +86,15 @@ func main() {
// configuration.
lg := common.NewLogger("main", cnf)
defer lg.Close()
- lg.Infof("*** Starting htraced ***\n")
+ lg.Infof("*** Starting htraced %s [%s]***\n", RELEASE_VERSION, GIT_VERSION)
scanner := bufio.NewScanner(cnfLog)
for scanner.Scan() {
lg.Infof(scanner.Text() + "\n")
}
common.InstallSignalHandlers(cnf)
+ lg.Infof("GOMAXPROCS=%d\n", runtime.GOMAXPROCS(0))
+ lg.Infof("leveldb version=%d.%d\n",
+ levigo.GetLevelDBMajorVersion(), levigo.GetLevelDBMinorVersion())
// Initialize the datastore.
store, err := CreateDataStore(cnf, nil)
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
index 672f5f6..cfff418 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
@@ -21,9 +21,11 @@ package main
import (
"encoding/json"
+ "math"
"org/apache/htrace/common"
"org/apache/htrace/conf"
"sync"
+ "time"
)
//
@@ -36,6 +38,8 @@ import (
// them so that we can adjust the sampling rate there.
//
+const LATENCY_CIRC_BUF_SIZE = 4096
+
type ServerSpanMetrics struct {
// The total number of spans written to HTraced.
Written uint64
@@ -131,12 +135,8 @@ type MetricsSink struct {
// The maximum number of metrics totals we will maintain.
maxMtx int
- // The number of spans which each client has self-reported that it has
- // dropped.
- clientDroppedMap map[string]uint64
-
- // Lock protecting clientDropped
- clientDroppedLock sync.Mutex
+ // Metrics about WriteSpans requests
+ wsm WriteSpanMetrics
}
func NewMetricsSink(cnf *conf.Config) *MetricsSink {
@@ -147,7 +147,10 @@ func NewMetricsSink(cnf *conf.Config) *MetricsSink {
exited: make(chan interface{}),
lg: common.NewLogger("metrics", cnf),
maxMtx: cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
- clientDroppedMap: make(map[string]uint64),
+ wsm: WriteSpanMetrics {
+ clientDroppedMap: make(map[string]uint64),
+ latencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE),
+ },
}
go mcl.run()
return &mcl
@@ -187,21 +190,16 @@ func (msink *MetricsSink) run() {
func (msink *MetricsSink) handleAccessReq(accessReq *AccessReq) {
msink.lg.Debug("MetricsSink: accessing global metrics.\n")
- msink.clientDroppedLock.Lock()
- defer func() {
- msink.clientDroppedLock.Unlock()
- close(accessReq.done)
- }()
+ defer close(accessReq.done)
for addr, smtx := range msink.smtxMap {
accessReq.mtxMap[addr] = &common.SpanMetrics{
Written: smtx.Written,
ServerDropped: smtx.ServerDropped,
- ClientDropped: msink.clientDroppedMap[addr],
}
}
}
-func (msink *MetricsSink) AccessTotals() common.SpanMetricsMap {
+func (msink *MetricsSink) AccessServerTotals() common.SpanMetricsMap {
accessReq := &AccessReq{
mtxMap: make(common.SpanMetricsMap),
done: make(chan interface{}),
@@ -220,15 +218,123 @@ func (msink *MetricsSink) Shutdown() {
<-msink.exited
}
-func (msink *MetricsSink) UpdateClientDropped(client string, clientDropped uint64) {
- msink.clientDroppedLock.Lock()
- defer msink.clientDroppedLock.Unlock()
- msink.clientDroppedMap[client] = clientDropped
- if len(msink.clientDroppedMap) >= msink.maxMtx {
+type WriteSpanMetrics struct {
+ // Lock protecting WriteSpanMetrics
+ lock sync.Mutex
+
+ // The number of spans which each client has self-reported that it has
+ // dropped.
+ clientDroppedMap map[string]uint64
+
+ // The total number of new span writes we've gotten since startup.
+ ingestedSpans uint64
+
+ // The total number of spans all clients have dropped since startup.
+ clientDroppedSpans uint64
+
+ // The last few writeSpan latencies
+ latencyCircBuf *CircBufU32
+}
+
+type WriteSpanMetricsData struct {
+ clientDroppedMap map[string]uint64
+ ingestedSpans uint64
+ clientDroppedSpans uint64
+ latencyMax uint32
+ latencyAverage uint32
+}
+
+func (msink *MetricsSink) Update(client string, clientDropped uint64, clientWritten int,
+ wsLatency time.Duration) {
+ wsLatencyNs := wsLatency.Nanoseconds() / 1000000
+ var wsLatency32 uint32
+ if wsLatencyNs > math.MaxUint32 {
+ wsLatency32 = math.MaxUint32
+ } else {
+ wsLatency32 = uint32(wsLatencyNs)
+ }
+ msink.wsm.update(msink.maxMtx, client, clientDropped, clientWritten, wsLatency32)
+}
+
+func (wsm *WriteSpanMetrics) update(maxMtx int, client string, clientDropped uint64,
+ clientWritten int, wsLatency uint32) {
+ wsm.lock.Lock()
+ defer wsm.lock.Unlock()
+ wsm.clientDroppedMap[client] = clientDropped
+ if len(wsm.clientDroppedMap) >= maxMtx {
// Delete a random entry
- for k := range msink.clientDroppedMap {
- delete(msink.clientDroppedMap, k)
+ for k := range wsm.clientDroppedMap {
+ delete(wsm.clientDroppedMap, k)
return
}
}
+ wsm.ingestedSpans += uint64(clientWritten)
+ wsm.clientDroppedSpans += uint64(clientDropped)
+ wsm.latencyCircBuf.Append(wsLatency)
+}
+
+func (wsm *WriteSpanMetrics) GetData() *WriteSpanMetricsData {
+ wsm.lock.Lock()
+ defer wsm.lock.Unlock()
+ clientDroppedMap := make(map[string]uint64)
+ for k, v := range wsm.clientDroppedMap {
+ clientDroppedMap[k] = v
+ }
+ return &WriteSpanMetricsData {
+ clientDroppedMap: clientDroppedMap,
+ ingestedSpans: wsm.ingestedSpans,
+ clientDroppedSpans: wsm.clientDroppedSpans,
+ latencyMax: wsm.latencyCircBuf.Max(),
+ latencyAverage: wsm.latencyCircBuf.Average(),
+ }
+}
+
+// A circular buffer of uint32s which supports appending and taking the
+// average, and some other things.
+type CircBufU32 struct {
+ // The next slot to fill
+ slot int
+
+ // The number of slots which are in use. This number only ever
+ // increases until the buffer is full.
+ slotsUsed int
+
+ // The buffer
+ buf []uint32
+}
+
+func NewCircBufU32(size int) *CircBufU32 {
+ return &CircBufU32 {
+ slotsUsed: -1,
+ buf: make([]uint32, size),
+ }
+}
+
+func (cbuf *CircBufU32) Max() uint32 {
+ var max uint32
+ for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ {
+ if cbuf.buf[bufIdx] > max {
+ max = cbuf.buf[bufIdx]
+ }
+ }
+ return max
+}
+
+func (cbuf *CircBufU32) Average() uint32 {
+ var total uint64
+ for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ {
+ total += uint64(cbuf.buf[bufIdx])
+ }
+ return uint32(total / uint64(cbuf.slotsUsed))
+}
+
+func (cbuf *CircBufU32) Append(val uint32) {
+ cbuf.buf[cbuf.slot] = val
+ cbuf.slot++
+ if cbuf.slotsUsed < cbuf.slot {
+ cbuf.slotsUsed = cbuf.slot
+ }
+ if cbuf.slot >= len(cbuf.buf) {
+ cbuf.slot = 0
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
index c90d1da..48c20f0 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
@@ -20,6 +20,7 @@
package main
import (
+ htrace "org/apache/htrace/client"
"org/apache/htrace/common"
"org/apache/htrace/conf"
"reflect"
@@ -81,7 +82,7 @@ func compareTotals(a, b common.SpanMetricsMap) bool {
func waitForMetrics(msink *MetricsSink, expectedTotals common.SpanMetricsMap) {
for {
time.Sleep(1 * time.Millisecond)
- totals := msink.AccessTotals()
+ totals := msink.AccessServerTotals()
if compareTotals(totals, expectedTotals) {
return
}
@@ -98,7 +99,7 @@ func TestMetricsSinkMessages(t *testing.T) {
t.Fatalf("failed to create conf: %s", err.Error())
}
msink := NewMetricsSink(cnf)
- totals := msink.AccessTotals()
+ totals := msink.AccessServerTotals()
if len(totals) != 0 {
t.Fatalf("Expected no data in the MetricsSink to start with.")
}
@@ -178,10 +179,93 @@ func TestMetricsSinkMessagesEviction(t *testing.T) {
},
})
for {
- totals := msink.AccessTotals()
+ totals := msink.AccessServerTotals()
if len(totals) == 2 {
break
}
}
msink.Shutdown()
}
+
+func TestIngestedSpansMetricsRest(t *testing.T) {
+ testIngestedSpansMetricsImpl(t, false)
+}
+
+func TestIngestedSpansMetricsPacked(t *testing.T) {
+ testIngestedSpansMetricsImpl(t, true)
+}
+
+func testIngestedSpansMetricsImpl(t *testing.T, usePacked bool) {
+ htraceBld := &MiniHTracedBuilder{Name: "TestIngestedSpansMetrics",
+ DataDirs: make([]string, 2),
+ }
+ ht, err := htraceBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create datastore: %s", err.Error())
+ }
+ defer ht.Close()
+ var hcl *htrace.Client
+ hcl, err = htrace.NewClient(ht.ClientConf())
+ if err != nil {
+ t.Fatalf("failed to create client: %s", err.Error())
+ }
+ if !usePacked {
+ hcl.DisableHrpc()
+ }
+
+ NUM_TEST_SPANS := 12
+ allSpans := createRandomTestSpans(NUM_TEST_SPANS)
+ err = hcl.WriteSpans(&common.WriteSpansReq{
+ Spans: allSpans,
+ })
+ if err != nil {
+ t.Fatalf("WriteSpans failed: %s\n", err.Error())
+ }
+ for {
+ var stats *common.ServerStats
+ stats, err = hcl.GetServerStats()
+ if err != nil {
+ t.Fatalf("GetServerStats failed: %s\n", err.Error())
+ }
+ if stats.IngestedSpans == uint64(NUM_TEST_SPANS) {
+ break
+ }
+ time.Sleep(1 * time.Millisecond)
+ }
+}
+
+func TestCircBuf32(t *testing.T) {
+ cbuf := NewCircBufU32(3)
+ // We arbitrarily define that empty circular buffers have an average of 0.
+ if cbuf.Average() != 0 {
+ t.Fatalf("expected empty CircBufU32 to have an average of 0.\n")
+ }
+ if cbuf.Max() != 0 {
+ t.Fatalf("expected empty CircBufU32 to have a max of 0.\n")
+ }
+ cbuf.Append(2)
+ if cbuf.Average() != 2 {
+ t.Fatalf("expected one-element CircBufU32 to have an average of 2.\n")
+ }
+ cbuf.Append(10)
+ if cbuf.Average() != 6 {
+ t.Fatalf("expected two-element CircBufU32 to have an average of 6.\n")
+ }
+ cbuf.Append(12)
+ if cbuf.Average() != 8 {
+ t.Fatalf("expected three-element CircBufU32 to have an average of 8.\n")
+ }
+ cbuf.Append(14)
+ // The 14 overwrites the original 2 element.
+ if cbuf.Average() != 12 {
+ t.Fatalf("expected three-element CircBufU32 to have an average of 12.\n")
+ }
+ cbuf.Append(1)
+ // The 1 overwrites the original 10 element.
+ if cbuf.Average() != 9 {
+ t.Fatalf("expected three-element CircBufU32 to have an average of 12.\n")
+ }
+ if cbuf.Max() != 14 {
+ t.Fatalf("expected three-element CircBufU32 to have a max of 14.\n")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
index 80df676..a50799a 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
@@ -165,6 +165,13 @@ func (ht *MiniHTraced) ClientConf() *conf.Config {
conf.HTRACE_HRPC_ADDRESS, ht.Hsv.Addr().String())
}
+// Return a Config object that clients can use to connect to this MiniHTraceD
+// by HTTP only (no HRPC).
+func (ht *MiniHTraced) RestOnlyClientConf() *conf.Config {
+ return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(),
+ conf.HTRACE_HRPC_ADDRESS, "")
+}
+
func (ht *MiniHTraced) Close() {
ht.Lg.Infof("Closing MiniHTraced %s\n", ht.Name)
ht.Rsv.Close()
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
index a41e1c7..9b78d15 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
@@ -34,6 +34,7 @@ import (
"path/filepath"
"strconv"
"strings"
+ "time"
)
// Set the response headers.
@@ -198,7 +199,15 @@ type writeSpansHandler struct {
}
func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ startTime := time.Now()
setResponseHeaders(w.Header())
+ client, _, serr := net.SplitHostPort(req.RemoteAddr)
+ if serr != nil {
+ writeError(hand.lg, w, http.StatusBadRequest,
+ fmt.Sprintf("Failed to split host and port for %s: %s\n",
+ req.RemoteAddr, serr.Error()))
+ return
+ }
var dec *json.Decoder
if hand.lg.TraceEnabled() {
b, err := ioutil.ReadAll(req.Body)
@@ -234,12 +243,14 @@ func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
hand.lg.Warnf(fmt.Sprintf("Invalid span ID: %s", spanIdProblem))
} else {
hand.store.WriteSpan(&IncomingSpan{
- Addr: req.RemoteAddr,
+ Addr: client,
Span: span,
})
}
}
- hand.store.msink.UpdateClientDropped(req.RemoteAddr, msg.ClientDropped)
+ endTime := time.Now()
+ hand.store.msink.Update(client, msg.ClientDropped, len(msg.Spans),
+ endTime.Sub(startTime))
}
type queryHandler struct {
@@ -291,6 +302,7 @@ func (hand *logErrorHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)
}
type RestServer struct {
+ http.Server
listener net.Listener
lg *common.Logger
}
@@ -337,14 +349,16 @@ func CreateRestServer(cnf *conf.Config, store *dataStore,
}
}
- rsv.lg.Infof(`Serving static files from "%s"\n`, webdir)
+ rsv.lg.Infof(`Serving static files from "%s"` + "\n", webdir)
r.PathPrefix("/").Handler(http.FileServer(http.Dir(webdir))).Methods("GET")
// Log an error message for unknown non-GET requests.
r.PathPrefix("/").Handler(&logErrorHandler{lg: rsv.lg})
rsv.listener = listener
- go http.Serve(rsv.listener, r)
+ rsv.Handler = r
+ rsv.ErrorLog = rsv.lg.Wrap("[REST] ", common.INFO)
+ go rsv.Serve(rsv.listener)
rsv.lg.Infof("Started REST server on %s\n", rsv.listener.Addr().String())
return rsv, nil
}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
index 04dc269..88071c7 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
@@ -212,13 +212,19 @@ func printServerStats(hcl *htrace.Client) int {
fmt.Fprintf(w, "Server Time\t%s\n",
common.UnixMsToTime(stats.CurMs).Format(time.RFC3339))
fmt.Fprintf(w, "Spans reaped\t%d\n", stats.ReapedSpans)
+ fmt.Fprintf(w, "Spans ingested\t%d\n", stats.IngestedSpans)
+ fmt.Fprintf(w, "Spans dropped by clients\t%d\n", stats.ClientDroppedSpans)
+ dur := time.Millisecond * time.Duration(stats.AverageWriteSpansLatencyMs)
+ fmt.Fprintf(w, "Average WriteSpan Latency\t%s\n", dur.String())
+ dur = time.Millisecond * time.Duration(stats.MaxWriteSpansLatencyMs)
+ fmt.Fprintf(w, "Maximum WriteSpan Latency\t%s\n", dur.String())
fmt.Fprintf(w, "Number of leveldb directories\t%d\n", len(stats.Dirs))
w.Flush()
fmt.Println("")
for i := range stats.Dirs {
dir := stats.Dirs[i]
fmt.Printf("==== %s ===\n", dir.Path)
- fmt.Printf("Approximate number of spans: %d\n", dir.ApproxNumSpans)
+ fmt.Printf("Approximate number of bytes: %d\n", dir.ApproximateBytes)
stats := strings.Replace(dir.LevelDbStats, "\\n", "\n", -1)
fmt.Printf("%s\n", stats)
}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-webapp/src/main/webapp/app/server_info_view.js
----------------------------------------------------------------------
diff --git a/htrace-webapp/src/main/webapp/app/server_info_view.js b/htrace-webapp/src/main/webapp/app/server_info_view.js
index f3473d3..4255995 100644
--- a/htrace-webapp/src/main/webapp/app/server_info_view.js
+++ b/htrace-webapp/src/main/webapp/app/server_info_view.js
@@ -22,6 +22,7 @@ var htrace = htrace || {};
htrace.ServerInfoView = Backbone.View.extend({
events: {
"click .serverConfigurationButton": "showServerConfigurationModal",
+ "click .storageDirectoryStatsButton": "showStorageDirectoryStatsModal",
},
render: function() {
@@ -110,5 +111,18 @@ htrace.ServerInfoView = Backbone.View.extend({
{title: "HTraced Server Configuration", body: out}));
}
})
+ },
+
+ showStorageDirectoryStatsModal: function() {
+ var dirs = this.model.stats.get("Dirs");
+ var out = "";
+ for (var dirIdx = 0; dirIdx < dirs.length; dirIdx++) {
+ var dir = dirs[dirIdx];
+ out += "<h3>" + dir.Path + "</h3>";
+ out += "Approximate size in bytes: " + dir.ApproximateBytes + "<br/>";
+ out += "<pre>" + dir.LevelDbStats + "</pre></pre><br/><p/>";
+ }
+ htrace.showModal(_.template($("#modal-table-template").html())(
+ {title: "HTraced Storage Directory Statistics", body: out}));
}
});
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-webapp/src/main/webapp/app/server_stats.js
----------------------------------------------------------------------
diff --git a/htrace-webapp/src/main/webapp/app/server_stats.js b/htrace-webapp/src/main/webapp/app/server_stats.js
index e4289ef..783041c 100644
--- a/htrace-webapp/src/main/webapp/app/server_stats.js
+++ b/htrace-webapp/src/main/webapp/app/server_stats.js
@@ -20,7 +20,10 @@
// htraced server statistics. See rest.go.
htrace.ServerStats = Backbone.Model.extend({
defaults: {
- "ReapedSpans": "(unknown)",
+ "LastStartMs": "0",
+ "CurMs": "0",
+ "IngestedSpans": "(unknown)",
+ "ReapedSpans": "(unknown)"
},
url: function() {
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/021e4914/htrace-webapp/src/main/webapp/index.html
----------------------------------------------------------------------
diff --git a/htrace-webapp/src/main/webapp/index.html b/htrace-webapp/src/main/webapp/index.html
index 2cebefe..a59282a 100644
--- a/htrace-webapp/src/main/webapp/index.html
+++ b/htrace-webapp/src/main/webapp/index.html
@@ -79,6 +79,22 @@
<td>Spans Reaped</td>
<td><%= model.stats.get("ReapedSpans") %></td>
</tr>
+ <tr>
+ <td>Spans Ingested</td>
+ <td><%= model.stats.get("IngestedSpans") %></td>
+ </tr>
+ <tr>
+ <td>Client Dropped Spans</td>
+ <td><%= model.stats.get("ClientDroppedSpans") %></td>
+ </tr>
+ <tr>
+ <td>Maximum WriteSpans Latency (ms)</td>
+ <td><%= model.stats.get("MaxWriteSpansLatencyMs") %></td>
+ </tr>
+ <tr>
+ <td>Average WriteSpans Latency (ms)</td>
+ <td><%= model.stats.get("AverageWriteSpansLatencyMs") %></td>
+ </tr>
</tr>
<td>Datastore Start Time</td>
<td><%= htrace.dateToString(model.stats.get("LastStartMs")) %></td>
@@ -93,6 +109,9 @@
<%= view.getServerStatsTableHtml() %>
</div>
<button type="button" class="btn btn-info serverConfigurationButton">Server Configuration</button>
+ <button type="button" class="btn btn-success storageDirectoryStatsButton">Storage Directory Stats</button>
+ <br/>
+ <p/>
</div>
<div class="col-md-1">
</div>