You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by cm...@apache.org on 2015/11/21 22:26:47 UTC
incubator-htrace git commit: HTRACE-298. htraced: improve datastore
serialization and metrics (cmccabe)
Repository: incubator-htrace
Updated Branches:
refs/heads/master fe19368a3 -> 699c8cf80
HTRACE-298. htraced: improve datastore serialization and metrics (cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/699c8cf8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/699c8cf8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/699c8cf8
Branch: refs/heads/master
Commit: 699c8cf80058913a89988552ef2d357690309b6f
Parents: fe19368
Author: Colin P. Mccabe <cm...@apache.org>
Authored: Sat Nov 21 13:12:20 2015 -0800
Committer: Colin P. Mccabe <cm...@apache.org>
Committed: Sat Nov 21 13:26:04 2015 -0800
----------------------------------------------------------------------
htrace-htraced/go/Godeps/Godeps.json | 2 +-
.../go/src/org/apache/htrace/common/rpc.go | 31 +-
.../src/org/apache/htrace/conf/config_keys.go | 8 +-
.../org/apache/htrace/htraced/client_test.go | 98 ++++++
.../src/org/apache/htrace/htraced/datastore.go | 254 +++++++++++----
.../org/apache/htrace/htraced/datastore_test.go | 61 ++--
.../go/src/org/apache/htrace/htraced/hrpc.go | 23 +-
.../go/src/org/apache/htrace/htraced/metrics.go | 319 ++++++-------------
.../org/apache/htrace/htraced/metrics_test.go | 156 ++-------
.../org/apache/htrace/htraced/reaper_test.go | 11 +-
.../go/src/org/apache/htrace/htraced/rest.go | 23 +-
.../go/src/org/apache/htrace/htracedTool/cmd.go | 9 +-
.../src/main/webapp/app/server_info_view.js | 4 +-
.../src/main/webapp/app/server_stats.js | 7 +-
htrace-webapp/src/main/webapp/index.html | 12 +-
15 files changed, 491 insertions(+), 527 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/Godeps/Godeps.json
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/Godeps/Godeps.json b/htrace-htraced/go/Godeps/Godeps.json
index 47aa90e..7c737fe 100644
--- a/htrace-htraced/go/Godeps/Godeps.json
+++ b/htrace-htraced/go/Godeps/Godeps.json
@@ -24,7 +24,7 @@
},
{
"ImportPath": "github.com/ugorji/go/codec",
- "Rev": "08bbe4aa39b9f189f4e294b5c8408b5fa5787bb2"
+ "Rev": "1a8bf87a90ddcdc7deaa0038f127ac62135fdd58"
}
]
}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index 2627c26..6375688 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -38,7 +38,7 @@ const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024
// A request to write spans to htraced.
type WriteSpansReq struct {
- Addr string // This gets filled in by the RPC layer.
+ Addr string `json:",omitempty"` // This gets filled in by the RPC layer.
DefaultTrid string `json:",omitempty"`
Spans []*Span
ClientDropped uint64 `json:",omitempty"`
@@ -98,10 +98,19 @@ type SpanMetrics struct {
// The total number of spans dropped by the server.
ServerDropped uint64
- // The total number of spans dropped by the client. Note that this number
- // is tracked on the client itself and doesn't get updated if the client
- // can't contact the server.
- ClientDropped uint64
+ // The total number of spans dropped by the client.
+ //
+ // This number is just an estimate and may be incorrect for many reasons.
+ // If the client can't contact the server at all, then obviously the server
+ // will never increment ClientDropped... even though spans are being
+ // dropped. The client may also tell the server about some new spans it
+ // has dropped, but then for some reason fail to get the acknowledgement
+ // from the server. In that case, the client would re-send its client
+ // dropped estimate and it would be double-counted by the server
+ //
+ // The intention here is to provide a rough estimate of how overloaded
+ // htraced clients are, not to provide strongly consistent numbers.
+ ClientDroppedEstimate uint64
}
// A map from network address strings to SpanMetrics structures.
@@ -130,9 +139,15 @@ type ServerStats struct {
// those that did.
IngestedSpans uint64
- // The total number of spans which have been dropped by clients since the server started,
- // as reported by WriteSpans requests.
- ClientDroppedSpans uint64
+ // The total number of spans which have been written to leveldb since the server started.
+ WrittenSpans uint64
+
+ // The total number of spans dropped by the server since the server started.
+ ServerDroppedSpans uint64
+
+ // An estimate of the total number of spans dropped by the server since the server started.
+ // See SpanMetrics#ClientDroppedEstimate
+ ClientDroppedEstimate uint64
// The maximum latency of a writeSpans request, in milliseconds.
MaxWriteSpansLatencyMs uint32
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
index 511833c..573ce21 100644
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
@@ -68,9 +68,9 @@ const HTRACE_LOG_PATH = "log.path"
// The log level to use for the logs in htrace.
const HTRACE_LOG_LEVEL = "log.level"
-// The period between metrics heartbeats. This is the approximate interval at which we will
-// update global metrics.
-const HTRACE_METRICS_HEARTBEAT_PERIOD_MS = "metrics.heartbeat.period.ms"
+// The period between datastore heartbeats. This is the approximate interval at which we will
+// prune expired spans.
+const HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS = "datastore.heartbeat.period.ms"
// The maximum number of addresses for which we will maintain metrics.
const HTRACE_METRICS_MAX_ADDR_ENTRIES = "metrics.max.addr.entries"
@@ -106,7 +106,7 @@ var DEFAULTS = map[string]string{
HTRACE_DATA_STORE_SPAN_BUFFER_SIZE: "100",
HTRACE_LOG_PATH: "",
HTRACE_LOG_LEVEL: "INFO",
- HTRACE_METRICS_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000),
+ HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000),
HTRACE_METRICS_MAX_ADDR_ENTRIES: "100000",
HTRACE_SPAN_EXPIRY_MS: "0",
HTRACE_REAPER_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 90*1000),
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
index fae871c..36e8369 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
@@ -21,6 +21,8 @@ package main
import (
"fmt"
+ "github.com/ugorji/go/codec"
+ "math"
"math/rand"
"org/apache/htrace/common"
"org/apache/htrace/conf"
@@ -391,3 +393,99 @@ func TestHrpcIoTimeout(t *testing.T) {
close(finishClient)
wg.Wait()
}
+
+func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B) {
+ htraceBld := &MiniHTracedBuilder{Name: "doWriteSpans",
+ Cnf: map[string]string{
+ conf.HTRACE_LOG_LEVEL: "INFO",
+ },
+ WrittenSpans: common.NewSemaphore(int64(1-N)),
+ }
+ ht, err := htraceBld.Build()
+ if err != nil {
+ panic(err)
+ }
+ defer ht.Close()
+ rnd := rand.New(rand.NewSource(1))
+ allSpans := make([]*common.Span, N)
+ for n := 0; n < N; n++ {
+ allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n])
+ }
+ // Determine how many calls to WriteSpans we should make. Each writeSpans
+ // message should be small enough so that it doesn't exceed the max RPC
+ // body length limit. TODO: a production-quality golang client would do
+ // this internally rather than needing us to do it here in the unit test.
+ bodyLen := (4 * common.MAX_HRPC_BODY_LENGTH) / 5
+ reqs := make([]*common.WriteSpansReq, 0, 4)
+ curReq := -1
+ curReqLen := bodyLen
+ var curReqSpans uint32
+ mh := new(codec.MsgpackHandle)
+ mh.WriteExt = true
+ var mbuf [8192]byte
+ buf := mbuf[:0]
+ enc := codec.NewEncoderBytes(&buf, mh)
+ for n := 0; n < N; n++ {
+ span := allSpans[n]
+ if (curReqSpans >= maxSpansPerRpc) ||
+ (curReqLen >= bodyLen) {
+ reqs = append(reqs, &common.WriteSpansReq{})
+ curReqLen = 0
+ curReq++
+ curReqSpans = 0
+ }
+ buf = mbuf[:0]
+ enc.ResetBytes(&buf)
+ err := enc.Encode(span)
+ if err != nil {
+ panic(fmt.Sprintf("Error encoding span %s: %s\n",
+ span.String(), err.Error()))
+ }
+ bufLen := len(buf)
+ if bufLen > (bodyLen / 5) {
+ panic(fmt.Sprintf("Span too long at %d bytes\n", bufLen))
+ }
+ curReqLen += bufLen
+ reqs[curReq].Spans = append(reqs[curReq].Spans, span)
+ curReqSpans++
+ }
+ ht.Store.lg.Infof("num spans: %d. num WriteSpansReq calls: %d\n", N, len(reqs))
+ var hcl *htrace.Client
+ hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+ if err != nil {
+ panic(fmt.Sprintf("failed to create client: %s", err.Error()))
+ }
+ defer hcl.Close()
+
+ // Reset the timer to avoid including the time required to create new
+ // random spans in the benchmark total.
+ if b != nil {
+ b.ResetTimer()
+ }
+
+ // Write many random spans.
+ for reqIdx := range(reqs) {
+ go func() {
+ err = hcl.WriteSpans(reqs[reqIdx])
+ if err != nil {
+ panic(fmt.Sprintf("failed to send WriteSpans request %d: %s",
+ reqIdx, err.Error()))
+ }
+ }()
+ }
+ // Wait for all the spans to be written.
+ ht.Store.WrittenSpans.Wait()
+}
+
+// This is a test of how quickly we can create new spans via WriteSpans RPCs.
+// Like BenchmarkDatastoreWrites, it creates b.N spans in the datastore.
+// Unlike that benchmark, it sends the spans via RPC.
+// Suggested flags for running this:
+// -tags unsafe -cpu 16 -benchtime=1m
+func BenchmarkWriteSpans(b *testing.B) {
+ doWriteSpans("BenchmarkWriteSpans", b.N, math.MaxUint32, b)
+}
+
+func TestWriteSpansRpcs(t *testing.T) {
+ doWriteSpans("TestWriteSpansRpcs", 3000, 1000, nil)
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index 8cd1526..9310e6e 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -26,6 +26,7 @@ import (
"errors"
"fmt"
"github.com/jmhodges/levigo"
+ "github.com/ugorji/go/codec"
"org/apache/htrace/common"
"org/apache/htrace/conf"
"os"
@@ -66,7 +67,7 @@ import (
//
const UNKNOWN_LAYOUT_VERSION = 0
-const CURRENT_LAYOUT_VERSION = 2
+const CURRENT_LAYOUT_VERSION = 3
var EMPTY_BYTE_BUF []byte = []byte{}
@@ -89,6 +90,9 @@ type IncomingSpan struct {
// The span.
*common.Span
+
+ // Serialized span data
+ SpanDataBytes []byte
}
// A single directory containing a levelDB instance.
@@ -103,19 +107,13 @@ type shard struct {
path string
// Incoming requests to write Spans.
- incoming chan *IncomingSpan
+ incoming chan []*IncomingSpan
// A channel for incoming heartbeats
heartbeats chan interface{}
// Tracks whether the shard goroutine has exited.
exited sync.WaitGroup
-
- // Per-address metrics
- mtxMap ServerSpanMetricsMap
-
- // The maximum number of metrics to allow in our map
- maxMtx int
}
// Process incoming spans for a shard.
@@ -127,24 +125,33 @@ func (shd *shard) processIncoming() {
}()
for {
select {
- case span := <-shd.incoming:
- if span == nil {
+ case spans := <-shd.incoming:
+ if spans == nil {
return
}
- err := shd.writeSpan(span)
- if err != nil {
- lg.Errorf("Shard processor for %s got fatal error %s.\n", shd.path, err.Error())
- } else if lg.TraceEnabled() {
- lg.Tracef("Shard processor for %s wrote span %s.\n", shd.path, span.ToJson())
+ totalWritten := 0
+ totalDropped := 0
+ for spanIdx := range(spans) {
+ err := shd.writeSpan(spans[spanIdx])
+ if err != nil {
+ lg.Errorf("Shard processor for %s got fatal error %s.\n",
+ shd.path, err.Error())
+ totalDropped++
+ } else {
+ if lg.TraceEnabled() {
+ lg.Tracef("Shard processor for %s wrote span %s.\n",
+ shd.path, spans[spanIdx].ToJson())
+ }
+ totalWritten++
+ }
+ }
+ shd.store.msink.UpdatePersisted(spans[0].Addr, totalWritten, totalDropped)
+ if shd.store.WrittenSpans != nil {
+ lg.Debugf("Shard %s incrementing WrittenSpans by %d\n", shd.path, len(spans))
+ shd.store.WrittenSpans.Posts(int64(len(spans)))
}
case <-shd.heartbeats:
lg.Tracef("Shard processor for %s handling heartbeat.\n", shd.path)
- mtxMap := make(ServerSpanMetricsMap)
- for addr, mtx := range shd.mtxMap {
- mtxMap[addr] = mtx.Clone()
- mtx.Clear()
- }
- shd.store.msink.UpdateMetrics(mtxMap)
shd.pruneExpired()
}
}
@@ -246,21 +253,10 @@ func u64toSlice(val uint64) []byte {
func (shd *shard) writeSpan(ispan *IncomingSpan) error {
batch := levigo.NewWriteBatch()
defer batch.Close()
-
- // Add SpanData to batch.
- spanDataBuf := new(bytes.Buffer)
- spanDataEnc := gob.NewEncoder(spanDataBuf)
span := ispan.Span
- err := spanDataEnc.Encode(span.SpanData)
- if err != nil {
- shd.store.lg.Errorf("Error encoding span %s: %s\n",
- span.String(), err.Error())
- shd.mtxMap.IncrementDropped(ispan.Addr, shd.maxMtx, shd.store.lg)
- return err
- }
primaryKey :=
append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...)
- batch.Put(primaryKey, spanDataBuf.Bytes())
+ batch.Put(primaryKey, ispan.SpanDataBytes)
// Add this to the parent index.
for parentIdx := range span.Parents {
@@ -280,17 +276,12 @@ func (shd *shard) writeSpan(ispan *IncomingSpan) error {
u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...)
batch.Put(durationKey, EMPTY_BYTE_BUF)
- err = shd.ldb.Write(shd.store.writeOpts, batch)
+ err := shd.ldb.Write(shd.store.writeOpts, batch)
if err != nil {
shd.store.lg.Errorf("Error writing span %s to leveldb at %s: %s\n",
span.String(), shd.path, err.Error())
- shd.mtxMap.IncrementDropped(ispan.Addr, shd.maxMtx, shd.store.lg)
return err
}
- shd.mtxMap.IncrementWritten(ispan.Addr, shd.maxMtx, shd.store.lg)
- if shd.store.WrittenSpans != nil {
- shd.store.WrittenSpans.Post()
- }
return nil
}
@@ -510,13 +501,11 @@ func CreateDataStore(cnf *conf.Config, writtenSpans *common.Semaphore) (*dataSto
for idx := range store.shards {
shd := store.shards[idx]
shd.heartbeats = make(chan interface{}, 1)
- shd.mtxMap = make(ServerSpanMetricsMap)
- shd.maxMtx = store.msink.maxMtx
shd.exited.Add(1)
go shd.processIncoming()
}
store.hb = NewHeartbeater("DatastoreHeartbeater",
- cnf.GetInt64(conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS), lg)
+ cnf.GetInt64(conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS), lg)
for shdIdx := range store.shards {
shd := store.shards[shdIdx]
store.hb.AddHeartbeatTarget(&HeartbeatTarget{
@@ -606,7 +595,7 @@ func CreateShard(store *dataStore, cnf *conf.Config, path string,
}
spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE)
shd = &shard{store: store, ldb: ldb, path: path,
- incoming: make(chan *IncomingSpan, spanBufferSize)}
+ incoming: make(chan []*IncomingSpan, spanBufferSize)}
return shd, nil
}
@@ -654,10 +643,6 @@ func (store *dataStore) Close() {
store.rpr.Shutdown()
store.rpr = nil
}
- if store.msink != nil {
- store.msink.Shutdown()
- store.msink = nil
- }
if store.readOpts != nil {
store.readOpts.Close()
store.readOpts = nil
@@ -677,8 +662,158 @@ func (store *dataStore) getShardIndex(sid common.SpanId) int {
return int(sid.Hash32() % uint32(len(store.shards)))
}
-func (store *dataStore) WriteSpan(span *IncomingSpan) {
- store.shards[store.getShardIndex(span.Id)].incoming <- span
+const WRITESPANS_BATCH_SIZE = 128
+
+// SpanIngestor is a class used internally to ingest spans from an RPC
+// endpoint. It groups spans destined for a particular shard into small
+// batches, so that we can reduce the number of objects that need to be sent
+// over the shard's "incoming" channel. Since sending objects over a channel
+// requires goroutine synchronization, this improves performance.
+//
+// SpanIngestor also allows us to reuse the same encoder object for many spans,
+// rather than creating a new encoder per span. This avoids re-doing the
+// encoder setup for each span, and also generates less garbage.
+type SpanIngestor struct {
+ // The logger to use.
+ lg *common.Logger
+
+ // The dataStore we are ingesting spans into.
+ store *dataStore
+
+ // The remote address these spans are coming from.
+ addr string
+
+ // Default TracerId
+ defaultTrid string
+
+ // The msgpack handle to use to serialize the spans.
+ mh codec.MsgpackHandle
+
+ // The msgpack encoder to use to serialize the spans.
+ // Caching this avoids generating a lot of garbage and burning CPUs
+ // creating new encoder objects for each span.
+ enc *codec.Encoder
+
+ // The buffer which codec.Encoder is currently serializing to.
+ // We have to create a new buffer for each span because once we hand it off to the shard, the
+ // shard manages the buffer lifecycle.
+ spanDataBytes []byte
+
+ // An array mapping shard index to span batch.
+ batches []*SpanIngestorBatch
+
+ // The total number of spans ingested. Includes dropped spans.
+ totalIngested int
+
+ // The total number of spans the ingestor dropped because of a server-side error.
+ serverDropped int
+}
+
+// A batch of spans destined for a particular shard.
+type SpanIngestorBatch struct {
+ incoming []*IncomingSpan
+}
+
+func (store *dataStore) NewSpanIngestor(lg *common.Logger,
+ addr string, defaultTrid string) *SpanIngestor {
+ ing := &SpanIngestor {
+ lg: lg,
+ store: store,
+ addr: addr,
+ defaultTrid: defaultTrid,
+ spanDataBytes: make([]byte, 0, 1024),
+ batches: make([]*SpanIngestorBatch, len(store.shards)),
+ }
+ ing.mh.WriteExt = true
+ ing.enc = codec.NewEncoderBytes(&ing.spanDataBytes, &ing.mh)
+ for batchIdx := range(ing.batches) {
+ ing.batches[batchIdx] = &SpanIngestorBatch {
+ incoming: make([]*IncomingSpan, 0, WRITESPANS_BATCH_SIZE),
+ }
+ }
+ return ing
+}
+
+func (ing *SpanIngestor) IngestSpan(span *common.Span) {
+ ing.totalIngested++
+ // Make sure the span ID is valid.
+ spanIdProblem := span.Id.FindProblem()
+ if spanIdProblem != "" {
+ // Can't print the invalid span ID because String() might fail.
+ ing.lg.Warnf("Invalid span ID: %s\n", spanIdProblem)
+ ing.serverDropped++
+ return
+ }
+
+ // Set the default tracer id, if needed.
+ if span.TracerId == "" {
+ span.TracerId = ing.defaultTrid
+ }
+
+ // Encode the span data. Doing the encoding here is better than doing it
+ // in the shard goroutine, because we can achieve more parallelism.
+ // There is one shard goroutine per shard, but potentially many more
+ // ingestors per shard.
+ err := ing.enc.Encode(span.SpanData)
+ if err != nil {
+ ing.lg.Warnf("Failed to encode span ID %s: %s\n",
+ span.Id.String(), err.Error())
+ ing.serverDropped++
+ return
+ }
+ spanDataBytes := ing.spanDataBytes
+ ing.spanDataBytes = make([]byte, 0, 1024)
+ ing.enc.ResetBytes(&ing.spanDataBytes)
+
+ // Determine which shard this span should go to.
+ shardIdx := ing.store.getShardIndex(span.Id)
+ batch := ing.batches[shardIdx]
+ incomingLen := len(batch.incoming)
+ if ing.lg.TraceEnabled() {
+ ing.lg.Tracef("SpanIngestor#IngestSpan: spanId=%s, shardIdx=%d, " +
+ "incomingLen=%d, cap(batch.incoming)=%d\n",
+ span.Id.String(), shardIdx, incomingLen, cap(batch.incoming))
+ }
+ if incomingLen + 1 == cap(batch.incoming) {
+ if ing.lg.TraceEnabled() {
+ ing.lg.Tracef("SpanIngestor#IngestSpan: flushing %d spans for " +
+ "shard %d\n", len(batch.incoming), shardIdx)
+ }
+ ing.store.WriteSpans(shardIdx, batch.incoming)
+ batch.incoming = make([]*IncomingSpan, 1, WRITESPANS_BATCH_SIZE)
+ incomingLen = 0
+ } else {
+ batch.incoming = batch.incoming[0:incomingLen+1]
+ }
+ batch.incoming[incomingLen] = &IncomingSpan {
+ Addr: ing.addr,
+ Span: span,
+ SpanDataBytes: spanDataBytes,
+ }
+}
+
+func (ing *SpanIngestor) Close(clientDropped int, startTime time.Time) {
+ for shardIdx := range(ing.batches) {
+ batch := ing.batches[shardIdx]
+ if len(batch.incoming) > 0 {
+ if ing.lg.TraceEnabled() {
+ ing.lg.Tracef("SpanIngestor#Close: flushing %d span(s) for " +
+ "shard %d\n", len(batch.incoming), shardIdx)
+ }
+ ing.store.WriteSpans(shardIdx, batch.incoming)
+ }
+ batch.incoming = nil
+ }
+ ing.lg.Debugf("Closed span ingestor for %s. Ingested %d span(s); dropped " +
+ "%d span(s).\n", ing.addr, ing.totalIngested, ing.serverDropped)
+
+ endTime := time.Now()
+ ing.store.msink.UpdateIngested(ing.addr, ing.totalIngested,
+ ing.serverDropped, clientDropped, endTime.Sub(startTime))
+}
+
+func (store *dataStore) WriteSpans(shardIdx int, ispans []*IncomingSpan) {
+ store.shards[shardIdx].incoming <- ispans
}
func (store *dataStore) FindSpan(sid common.SpanId) *common.Span {
@@ -709,14 +844,14 @@ func (shd *shard) FindSpan(sid common.SpanId) *common.Span {
func (shd *shard) decodeSpan(sid common.SpanId, buf []byte) (*common.Span, error) {
r := bytes.NewBuffer(buf)
- decoder := gob.NewDecoder(r)
+ mh := new(codec.MsgpackHandle)
+ mh.WriteExt = true
+ decoder := codec.NewDecoder(r, mh)
data := common.SpanData{}
err := decoder.Decode(&data)
if err != nil {
return nil, err
}
- // Gob encoding translates empty slices to nil. Reverse this so that we're always dealing with
- // non-nil slices.
if data.Parents == nil {
data.Parents = []common.SpanId{}
}
@@ -1259,19 +1394,6 @@ func (store *dataStore) ServerStats() *common.ServerStats {
serverStats.LastStartMs = store.startMs
serverStats.CurMs = common.TimeToUnixMs(time.Now().UTC())
serverStats.ReapedSpans = atomic.LoadUint64(&store.rpr.ReapedSpans)
- wsData := store.msink.wsm.GetData()
- serverStats.HostSpanMetrics = store.msink.AccessServerTotals()
- for k, v := range wsData.clientDroppedMap {
- smtx := serverStats.HostSpanMetrics[k]
- if smtx == nil {
- smtx = &common.SpanMetrics {}
- serverStats.HostSpanMetrics[k] = smtx
- }
- smtx.ClientDropped = v
- }
- serverStats.IngestedSpans = wsData.ingestedSpans
- serverStats.ClientDroppedSpans = wsData.clientDroppedSpans
- serverStats.MaxWriteSpansLatencyMs = wsData.latencyMax
- serverStats.AverageWriteSpansLatencyMs = wsData.latencyAverage
+ store.msink.PopulateServerStats(&serverStats)
return &serverStats
}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index d9f4a0a..e6d1df7 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -73,12 +73,11 @@ var SIMPLE_TEST_SPANS []common.Span = []common.Span{
}
func createSpans(spans []common.Span, store *dataStore) {
+ ing := store.NewSpanIngestor(store.lg, "127.0.0.1", "")
for idx := range spans {
- store.WriteSpan(&IncomingSpan{
- Addr: "127.0.0.1",
- Span: &spans[idx],
- })
+ ing.IngestSpan(&spans[idx])
}
+ ing.Close(0, time.Now())
store.WrittenSpans.Waits(int64(len(spans)))
}
@@ -87,7 +86,7 @@ func TestDatastoreWriteAndRead(t *testing.T) {
t.Parallel()
htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead",
Cnf: map[string]string{
- conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
},
WrittenSpans: common.NewSemaphore(0),
}
@@ -151,7 +150,7 @@ func TestSimpleQuery(t *testing.T) {
t.Parallel()
htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery",
Cnf: map[string]string{
- conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
},
WrittenSpans: common.NewSemaphore(0),
}
@@ -161,11 +160,8 @@ func TestSimpleQuery(t *testing.T) {
}
defer ht.Close()
createSpans(SIMPLE_TEST_SPANS, ht.Store)
- waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
- "127.0.0.1": &common.SpanMetrics{
- Written: uint64(len(SIMPLE_TEST_SPANS)),
- },
- })
+
+ assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
testQuery(t, ht, &common.Query{
Predicates: []common.Predicate{
@@ -183,7 +179,7 @@ func TestQueries2(t *testing.T) {
t.Parallel()
htraceBld := &MiniHTracedBuilder{Name: "TestQueries2",
Cnf: map[string]string{
- conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
},
WrittenSpans: common.NewSemaphore(0),
}
@@ -193,11 +189,7 @@ func TestQueries2(t *testing.T) {
}
defer ht.Close()
createSpans(SIMPLE_TEST_SPANS, ht.Store)
- waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
- "127.0.0.1": &common.SpanMetrics{
- Written: uint64(len(SIMPLE_TEST_SPANS)),
- },
- })
+ assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
testQuery(t, ht, &common.Query{
Predicates: []common.Predicate{
common.Predicate{
@@ -241,7 +233,7 @@ func TestQueries3(t *testing.T) {
t.Parallel()
htraceBld := &MiniHTracedBuilder{Name: "TestQueries3",
Cnf: map[string]string{
- conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
},
WrittenSpans: common.NewSemaphore(0),
}
@@ -251,11 +243,7 @@ func TestQueries3(t *testing.T) {
}
defer ht.Close()
createSpans(SIMPLE_TEST_SPANS, ht.Store)
- waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
- "127.0.0.1": &common.SpanMetrics{
- Written: uint64(len(SIMPLE_TEST_SPANS)),
- },
- })
+ assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
testQuery(t, ht, &common.Query{
Predicates: []common.Predicate{
common.Predicate{
@@ -299,7 +287,7 @@ func TestQueries4(t *testing.T) {
t.Parallel()
htraceBld := &MiniHTracedBuilder{Name: "TestQueries4",
Cnf: map[string]string{
- conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
},
WrittenSpans: common.NewSemaphore(0),
}
@@ -345,7 +333,7 @@ func TestQueries4(t *testing.T) {
func BenchmarkDatastoreWrites(b *testing.B) {
htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites",
Cnf: map[string]string{
- conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "15000",
+ conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
conf.HTRACE_LOG_LEVEL: "INFO",
},
WrittenSpans: common.NewSemaphore(0),
@@ -372,25 +360,20 @@ func BenchmarkDatastoreWrites(b *testing.B) {
b.ResetTimer()
// Write many random spans.
+ ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "")
for n := 0; n < b.N; n++ {
- ht.Store.WriteSpan(&IncomingSpan{
- Addr: "127.0.0.1",
- Span: allSpans[n],
- })
+ ing.IngestSpan(allSpans[n])
}
+ ing.Close(0, time.Now())
// Wait for all the spans to be written.
ht.Store.WrittenSpans.Waits(int64(b.N))
- waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
- "127.0.0.1": &common.SpanMetrics{
- Written: uint64(b.N), // should be less than?
- },
- })
+ assertNumWrittenEquals(b, ht.Store.msink, b.N)
}
func TestReloadDataStore(t *testing.T) {
htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore",
Cnf: map[string]string{
- conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
},
DataDirs: make([]string, 2),
KeepDataDirsOnClose: true,
@@ -494,7 +477,7 @@ func TestQueriesWithContinuationTokens1(t *testing.T) {
t.Parallel()
htraceBld := &MiniHTracedBuilder{Name: "TestQueriesWithContinuationTokens1",
Cnf: map[string]string{
- conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
},
WrittenSpans: common.NewSemaphore(0),
}
@@ -504,11 +487,7 @@ func TestQueriesWithContinuationTokens1(t *testing.T) {
}
defer ht.Close()
createSpans(SIMPLE_TEST_SPANS, ht.Store)
- waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
- "127.0.0.1": &common.SpanMetrics{
- Written: uint64(len(SIMPLE_TEST_SPANS)),
- },
- })
+ assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
// Adding a prev value to this query excludes the first result that we
// would normally get.
testQuery(t, ht, &common.Query{
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
index a0f2e81..a649420 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
@@ -266,26 +266,11 @@ func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
return errors.New(fmt.Sprintf("Failed to split host and port " +
"for %s: %s\n", req.Addr, err.Error()))
}
- for i := range req.Spans {
- span := req.Spans[i]
- spanIdProblem := span.Id.FindProblem()
- if spanIdProblem != "" {
- return errors.New(fmt.Sprintf("Invalid span ID: %s", spanIdProblem))
- }
- if span.TracerId == "" {
- span.TracerId = req.DefaultTrid
- }
- if hand.lg.TraceEnabled() {
- hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson())
- }
- hand.store.WriteSpan(&IncomingSpan{
- Addr: client,
- Span: span,
- })
+ ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid)
+ for spanIdx := range req.Spans {
+ ing.IngestSpan(req.Spans[spanIdx])
}
- endTime := time.Now()
- hand.store.msink.Update(client, req.ClientDropped, len(req.Spans),
- endTime.Sub(startTime))
+ ing.Close(int(req.ClientDropped), startTime)
return nil
}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
index cfff418..5ce3339 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
@@ -20,7 +20,6 @@
package main
import (
- "encoding/json"
"math"
"org/apache/htrace/common"
"org/apache/htrace/conf"
@@ -40,252 +39,114 @@ import (
const LATENCY_CIRC_BUF_SIZE = 4096
-type ServerSpanMetrics struct {
- // The total number of spans written to HTraced.
- Written uint64
-
- // The total number of spans dropped by the server.
- ServerDropped uint64
-}
-
-func (spm *ServerSpanMetrics) Clone() *ServerSpanMetrics {
- return &ServerSpanMetrics{
- Written: spm.Written,
- ServerDropped: spm.ServerDropped,
- }
-}
-
-func (spm *ServerSpanMetrics) String() string {
- jbytes, err := json.Marshal(*spm)
- if err != nil {
- panic(err)
- }
- return string(jbytes)
-}
-
-func (spm *ServerSpanMetrics) Add(ospm *ServerSpanMetrics) {
- spm.Written += ospm.Written
- spm.ServerDropped += ospm.ServerDropped
-}
-
-func (spm *ServerSpanMetrics) Clear() {
- spm.Written = 0
- spm.ServerDropped = 0
-}
-
-// A map from network address strings to ServerSpanMetrics structures.
-type ServerSpanMetricsMap map[string]*ServerSpanMetrics
-
-func (smtxMap ServerSpanMetricsMap) IncrementDropped(addr string, maxMtx int,
- lg *common.Logger) {
- mtx := smtxMap[addr]
- if mtx == nil {
- mtx = &ServerSpanMetrics{}
- smtxMap[addr] = mtx
- }
- mtx.ServerDropped++
- smtxMap.Prune(maxMtx, lg)
-}
-
-func (smtxMap ServerSpanMetricsMap) IncrementWritten(addr string, maxMtx int,
- lg *common.Logger) {
- mtx := smtxMap[addr]
- if mtx == nil {
- mtx = &ServerSpanMetrics{}
- smtxMap[addr] = mtx
- }
- mtx.Written++
- smtxMap.Prune(maxMtx, lg)
-}
-
-func (smtxMap ServerSpanMetricsMap) Prune(maxMtx int, lg *common.Logger) {
- if len(smtxMap) >= maxMtx {
- // Delete a random entry
- for k := range smtxMap {
- lg.Warnf("Evicting metrics entry for addr %s "+
- "because there are more than %d addrs.\n", k, maxMtx)
- delete(smtxMap, k)
- return
- }
- }
-}
-
-type AccessReq struct {
- mtxMap common.SpanMetricsMap
- done chan interface{}
-}
-
type MetricsSink struct {
- // The total span metrics.
- smtxMap ServerSpanMetricsMap
-
- // A channel of incoming shard metrics.
- // When this is shut down, the MetricsSink will exit.
- updateReqs chan ServerSpanMetricsMap
-
- // A channel of incoming requests for shard metrics.
- accessReqs chan *AccessReq
-
- // This will be closed when the MetricsSink has exited.
- exited chan interface{}
-
- // The logger used by this MetricsSink.
+ // The metrics sink logger.
lg *common.Logger
- // The maximum number of metrics totals we will maintain.
+ // The maximum number of entries we shuld allow in the HostSpanMetrics map.
maxMtx int
- // Metrics about WriteSpans requests
- wsm WriteSpanMetrics
-}
+ // The total number of spans ingested by the server (counting dropped spans)
+ IngestedSpans uint64
-func NewMetricsSink(cnf *conf.Config) *MetricsSink {
- mcl := MetricsSink{
- smtxMap: make(ServerSpanMetricsMap),
- updateReqs: make(chan ServerSpanMetricsMap, 128),
- accessReqs: make(chan *AccessReq),
- exited: make(chan interface{}),
- lg: common.NewLogger("metrics", cnf),
- maxMtx: cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
- wsm: WriteSpanMetrics {
- clientDroppedMap: make(map[string]uint64),
- latencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE),
- },
- }
- go mcl.run()
- return &mcl
-}
-
-func (msink *MetricsSink) run() {
- lg := msink.lg
- defer func() {
- lg.Info("MetricsSink: stopping service goroutine.\n")
- close(msink.exited)
- }()
- lg.Tracef("MetricsSink: starting.\n")
- for {
- select {
- case updateReq, open := <-msink.updateReqs:
- if !open {
- lg.Trace("MetricsSink: shutting down cleanly.\n")
- return
- }
- for addr, umtx := range updateReq {
- smtx := msink.smtxMap[addr]
- if smtx == nil {
- smtx = &ServerSpanMetrics{}
- msink.smtxMap[addr] = smtx
- }
- smtx.Add(umtx)
- if lg.TraceEnabled() {
- lg.Tracef("MetricsSink: updated %s to %s\n", addr, smtx.String())
- }
- }
- msink.smtxMap.Prune(msink.maxMtx, lg)
- case accessReq := <-msink.accessReqs:
- msink.handleAccessReq(accessReq)
- }
- }
-}
-
-func (msink *MetricsSink) handleAccessReq(accessReq *AccessReq) {
- msink.lg.Debug("MetricsSink: accessing global metrics.\n")
- defer close(accessReq.done)
- for addr, smtx := range msink.smtxMap {
- accessReq.mtxMap[addr] = &common.SpanMetrics{
- Written: smtx.Written,
- ServerDropped: smtx.ServerDropped,
- }
- }
-}
-
-func (msink *MetricsSink) AccessServerTotals() common.SpanMetricsMap {
- accessReq := &AccessReq{
- mtxMap: make(common.SpanMetricsMap),
- done: make(chan interface{}),
- }
- msink.accessReqs <- accessReq
- <-accessReq.done
- return accessReq.mtxMap
-}
-
-func (msink *MetricsSink) UpdateMetrics(mtxMap ServerSpanMetricsMap) {
- msink.updateReqs <- mtxMap
-}
-
-func (msink *MetricsSink) Shutdown() {
- close(msink.updateReqs)
- <-msink.exited
-}
+ // The total number of spans written to leveldb since the server started.
+ WrittenSpans uint64
-type WriteSpanMetrics struct {
- // Lock protecting WriteSpanMetrics
- lock sync.Mutex
-
- // The number of spans which each client has self-reported that it has
- // dropped.
- clientDroppedMap map[string]uint64
+ // The total number of spans dropped by the server.
+ ServerDropped uint64
- // The total number of new span writes we've gotten since startup.
- ingestedSpans uint64
+ // The total number of spans dropped by the client (self-reported).
+ ClientDroppedEstimate uint64
- // The total number of spans all clients have dropped since startup.
- clientDroppedSpans uint64
+ // Per-host Span Metrics
+ HostSpanMetrics common.SpanMetricsMap
// The last few writeSpan latencies
- latencyCircBuf *CircBufU32
-}
+ wsLatencyCircBuf *CircBufU32
-type WriteSpanMetricsData struct {
- clientDroppedMap map[string]uint64
- ingestedSpans uint64
- clientDroppedSpans uint64
- latencyMax uint32
- latencyAverage uint32
+ // Lock protecting all metrics
+ lock sync.Mutex
}
-func (msink *MetricsSink) Update(client string, clientDropped uint64, clientWritten int,
- wsLatency time.Duration) {
- wsLatencyNs := wsLatency.Nanoseconds() / 1000000
+func NewMetricsSink(cnf *conf.Config) *MetricsSink {
+ return &MetricsSink {
+ lg: common.NewLogger("metrics", cnf),
+ maxMtx: cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
+ HostSpanMetrics: make(common.SpanMetricsMap),
+ wsLatencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE),
+ }
+}
+
+// Update the total number of spans which were ingested, as well as other
+// metrics that get updated during span ingest.
+func (msink *MetricsSink) UpdateIngested(addr string, totalIngested int,
+ serverDropped int, clientDroppedEstimate int, wsLatency time.Duration) {
+ msink.lock.Lock()
+ defer msink.lock.Unlock()
+ msink.IngestedSpans += uint64(totalIngested)
+ msink.ServerDropped += uint64(serverDropped)
+ msink.ClientDroppedEstimate += uint64(clientDroppedEstimate)
+ msink.updateSpanMetrics(addr, 0, serverDropped, clientDroppedEstimate)
+ wsLatencyMs := wsLatency.Nanoseconds() / 1000000
var wsLatency32 uint32
- if wsLatencyNs > math.MaxUint32 {
+ if wsLatencyMs > math.MaxUint32 {
wsLatency32 = math.MaxUint32
} else {
- wsLatency32 = uint32(wsLatencyNs)
- }
- msink.wsm.update(msink.maxMtx, client, clientDropped, clientWritten, wsLatency32)
-}
-
-func (wsm *WriteSpanMetrics) update(maxMtx int, client string, clientDropped uint64,
- clientWritten int, wsLatency uint32) {
- wsm.lock.Lock()
- defer wsm.lock.Unlock()
- wsm.clientDroppedMap[client] = clientDropped
- if len(wsm.clientDroppedMap) >= maxMtx {
- // Delete a random entry
- for k := range wsm.clientDroppedMap {
- delete(wsm.clientDroppedMap, k)
- return
+ wsLatency32 = uint32(wsLatencyMs)
+ }
+ msink.wsLatencyCircBuf.Append(wsLatency32)
+}
+
+// Update the per-host span metrics. Must be called with the lock held.
+func (msink *MetricsSink) updateSpanMetrics(addr string, numWritten int,
+ serverDropped int, clientDroppedEstimate int) {
+ mtx, found := msink.HostSpanMetrics[addr]
+ if !found {
+ // Ensure that the per-host span metrics map doesn't grow too large.
+ if len(msink.HostSpanMetrics) >= msink.maxMtx {
+ // Delete a random entry
+ for k := range msink.HostSpanMetrics {
+ msink.lg.Warnf("Evicting metrics entry for addr %s "+
+ "because there are more than %d addrs.\n", k, msink.maxMtx)
+ delete(msink.HostSpanMetrics, k)
+ break
+ }
+ }
+ mtx = &common.SpanMetrics { }
+ msink.HostSpanMetrics[addr] = mtx
+ }
+ mtx.Written += uint64(numWritten)
+ mtx.ServerDropped += uint64(serverDropped)
+ mtx.ClientDroppedEstimate += uint64(clientDroppedEstimate)
+}
+
+// Update the total number of spans which were persisted to disk.
+func (msink *MetricsSink) UpdatePersisted(addr string, totalWritten int,
+ serverDropped int) {
+ msink.lock.Lock()
+ defer msink.lock.Unlock()
+ msink.WrittenSpans += uint64(totalWritten)
+ msink.ServerDropped += uint64(serverDropped)
+ msink.updateSpanMetrics(addr, totalWritten, serverDropped, 0)
+}
+
+// Read the server stats.
+func (msink *MetricsSink) PopulateServerStats(stats *common.ServerStats) {
+ msink.lock.Lock()
+ defer msink.lock.Unlock()
+ stats.IngestedSpans = msink.IngestedSpans
+ stats.WrittenSpans = msink.WrittenSpans
+ stats.ServerDroppedSpans = msink.ServerDropped
+ stats.ClientDroppedEstimate = msink.ClientDroppedEstimate
+ stats.MaxWriteSpansLatencyMs = msink.wsLatencyCircBuf.Max()
+ stats.AverageWriteSpansLatencyMs = msink.wsLatencyCircBuf.Average()
+ stats.HostSpanMetrics = make(common.SpanMetricsMap)
+ for k, v := range(msink.HostSpanMetrics) {
+ stats.HostSpanMetrics[k] = &common.SpanMetrics {
+ Written: v.Written,
+ ServerDropped: v.ServerDropped,
+ ClientDroppedEstimate: v.ClientDroppedEstimate,
}
- }
- wsm.ingestedSpans += uint64(clientWritten)
- wsm.clientDroppedSpans += uint64(clientDropped)
- wsm.latencyCircBuf.Append(wsLatency)
-}
-
-func (wsm *WriteSpanMetrics) GetData() *WriteSpanMetricsData {
- wsm.lock.Lock()
- defer wsm.lock.Unlock()
- clientDroppedMap := make(map[string]uint64)
- for k, v := range wsm.clientDroppedMap {
- clientDroppedMap[k] = v
- }
- return &WriteSpanMetricsData {
- clientDroppedMap: clientDroppedMap,
- ingestedSpans: wsm.ingestedSpans,
- clientDroppedSpans: wsm.clientDroppedSpans,
- latencyMax: wsm.latencyCircBuf.Max(),
- latencyAverage: wsm.latencyCircBuf.Average(),
}
}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
index 5243d9e..e1dba1f 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
@@ -20,6 +20,7 @@
package main
import (
+ "fmt"
htrace "org/apache/htrace/client"
"org/apache/htrace/common"
"org/apache/htrace/conf"
@@ -28,43 +29,6 @@ import (
"time"
)
-func TestMetricsSinkStartupShutdown(t *testing.T) {
- cnfBld := conf.Builder{
- Values: conf.TEST_VALUES(),
- Defaults: conf.DEFAULTS,
- }
- cnf, err := cnfBld.Build()
- if err != nil {
- t.Fatalf("failed to create conf: %s", err.Error())
- }
- msink := NewMetricsSink(cnf)
- msink.Shutdown()
-}
-
-func TestAddSpanMetrics(t *testing.T) {
- a := &ServerSpanMetrics{
- Written: 100,
- ServerDropped: 200,
- }
- b := &ServerSpanMetrics{
- Written: 500,
- ServerDropped: 100,
- }
- a.Add(b)
- if a.Written != 600 {
- t.Fatalf("SpanMetrics#Add failed to update #Written")
- }
- if a.ServerDropped != 300 {
- t.Fatalf("SpanMetrics#Add failed to update #Dropped")
- }
- if b.Written != 500 {
- t.Fatalf("SpanMetrics#Add updated b#Written")
- }
- if b.ServerDropped != 100 {
- t.Fatalf("SpanMetrics#Add updated b#Dropped")
- }
-}
-
func compareTotals(a, b common.SpanMetricsMap) bool {
for k, v := range a {
if !reflect.DeepEqual(v, b[k]) {
@@ -79,112 +43,52 @@ func compareTotals(a, b common.SpanMetricsMap) bool {
return true
}
-func waitForMetrics(msink *MetricsSink, expectedTotals common.SpanMetricsMap) {
- for {
- time.Sleep(1 * time.Millisecond)
- totals := msink.AccessServerTotals()
- if compareTotals(totals, expectedTotals) {
- return
- }
- }
+type Fatalfer interface {
+ Fatalf(format string, args ...interface{})
}
-func TestMetricsSinkMessages(t *testing.T) {
- cnfBld := conf.Builder{
- Values: conf.TEST_VALUES(),
- Defaults: conf.DEFAULTS,
- }
- cnf, err := cnfBld.Build()
- if err != nil {
- t.Fatalf("failed to create conf: %s", err.Error())
- }
- msink := NewMetricsSink(cnf)
- totals := msink.AccessServerTotals()
- if len(totals) != 0 {
- t.Fatalf("Expected no data in the MetricsSink to start with.")
+func assertNumWrittenEquals(t Fatalfer, msink *MetricsSink,
+ expectedNumWritten int) {
+ var sstats common.ServerStats
+ msink.PopulateServerStats(&sstats)
+ if sstats.WrittenSpans != uint64(expectedNumWritten) {
+ t.Fatalf("sstats.WrittenSpans = %d, but expected %d\n",
+ sstats.WrittenSpans, len(SIMPLE_TEST_SPANS))
+ }
+ if sstats.HostSpanMetrics["127.0.0.1"] == nil {
+ t.Fatalf("no entry for sstats.HostSpanMetrics[127.0.0.1] found.")
+ }
+ if sstats.HostSpanMetrics["127.0.0.1"].Written !=
+ uint64(expectedNumWritten) {
+ t.Fatalf("sstats.HostSpanMetrics[127.0.0.1].Written = %d, but " +
+ "expected %d\n", sstats.HostSpanMetrics["127.0.0.1"].Written,
+ len(SIMPLE_TEST_SPANS))
}
- msink.UpdateMetrics(ServerSpanMetricsMap{
- "192.168.0.100": &ServerSpanMetrics{
- Written: 20,
- ServerDropped: 10,
- },
- })
- waitForMetrics(msink, common.SpanMetricsMap{
- "192.168.0.100": &common.SpanMetrics{
- Written: 20,
- ServerDropped: 10,
- },
- })
- msink.UpdateMetrics(ServerSpanMetricsMap{
- "192.168.0.100": &ServerSpanMetrics{
- Written: 200,
- ServerDropped: 100,
- },
- })
- msink.UpdateMetrics(ServerSpanMetricsMap{
- "192.168.0.100": &ServerSpanMetrics{
- Written: 1000,
- ServerDropped: 1000,
- },
- })
- waitForMetrics(msink, common.SpanMetricsMap{
- "192.168.0.100": &common.SpanMetrics{
- Written: 1220,
- ServerDropped: 1110,
- },
- })
- msink.UpdateMetrics(ServerSpanMetricsMap{
- "192.168.0.200": &ServerSpanMetrics{
- Written: 200,
- ServerDropped: 100,
- },
- })
- waitForMetrics(msink, common.SpanMetricsMap{
- "192.168.0.100": &common.SpanMetrics{
- Written: 1220,
- ServerDropped: 1110,
- },
- "192.168.0.200": &common.SpanMetrics{
- Written: 200,
- ServerDropped: 100,
- },
- })
- msink.Shutdown()
}
-func TestMetricsSinkMessagesEviction(t *testing.T) {
+func TestMetricsSinkPerHostEviction(t *testing.T) {
cnfBld := conf.Builder{
Values: conf.TEST_VALUES(),
Defaults: conf.DEFAULTS,
}
cnfBld.Values[conf.HTRACE_METRICS_MAX_ADDR_ENTRIES] = "2"
- cnfBld.Values[conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS] = "1"
cnf, err := cnfBld.Build()
if err != nil {
t.Fatalf("failed to create conf: %s", err.Error())
}
msink := NewMetricsSink(cnf)
- msink.UpdateMetrics(ServerSpanMetricsMap{
- "192.168.0.100": &ServerSpanMetrics{
- Written: 20,
- ServerDropped: 10,
- },
- "192.168.0.101": &ServerSpanMetrics{
- Written: 20,
- ServerDropped: 10,
- },
- "192.168.0.102": &ServerSpanMetrics{
- Written: 20,
- ServerDropped: 10,
- },
- })
- for {
- totals := msink.AccessServerTotals()
- if len(totals) == 2 {
- break
+ msink.UpdatePersisted("192.168.0.100", 20, 10)
+ msink.UpdatePersisted("192.168.0.101", 20, 10)
+ msink.UpdatePersisted("192.168.0.102", 20, 10)
+ msink.lock.Lock()
+ defer msink.lock.Unlock()
+ if len(msink.HostSpanMetrics) != 2 {
+ for k, v := range(msink.HostSpanMetrics) {
+ fmt.Printf("WATERMELON: [%s] = [%s]\n", k, v)
}
+ t.Fatalf("Expected len(msink.HostSpanMetrics) to be 2, but got %d\n",
+ len(msink.HostSpanMetrics))
}
- msink.Shutdown()
}
func TestIngestedSpansMetricsRest(t *testing.T) {
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
index dcc916a..0140dbb 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
@@ -43,7 +43,7 @@ func TestReapingOldSpans(t *testing.T) {
Cnf: map[string]string{
conf.HTRACE_SPAN_EXPIRY_MS: fmt.Sprintf("%d", 60*60*1000),
conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS: "1",
- conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+ conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "1",
},
WrittenSpans: common.NewSemaphore(0),
DataDirs: make([]string, 2),
@@ -52,12 +52,11 @@ func TestReapingOldSpans(t *testing.T) {
if err != nil {
t.Fatalf("failed to create mini htraced cluster: %s\n", err.Error())
}
- for i := range testSpans {
- ht.Store.WriteSpan(&IncomingSpan{
- Addr: "127.0.0.1",
- Span: testSpans[i],
- })
+ ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "")
+ for spanIdx := range testSpans {
+ ing.IngestSpan(testSpans[spanIdx])
}
+ ing.Close(0, time.Now())
// Wait the spans to be created
ht.Store.WrittenSpans.Waits(NUM_TEST_SPANS)
// Set a reaper date that will remove all the spans except final one.
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
index 432375d..1b90bd4 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
@@ -252,27 +252,12 @@ func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
}
hand.lg.Debugf("writeSpansHandler: received %d span(s). defaultTrid = %s\n",
len(msg.Spans), msg.DefaultTrid)
+
+ ing := hand.store.NewSpanIngestor(hand.lg, client, msg.DefaultTrid)
for spanIdx := range msg.Spans {
- if hand.lg.DebugEnabled() {
- hand.lg.Debugf("writing span %s\n", msg.Spans[spanIdx].ToJson())
- }
- span := msg.Spans[spanIdx]
- if span.TracerId == "" {
- span.TracerId = msg.DefaultTrid
- }
- spanIdProblem := span.Id.FindProblem()
- if spanIdProblem != "" {
- hand.lg.Warnf(fmt.Sprintf("Invalid span ID: %s", spanIdProblem))
- } else {
- hand.store.WriteSpan(&IncomingSpan{
- Addr: client,
- Span: span,
- })
- }
+ ing.IngestSpan(msg.Spans[spanIdx])
}
- endTime := time.Now()
- hand.store.msink.Update(client, msg.ClientDropped, len(msg.Spans),
- endTime.Sub(startTime))
+ ing.Close(int(msg.ClientDropped), startTime)
}
type queryHandler struct {
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
index 7b5e433..c81bbb7 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
@@ -216,7 +216,10 @@ func printServerStats(hcl *htrace.Client) int {
common.UnixMsToTime(stats.CurMs).Format(time.RFC3339))
fmt.Fprintf(w, "Spans reaped\t%d\n", stats.ReapedSpans)
fmt.Fprintf(w, "Spans ingested\t%d\n", stats.IngestedSpans)
- fmt.Fprintf(w, "Spans dropped by clients\t%d\n", stats.ClientDroppedSpans)
+ fmt.Fprintf(w, "Spans written\t%d\n", stats.WrittenSpans)
+ fmt.Fprintf(w, "Spans dropped by server\t%d\n", stats.ServerDroppedSpans)
+ fmt.Fprintf(w, "Estimated spans dropped by clients\t%d\n",
+ stats.ClientDroppedEstimate)
dur := time.Millisecond * time.Duration(stats.AverageWriteSpansLatencyMs)
fmt.Fprintf(w, "Average WriteSpan Latency\t%s\n", dur.String())
dur = time.Millisecond * time.Duration(stats.MaxWriteSpansLatencyMs)
@@ -244,8 +247,8 @@ func printServerStats(hcl *htrace.Client) int {
sort.Sort(keys)
for k := range keys {
mtx := mtxMap[keys[k]]
- fmt.Fprintf(w, "%s\twritten: %d\tserver dropped: %d\tclient dropped: %d\n",
- keys[k], mtx.Written, mtx.ServerDropped, mtx.ClientDropped)
+ fmt.Fprintf(w, "%s\twritten: %d\tserver dropped: %d\tclient dropped estimate: %d\n",
+ keys[k], mtx.Written, mtx.ServerDropped, mtx.ClientDroppedEstimate)
}
w.Flush()
return EXIT_SUCCESS
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-webapp/src/main/webapp/app/server_info_view.js
----------------------------------------------------------------------
diff --git a/htrace-webapp/src/main/webapp/app/server_info_view.js b/htrace-webapp/src/main/webapp/app/server_info_view.js
index 62775e8..efb7545 100644
--- a/htrace-webapp/src/main/webapp/app/server_info_view.js
+++ b/htrace-webapp/src/main/webapp/app/server_info_view.js
@@ -50,7 +50,7 @@ htrace.ServerInfoView = Backbone.View.extend({
'<th>Remote</th>' +
'<th>Written</th>' +
'<th>ServerDropped</th>' +
- '<th>ClientDropped</th>' +
+ '<th>ClientDroppedEstimate</th>' +
'</tr>' +
'</thead>';
var remotes = [];
@@ -69,7 +69,7 @@ htrace.ServerInfoView = Backbone.View.extend({
"<td>" + remote + "</td>" +
"<td>" + smtx.Written + "</td>" +
"<td>" + smtx.ServerDropped + "</td>" +
- "<td>" + smtx.ClientDropped + "</td>" +
+ "<td>" + smtx.ClientDroppedEstimate + "</td>" +
"</tr>";
}
out = out + '</table>';
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-webapp/src/main/webapp/app/server_stats.js
----------------------------------------------------------------------
diff --git a/htrace-webapp/src/main/webapp/app/server_stats.js b/htrace-webapp/src/main/webapp/app/server_stats.js
index 783041c..4cfea92 100644
--- a/htrace-webapp/src/main/webapp/app/server_stats.js
+++ b/htrace-webapp/src/main/webapp/app/server_stats.js
@@ -22,8 +22,13 @@ htrace.ServerStats = Backbone.Model.extend({
defaults: {
"LastStartMs": "0",
"CurMs": "0",
+ "ReapedSpans": "(unknown)",
"IngestedSpans": "(unknown)",
- "ReapedSpans": "(unknown)"
+ "WrittenSpans": "(unknown)",
+ "ServerDroppedSpans": "(unknown)",
+ "ClientDroppedSpans": "(unknown)",
+ "MaxWriteSpansLatencyMs": "(unknown)",
+ "AverageWriteSpansLatencyMs": "(unknown)"
},
url: function() {
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-webapp/src/main/webapp/index.html
----------------------------------------------------------------------
diff --git a/htrace-webapp/src/main/webapp/index.html b/htrace-webapp/src/main/webapp/index.html
index 16fd2f9..1e20ec0 100644
--- a/htrace-webapp/src/main/webapp/index.html
+++ b/htrace-webapp/src/main/webapp/index.html
@@ -84,8 +84,16 @@
<td><%= model.stats.get("IngestedSpans") %></td>
</tr>
<tr>
- <td>Client Dropped Spans</td>
- <td><%= model.stats.get("ClientDroppedSpans") %></td>
+ <td>Spans Written</td>
+ <td><%= model.stats.get("WrittenSpans") %></td>
+ </tr>
+ <tr>
+ <td>Server Dropped Spans</td>
+ <td><%= model.stats.get("ServerDroppedSpans") %></td>
+ </tr>
+ <tr>
+ <td>Estimated Client Dropped Spans</td>
+ <td><%= model.stats.get("ClientDroppedEstimate") %></td>
</tr>
<tr>
<td>Maximum WriteSpans Latency (ms)</td>