You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by cm...@apache.org on 2015/11/21 22:26:47 UTC

incubator-htrace git commit: HTRACE-298. htraced: improve datastore serialization and metrics (cmccabe)

Repository: incubator-htrace
Updated Branches:
  refs/heads/master fe19368a3 -> 699c8cf80


HTRACE-298. htraced: improve datastore serialization and metrics (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/699c8cf8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/699c8cf8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/699c8cf8

Branch: refs/heads/master
Commit: 699c8cf80058913a89988552ef2d357690309b6f
Parents: fe19368
Author: Colin P. Mccabe <cm...@apache.org>
Authored: Sat Nov 21 13:12:20 2015 -0800
Committer: Colin P. Mccabe <cm...@apache.org>
Committed: Sat Nov 21 13:26:04 2015 -0800

----------------------------------------------------------------------
 htrace-htraced/go/Godeps/Godeps.json            |   2 +-
 .../go/src/org/apache/htrace/common/rpc.go      |  31 +-
 .../src/org/apache/htrace/conf/config_keys.go   |   8 +-
 .../org/apache/htrace/htraced/client_test.go    |  98 ++++++
 .../src/org/apache/htrace/htraced/datastore.go  | 254 +++++++++++----
 .../org/apache/htrace/htraced/datastore_test.go |  61 ++--
 .../go/src/org/apache/htrace/htraced/hrpc.go    |  23 +-
 .../go/src/org/apache/htrace/htraced/metrics.go | 319 ++++++-------------
 .../org/apache/htrace/htraced/metrics_test.go   | 156 ++-------
 .../org/apache/htrace/htraced/reaper_test.go    |  11 +-
 .../go/src/org/apache/htrace/htraced/rest.go    |  23 +-
 .../go/src/org/apache/htrace/htracedTool/cmd.go |   9 +-
 .../src/main/webapp/app/server_info_view.js     |   4 +-
 .../src/main/webapp/app/server_stats.js         |   7 +-
 htrace-webapp/src/main/webapp/index.html        |  12 +-
 15 files changed, 491 insertions(+), 527 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/Godeps/Godeps.json
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/Godeps/Godeps.json b/htrace-htraced/go/Godeps/Godeps.json
index 47aa90e..7c737fe 100644
--- a/htrace-htraced/go/Godeps/Godeps.json
+++ b/htrace-htraced/go/Godeps/Godeps.json
@@ -24,7 +24,7 @@
         },
         {
             "ImportPath": "github.com/ugorji/go/codec",
-            "Rev": "08bbe4aa39b9f189f4e294b5c8408b5fa5787bb2"
+            "Rev": "1a8bf87a90ddcdc7deaa0038f127ac62135fdd58"
         }
     ]
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index 2627c26..6375688 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -38,7 +38,7 @@ const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024
 
 // A request to write spans to htraced.
 type WriteSpansReq struct {
-	Addr          string // This gets filled in by the RPC layer.
+	Addr          string `json:",omitempty"` // This gets filled in by the RPC layer.
 	DefaultTrid   string `json:",omitempty"`
 	Spans         []*Span
 	ClientDropped uint64 `json:",omitempty"`
@@ -98,10 +98,19 @@ type SpanMetrics struct {
 	// The total number of spans dropped by the server.
 	ServerDropped uint64
 
-	// The total number of spans dropped by the client.  Note that this number
-	// is tracked on the client itself and doesn't get updated if the client
-	// can't contact the server.
-	ClientDropped uint64
+	// The total number of spans dropped by the client.
+	//
+	// This number is just an estimate and may be incorrect for many reasons.
+	// If the client can't contact the server at all, then obviously the server
+	// will never increment ClientDropped... even though spans are being
+	// dropped.  The client may also tell the server about some new spans it
+	// has dropped, but then for some reason fail to get the acknowledgement
+	// from the server.  In that case, the client would re-send its client
+	// dropped estimate and it would be double-counted by the server
+	//
+	// The intention here is to provide a rough estimate of how overloaded
+	// htraced clients are, not to provide strongly consistent numbers.
+	ClientDroppedEstimate uint64
 }
 
 // A map from network address strings to SpanMetrics structures.
@@ -130,9 +139,15 @@ type ServerStats struct {
 	// those that did.
 	IngestedSpans uint64
 
-	// The total number of spans which have been dropped by clients since the server started,
-	// as reported by WriteSpans requests.
-	ClientDroppedSpans uint64
+	// The total number of spans which have been written to leveldb since the server started.
+	WrittenSpans uint64
+
+	// The total number of spans dropped by the server since the server started.
+	ServerDroppedSpans uint64
+
+	// An estimate of the total number of spans dropped by the server since the server started.
+	// See SpanMetrics#ClientDroppedEstimate
+	ClientDroppedEstimate uint64
 
 	// The maximum latency of a writeSpans request, in milliseconds.
 	MaxWriteSpansLatencyMs uint32

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
index 511833c..573ce21 100644
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
@@ -68,9 +68,9 @@ const HTRACE_LOG_PATH = "log.path"
 // The log level to use for the logs in htrace.
 const HTRACE_LOG_LEVEL = "log.level"
 
-// The period between metrics heartbeats.  This is the approximate interval at which we will
-// update global metrics.
-const HTRACE_METRICS_HEARTBEAT_PERIOD_MS = "metrics.heartbeat.period.ms"
+// The period between datastore heartbeats.  This is the approximate interval at which we will
+// prune expired spans.
+const HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS = "datastore.heartbeat.period.ms"
 
 // The maximum number of addresses for which we will maintain metrics.
 const HTRACE_METRICS_MAX_ADDR_ENTRIES = "metrics.max.addr.entries"
@@ -106,7 +106,7 @@ var DEFAULTS = map[string]string{
 	HTRACE_DATA_STORE_SPAN_BUFFER_SIZE: "100",
 	HTRACE_LOG_PATH:                    "",
 	HTRACE_LOG_LEVEL:                   "INFO",
-	HTRACE_METRICS_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000),
+	HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000),
 	HTRACE_METRICS_MAX_ADDR_ENTRIES:    "100000",
 	HTRACE_SPAN_EXPIRY_MS:              "0",
 	HTRACE_REAPER_HEARTBEAT_PERIOD_MS:  fmt.Sprintf("%d", 90*1000),

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
index fae871c..36e8369 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
@@ -21,6 +21,8 @@ package main
 
 import (
 	"fmt"
+	"github.com/ugorji/go/codec"
+	"math"
 	"math/rand"
 	"org/apache/htrace/common"
 	"org/apache/htrace/conf"
@@ -391,3 +393,99 @@ func TestHrpcIoTimeout(t *testing.T) {
 	close(finishClient)
 	wg.Wait()
 }
+
+func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B) {
+	htraceBld := &MiniHTracedBuilder{Name: "doWriteSpans",
+		Cnf: map[string]string{
+			conf.HTRACE_LOG_LEVEL: "INFO",
+		},
+		WrittenSpans: common.NewSemaphore(int64(1-N)),
+	}
+	ht, err := htraceBld.Build()
+	if err != nil {
+		panic(err)
+	}
+	defer ht.Close()
+	rnd := rand.New(rand.NewSource(1))
+	allSpans := make([]*common.Span, N)
+	for n := 0; n < N; n++ {
+		allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n])
+	}
+	// Determine how many calls to WriteSpans we should make.  Each writeSpans
+	// message should be small enough so that it doesn't exceed the max RPC
+	// body length limit.  TODO: a production-quality golang client would do
+	// this internally rather than needing us to do it here in the unit test.
+	bodyLen := (4 * common.MAX_HRPC_BODY_LENGTH) / 5
+	reqs := make([]*common.WriteSpansReq, 0, 4)
+	curReq := -1
+	curReqLen := bodyLen
+	var curReqSpans uint32
+	mh := new(codec.MsgpackHandle)
+	mh.WriteExt = true
+	var mbuf [8192]byte
+	buf := mbuf[:0]
+	enc := codec.NewEncoderBytes(&buf, mh)
+	for n := 0; n < N; n++ {
+		span := allSpans[n]
+		if (curReqSpans >= maxSpansPerRpc) ||
+			   (curReqLen >= bodyLen) {
+			reqs = append(reqs, &common.WriteSpansReq{})
+			curReqLen = 0
+			curReq++
+			curReqSpans = 0
+		}
+		buf = mbuf[:0]
+		enc.ResetBytes(&buf)
+		err := enc.Encode(span)
+		if err != nil {
+			panic(fmt.Sprintf("Error encoding span %s: %s\n",
+				span.String(), err.Error()))
+		}
+		bufLen := len(buf)
+		if bufLen > (bodyLen / 5) {
+			panic(fmt.Sprintf("Span too long at %d bytes\n", bufLen))
+		}
+		curReqLen += bufLen
+		reqs[curReq].Spans = append(reqs[curReq].Spans, span)
+		curReqSpans++
+	}
+	ht.Store.lg.Infof("num spans: %d.  num WriteSpansReq calls: %d\n", N, len(reqs))
+	var hcl *htrace.Client
+	hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+	if err != nil {
+		panic(fmt.Sprintf("failed to create client: %s", err.Error()))
+	}
+	defer hcl.Close()
+
+	// Reset the timer to avoid including the time required to create new
+	// random spans in the benchmark total.
+	if b != nil {
+		b.ResetTimer()
+	}
+
+	// Write many random spans.
+	for reqIdx := range(reqs) {
+		go func() {
+			err = hcl.WriteSpans(reqs[reqIdx])
+			if err != nil {
+				panic(fmt.Sprintf("failed to send WriteSpans request %d: %s",
+					reqIdx, err.Error()))
+			}
+		}()
+	}
+	// Wait for all the spans to be written.
+	ht.Store.WrittenSpans.Wait()
+}
+
+// This is a test of how quickly we can create new spans via WriteSpans RPCs.
+// Like BenchmarkDatastoreWrites, it creates b.N spans in the datastore.
+// Unlike that benchmark, it sends the spans via RPC.
+// Suggested flags for running this:
+// -tags unsafe -cpu 16 -benchtime=1m
+func BenchmarkWriteSpans(b *testing.B) {
+	doWriteSpans("BenchmarkWriteSpans", b.N, math.MaxUint32, b)
+}
+
+func TestWriteSpansRpcs(t *testing.T) {
+	doWriteSpans("TestWriteSpansRpcs", 3000, 1000, nil)
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index 8cd1526..9310e6e 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -26,6 +26,7 @@ import (
 	"errors"
 	"fmt"
 	"github.com/jmhodges/levigo"
+	"github.com/ugorji/go/codec"
 	"org/apache/htrace/common"
 	"org/apache/htrace/conf"
 	"os"
@@ -66,7 +67,7 @@ import (
 //
 
 const UNKNOWN_LAYOUT_VERSION = 0
-const CURRENT_LAYOUT_VERSION = 2
+const CURRENT_LAYOUT_VERSION = 3
 
 var EMPTY_BYTE_BUF []byte = []byte{}
 
@@ -89,6 +90,9 @@ type IncomingSpan struct {
 
 	// The span.
 	*common.Span
+
+	// Serialized span data
+	SpanDataBytes []byte
 }
 
 // A single directory containing a levelDB instance.
@@ -103,19 +107,13 @@ type shard struct {
 	path string
 
 	// Incoming requests to write Spans.
-	incoming chan *IncomingSpan
+	incoming chan []*IncomingSpan
 
 	// A channel for incoming heartbeats
 	heartbeats chan interface{}
 
 	// Tracks whether the shard goroutine has exited.
 	exited sync.WaitGroup
-
-	// Per-address metrics
-	mtxMap ServerSpanMetricsMap
-
-	// The maximum number of metrics to allow in our map
-	maxMtx int
 }
 
 // Process incoming spans for a shard.
@@ -127,24 +125,33 @@ func (shd *shard) processIncoming() {
 	}()
 	for {
 		select {
-		case span := <-shd.incoming:
-			if span == nil {
+		case spans := <-shd.incoming:
+			if spans == nil {
 				return
 			}
-			err := shd.writeSpan(span)
-			if err != nil {
-				lg.Errorf("Shard processor for %s got fatal error %s.\n", shd.path, err.Error())
-			} else if lg.TraceEnabled() {
-				lg.Tracef("Shard processor for %s wrote span %s.\n", shd.path, span.ToJson())
+			totalWritten := 0
+			totalDropped := 0
+			for spanIdx := range(spans) {
+				err := shd.writeSpan(spans[spanIdx])
+				if err != nil {
+					lg.Errorf("Shard processor for %s got fatal error %s.\n",
+						shd.path, err.Error())
+					totalDropped++
+				} else {
+					if lg.TraceEnabled() {
+						lg.Tracef("Shard processor for %s wrote span %s.\n",
+							shd.path, spans[spanIdx].ToJson())
+					}
+					totalWritten++
+				}
+			}
+			shd.store.msink.UpdatePersisted(spans[0].Addr, totalWritten, totalDropped)
+			if shd.store.WrittenSpans != nil {
+				lg.Debugf("Shard %s incrementing WrittenSpans by %d\n", shd.path, len(spans))
+				shd.store.WrittenSpans.Posts(int64(len(spans)))
 			}
 		case <-shd.heartbeats:
 			lg.Tracef("Shard processor for %s handling heartbeat.\n", shd.path)
-			mtxMap := make(ServerSpanMetricsMap)
-			for addr, mtx := range shd.mtxMap {
-				mtxMap[addr] = mtx.Clone()
-				mtx.Clear()
-			}
-			shd.store.msink.UpdateMetrics(mtxMap)
 			shd.pruneExpired()
 		}
 	}
@@ -246,21 +253,10 @@ func u64toSlice(val uint64) []byte {
 func (shd *shard) writeSpan(ispan *IncomingSpan) error {
 	batch := levigo.NewWriteBatch()
 	defer batch.Close()
-
-	// Add SpanData to batch.
-	spanDataBuf := new(bytes.Buffer)
-	spanDataEnc := gob.NewEncoder(spanDataBuf)
 	span := ispan.Span
-	err := spanDataEnc.Encode(span.SpanData)
-	if err != nil {
-		shd.store.lg.Errorf("Error encoding span %s: %s\n",
-			span.String(), err.Error())
-		shd.mtxMap.IncrementDropped(ispan.Addr, shd.maxMtx, shd.store.lg)
-		return err
-	}
 	primaryKey :=
 		append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...)
-	batch.Put(primaryKey, spanDataBuf.Bytes())
+	batch.Put(primaryKey, ispan.SpanDataBytes)
 
 	// Add this to the parent index.
 	for parentIdx := range span.Parents {
@@ -280,17 +276,12 @@ func (shd *shard) writeSpan(ispan *IncomingSpan) error {
 		u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...)
 	batch.Put(durationKey, EMPTY_BYTE_BUF)
 
-	err = shd.ldb.Write(shd.store.writeOpts, batch)
+	err := shd.ldb.Write(shd.store.writeOpts, batch)
 	if err != nil {
 		shd.store.lg.Errorf("Error writing span %s to leveldb at %s: %s\n",
 			span.String(), shd.path, err.Error())
-		shd.mtxMap.IncrementDropped(ispan.Addr, shd.maxMtx, shd.store.lg)
 		return err
 	}
-	shd.mtxMap.IncrementWritten(ispan.Addr, shd.maxMtx, shd.store.lg)
-	if shd.store.WrittenSpans != nil {
-		shd.store.WrittenSpans.Post()
-	}
 	return nil
 }
 
@@ -510,13 +501,11 @@ func CreateDataStore(cnf *conf.Config, writtenSpans *common.Semaphore) (*dataSto
 	for idx := range store.shards {
 		shd := store.shards[idx]
 		shd.heartbeats = make(chan interface{}, 1)
-		shd.mtxMap = make(ServerSpanMetricsMap)
-		shd.maxMtx = store.msink.maxMtx
 		shd.exited.Add(1)
 		go shd.processIncoming()
 	}
 	store.hb = NewHeartbeater("DatastoreHeartbeater",
-		cnf.GetInt64(conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS), lg)
+		cnf.GetInt64(conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS), lg)
 	for shdIdx := range store.shards {
 		shd := store.shards[shdIdx]
 		store.hb.AddHeartbeatTarget(&HeartbeatTarget{
@@ -606,7 +595,7 @@ func CreateShard(store *dataStore, cnf *conf.Config, path string,
 	}
 	spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE)
 	shd = &shard{store: store, ldb: ldb, path: path,
-		incoming: make(chan *IncomingSpan, spanBufferSize)}
+		incoming: make(chan []*IncomingSpan, spanBufferSize)}
 	return shd, nil
 }
 
@@ -654,10 +643,6 @@ func (store *dataStore) Close() {
 		store.rpr.Shutdown()
 		store.rpr = nil
 	}
-	if store.msink != nil {
-		store.msink.Shutdown()
-		store.msink = nil
-	}
 	if store.readOpts != nil {
 		store.readOpts.Close()
 		store.readOpts = nil
@@ -677,8 +662,158 @@ func (store *dataStore) getShardIndex(sid common.SpanId) int {
 	return int(sid.Hash32() % uint32(len(store.shards)))
 }
 
-func (store *dataStore) WriteSpan(span *IncomingSpan) {
-	store.shards[store.getShardIndex(span.Id)].incoming <- span
+const WRITESPANS_BATCH_SIZE = 128
+
+// SpanIngestor is a class used internally to ingest spans from an RPC
+// endpoint.  It groups spans destined for a particular shard into small
+// batches, so that we can reduce the number of objects that need to be sent
+// over the shard's "incoming" channel.  Since sending objects over a channel
+// requires goroutine synchronization, this improves performance.
+//
+// SpanIngestor also allows us to reuse the same encoder object for many spans,
+// rather than creating a new encoder per span.  This avoids re-doing the
+// encoder setup for each span, and also generates less garbage.
+type SpanIngestor struct {
+	// The logger to use.
+	lg              *common.Logger
+
+	// The dataStore we are ingesting spans into.
+	store           *dataStore
+
+	// The remote address these spans are coming from.
+	addr            string
+
+	// Default TracerId
+	defaultTrid     string
+
+	// The msgpack handle to use to serialize the spans.
+	mh              codec.MsgpackHandle
+
+	// The msgpack encoder to use to serialize the spans.
+	// Caching this avoids generating a lot of garbage and burning CPUs 
+	// creating new encoder objects for each span.
+	enc             *codec.Encoder
+
+	// The buffer which codec.Encoder is currently serializing to. 
+	// We have to create a new buffer for each span because once we hand it off to the shard, the
+	// shard manages the buffer lifecycle.
+	spanDataBytes   []byte
+
+	// An array mapping shard index to span batch.
+	batches         []*SpanIngestorBatch
+
+	// The total number of spans ingested.  Includes dropped spans. 
+	totalIngested	int
+
+	// The total number of spans the ingestor dropped because of a server-side error.
+	serverDropped   int
+}
+
+// A batch of spans destined for a particular shard.
+type SpanIngestorBatch struct {
+	incoming        []*IncomingSpan
+}
+
+func (store *dataStore) NewSpanIngestor(lg *common.Logger,
+		addr string, defaultTrid string) *SpanIngestor {
+	ing := &SpanIngestor {
+		lg: lg,
+		store: store,
+		addr: addr,
+		defaultTrid: defaultTrid,
+		spanDataBytes: make([]byte, 0, 1024),
+		batches: make([]*SpanIngestorBatch, len(store.shards)),
+	}
+	ing.mh.WriteExt = true
+	ing.enc = codec.NewEncoderBytes(&ing.spanDataBytes, &ing.mh)
+	for batchIdx := range(ing.batches) {
+		ing.batches[batchIdx] = &SpanIngestorBatch {
+			incoming: make([]*IncomingSpan, 0, WRITESPANS_BATCH_SIZE),
+		}
+	}
+	return ing
+}
+
+func (ing *SpanIngestor) IngestSpan(span *common.Span) {
+	ing.totalIngested++
+	// Make sure the span ID is valid.
+	spanIdProblem := span.Id.FindProblem()
+	if spanIdProblem != "" {
+		// Can't print the invalid span ID because String() might fail.
+		ing.lg.Warnf("Invalid span ID: %s\n", spanIdProblem)
+		ing.serverDropped++
+		return
+	}
+
+	// Set the default tracer id, if needed.
+	if span.TracerId == "" {
+		span.TracerId = ing.defaultTrid
+	}
+
+	// Encode the span data.  Doing the encoding here is better than doing it
+	// in the shard goroutine, because we can achieve more parallelism.
+	// There is one shard goroutine per shard, but potentially many more
+	// ingestors per shard.
+	err := ing.enc.Encode(span.SpanData)
+	if err != nil {
+		ing.lg.Warnf("Failed to encode span ID %s: %s\n",
+			span.Id.String(), err.Error())
+		ing.serverDropped++
+		return
+	}
+	spanDataBytes := ing.spanDataBytes
+	ing.spanDataBytes = make([]byte, 0, 1024)
+	ing.enc.ResetBytes(&ing.spanDataBytes)
+
+	// Determine which shard this span should go to.
+	shardIdx := ing.store.getShardIndex(span.Id)
+	batch := ing.batches[shardIdx]
+	incomingLen := len(batch.incoming)
+	if ing.lg.TraceEnabled() {
+		ing.lg.Tracef("SpanIngestor#IngestSpan: spanId=%s, shardIdx=%d, " +
+			"incomingLen=%d, cap(batch.incoming)=%d\n",
+			span.Id.String(), shardIdx, incomingLen, cap(batch.incoming))
+	}
+	if incomingLen + 1 == cap(batch.incoming) {
+		if ing.lg.TraceEnabled() {
+			ing.lg.Tracef("SpanIngestor#IngestSpan: flushing %d spans for " +
+				"shard %d\n", len(batch.incoming), shardIdx)
+		}
+		ing.store.WriteSpans(shardIdx, batch.incoming)
+		batch.incoming = make([]*IncomingSpan, 1, WRITESPANS_BATCH_SIZE)
+		incomingLen = 0
+	} else {
+		batch.incoming = batch.incoming[0:incomingLen+1]
+	}
+	batch.incoming[incomingLen] = &IncomingSpan {
+		Addr: ing.addr,
+		Span: span,
+		SpanDataBytes: spanDataBytes,
+	}
+}
+
+func (ing *SpanIngestor) Close(clientDropped int, startTime time.Time) {
+	for shardIdx := range(ing.batches) {
+		batch := ing.batches[shardIdx]
+		if len(batch.incoming) > 0 {
+			if ing.lg.TraceEnabled() {
+				ing.lg.Tracef("SpanIngestor#Close: flushing %d span(s) for " +
+					"shard %d\n", len(batch.incoming), shardIdx)
+			}
+			ing.store.WriteSpans(shardIdx, batch.incoming)
+		}
+		batch.incoming = nil
+	}
+	ing.lg.Debugf("Closed span ingestor for %s.  Ingested %d span(s); dropped " +
+		"%d span(s).\n", ing.addr, ing.totalIngested, ing.serverDropped)
+
+	endTime := time.Now()
+	ing.store.msink.UpdateIngested(ing.addr, ing.totalIngested,
+		ing.serverDropped, clientDropped, endTime.Sub(startTime))
+}
+
+func (store *dataStore) WriteSpans(shardIdx int, ispans []*IncomingSpan) {
+	store.shards[shardIdx].incoming <- ispans
 }
 
 func (store *dataStore) FindSpan(sid common.SpanId) *common.Span {
@@ -709,14 +844,14 @@ func (shd *shard) FindSpan(sid common.SpanId) *common.Span {
 
 func (shd *shard) decodeSpan(sid common.SpanId, buf []byte) (*common.Span, error) {
 	r := bytes.NewBuffer(buf)
-	decoder := gob.NewDecoder(r)
+	mh := new(codec.MsgpackHandle)
+	mh.WriteExt = true
+	decoder := codec.NewDecoder(r, mh)
 	data := common.SpanData{}
 	err := decoder.Decode(&data)
 	if err != nil {
 		return nil, err
 	}
-	// Gob encoding translates empty slices to nil.  Reverse this so that we're always dealing with
-	// non-nil slices.
 	if data.Parents == nil {
 		data.Parents = []common.SpanId{}
 	}
@@ -1259,19 +1394,6 @@ func (store *dataStore) ServerStats() *common.ServerStats {
 	serverStats.LastStartMs = store.startMs
 	serverStats.CurMs = common.TimeToUnixMs(time.Now().UTC())
 	serverStats.ReapedSpans = atomic.LoadUint64(&store.rpr.ReapedSpans)
-	wsData := store.msink.wsm.GetData()
-	serverStats.HostSpanMetrics = store.msink.AccessServerTotals()
-	for k, v := range wsData.clientDroppedMap {
-		smtx := serverStats.HostSpanMetrics[k]
-		if smtx == nil {
-			smtx = &common.SpanMetrics {}
-			serverStats.HostSpanMetrics[k] = smtx
-		}
-		smtx.ClientDropped = v
-	}
-	serverStats.IngestedSpans = wsData.ingestedSpans
-	serverStats.ClientDroppedSpans = wsData.clientDroppedSpans
-	serverStats.MaxWriteSpansLatencyMs = wsData.latencyMax
-	serverStats.AverageWriteSpansLatencyMs = wsData.latencyAverage
+	store.msink.PopulateServerStats(&serverStats)
 	return &serverStats
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index d9f4a0a..e6d1df7 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -73,12 +73,11 @@ var SIMPLE_TEST_SPANS []common.Span = []common.Span{
 }
 
 func createSpans(spans []common.Span, store *dataStore) {
+	ing := store.NewSpanIngestor(store.lg, "127.0.0.1", "")
 	for idx := range spans {
-		store.WriteSpan(&IncomingSpan{
-			Addr: "127.0.0.1",
-			Span: &spans[idx],
-		})
+		ing.IngestSpan(&spans[idx])
 	}
+	ing.Close(0, time.Now())
 	store.WrittenSpans.Waits(int64(len(spans)))
 }
 
@@ -87,7 +86,7 @@ func TestDatastoreWriteAndRead(t *testing.T) {
 	t.Parallel()
 	htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead",
 		Cnf: map[string]string{
-			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+			conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
 		},
 		WrittenSpans: common.NewSemaphore(0),
 	}
@@ -151,7 +150,7 @@ func TestSimpleQuery(t *testing.T) {
 	t.Parallel()
 	htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery",
 		Cnf: map[string]string{
-			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+			conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
 		},
 		WrittenSpans: common.NewSemaphore(0),
 	}
@@ -161,11 +160,8 @@ func TestSimpleQuery(t *testing.T) {
 	}
 	defer ht.Close()
 	createSpans(SIMPLE_TEST_SPANS, ht.Store)
-	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-		"127.0.0.1": &common.SpanMetrics{
-			Written: uint64(len(SIMPLE_TEST_SPANS)),
-		},
-	})
+
+	assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
 
 	testQuery(t, ht, &common.Query{
 		Predicates: []common.Predicate{
@@ -183,7 +179,7 @@ func TestQueries2(t *testing.T) {
 	t.Parallel()
 	htraceBld := &MiniHTracedBuilder{Name: "TestQueries2",
 		Cnf: map[string]string{
-			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+			conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
 		},
 		WrittenSpans: common.NewSemaphore(0),
 	}
@@ -193,11 +189,7 @@ func TestQueries2(t *testing.T) {
 	}
 	defer ht.Close()
 	createSpans(SIMPLE_TEST_SPANS, ht.Store)
-	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-		"127.0.0.1": &common.SpanMetrics{
-			Written: uint64(len(SIMPLE_TEST_SPANS)),
-		},
-	})
+	assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
 	testQuery(t, ht, &common.Query{
 		Predicates: []common.Predicate{
 			common.Predicate{
@@ -241,7 +233,7 @@ func TestQueries3(t *testing.T) {
 	t.Parallel()
 	htraceBld := &MiniHTracedBuilder{Name: "TestQueries3",
 		Cnf: map[string]string{
-			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+			conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
 		},
 		WrittenSpans: common.NewSemaphore(0),
 	}
@@ -251,11 +243,7 @@ func TestQueries3(t *testing.T) {
 	}
 	defer ht.Close()
 	createSpans(SIMPLE_TEST_SPANS, ht.Store)
-	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-		"127.0.0.1": &common.SpanMetrics{
-			Written: uint64(len(SIMPLE_TEST_SPANS)),
-		},
-	})
+	assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
 	testQuery(t, ht, &common.Query{
 		Predicates: []common.Predicate{
 			common.Predicate{
@@ -299,7 +287,7 @@ func TestQueries4(t *testing.T) {
 	t.Parallel()
 	htraceBld := &MiniHTracedBuilder{Name: "TestQueries4",
 		Cnf: map[string]string{
-			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+			conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
 		},
 		WrittenSpans: common.NewSemaphore(0),
 	}
@@ -345,7 +333,7 @@ func TestQueries4(t *testing.T) {
 func BenchmarkDatastoreWrites(b *testing.B) {
 	htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites",
 		Cnf: map[string]string{
-			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "15000",
+			conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
 			conf.HTRACE_LOG_LEVEL: "INFO",
 		},
 		WrittenSpans: common.NewSemaphore(0),
@@ -372,25 +360,20 @@ func BenchmarkDatastoreWrites(b *testing.B) {
 	b.ResetTimer()
 
 	// Write many random spans.
+	ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "")
 	for n := 0; n < b.N; n++ {
-		ht.Store.WriteSpan(&IncomingSpan{
-			Addr: "127.0.0.1",
-			Span: allSpans[n],
-		})
+		ing.IngestSpan(allSpans[n])
 	}
+	ing.Close(0, time.Now())
 	// Wait for all the spans to be written.
 	ht.Store.WrittenSpans.Waits(int64(b.N))
-	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-		"127.0.0.1": &common.SpanMetrics{
-			Written: uint64(b.N), // should be less than?
-		},
-	})
+	assertNumWrittenEquals(b, ht.Store.msink, b.N)
 }
 
 func TestReloadDataStore(t *testing.T) {
 	htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore",
 		Cnf: map[string]string{
-			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+			conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
 		},
 		DataDirs: make([]string, 2),
 		KeepDataDirsOnClose: true,
@@ -494,7 +477,7 @@ func TestQueriesWithContinuationTokens1(t *testing.T) {
 	t.Parallel()
 	htraceBld := &MiniHTracedBuilder{Name: "TestQueriesWithContinuationTokens1",
 		Cnf: map[string]string{
-			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+			conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
 		},
 		WrittenSpans: common.NewSemaphore(0),
 	}
@@ -504,11 +487,7 @@ func TestQueriesWithContinuationTokens1(t *testing.T) {
 	}
 	defer ht.Close()
 	createSpans(SIMPLE_TEST_SPANS, ht.Store)
-	waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-		"127.0.0.1": &common.SpanMetrics{
-			Written: uint64(len(SIMPLE_TEST_SPANS)),
-		},
-	})
+	assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
 	// Adding a prev value to this query excludes the first result that we
 	// would normally get.
 	testQuery(t, ht, &common.Query{

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
index a0f2e81..a649420 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
@@ -266,26 +266,11 @@ func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
 		return errors.New(fmt.Sprintf("Failed to split host and port " +
 			"for %s: %s\n", req.Addr, err.Error()))
 	}
-	for i := range req.Spans {
-		span := req.Spans[i]
-		spanIdProblem := span.Id.FindProblem()
-		if spanIdProblem != "" {
-			return errors.New(fmt.Sprintf("Invalid span ID: %s", spanIdProblem))
-		}
-		if span.TracerId == "" {
-			span.TracerId = req.DefaultTrid
-		}
-		if hand.lg.TraceEnabled() {
-			hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson())
-		}
-		hand.store.WriteSpan(&IncomingSpan{
-			Addr: client,
-			Span: span,
-		})
+	ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid)
+	for spanIdx := range req.Spans {
+		ing.IngestSpan(req.Spans[spanIdx])
 	}
-	endTime := time.Now()
-	hand.store.msink.Update(client, req.ClientDropped, len(req.Spans),
-			endTime.Sub(startTime))
+	ing.Close(int(req.ClientDropped), startTime)
 	return nil
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
index cfff418..5ce3339 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
@@ -20,7 +20,6 @@
 package main
 
 import (
-	"encoding/json"
 	"math"
 	"org/apache/htrace/common"
 	"org/apache/htrace/conf"
@@ -40,252 +39,114 @@ import (
 
 const LATENCY_CIRC_BUF_SIZE = 4096
 
-type ServerSpanMetrics struct {
-	// The total number of spans written to HTraced.
-	Written uint64
-
-	// The total number of spans dropped by the server.
-	ServerDropped uint64
-}
-
-func (spm *ServerSpanMetrics) Clone() *ServerSpanMetrics {
-	return &ServerSpanMetrics{
-		Written:       spm.Written,
-		ServerDropped: spm.ServerDropped,
-	}
-}
-
-func (spm *ServerSpanMetrics) String() string {
-	jbytes, err := json.Marshal(*spm)
-	if err != nil {
-		panic(err)
-	}
-	return string(jbytes)
-}
-
-func (spm *ServerSpanMetrics) Add(ospm *ServerSpanMetrics) {
-	spm.Written += ospm.Written
-	spm.ServerDropped += ospm.ServerDropped
-}
-
-func (spm *ServerSpanMetrics) Clear() {
-	spm.Written = 0
-	spm.ServerDropped = 0
-}
-
-// A map from network address strings to ServerSpanMetrics structures.
-type ServerSpanMetricsMap map[string]*ServerSpanMetrics
-
-func (smtxMap ServerSpanMetricsMap) IncrementDropped(addr string, maxMtx int,
-	lg *common.Logger) {
-	mtx := smtxMap[addr]
-	if mtx == nil {
-		mtx = &ServerSpanMetrics{}
-		smtxMap[addr] = mtx
-	}
-	mtx.ServerDropped++
-	smtxMap.Prune(maxMtx, lg)
-}
-
-func (smtxMap ServerSpanMetricsMap) IncrementWritten(addr string, maxMtx int,
-	lg *common.Logger) {
-	mtx := smtxMap[addr]
-	if mtx == nil {
-		mtx = &ServerSpanMetrics{}
-		smtxMap[addr] = mtx
-	}
-	mtx.Written++
-	smtxMap.Prune(maxMtx, lg)
-}
-
-func (smtxMap ServerSpanMetricsMap) Prune(maxMtx int, lg *common.Logger) {
-	if len(smtxMap) >= maxMtx {
-		// Delete a random entry
-		for k := range smtxMap {
-			lg.Warnf("Evicting metrics entry for addr %s "+
-				"because there are more than %d addrs.\n", k, maxMtx)
-			delete(smtxMap, k)
-			return
-		}
-	}
-}
-
-type AccessReq struct {
-	mtxMap common.SpanMetricsMap
-	done   chan interface{}
-}
-
 type MetricsSink struct {
-	// The total span metrics.
-	smtxMap ServerSpanMetricsMap
-
-	// A channel of incoming shard metrics.
-	// When this is shut down, the MetricsSink will exit.
-	updateReqs chan ServerSpanMetricsMap
-
-	// A channel of incoming requests for shard metrics.
-	accessReqs chan *AccessReq
-
-	// This will be closed when the MetricsSink has exited.
-	exited chan interface{}
-
-	// The logger used by this MetricsSink.
+	// The metrics sink logger.
 	lg *common.Logger
 
-	// The maximum number of metrics totals we will maintain.
+	// The maximum number of entries we shuld allow in the HostSpanMetrics map.
 	maxMtx int
 
-	// Metrics about WriteSpans requests
-	wsm WriteSpanMetrics
-}
+	// The total number of spans ingested by the server (counting dropped spans)
+	IngestedSpans uint64
 
-func NewMetricsSink(cnf *conf.Config) *MetricsSink {
-	mcl := MetricsSink{
-		smtxMap:          make(ServerSpanMetricsMap),
-		updateReqs:       make(chan ServerSpanMetricsMap, 128),
-		accessReqs:       make(chan *AccessReq),
-		exited:           make(chan interface{}),
-		lg:               common.NewLogger("metrics", cnf),
-		maxMtx:           cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
-		wsm: WriteSpanMetrics {
-			clientDroppedMap: make(map[string]uint64),
-			latencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE),
-		},
-	}
-	go mcl.run()
-	return &mcl
-}
-
-func (msink *MetricsSink) run() {
-	lg := msink.lg
-	defer func() {
-		lg.Info("MetricsSink: stopping service goroutine.\n")
-		close(msink.exited)
-	}()
-	lg.Tracef("MetricsSink: starting.\n")
-	for {
-		select {
-		case updateReq, open := <-msink.updateReqs:
-			if !open {
-				lg.Trace("MetricsSink: shutting down cleanly.\n")
-				return
-			}
-			for addr, umtx := range updateReq {
-				smtx := msink.smtxMap[addr]
-				if smtx == nil {
-					smtx = &ServerSpanMetrics{}
-					msink.smtxMap[addr] = smtx
-				}
-				smtx.Add(umtx)
-				if lg.TraceEnabled() {
-					lg.Tracef("MetricsSink: updated %s to %s\n", addr, smtx.String())
-				}
-			}
-			msink.smtxMap.Prune(msink.maxMtx, lg)
-		case accessReq := <-msink.accessReqs:
-			msink.handleAccessReq(accessReq)
-		}
-	}
-}
-
-func (msink *MetricsSink) handleAccessReq(accessReq *AccessReq) {
-	msink.lg.Debug("MetricsSink: accessing global metrics.\n")
-	defer close(accessReq.done)
-	for addr, smtx := range msink.smtxMap {
-		accessReq.mtxMap[addr] = &common.SpanMetrics{
-			Written:       smtx.Written,
-			ServerDropped: smtx.ServerDropped,
-		}
-	}
-}
-
-func (msink *MetricsSink) AccessServerTotals() common.SpanMetricsMap {
-	accessReq := &AccessReq{
-		mtxMap: make(common.SpanMetricsMap),
-		done:   make(chan interface{}),
-	}
-	msink.accessReqs <- accessReq
-	<-accessReq.done
-	return accessReq.mtxMap
-}
-
-func (msink *MetricsSink) UpdateMetrics(mtxMap ServerSpanMetricsMap) {
-	msink.updateReqs <- mtxMap
-}
-
-func (msink *MetricsSink) Shutdown() {
-	close(msink.updateReqs)
-	<-msink.exited
-}
+	// The total number of spans written to leveldb since the server started.
+	WrittenSpans uint64
 
-type WriteSpanMetrics struct {
-	// Lock protecting WriteSpanMetrics
-	lock sync.Mutex
-
-	// The number of spans which each client has self-reported that it has
-	// dropped.
-	clientDroppedMap map[string]uint64
+	// The total number of spans dropped by the server.
+	ServerDropped uint64
 
-	// The total number of new span writes we've gotten since startup.
-	ingestedSpans uint64
+	// The total number of spans dropped by the client (self-reported).
+	ClientDroppedEstimate uint64
 
-	// The total number of spans all clients have dropped since startup.
-	clientDroppedSpans uint64
+	// Per-host Span Metrics
+	HostSpanMetrics common.SpanMetricsMap
 
 	// The last few writeSpan latencies
-	latencyCircBuf *CircBufU32
-}
+	wsLatencyCircBuf *CircBufU32
 
-type WriteSpanMetricsData struct {
-	clientDroppedMap map[string]uint64
-	ingestedSpans uint64
-	clientDroppedSpans uint64
-	latencyMax uint32
-	latencyAverage uint32
+	// Lock protecting all metrics
+	lock sync.Mutex
 }
 
-func (msink *MetricsSink) Update(client string, clientDropped uint64, clientWritten int,
-		wsLatency time.Duration) {
-	wsLatencyNs := wsLatency.Nanoseconds() / 1000000
+func NewMetricsSink(cnf *conf.Config) *MetricsSink {
+	return &MetricsSink {
+		lg:               common.NewLogger("metrics", cnf),
+		maxMtx:           cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
+		HostSpanMetrics: make(common.SpanMetricsMap),
+		wsLatencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE),
+	}
+}
+
+// Update the total number of spans which were ingested, as well as other
+// metrics that get updated during span ingest. 
+func (msink *MetricsSink) UpdateIngested(addr string, totalIngested int,
+		serverDropped int, clientDroppedEstimate int, wsLatency time.Duration) {
+	msink.lock.Lock()
+	defer msink.lock.Unlock()
+	msink.IngestedSpans += uint64(totalIngested)
+	msink.ServerDropped += uint64(serverDropped)
+	msink.ClientDroppedEstimate += uint64(clientDroppedEstimate)
+	msink.updateSpanMetrics(addr, 0, serverDropped, clientDroppedEstimate)
+	wsLatencyMs := wsLatency.Nanoseconds() / 1000000
 	var wsLatency32 uint32
-	if wsLatencyNs > math.MaxUint32 {
+	if wsLatencyMs > math.MaxUint32 {
 		wsLatency32 = math.MaxUint32
 	} else {
-		wsLatency32 = uint32(wsLatencyNs)
-	}
-	msink.wsm.update(msink.maxMtx, client, clientDropped, clientWritten, wsLatency32)
-}
-
-func (wsm *WriteSpanMetrics) update(maxMtx int, client string, clientDropped uint64,
-		clientWritten int, wsLatency uint32) {
-	wsm.lock.Lock()
-	defer wsm.lock.Unlock()
-	wsm.clientDroppedMap[client] = clientDropped
-	if len(wsm.clientDroppedMap) >= maxMtx {
-		// Delete a random entry
-		for k := range wsm.clientDroppedMap {
-			delete(wsm.clientDroppedMap, k)
-			return
+		wsLatency32 = uint32(wsLatencyMs)
+	}
+	msink.wsLatencyCircBuf.Append(wsLatency32)
+}
+
+// Update the per-host span metrics.  Must be called with the lock held.
+func (msink *MetricsSink) updateSpanMetrics(addr string, numWritten int,
+		serverDropped int, clientDroppedEstimate int) {
+	mtx, found := msink.HostSpanMetrics[addr]
+	if !found {
+		// Ensure that the per-host span metrics map doesn't grow too large.
+		if len(msink.HostSpanMetrics) >= msink.maxMtx {
+			// Delete a random entry
+			for k := range msink.HostSpanMetrics {
+				msink.lg.Warnf("Evicting metrics entry for addr %s "+
+					"because there are more than %d addrs.\n", k, msink.maxMtx)
+				delete(msink.HostSpanMetrics, k)
+				break
+			}
+		}
+		mtx = &common.SpanMetrics { }
+		msink.HostSpanMetrics[addr] = mtx
+	}
+	mtx.Written += uint64(numWritten)
+	mtx.ServerDropped += uint64(serverDropped)
+	mtx.ClientDroppedEstimate += uint64(clientDroppedEstimate)
+}
+
+// Update the total number of spans which were persisted to disk.
+func (msink *MetricsSink) UpdatePersisted(addr string, totalWritten int,
+		serverDropped int) {
+	msink.lock.Lock()
+	defer msink.lock.Unlock()
+	msink.WrittenSpans += uint64(totalWritten)
+	msink.ServerDropped += uint64(serverDropped)
+	msink.updateSpanMetrics(addr, totalWritten, serverDropped, 0)
+}
+
+// Read the server stats.
+func (msink *MetricsSink) PopulateServerStats(stats *common.ServerStats) {
+	msink.lock.Lock()
+	defer msink.lock.Unlock()
+	stats.IngestedSpans = msink.IngestedSpans
+	stats.WrittenSpans = msink.WrittenSpans
+	stats.ServerDroppedSpans = msink.ServerDropped
+	stats.ClientDroppedEstimate = msink.ClientDroppedEstimate
+	stats.MaxWriteSpansLatencyMs = msink.wsLatencyCircBuf.Max()
+	stats.AverageWriteSpansLatencyMs = msink.wsLatencyCircBuf.Average()
+	stats.HostSpanMetrics = make(common.SpanMetricsMap)
+	for k, v := range(msink.HostSpanMetrics) {
+		stats.HostSpanMetrics[k] = &common.SpanMetrics {
+			Written: v.Written,
+			ServerDropped: v.ServerDropped,
+			ClientDroppedEstimate: v.ClientDroppedEstimate,
 		}
-	}
-	wsm.ingestedSpans += uint64(clientWritten)
-	wsm.clientDroppedSpans += uint64(clientDropped)
-	wsm.latencyCircBuf.Append(wsLatency)
-}
-
-func (wsm *WriteSpanMetrics) GetData() *WriteSpanMetricsData {
-	wsm.lock.Lock()
-	defer wsm.lock.Unlock()
-	clientDroppedMap := make(map[string]uint64)
-	for k, v := range wsm.clientDroppedMap {
-		clientDroppedMap[k] = v
-	}
-	return &WriteSpanMetricsData {
-		clientDroppedMap: clientDroppedMap,
-		ingestedSpans: wsm.ingestedSpans,
-		clientDroppedSpans: wsm.clientDroppedSpans,
-		latencyMax: wsm.latencyCircBuf.Max(),
-		latencyAverage: wsm.latencyCircBuf.Average(),
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
index 5243d9e..e1dba1f 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
@@ -20,6 +20,7 @@
 package main
 
 import (
+	"fmt"
 	htrace "org/apache/htrace/client"
 	"org/apache/htrace/common"
 	"org/apache/htrace/conf"
@@ -28,43 +29,6 @@ import (
 	"time"
 )
 
-func TestMetricsSinkStartupShutdown(t *testing.T) {
-	cnfBld := conf.Builder{
-		Values:   conf.TEST_VALUES(),
-		Defaults: conf.DEFAULTS,
-	}
-	cnf, err := cnfBld.Build()
-	if err != nil {
-		t.Fatalf("failed to create conf: %s", err.Error())
-	}
-	msink := NewMetricsSink(cnf)
-	msink.Shutdown()
-}
-
-func TestAddSpanMetrics(t *testing.T) {
-	a := &ServerSpanMetrics{
-		Written:       100,
-		ServerDropped: 200,
-	}
-	b := &ServerSpanMetrics{
-		Written:       500,
-		ServerDropped: 100,
-	}
-	a.Add(b)
-	if a.Written != 600 {
-		t.Fatalf("SpanMetrics#Add failed to update #Written")
-	}
-	if a.ServerDropped != 300 {
-		t.Fatalf("SpanMetrics#Add failed to update #Dropped")
-	}
-	if b.Written != 500 {
-		t.Fatalf("SpanMetrics#Add updated b#Written")
-	}
-	if b.ServerDropped != 100 {
-		t.Fatalf("SpanMetrics#Add updated b#Dropped")
-	}
-}
-
 func compareTotals(a, b common.SpanMetricsMap) bool {
 	for k, v := range a {
 		if !reflect.DeepEqual(v, b[k]) {
@@ -79,112 +43,52 @@ func compareTotals(a, b common.SpanMetricsMap) bool {
 	return true
 }
 
-func waitForMetrics(msink *MetricsSink, expectedTotals common.SpanMetricsMap) {
-	for {
-		time.Sleep(1 * time.Millisecond)
-		totals := msink.AccessServerTotals()
-		if compareTotals(totals, expectedTotals) {
-			return
-		}
-	}
+type Fatalfer interface {
+	Fatalf(format string, args ...interface{})
 }
 
-func TestMetricsSinkMessages(t *testing.T) {
-	cnfBld := conf.Builder{
-		Values:   conf.TEST_VALUES(),
-		Defaults: conf.DEFAULTS,
-	}
-	cnf, err := cnfBld.Build()
-	if err != nil {
-		t.Fatalf("failed to create conf: %s", err.Error())
-	}
-	msink := NewMetricsSink(cnf)
-	totals := msink.AccessServerTotals()
-	if len(totals) != 0 {
-		t.Fatalf("Expected no data in the MetricsSink to start with.")
+func assertNumWrittenEquals(t Fatalfer, msink *MetricsSink,
+		expectedNumWritten int) {
+	var sstats common.ServerStats
+	msink.PopulateServerStats(&sstats)
+	if sstats.WrittenSpans != uint64(expectedNumWritten) {
+		t.Fatalf("sstats.WrittenSpans = %d, but expected %d\n",
+			sstats.WrittenSpans, len(SIMPLE_TEST_SPANS))
+	}
+	if sstats.HostSpanMetrics["127.0.0.1"] == nil {
+		t.Fatalf("no entry for sstats.HostSpanMetrics[127.0.0.1] found.")
+	}
+	if sstats.HostSpanMetrics["127.0.0.1"].Written !=
+			uint64(expectedNumWritten) {
+		t.Fatalf("sstats.HostSpanMetrics[127.0.0.1].Written = %d, but " +
+			"expected %d\n", sstats.HostSpanMetrics["127.0.0.1"].Written,
+			len(SIMPLE_TEST_SPANS))
 	}
-	msink.UpdateMetrics(ServerSpanMetricsMap{
-		"192.168.0.100": &ServerSpanMetrics{
-			Written:       20,
-			ServerDropped: 10,
-		},
-	})
-	waitForMetrics(msink, common.SpanMetricsMap{
-		"192.168.0.100": &common.SpanMetrics{
-			Written:       20,
-			ServerDropped: 10,
-		},
-	})
-	msink.UpdateMetrics(ServerSpanMetricsMap{
-		"192.168.0.100": &ServerSpanMetrics{
-			Written:       200,
-			ServerDropped: 100,
-		},
-	})
-	msink.UpdateMetrics(ServerSpanMetricsMap{
-		"192.168.0.100": &ServerSpanMetrics{
-			Written:       1000,
-			ServerDropped: 1000,
-		},
-	})
-	waitForMetrics(msink, common.SpanMetricsMap{
-		"192.168.0.100": &common.SpanMetrics{
-			Written:       1220,
-			ServerDropped: 1110,
-		},
-	})
-	msink.UpdateMetrics(ServerSpanMetricsMap{
-		"192.168.0.200": &ServerSpanMetrics{
-			Written:       200,
-			ServerDropped: 100,
-		},
-	})
-	waitForMetrics(msink, common.SpanMetricsMap{
-		"192.168.0.100": &common.SpanMetrics{
-			Written:       1220,
-			ServerDropped: 1110,
-		},
-		"192.168.0.200": &common.SpanMetrics{
-			Written:       200,
-			ServerDropped: 100,
-		},
-	})
-	msink.Shutdown()
 }
 
-func TestMetricsSinkMessagesEviction(t *testing.T) {
+func TestMetricsSinkPerHostEviction(t *testing.T) {
 	cnfBld := conf.Builder{
 		Values:   conf.TEST_VALUES(),
 		Defaults: conf.DEFAULTS,
 	}
 	cnfBld.Values[conf.HTRACE_METRICS_MAX_ADDR_ENTRIES] = "2"
-	cnfBld.Values[conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS] = "1"
 	cnf, err := cnfBld.Build()
 	if err != nil {
 		t.Fatalf("failed to create conf: %s", err.Error())
 	}
 	msink := NewMetricsSink(cnf)
-	msink.UpdateMetrics(ServerSpanMetricsMap{
-		"192.168.0.100": &ServerSpanMetrics{
-			Written:       20,
-			ServerDropped: 10,
-		},
-		"192.168.0.101": &ServerSpanMetrics{
-			Written:       20,
-			ServerDropped: 10,
-		},
-		"192.168.0.102": &ServerSpanMetrics{
-			Written:       20,
-			ServerDropped: 10,
-		},
-	})
-	for {
-		totals := msink.AccessServerTotals()
-		if len(totals) == 2 {
-			break
+	msink.UpdatePersisted("192.168.0.100", 20, 10)
+	msink.UpdatePersisted("192.168.0.101", 20, 10)
+	msink.UpdatePersisted("192.168.0.102", 20, 10)
+	msink.lock.Lock()
+	defer msink.lock.Unlock()
+	if len(msink.HostSpanMetrics) != 2 {
+		for k, v := range(msink.HostSpanMetrics) {
+			fmt.Printf("WATERMELON: [%s] = [%s]\n", k, v)
 		}
+		t.Fatalf("Expected len(msink.HostSpanMetrics) to be 2, but got %d\n",
+			len(msink.HostSpanMetrics))
 	}
-	msink.Shutdown()
 }
 
 func TestIngestedSpansMetricsRest(t *testing.T) {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
index dcc916a..0140dbb 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
@@ -43,7 +43,7 @@ func TestReapingOldSpans(t *testing.T) {
 		Cnf: map[string]string{
 			conf.HTRACE_SPAN_EXPIRY_MS:              fmt.Sprintf("%d", 60*60*1000),
 			conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS:  "1",
-			conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
+			conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "1",
 		},
 		WrittenSpans: common.NewSemaphore(0),
 		DataDirs:     make([]string, 2),
@@ -52,12 +52,11 @@ func TestReapingOldSpans(t *testing.T) {
 	if err != nil {
 		t.Fatalf("failed to create mini htraced cluster: %s\n", err.Error())
 	}
-	for i := range testSpans {
-		ht.Store.WriteSpan(&IncomingSpan{
-			Addr: "127.0.0.1",
-			Span: testSpans[i],
-		})
+	ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "")
+	for spanIdx := range testSpans {
+		ing.IngestSpan(testSpans[spanIdx])
 	}
+	ing.Close(0, time.Now())
 	// Wait the spans to be created
 	ht.Store.WrittenSpans.Waits(NUM_TEST_SPANS)
 	// Set a reaper date that will remove all the spans except final one.

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
index 432375d..1b90bd4 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
@@ -252,27 +252,12 @@ func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
 	}
 	hand.lg.Debugf("writeSpansHandler: received %d span(s).  defaultTrid = %s\n",
 		len(msg.Spans), msg.DefaultTrid)
+
+	ing := hand.store.NewSpanIngestor(hand.lg, client, msg.DefaultTrid)
 	for spanIdx := range msg.Spans {
-		if hand.lg.DebugEnabled() {
-			hand.lg.Debugf("writing span %s\n", msg.Spans[spanIdx].ToJson())
-		}
-		span := msg.Spans[spanIdx]
-		if span.TracerId == "" {
-			span.TracerId = msg.DefaultTrid
-		}
-		spanIdProblem := span.Id.FindProblem()
-		if spanIdProblem != "" {
-			hand.lg.Warnf(fmt.Sprintf("Invalid span ID: %s", spanIdProblem))
-		} else {
-			hand.store.WriteSpan(&IncomingSpan{
-				Addr: client,
-				Span: span,
-			})
-		}
+		ing.IngestSpan(msg.Spans[spanIdx])
 	}
-	endTime := time.Now()
-	hand.store.msink.Update(client, msg.ClientDropped, len(msg.Spans),
-			endTime.Sub(startTime))
+	ing.Close(int(msg.ClientDropped), startTime)
 }
 
 type queryHandler struct {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
index 7b5e433..c81bbb7 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
@@ -216,7 +216,10 @@ func printServerStats(hcl *htrace.Client) int {
 		common.UnixMsToTime(stats.CurMs).Format(time.RFC3339))
 	fmt.Fprintf(w, "Spans reaped\t%d\n", stats.ReapedSpans)
 	fmt.Fprintf(w, "Spans ingested\t%d\n", stats.IngestedSpans)
-	fmt.Fprintf(w, "Spans dropped by clients\t%d\n", stats.ClientDroppedSpans)
+	fmt.Fprintf(w, "Spans written\t%d\n", stats.WrittenSpans)
+	fmt.Fprintf(w, "Spans dropped by server\t%d\n", stats.ServerDroppedSpans)
+	fmt.Fprintf(w, "Estimated spans dropped by clients\t%d\n",
+		stats.ClientDroppedEstimate)
 	dur := time.Millisecond * time.Duration(stats.AverageWriteSpansLatencyMs)
 	fmt.Fprintf(w, "Average WriteSpan Latency\t%s\n", dur.String())
 	dur = time.Millisecond * time.Duration(stats.MaxWriteSpansLatencyMs)
@@ -244,8 +247,8 @@ func printServerStats(hcl *htrace.Client) int {
 	sort.Sort(keys)
 	for k := range keys {
 		mtx := mtxMap[keys[k]]
-		fmt.Fprintf(w, "%s\twritten: %d\tserver dropped: %d\tclient dropped: %d\n",
-			keys[k], mtx.Written, mtx.ServerDropped, mtx.ClientDropped)
+		fmt.Fprintf(w, "%s\twritten: %d\tserver dropped: %d\tclient dropped estimate: %d\n",
+			keys[k], mtx.Written, mtx.ServerDropped, mtx.ClientDroppedEstimate)
 	}
 	w.Flush()
 	return EXIT_SUCCESS

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-webapp/src/main/webapp/app/server_info_view.js
----------------------------------------------------------------------
diff --git a/htrace-webapp/src/main/webapp/app/server_info_view.js b/htrace-webapp/src/main/webapp/app/server_info_view.js
index 62775e8..efb7545 100644
--- a/htrace-webapp/src/main/webapp/app/server_info_view.js
+++ b/htrace-webapp/src/main/webapp/app/server_info_view.js
@@ -50,7 +50,7 @@ htrace.ServerInfoView = Backbone.View.extend({
             '<th>Remote</th>' +
             '<th>Written</th>' +
             '<th>ServerDropped</th>' +
-            '<th>ClientDropped</th>' +
+            '<th>ClientDroppedEstimate</th>' +
           '</tr>' +
         '</thead>';
     var remotes = [];
@@ -69,7 +69,7 @@ htrace.ServerInfoView = Backbone.View.extend({
         "<td>" + remote + "</td>" +
         "<td>" + smtx.Written + "</td>" +
         "<td>" + smtx.ServerDropped + "</td>" +
-        "<td>" + smtx.ClientDropped + "</td>" +
+        "<td>" + smtx.ClientDroppedEstimate + "</td>" +
         "</tr>";
     }
     out = out + '</table>';

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-webapp/src/main/webapp/app/server_stats.js
----------------------------------------------------------------------
diff --git a/htrace-webapp/src/main/webapp/app/server_stats.js b/htrace-webapp/src/main/webapp/app/server_stats.js
index 783041c..4cfea92 100644
--- a/htrace-webapp/src/main/webapp/app/server_stats.js
+++ b/htrace-webapp/src/main/webapp/app/server_stats.js
@@ -22,8 +22,13 @@ htrace.ServerStats = Backbone.Model.extend({
   defaults: {
     "LastStartMs": "0",
     "CurMs": "0",
+    "ReapedSpans": "(unknown)",
     "IngestedSpans": "(unknown)",
-    "ReapedSpans": "(unknown)"
+    "WrittenSpans": "(unknown)",
+    "ServerDroppedSpans": "(unknown)",
+    "ClientDroppedSpans": "(unknown)",
+    "MaxWriteSpansLatencyMs": "(unknown)",
+    "AverageWriteSpansLatencyMs": "(unknown)"
   },
 
   url: function() {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/699c8cf8/htrace-webapp/src/main/webapp/index.html
----------------------------------------------------------------------
diff --git a/htrace-webapp/src/main/webapp/index.html b/htrace-webapp/src/main/webapp/index.html
index 16fd2f9..1e20ec0 100644
--- a/htrace-webapp/src/main/webapp/index.html
+++ b/htrace-webapp/src/main/webapp/index.html
@@ -84,8 +84,16 @@
               <td><%= model.stats.get("IngestedSpans") %></td>
             </tr>
             <tr>
-              <td>Client Dropped Spans</td>
-              <td><%= model.stats.get("ClientDroppedSpans") %></td>
+              <td>Spans Written</td>
+              <td><%= model.stats.get("WrittenSpans") %></td>
+            </tr>
+            <tr>
+              <td>Server Dropped Spans</td>
+              <td><%= model.stats.get("ServerDroppedSpans") %></td>
+            </tr>
+            <tr>
+              <td>Estimated Client Dropped Spans</td>
+              <td><%= model.stats.get("ClientDroppedEstimate") %></td>
             </tr>
             <tr>
               <td>Maximum WriteSpans Latency (ms)</td>