You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by iw...@apache.org on 2016/04/20 01:32:45 UTC
[2/7] incubator-htrace git commit: HTRACE-357. Rename
htrace-htraced/go/src/org/apache/htrace to htrace-htraced/go/src/htrace
(Colin Patrick McCabe via iwasakims)
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
deleted file mode 100644
index 281ee2d..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ /dev/null
@@ -1,761 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "bytes"
- "encoding/json"
- "math/rand"
- htrace "org/apache/htrace/client"
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
- "org/apache/htrace/test"
- "os"
- "reflect"
- "sort"
- "testing"
- "time"
-)
-
-// Test creating and tearing down a datastore.
-func TestCreateDatastore(t *testing.T) {
- htraceBld := &MiniHTracedBuilder{Name: "TestCreateDatastore",
- DataDirs: make([]string, 3)}
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
- defer ht.Close()
-}
-
-var SIMPLE_TEST_SPANS []common.Span = []common.Span{
- common.Span{Id: common.TestId("00000000000000000000000000000001"),
- SpanData: common.SpanData{
- Begin: 123,
- End: 456,
- Description: "getFileDescriptors",
- Parents: []common.SpanId{},
- TracerId: "firstd",
- }},
- common.Span{Id: common.TestId("00000000000000000000000000000002"),
- SpanData: common.SpanData{
- Begin: 125,
- End: 200,
- Description: "openFd",
- Parents: []common.SpanId{common.TestId("00000000000000000000000000000001")},
- TracerId: "secondd",
- }},
- common.Span{Id: common.TestId("00000000000000000000000000000003"),
- SpanData: common.SpanData{
- Begin: 200,
- End: 456,
- Description: "passFd",
- Parents: []common.SpanId{common.TestId("00000000000000000000000000000001")},
- TracerId: "thirdd",
- }},
-}
-
-func createSpans(spans []common.Span, store *dataStore) {
- ing := store.NewSpanIngestor(store.lg, "127.0.0.1", "")
- for idx := range spans {
- ing.IngestSpan(&spans[idx])
- }
- ing.Close(time.Now())
- store.WrittenSpans.Waits(int64(len(spans)))
-}
-
-// Test creating a datastore and adding some spans.
-func TestDatastoreWriteAndRead(t *testing.T) {
- t.Parallel()
- htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead",
- Cnf: map[string]string{
- conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
- },
- WrittenSpans: common.NewSemaphore(0),
- }
- ht, err := htraceBld.Build()
- if err != nil {
- panic(err)
- }
- defer ht.Close()
- createSpans(SIMPLE_TEST_SPANS, ht.Store)
-
- span := ht.Store.FindSpan(common.TestId("00000000000000000000000000000001"))
- if span == nil {
- t.Fatal()
- }
- if !span.Id.Equal(common.TestId("00000000000000000000000000000001")) {
- t.Fatal()
- }
- common.ExpectSpansEqual(t, &SIMPLE_TEST_SPANS[0], span)
- children := ht.Store.FindChildren(common.TestId("00000000000000000000000000000001"), 1)
- if len(children) != 1 {
- t.Fatalf("expected 1 child, but got %d\n", len(children))
- }
- children = ht.Store.FindChildren(common.TestId("00000000000000000000000000000001"), 2)
- if len(children) != 2 {
- t.Fatalf("expected 2 children, but got %d\n", len(children))
- }
- sort.Sort(common.SpanIdSlice(children))
- if !children[0].Equal(common.TestId("00000000000000000000000000000002")) {
- t.Fatal()
- }
- if !children[1].Equal(common.TestId("00000000000000000000000000000003")) {
- t.Fatal()
- }
-}
-
-func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query,
- expectedSpans []common.Span) {
- testQueryExt(t, ht, query, expectedSpans, nil)
-}
-
-func testQueryExt(t *testing.T, ht *MiniHTraced, query *common.Query,
- expectedSpans []common.Span, expectedNumScanned []int) {
- spans, err, numScanned := ht.Store.HandleQuery(query)
- if err != nil {
- t.Fatalf("Query %s failed: %s\n", query.String(), err.Error())
- }
- expectedBuf := new(bytes.Buffer)
- dec := json.NewEncoder(expectedBuf)
- err = dec.Encode(expectedSpans)
- if err != nil {
- t.Fatalf("Failed to encode expectedSpans to JSON: %s\n", err.Error())
- }
- spansBuf := new(bytes.Buffer)
- dec = json.NewEncoder(spansBuf)
- err = dec.Encode(spans)
- if err != nil {
- t.Fatalf("Failed to encode result spans to JSON: %s\n", err.Error())
- }
- t.Logf("len(spans) = %d, len(expectedSpans) = %d\n", len(spans),
- len(expectedSpans))
- common.ExpectStrEqual(t, string(expectedBuf.Bytes()), string(spansBuf.Bytes()))
- if expectedNumScanned != nil {
- if !reflect.DeepEqual(expectedNumScanned, numScanned) {
- t.Fatalf("Invalid values for numScanned: got %v, expected %v\n",
- expectedNumScanned, numScanned)
- }
- }
-}
-
-// Test queries on the datastore.
-func TestSimpleQuery(t *testing.T) {
- t.Parallel()
- htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery",
- Cnf: map[string]string{
- conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
- },
- WrittenSpans: common.NewSemaphore(0),
- }
- ht, err := htraceBld.Build()
- if err != nil {
- panic(err)
- }
- defer ht.Close()
- createSpans(SIMPLE_TEST_SPANS, ht.Store)
-
- assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
-
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.GREATER_THAN_OR_EQUALS,
- Field: common.BEGIN_TIME,
- Val: "125",
- },
- },
- Lim: 5,
- }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
-}
-
-func TestQueries2(t *testing.T) {
- t.Parallel()
- htraceBld := &MiniHTracedBuilder{Name: "TestQueries2",
- Cnf: map[string]string{
- conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
- },
- WrittenSpans: common.NewSemaphore(0),
- }
- ht, err := htraceBld.Build()
- if err != nil {
- panic(err)
- }
- defer ht.Close()
- createSpans(SIMPLE_TEST_SPANS, ht.Store)
- assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.LESS_THAN_OR_EQUALS,
- Field: common.BEGIN_TIME,
- Val: "125",
- },
- },
- Lim: 5,
- }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]})
-
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.LESS_THAN_OR_EQUALS,
- Field: common.BEGIN_TIME,
- Val: "125",
- },
- common.Predicate{
- Op: common.EQUALS,
- Field: common.DESCRIPTION,
- Val: "getFileDescriptors",
- },
- },
- Lim: 2,
- }, []common.Span{SIMPLE_TEST_SPANS[0]})
-
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.EQUALS,
- Field: common.DESCRIPTION,
- Val: "getFileDescriptors",
- },
- },
- Lim: 2,
- }, []common.Span{SIMPLE_TEST_SPANS[0]})
-}
-
-func TestQueries3(t *testing.T) {
- t.Parallel()
- htraceBld := &MiniHTracedBuilder{Name: "TestQueries3",
- Cnf: map[string]string{
- conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
- },
- WrittenSpans: common.NewSemaphore(0),
- }
- ht, err := htraceBld.Build()
- if err != nil {
- panic(err)
- }
- defer ht.Close()
- createSpans(SIMPLE_TEST_SPANS, ht.Store)
- assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.CONTAINS,
- Field: common.DESCRIPTION,
- Val: "Fd",
- },
- common.Predicate{
- Op: common.GREATER_THAN_OR_EQUALS,
- Field: common.BEGIN_TIME,
- Val: "100",
- },
- },
- Lim: 5,
- }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
-
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.LESS_THAN_OR_EQUALS,
- Field: common.SPAN_ID,
- Val: common.TestId("00000000000000000000000000000000").String(),
- },
- },
- Lim: 200,
- }, []common.Span{})
-
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.LESS_THAN_OR_EQUALS,
- Field: common.SPAN_ID,
- Val: common.TestId("00000000000000000000000000000002").String(),
- },
- },
- Lim: 200,
- }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]})
-}
-
-func TestQueries4(t *testing.T) {
- t.Parallel()
- htraceBld := &MiniHTracedBuilder{Name: "TestQueries4",
- Cnf: map[string]string{
- conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
- },
- WrittenSpans: common.NewSemaphore(0),
- }
- ht, err := htraceBld.Build()
- if err != nil {
- panic(err)
- }
- defer ht.Close()
- createSpans(SIMPLE_TEST_SPANS, ht.Store)
-
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.GREATER_THAN,
- Field: common.BEGIN_TIME,
- Val: "125",
- },
- },
- Lim: 5,
- }, []common.Span{SIMPLE_TEST_SPANS[2]})
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.GREATER_THAN_OR_EQUALS,
- Field: common.DESCRIPTION,
- Val: "openFd",
- },
- },
- Lim: 2,
- }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.GREATER_THAN,
- Field: common.DESCRIPTION,
- Val: "openFd",
- },
- },
- Lim: 2,
- }, []common.Span{SIMPLE_TEST_SPANS[2]})
-}
-
-var TEST_QUERIES5_SPANS []common.Span = []common.Span{
- common.Span{Id: common.TestId("10000000000000000000000000000001"),
- SpanData: common.SpanData{
- Begin: 123,
- End: 456,
- Description: "span1",
- Parents: []common.SpanId{},
- TracerId: "myTracer",
- }},
- common.Span{Id: common.TestId("10000000000000000000000000000002"),
- SpanData: common.SpanData{
- Begin: 123,
- End: 200,
- Description: "span2",
- Parents: []common.SpanId{common.TestId("10000000000000000000000000000001")},
- TracerId: "myTracer",
- }},
- common.Span{Id: common.TestId("10000000000000000000000000000003"),
- SpanData: common.SpanData{
- Begin: 124,
- End: 457,
- Description: "span3",
- Parents: []common.SpanId{common.TestId("10000000000000000000000000000001")},
- TracerId: "myTracer",
- }},
-}
-
-func TestQueries5(t *testing.T) {
- t.Parallel()
- htraceBld := &MiniHTracedBuilder{Name: "TestQueries5",
- WrittenSpans: common.NewSemaphore(0),
- DataDirs: make([]string, 1),
- }
- ht, err := htraceBld.Build()
- if err != nil {
- panic(err)
- }
- defer ht.Close()
- createSpans(TEST_QUERIES5_SPANS, ht.Store)
-
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.GREATER_THAN,
- Field: common.BEGIN_TIME,
- Val: "123",
- },
- },
- Lim: 5,
- }, []common.Span{TEST_QUERIES5_SPANS[2]})
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.GREATER_THAN,
- Field: common.END_TIME,
- Val: "200",
- },
- },
- Lim: 500,
- }, []common.Span{TEST_QUERIES5_SPANS[0], TEST_QUERIES5_SPANS[2]})
-
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.LESS_THAN_OR_EQUALS,
- Field: common.END_TIME,
- Val: "999",
- },
- },
- Lim: 500,
- }, []common.Span{TEST_QUERIES5_SPANS[2],
- TEST_QUERIES5_SPANS[0],
- TEST_QUERIES5_SPANS[1],
- })
-}
-
-func BenchmarkDatastoreWrites(b *testing.B) {
- htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites",
- Cnf: map[string]string{
- conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
- conf.HTRACE_LOG_LEVEL: "INFO",
- },
- WrittenSpans: common.NewSemaphore(0),
- }
- ht, err := htraceBld.Build()
- if err != nil {
- b.Fatalf("Error creating MiniHTraced: %s\n", err.Error())
- }
- ht.Store.lg.Infof("BenchmarkDatastoreWrites: b.N = %d\n", b.N)
- defer func() {
- if r := recover(); r != nil {
- ht.Store.lg.Infof("panic: %s\n", r.(error))
- }
- ht.Close()
- }()
- rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
- allSpans := make([]*common.Span, b.N)
- for n := range allSpans {
- allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n])
- }
-
- // Reset the timer to avoid including the time required to create new
- // random spans in the benchmark total.
- b.ResetTimer()
-
- // Write many random spans.
- ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "")
- for n := 0; n < b.N; n++ {
- ing.IngestSpan(allSpans[n])
- }
- ing.Close(time.Now())
- // Wait for all the spans to be written.
- ht.Store.WrittenSpans.Waits(int64(b.N))
- assertNumWrittenEquals(b, ht.Store.msink, b.N)
-}
-
-func verifySuccessfulLoad(t *testing.T, allSpans common.SpanSlice,
- dataDirs []string) {
- htraceBld := &MiniHTracedBuilder{
- Name: "TestReloadDataStore#verifySuccessfulLoad",
- DataDirs: dataDirs,
- KeepDataDirsOnClose: true,
- }
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
- defer ht.Close()
- var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf(), nil)
- if err != nil {
- t.Fatalf("failed to create client: %s", err.Error())
- }
- defer hcl.Close()
- for i := 0; i < len(allSpans); i++ {
- span, err := hcl.FindSpan(allSpans[i].Id)
- if err != nil {
- t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
- }
- common.ExpectSpansEqual(t, allSpans[i], span)
- }
- // Look up the spans we wrote.
- var span *common.Span
- for i := 0; i < len(allSpans); i++ {
- span, err = hcl.FindSpan(allSpans[i].Id)
- if err != nil {
- t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
- }
- common.ExpectSpansEqual(t, allSpans[i], span)
- }
-}
-
-func verifyFailedLoad(t *testing.T, dataDirs []string, expectedErr string) {
- htraceBld := &MiniHTracedBuilder{
- Name: "TestReloadDataStore#verifyFailedLoad",
- DataDirs: dataDirs,
- KeepDataDirsOnClose: true,
- }
- _, err := htraceBld.Build()
- if err == nil {
- t.Fatalf("expected failure to load, but the load succeeded.")
- }
- common.AssertErrContains(t, err, expectedErr)
-}
-
-func TestReloadDataStore(t *testing.T) {
- htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore",
- Cnf: map[string]string{
- conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
- },
- DataDirs: make([]string, 2),
- KeepDataDirsOnClose: true,
- WrittenSpans: common.NewSemaphore(0),
- }
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
- dataDirs := make([]string, len(ht.DataDirs))
- copy(dataDirs, ht.DataDirs)
- defer func() {
- if ht != nil {
- ht.Close()
- }
- for i := range dataDirs {
- os.RemoveAll(dataDirs[i])
- }
- }()
- var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf(), nil)
- if err != nil {
- t.Fatalf("failed to create client: %s", err.Error())
- }
- hcnf := ht.Cnf.Clone()
-
- // Create some random trace spans.
- NUM_TEST_SPANS := 5
- allSpans := createRandomTestSpans(NUM_TEST_SPANS)
- err = hcl.WriteSpans(allSpans)
- if err != nil {
- t.Fatalf("WriteSpans failed: %s\n", err.Error())
- }
- ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS))
-
- // Look up the spans we wrote.
- var span *common.Span
- for i := 0; i < NUM_TEST_SPANS; i++ {
- span, err = hcl.FindSpan(allSpans[i].Id)
- if err != nil {
- t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
- }
- common.ExpectSpansEqual(t, allSpans[i], span)
- }
- hcl.Close()
- ht.Close()
- ht = nil
-
- // Verify that we can reload the datastore, even if we configure the data
- // directories in a different order.
- verifySuccessfulLoad(t, allSpans, []string{dataDirs[1], dataDirs[0]})
-
- // If we try to reload the datastore with only one directory, it won't work
- // (we need both).
- verifyFailedLoad(t, []string{dataDirs[1]},
- "The TotalShards field of all shards is 2, but we have 1 shards.")
-
- // Test that we give an intelligent error message when 0 directories are
- // configured.
- verifyFailedLoad(t, []string{}, "No shard directories found.")
-
- // Can't specify the same directory more than once... will get "lock
- // already held by process"
- verifyFailedLoad(t, []string{dataDirs[0], dataDirs[1], dataDirs[1]},
- " already held by process.")
-
- // Open the datastore and modify it to have the wrong DaemonId
- dld := NewDataStoreLoader(hcnf)
- defer func() {
- if dld != nil {
- dld.Close()
- dld = nil
- }
- }()
- dld.LoadShards()
- sinfo, err := dld.shards[0].readShardInfo()
- if err != nil {
- t.Fatalf("error reading shard info for shard %s: %s\n",
- dld.shards[0].path, err.Error())
- }
- newDaemonId := sinfo.DaemonId + 1
- dld.lg.Infof("Read %s from shard %s. Changing daemonId to 0x%016x\n.",
- asJson(sinfo), dld.shards[0].path, newDaemonId)
- sinfo.DaemonId = newDaemonId
- err = dld.shards[0].writeShardInfo(sinfo)
- if err != nil {
- t.Fatalf("error writing shard info for shard %s: %s\n",
- dld.shards[0].path, err.Error())
- }
- dld.Close()
- dld = nil
- verifyFailedLoad(t, dataDirs, "DaemonId mismatch.")
-
- // Open the datastore and modify it to have the wrong TotalShards
- dld = NewDataStoreLoader(hcnf)
- dld.LoadShards()
- sinfo, err = dld.shards[0].readShardInfo()
- if err != nil {
- t.Fatalf("error reading shard info for shard %s: %s\n",
- dld.shards[0].path, err.Error())
- }
- newDaemonId = sinfo.DaemonId - 1
- dld.lg.Infof("Read %s from shard %s. Changing daemonId to 0x%016x, " +
- "TotalShards to 3\n.",
- asJson(sinfo), dld.shards[0].path, newDaemonId)
- sinfo.DaemonId = newDaemonId
- sinfo.TotalShards = 3
- err = dld.shards[0].writeShardInfo(sinfo)
- if err != nil {
- t.Fatalf("error writing shard info for shard %s: %s\n",
- dld.shards[0].path, err.Error())
- }
- dld.Close()
- dld = nil
- verifyFailedLoad(t, dataDirs, "TotalShards mismatch.")
-
- // Open the datastore and modify it to have the wrong LayoutVersion
- dld = NewDataStoreLoader(hcnf)
- dld.LoadShards()
- for shardIdx := range(dld.shards) {
- sinfo, err = dld.shards[shardIdx].readShardInfo()
- if err != nil {
- t.Fatalf("error reading shard info for shard %s: %s\n",
- dld.shards[shardIdx].path, err.Error())
- }
- dld.lg.Infof("Read %s from shard %s. Changing TotalShards to 2, " +
- "LayoutVersion to 2\n", asJson(sinfo), dld.shards[shardIdx].path)
- sinfo.TotalShards = 2
- sinfo.LayoutVersion = 2
- err = dld.shards[shardIdx].writeShardInfo(sinfo)
- if err != nil {
- t.Fatalf("error writing shard info for shard %s: %s\n",
- dld.shards[0].path, err.Error())
- }
- }
- dld.Close()
- dld = nil
- verifyFailedLoad(t, dataDirs, "The layout version of all shards is 2, " +
- "but we only support")
-
- // It should work with data.store.clear set.
- htraceBld = &MiniHTracedBuilder{
- Name: "TestReloadDataStore#clear",
- DataDirs: dataDirs,
- KeepDataDirsOnClose: true,
- Cnf: map[string]string{conf.HTRACE_DATA_STORE_CLEAR: "true"},
- }
- ht, err = htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
-}
-
-func TestQueriesWithContinuationTokens1(t *testing.T) {
- t.Parallel()
- htraceBld := &MiniHTracedBuilder{Name: "TestQueriesWithContinuationTokens1",
- Cnf: map[string]string{
- conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000",
- },
- WrittenSpans: common.NewSemaphore(0),
- }
- ht, err := htraceBld.Build()
- if err != nil {
- panic(err)
- }
- defer ht.Close()
- createSpans(SIMPLE_TEST_SPANS, ht.Store)
- assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
- // Adding a prev value to this query excludes the first result that we
- // would normally get.
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.GREATER_THAN,
- Field: common.BEGIN_TIME,
- Val: "120",
- },
- },
- Lim: 5,
- Prev: &SIMPLE_TEST_SPANS[0],
- }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
-
- // There is only one result from an EQUALS query on SPAN_ID.
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.EQUALS,
- Field: common.SPAN_ID,
- Val: common.TestId("00000000000000000000000000000001").String(),
- },
- },
- Lim: 100,
- Prev: &SIMPLE_TEST_SPANS[0],
- }, []common.Span{})
-
- // When doing a LESS_THAN_OR_EQUALS search, we still don't get back the
- // span we pass as a continuation token. (Primary index edition).
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.LESS_THAN_OR_EQUALS,
- Field: common.SPAN_ID,
- Val: common.TestId("00000000000000000000000000000002").String(),
- },
- },
- Lim: 100,
- Prev: &SIMPLE_TEST_SPANS[1],
- }, []common.Span{SIMPLE_TEST_SPANS[0]})
-
- // When doing a GREATER_THAN_OR_EQUALS search, we still don't get back the
- // span we pass as a continuation token. (Secondary index edition).
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.GREATER_THAN,
- Field: common.DURATION,
- Val: "0",
- },
- },
- Lim: 100,
- Prev: &SIMPLE_TEST_SPANS[1],
- }, []common.Span{SIMPLE_TEST_SPANS[2], SIMPLE_TEST_SPANS[0]})
-}
-
-func TestQueryRowsScanned(t *testing.T) {
- t.Parallel()
- htraceBld := &MiniHTracedBuilder{Name: "TestQueryRowsScanned",
- WrittenSpans: common.NewSemaphore(0),
- }
- ht, err := htraceBld.Build()
- if err != nil {
- panic(err)
- }
- defer ht.Close()
- createSpans(SIMPLE_TEST_SPANS, ht.Store)
- assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS))
- testQueryExt(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.EQUALS,
- Field: common.SPAN_ID,
- Val: common.TestId("00000000000000000000000000000001").String(),
- },
- },
- Lim: 100,
- Prev: nil,
- }, []common.Span{SIMPLE_TEST_SPANS[0]},
- []int{2, 1})
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
deleted file mode 100644
index 49a21ee..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "org/apache/htrace/common"
- "sync"
- "time"
-)
-
-type Heartbeater struct {
- // The name of this heartbeater
- name string
-
- // How long to sleep between heartbeats, in milliseconds.
- periodMs int64
-
- // The logger to use.
- lg *common.Logger
-
- // The channels to send the heartbeat on.
- targets []HeartbeatTarget
-
- // Incoming requests to the heartbeater. When this is closed, the
- // heartbeater will exit.
- req chan *HeartbeatTarget
-
- wg sync.WaitGroup
-}
-
-type HeartbeatTarget struct {
- // The name of the heartbeat target.
- name string
-
- // The channel for the heartbeat target.
- targetChan chan interface{}
-}
-
-func (tgt *HeartbeatTarget) String() string {
- return tgt.name
-}
-
-func NewHeartbeater(name string, periodMs int64, lg *common.Logger) *Heartbeater {
- hb := &Heartbeater{
- name: name,
- periodMs: periodMs,
- lg: lg,
- targets: make([]HeartbeatTarget, 0, 4),
- req: make(chan *HeartbeatTarget),
- }
- hb.wg.Add(1)
- go hb.run()
- return hb
-}
-
-func (hb *Heartbeater) AddHeartbeatTarget(tgt *HeartbeatTarget) {
- hb.req <- tgt
-}
-
-func (hb *Heartbeater) Shutdown() {
- close(hb.req)
- hb.wg.Wait()
-}
-
-func (hb *Heartbeater) String() string {
- return hb.name
-}
-
-func (hb *Heartbeater) run() {
- defer func() {
- hb.lg.Debugf("%s: exiting.\n", hb.String())
- hb.wg.Done()
- }()
- period := time.Duration(hb.periodMs) * time.Millisecond
- for {
- periodEnd := time.Now().Add(period)
- for {
- timeToWait := periodEnd.Sub(time.Now())
- if timeToWait <= 0 {
- break
- } else if timeToWait > period {
- // Smooth over jitter or clock changes
- timeToWait = period
- periodEnd = time.Now().Add(period)
- }
- select {
- case tgt, open := <-hb.req:
- if !open {
- return
- }
- hb.targets = append(hb.targets, *tgt)
- hb.lg.Debugf("%s: added %s.\n", hb.String(), tgt.String())
- case <-time.After(timeToWait):
- }
- }
- for targetIdx := range hb.targets {
- select {
- case hb.targets[targetIdx].targetChan <- nil:
- default:
- // We failed to send a heartbeat because the other goroutine was busy and
- // hasn't cleared the previous one from its channel. This could indicate a
- // stuck goroutine.
- hb.lg.Infof("%s: could not send heartbeat to %s.\n",
- hb.String(), hb.targets[targetIdx])
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go
deleted file mode 100644
index cbde7fc..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
- "testing"
- "time"
-)
-
-func TestHeartbeaterStartupShutdown(t *testing.T) {
- cnfBld := conf.Builder{
- Values: conf.TEST_VALUES(),
- Defaults: conf.DEFAULTS,
- }
- cnf, err := cnfBld.Build()
- if err != nil {
- t.Fatalf("failed to create conf: %s", err.Error())
- }
- lg := common.NewLogger("heartbeater", cnf)
- hb := NewHeartbeater("ExampleHeartbeater", 1, lg)
- if hb.String() != "ExampleHeartbeater" {
- t.Fatalf("hb.String() returned %s instead of %s\n", hb.String(), "ExampleHeartbeater")
- }
- hb.Shutdown()
-}
-
-// The number of milliseconds between heartbeats
-const HEARTBEATER_PERIOD = 5
-
-// The number of heartbeats to send in the test.
-const NUM_TEST_HEARTBEATS = 3
-
-func TestHeartbeaterSendsHeartbeats(t *testing.T) {
- cnfBld := conf.Builder{
- Values: conf.TEST_VALUES(),
- Defaults: conf.DEFAULTS,
- }
- cnf, err := cnfBld.Build()
- if err != nil {
- t.Fatalf("failed to create conf: %s", err.Error())
- }
- lg := common.NewLogger("heartbeater", cnf)
- // The minimum amount of time which the heartbeater test should take
- MINIMUM_TEST_DURATION := time.Millisecond * (NUM_TEST_HEARTBEATS * HEARTBEATER_PERIOD)
- duration := MINIMUM_TEST_DURATION
- for duration <= MINIMUM_TEST_DURATION {
- start := time.Now()
- testHeartbeaterSendsHeartbeatsImpl(t, lg)
- end := time.Now()
- duration = end.Sub(start)
- lg.Debugf("Measured duration: %v; minimum expected duration: %v\n",
- duration, MINIMUM_TEST_DURATION)
- }
-}
-
-func testHeartbeaterSendsHeartbeatsImpl(t *testing.T, lg *common.Logger) {
- hb := NewHeartbeater("ExampleHeartbeater", HEARTBEATER_PERIOD, lg)
- if hb.String() != "ExampleHeartbeater" {
- t.Fatalf("hb.String() returned %s instead of %s\n", hb.String(), "ExampleHeartbeater")
- }
- testChan := make(chan interface{}, NUM_TEST_HEARTBEATS)
- gotAllHeartbeats := make(chan bool)
- hb.AddHeartbeatTarget(&HeartbeatTarget{
- name: "ExampleHeartbeatTarget",
- targetChan: testChan,
- })
- go func() {
- for i := 0; i < NUM_TEST_HEARTBEATS; i++ {
- <-testChan
- }
- gotAllHeartbeats <- true
- for i := 0; i < NUM_TEST_HEARTBEATS; i++ {
- _, open := <-testChan
- if !open {
- return
- }
- }
- }()
- <-gotAllHeartbeats
- hb.Shutdown()
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
deleted file mode 100644
index ecd13d4..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "bufio"
- "bytes"
- "encoding/binary"
- "encoding/json"
- "errors"
- "fmt"
- "github.com/ugorji/go/codec"
- "io"
- "net"
- "net/rpc"
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
- "sync"
- "sync/atomic"
- "time"
-)
-
-const MAX_HRPC_HANDLERS = 32765
-
-// Handles HRPC calls
-type HrpcHandler struct {
- lg *common.Logger
- store *dataStore
-}
-
-// The HRPC server
-type HrpcServer struct {
- *rpc.Server
- hand *HrpcHandler
-
- // The listener we are using to accept new connections.
- listener net.Listener
-
- // A WaitGroup used to block until the HRPC server has exited.
- exited sync.WaitGroup
-
- // A channel containing server codecs to use. This channel is fully
- // buffered. The number of entries it initially contains determines how
- // many concurrent codecs we will have running at once.
- cdcs chan *HrpcServerCodec
-
- // Used to shut down
- shutdown chan interface{}
-
- // The I/O timeout to use when reading requests or sending responses. This
- // timeout does not apply to the time we spend processing the message.
- ioTimeo time.Duration
-
- // A count of all I/O errors that we have encountered since the server
- // started. This counts errors like improperly formatted message frames,
- // but not errors like properly formatted but invalid messages.
- // This count is updated from multiple goroutines via sync/atomic.
- ioErrorCount uint64
-
- // The test hooks to use, or nil during normal operation.
- testHooks *hrpcTestHooks
-}
-
-type hrpcTestHooks struct {
- // A callback we make right after calling Accept() but before reading from
- // the new connection.
- HandleAdmission func()
-}
-
-// A codec which encodes HRPC data via JSON. This structure holds the context
-// for a particular client connection.
-type HrpcServerCodec struct {
- lg *common.Logger
-
- // The current connection.
- conn net.Conn
-
- // The HrpcServer which this connection is part of.
- hsv *HrpcServer
-
- // The message length we read from the header.
- length uint32
-
- // The number of messages this connection has handled.
- numHandled int
-
- // The buffer for reading requests. These buffers are reused for multiple
- // requests to avoid allocating memory.
- buf []byte
-
- // Configuration for msgpack decoding
- msgpackHandle codec.MsgpackHandle
-}
-
-func asJson(val interface{}) string {
- js, err := json.Marshal(val)
- if err != nil {
- return "encoding error: " + err.Error()
- }
- return string(js)
-}
-
-func newIoErrorWarn(cdc *HrpcServerCodec, val string) error {
- return newIoError(cdc, val, common.WARN)
-}
-
-func newIoError(cdc *HrpcServerCodec, val string, level common.Level) error {
- if cdc.lg.LevelEnabled(level) {
- cdc.lg.Write(level, cdc.conn.RemoteAddr().String()+": "+val+"\n")
- }
- if level >= common.INFO {
- atomic.AddUint64(&cdc.hsv.ioErrorCount, 1)
- }
- return errors.New(val)
-}
-
-func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
- hdr := common.HrpcRequestHeader{}
- if cdc.lg.TraceEnabled() {
- cdc.lg.Tracef("%s: Reading HRPC request header.\n", cdc.conn.RemoteAddr())
- }
- cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
- err := binary.Read(cdc.conn, binary.LittleEndian, &hdr)
- if err != nil {
- if err == io.EOF && cdc.numHandled > 0 {
- return newIoError(cdc, fmt.Sprintf("Remote closed connection "+
- "after writing %d message(s)", cdc.numHandled), common.DEBUG)
- }
- return newIoError(cdc,
- fmt.Sprintf("Error reading request header: %s", err.Error()), common.WARN)
- }
- if cdc.lg.TraceEnabled() {
- cdc.lg.Tracef("%s: Read HRPC request header %s\n",
- cdc.conn.RemoteAddr(), asJson(&hdr))
- }
- if hdr.Magic != common.HRPC_MAGIC {
- return newIoErrorWarn(cdc, fmt.Sprintf("Invalid request header: expected "+
- "magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic))
- }
- if hdr.Length > common.MAX_HRPC_BODY_LENGTH {
- return newIoErrorWarn(cdc, fmt.Sprintf("Length prefix was too long. "+
- "Maximum length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH,
- hdr.Length))
- }
- req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
- if req.ServiceMethod == "" {
- return newIoErrorWarn(cdc, fmt.Sprintf("Unknown MethodID code 0x%04x",
- hdr.MethodId))
- }
- req.Seq = hdr.Seq
- cdc.length = hdr.Length
- return nil
-}
-
-func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
- remoteAddr := cdc.conn.RemoteAddr().String()
- if cdc.lg.TraceEnabled() {
- cdc.lg.Tracef("%s: Reading HRPC %d-byte request body.\n",
- remoteAddr, cdc.length)
- }
- if cap(cdc.buf) < int(cdc.length) {
- var pow uint
- for pow=0;(1<<pow) < int(cdc.length);pow++ {
- }
- cdc.buf = make([]byte, 0, 1<<pow)
- }
- _, err := io.ReadFull(cdc.conn, cdc.buf[:cdc.length])
- if err != nil {
- return newIoErrorWarn(cdc, fmt.Sprintf("Failed to read %d-byte "+
- "request body: %s", cdc.length, err.Error()))
- }
- var zeroTime time.Time
- cdc.conn.SetDeadline(zeroTime)
-
- dec := codec.NewDecoderBytes(cdc.buf[:cdc.length], &cdc.msgpackHandle)
- err = dec.Decode(body)
- if cdc.lg.TraceEnabled() {
- cdc.lg.Tracef("%s: read HRPC message: %s\n",
- remoteAddr, asJson(&body))
- }
- req := body.(*common.WriteSpansReq)
- if req == nil {
- return nil
- }
- // We decode WriteSpans requests in a streaming fashion, to avoid overloading the garbage
- // collector with a ton of trace spans all at once.
- startTime := time.Now()
- client, _, err := net.SplitHostPort(remoteAddr)
- if err != nil {
- return newIoErrorWarn(cdc, fmt.Sprintf("Failed to split host and port "+
- "for %s: %s\n", remoteAddr, err.Error()))
- }
- hand := cdc.hsv.hand
- ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid)
- for spanIdx := 0; spanIdx < req.NumSpans; spanIdx++ {
- var span *common.Span
- err := dec.Decode(&span)
- if err != nil {
- return newIoErrorWarn(cdc, fmt.Sprintf("Failed to decode span %d " +
- "out of %d: %s\n", spanIdx, req.NumSpans, err.Error()))
- }
- ing.IngestSpan(span)
- }
- ing.Close(startTime)
- return nil
-}
-
-var EMPTY []byte = make([]byte, 0)
-
-func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) error {
- cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
- var err error
- buf := EMPTY
- if msg != nil {
- w := bytes.NewBuffer(make([]byte, 0, 128))
- enc := codec.NewEncoder(w, &cdc.msgpackHandle)
- err := enc.Encode(msg)
- if err != nil {
- return newIoErrorWarn(cdc, fmt.Sprintf("Failed to marshal "+
- "response message: %s", err.Error()))
- }
- buf = w.Bytes()
- }
- hdr := common.HrpcResponseHeader{}
- hdr.MethodId = common.HrpcMethodNameToId(resp.ServiceMethod)
- hdr.Seq = resp.Seq
- hdr.ErrLength = uint32(len(resp.Error))
- hdr.Length = uint32(len(buf))
- writer := bufio.NewWriterSize(cdc.conn, 256)
- err = binary.Write(writer, binary.LittleEndian, &hdr)
- if err != nil {
- return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+
- "header: %s", err.Error()))
- }
- if hdr.ErrLength > 0 {
- _, err = io.WriteString(writer, resp.Error)
- if err != nil {
- return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write error "+
- "string: %s", err.Error()))
- }
- }
- if hdr.Length > 0 {
- var length int
- length, err = writer.Write(buf)
- if err != nil {
- return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+
- "message: %s", err.Error()))
- }
- if uint32(length) != hdr.Length {
- return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write all of "+
- "response message: %s", err.Error()))
- }
- }
- err = writer.Flush()
- if err != nil {
- return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write the response "+
- "bytes: %s", err.Error()))
- }
- cdc.numHandled++
- return nil
-}
-
-func (cdc *HrpcServerCodec) Close() error {
- err := cdc.conn.Close()
- cdc.conn = nil
- cdc.length = 0
- cdc.numHandled = 0
- cdc.hsv.cdcs <- cdc
- return err
-}
-
-func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
- resp *common.WriteSpansResp) (err error) {
- // Nothing to do here; WriteSpans is handled in ReadRequestBody.
- return nil
-}
-
-func CreateHrpcServer(cnf *conf.Config, store *dataStore,
- testHooks *hrpcTestHooks) (*HrpcServer, error) {
- lg := common.NewLogger("hrpc", cnf)
- numHandlers := cnf.GetInt(conf.HTRACE_NUM_HRPC_HANDLERS)
- if numHandlers < 1 {
- lg.Warnf("%s must be positive: using 1 handler.\n", conf.HTRACE_NUM_HRPC_HANDLERS)
- numHandlers = 1
- }
- if numHandlers > MAX_HRPC_HANDLERS {
- lg.Warnf("%s cannot be more than %d: using %d handlers\n",
- conf.HTRACE_NUM_HRPC_HANDLERS, MAX_HRPC_HANDLERS, MAX_HRPC_HANDLERS)
- numHandlers = MAX_HRPC_HANDLERS
- }
- hsv := &HrpcServer{
- Server: rpc.NewServer(),
- hand: &HrpcHandler{
- lg: lg,
- store: store,
- },
- cdcs: make(chan *HrpcServerCodec, numHandlers),
- shutdown: make(chan interface{}),
- ioTimeo: time.Millisecond *
- time.Duration(cnf.GetInt64(conf.HTRACE_HRPC_IO_TIMEOUT_MS)),
- testHooks: testHooks,
- }
- for i := 0; i < numHandlers; i++ {
- hsv.cdcs <- &HrpcServerCodec{
- lg: lg,
- hsv: hsv,
- msgpackHandle: codec.MsgpackHandle {
- WriteExt: true,
- },
- }
- }
- var err error
- hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS))
- if err != nil {
- return nil, err
- }
- hsv.Server.Register(hsv.hand)
- hsv.exited.Add(1)
- go hsv.run()
- lg.Infof("Started HRPC server on %s with %d handler routines. "+
- "ioTimeo=%s.\n", hsv.listener.Addr().String(), numHandlers,
- hsv.ioTimeo.String())
- return hsv, nil
-}
-
-func (hsv *HrpcServer) run() {
- lg := hsv.hand.lg
- srvAddr := hsv.listener.Addr().String()
- defer func() {
- lg.Infof("HrpcServer on %s exiting\n", srvAddr)
- hsv.exited.Done()
- }()
- for {
- select {
- case cdc := <-hsv.cdcs:
- conn, err := hsv.listener.Accept()
- if err != nil {
- lg.Errorf("HrpcServer on %s got accept error: %s\n", srvAddr, err.Error())
- hsv.cdcs <- cdc // never blocks; there is always sufficient buffer space
- continue
- }
- if lg.TraceEnabled() {
- lg.Tracef("%s: Accepted HRPC connection.\n", conn.RemoteAddr())
- }
- cdc.conn = conn
- cdc.numHandled = 0
- if hsv.testHooks != nil && hsv.testHooks.HandleAdmission != nil {
- hsv.testHooks.HandleAdmission()
- }
- go hsv.ServeCodec(cdc)
- case <-hsv.shutdown:
- return
- }
- }
-}
-
-func (hsv *HrpcServer) Addr() net.Addr {
- return hsv.listener.Addr()
-}
-
-func (hsv *HrpcServer) GetNumIoErrors() uint64 {
- return atomic.LoadUint64(&hsv.ioErrorCount)
-}
-
-func (hsv *HrpcServer) Close() {
- close(hsv.shutdown)
- hsv.listener.Close()
- hsv.exited.Wait()
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
deleted file mode 100644
index 35ee753..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "bufio"
- "encoding/json"
- "fmt"
- "github.com/alecthomas/kingpin"
- "github.com/jmhodges/levigo"
- "net"
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
- "os"
- "runtime"
- "time"
-)
-
-var RELEASE_VERSION string
-var GIT_VERSION string
-
-const USAGE = `htraced: the HTrace server daemon.
-
-htraced receives trace spans sent from HTrace clients. It exposes a REST
-interface which others can query. It also runs a web server with a graphical
-user interface. htraced stores its span data in levelDB files on the local
-disks.
-
-Usage:
---help: this help message
-
--Dk=v: set configuration key 'k' to value 'v'
-For example -Dweb.address=127.0.0.1:8080 sets the web address to localhost,
-port 8080. -Dlog.level=DEBUG will set the default log level to DEBUG.
-
--Dk: set configuration key 'k' to 'true'
-
-Normally, configuration options should be set in the ` + conf.CONFIG_FILE_NAME + `
-configuration file. We find this file by searching the paths in the
-` + conf.HTRACED_CONF_DIR + `. The command-line options are just an alternate way
-of setting configuration when launching the daemon.
-`
-
-func main() {
- // Load the htraced configuration.
- // This also parses the -Dfoo=bar command line arguments and removes them
- // from os.Argv.
- cnf, cnfLog := conf.LoadApplicationConfig("htraced.")
-
- // Parse the remaining command-line arguments.
- app := kingpin.New(os.Args[0], USAGE)
- version := app.Command("version", "Print server version and exit.")
- cmd := kingpin.MustParse(app.Parse(os.Args[1:]))
-
- // Handle the "version" command-line argument.
- if cmd == version.FullCommand() {
- fmt.Printf("Running htraced %s [%s].\n", RELEASE_VERSION, GIT_VERSION)
- os.Exit(0)
- }
-
- // Open the HTTP port.
- // We want to do this first, before initializing the datastore or setting up
- // logging. That way, if someone accidentally starts two daemons with the
- // same config file, the second invocation will exit with a "port in use"
- // error rather than potentially disrupting the first invocation.
- rstListener, listenErr := net.Listen("tcp", cnf.Get(conf.HTRACE_WEB_ADDRESS))
- if listenErr != nil {
- fmt.Fprintf(os.Stderr, "Error opening HTTP port: %s\n",
- listenErr.Error())
- os.Exit(1)
- }
-
- // Print out the startup banner and information about the daemon
- // configuration.
- lg := common.NewLogger("main", cnf)
- defer lg.Close()
- lg.Infof("*** Starting htraced %s [%s]***\n", RELEASE_VERSION, GIT_VERSION)
- scanner := bufio.NewScanner(cnfLog)
- for scanner.Scan() {
- lg.Infof(scanner.Text() + "\n")
- }
- common.InstallSignalHandlers(cnf)
- if runtime.GOMAXPROCS(0) == 1 {
- ncpu := runtime.NumCPU()
- runtime.GOMAXPROCS(ncpu)
- lg.Infof("setting GOMAXPROCS=%d\n", ncpu)
- } else {
- lg.Infof("GOMAXPROCS=%d\n", runtime.GOMAXPROCS(0))
- }
- lg.Infof("leveldb version=%d.%d\n",
- levigo.GetLevelDBMajorVersion(), levigo.GetLevelDBMinorVersion())
-
- // Initialize the datastore.
- store, err := CreateDataStore(cnf, nil)
- if err != nil {
- lg.Errorf("Error creating datastore: %s\n", err.Error())
- os.Exit(1)
- }
- var rsv *RestServer
- rsv, err = CreateRestServer(cnf, store, rstListener)
- if err != nil {
- lg.Errorf("Error creating REST server: %s\n", err.Error())
- os.Exit(1)
- }
- var hsv *HrpcServer
- if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
- hsv, err = CreateHrpcServer(cnf, store, nil)
- if err != nil {
- lg.Errorf("Error creating HRPC server: %s\n", err.Error())
- os.Exit(1)
- }
- } else {
- lg.Infof("Not starting HRPC server because no value was given for %s.\n",
- conf.HTRACE_HRPC_ADDRESS)
- }
- naddr := cnf.Get(conf.HTRACE_STARTUP_NOTIFICATION_ADDRESS)
- if naddr != "" {
- notif := StartupNotification{
- HttpAddr: rsv.Addr().String(),
- ProcessId: os.Getpid(),
- }
- if hsv != nil {
- notif.HrpcAddr = hsv.Addr().String()
- }
- err = sendStartupNotification(naddr, ¬if)
- if err != nil {
- fmt.Fprintf(os.Stderr, "Failed to send startup notification: "+
- "%s\n", err.Error())
- os.Exit(1)
- }
- }
- for {
- time.Sleep(time.Duration(10) * time.Hour)
- }
-}
-
-// A startup notification message that we optionally send on startup.
-// Used by unit tests.
-type StartupNotification struct {
- HttpAddr string
- HrpcAddr string
- ProcessId int
-}
-
-func sendStartupNotification(naddr string, notif *StartupNotification) error {
- conn, err := net.Dial("tcp", naddr)
- if err != nil {
- return err
- }
- defer func() {
- if conn != nil {
- conn.Close()
- }
- }()
- var buf []byte
- buf, err = json.Marshal(notif)
- if err != nil {
- return err
- }
- _, err = conn.Write(buf)
- conn.Close()
- conn = nil
- return nil
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/loader.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/loader.go b/htrace-htraced/go/src/org/apache/htrace/htraced/loader.go
deleted file mode 100644
index 5914004..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/loader.go
+++ /dev/null
@@ -1,511 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "bytes"
- "errors"
- "fmt"
- "github.com/jmhodges/levigo"
- "github.com/ugorji/go/codec"
- "io"
- "math"
- "math/rand"
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
- "os"
- "strings"
- "syscall"
- "time"
-)
-
-// Routines for loading the datastore.
-
-// The leveldb key which has information about the shard.
-const SHARD_INFO_KEY = 'w'
-
-// A constant signifying that we don't know what the layout version is.
-const UNKNOWN_LAYOUT_VERSION = 0
-
-// The current layout version. We cannot read layout versions newer than this.
-// We may sometimes be able to read older versions, but only by doing an
-// upgrade.
-const CURRENT_LAYOUT_VERSION = 3
-
-type DataStoreLoader struct {
- // The dataStore logger.
- lg *common.Logger
-
- // True if we should clear the stored data.
- ClearStored bool
-
- // The shards that we're loading
- shards []*ShardLoader
-
- // The options to use for opening datastores in LevelDB.
- openOpts *levigo.Options
-
- // The read options to use for LevelDB.
- readOpts *levigo.ReadOptions
-
- // The write options to use for LevelDB.
- writeOpts *levigo.WriteOptions
-}
-
-// Information about a Shard.
-type ShardInfo struct {
- // The layout version of the datastore.
- // We should always keep this field so that old software can recognize new
- // layout versions, even if it can't read them.
- LayoutVersion uint64
-
- // A random number identifying this daemon.
- DaemonId uint64
-
- // The total number of shards in this datastore.
- TotalShards uint32
-
- // The index of this shard within the datastore.
- ShardIndex uint32
-}
-
-// Create a new datastore loader.
-// Initializes the loader, but does not load any leveldb instances.
-func NewDataStoreLoader(cnf *conf.Config) *DataStoreLoader {
- dld := &DataStoreLoader{
- lg: common.NewLogger("datastore", cnf),
- ClearStored: cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR),
- }
- dld.readOpts = levigo.NewReadOptions()
- dld.readOpts.SetFillCache(true)
- dld.readOpts.SetVerifyChecksums(false)
- dld.writeOpts = levigo.NewWriteOptions()
- dld.writeOpts.SetSync(false)
- dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES)
- rdirs := strings.Split(dirsStr, conf.PATH_LIST_SEP)
- // Filter out empty entries
- dirs := make([]string, 0, len(rdirs))
- for i := range(rdirs) {
- if strings.TrimSpace(rdirs[i]) != "" {
- dirs = append(dirs, rdirs[i])
- }
- }
- dld.shards = make([]*ShardLoader, len(dirs))
- for i := range(dirs) {
- dld.shards[i] = &ShardLoader{
- dld: dld,
- path: dirs[i] + conf.PATH_SEP + "db",
- }
- }
- dld.openOpts = levigo.NewOptions()
- cacheSize := cnf.GetInt(conf.HTRACE_LEVELDB_CACHE_SIZE)
- dld.openOpts.SetCache(levigo.NewLRUCache(cacheSize))
- dld.openOpts.SetParanoidChecks(false)
- writeBufferSize := cnf.GetInt(conf.HTRACE_LEVELDB_WRITE_BUFFER_SIZE)
- if writeBufferSize > 0 {
- dld.openOpts.SetWriteBufferSize(writeBufferSize)
- }
- maxFdPerShard := dld.calculateMaxOpenFilesPerShard()
- if maxFdPerShard > 0 {
- dld.openOpts.SetMaxOpenFiles(maxFdPerShard)
- }
- return dld
-}
-
-func (dld *DataStoreLoader) Close() {
- if dld.lg != nil {
- dld.lg.Close()
- dld.lg = nil
- }
- if dld.openOpts != nil {
- dld.openOpts.Close()
- dld.openOpts = nil
- }
- if dld.readOpts != nil {
- dld.readOpts.Close()
- dld.readOpts = nil
- }
- if dld.writeOpts != nil {
- dld.writeOpts.Close()
- dld.writeOpts = nil
- }
- if dld.shards != nil {
- for i := range(dld.shards) {
- if dld.shards[i] != nil {
- dld.shards[i].Close()
- }
- }
- dld.shards = nil
- }
-}
-
-func (dld *DataStoreLoader) DisownResources() {
- dld.lg = nil
- dld.openOpts = nil
- dld.readOpts = nil
- dld.writeOpts = nil
- dld.shards = nil
-}
-
-// The maximum number of file descriptors we'll use on non-datastore things.
-const NON_DATASTORE_FD_MAX = 300
-
-// The minimum number of file descriptors per shard we will set. Setting fewer
-// than this number could trigger a bug in some early versions of leveldb.
-const MIN_FDS_PER_SHARD = 80
-
-func (dld *DataStoreLoader) calculateMaxOpenFilesPerShard() int {
- var rlim syscall.Rlimit
- err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlim)
- if err != nil {
- dld.lg.Warnf("Unable to calculate maximum open files per shard: " +
- "getrlimit failed: %s\n", err.Error())
- return 0
- }
- // I think RLIMIT_NOFILE fits in 32 bits on all known operating systems,
- // but there's no harm in being careful. 'int' in golang always holds at
- // least 32 bits.
- var maxFd int
- if rlim.Cur > uint64(math.MaxInt32) {
- maxFd = math.MaxInt32
- } else {
- maxFd = int(rlim.Cur)
- }
- if len(dld.shards) == 0 {
- dld.lg.Warnf("Unable to calculate maximum open files per shard, " +
- "since there are 0 shards configured.\n")
- return 0
- }
- fdsPerShard := (maxFd - NON_DATASTORE_FD_MAX) / len(dld.shards)
- if fdsPerShard < MIN_FDS_PER_SHARD {
- dld.lg.Warnf("Expected to be able to use at least %d " +
- "fds per shard, but we have %d shards and %d total fds to allocate, " +
- "giving us only %d FDs per shard.", MIN_FDS_PER_SHARD,
- len(dld.shards), maxFd - NON_DATASTORE_FD_MAX, fdsPerShard)
- return 0
- }
- dld.lg.Infof("maxFd = %d. Setting maxFdPerShard = %d\n",
- maxFd, fdsPerShard)
- return fdsPerShard
-}
-
-// Load information about all shards.
-func (dld *DataStoreLoader) LoadShards() {
- for i := range(dld.shards) {
- shd := dld.shards[i]
- shd.load()
- }
-}
-
-// Verify that the shard infos are consistent.
-// Reorders the shardInfo structures based on their ShardIndex.
-func (dld *DataStoreLoader) VerifyShardInfos() error {
- if len(dld.shards) < 1 {
- return errors.New("No shard directories found.")
- }
- // Make sure no shards had errors.
- for i := range(dld.shards) {
- shd := dld.shards[i]
- if shd.infoErr != nil {
- return shd.infoErr
- }
- }
- // Make sure that if any shards are empty, all shards are empty.
- emptyShards := ""
- prefix := ""
- for i := range(dld.shards) {
- if dld.shards[i].info == nil {
- emptyShards = prefix + dld.shards[i].path
- prefix = ", "
- }
- }
- if emptyShards != "" {
- for i := range(dld.shards) {
- if dld.shards[i].info != nil {
- return errors.New(fmt.Sprintf("Shards %s were empty, but " +
- "the other shards had data.", emptyShards))
- }
- }
- // All shards are empty.
- return nil
- }
- // Make sure that all shards have the same layout version, daemonId, and number of total
- // shards.
- layoutVersion := dld.shards[0].info.LayoutVersion
- daemonId := dld.shards[0].info.DaemonId
- totalShards := dld.shards[0].info.TotalShards
- for i := 1; i < len(dld.shards); i++ {
- shd := dld.shards[i]
- if layoutVersion != shd.info.LayoutVersion {
- return errors.New(fmt.Sprintf("Layout version mismatch. Shard " +
- "%s has layout version 0x%016x, but shard %s has layout " +
- "version 0x%016x.",
- dld.shards[0].path, layoutVersion, shd.path, shd.info.LayoutVersion))
- }
- if daemonId != shd.info.DaemonId {
- return errors.New(fmt.Sprintf("DaemonId mismatch. Shard %s has " +
- "daemonId 0x%016x, but shard %s has daemonId 0x%016x.",
- dld.shards[0].path, daemonId, shd.path, shd.info.DaemonId))
- }
- if totalShards != shd.info.TotalShards {
- return errors.New(fmt.Sprintf("TotalShards mismatch. Shard %s has " +
- "TotalShards = %d, but shard %s has TotalShards = %d.",
- dld.shards[0].path, totalShards, shd.path, shd.info.TotalShards))
- }
- if shd.info.ShardIndex >= totalShards {
- return errors.New(fmt.Sprintf("Invalid ShardIndex. Shard %s has " +
- "ShardIndex = %d, but TotalShards = %d.",
- shd.path, shd.info.ShardIndex, shd.info.TotalShards))
- }
- }
- if layoutVersion != CURRENT_LAYOUT_VERSION {
- return errors.New(fmt.Sprintf("The layout version of all shards " +
- "is %d, but we only support version %d.",
- layoutVersion, CURRENT_LAYOUT_VERSION))
- }
- if totalShards != uint32(len(dld.shards)) {
- return errors.New(fmt.Sprintf("The TotalShards field of all shards " +
- "is %d, but we have %d shards.", totalShards, len(dld.shards)))
- }
- // Reorder shards in order of their ShardIndex.
- reorderedShards := make([]*ShardLoader, len(dld.shards))
- for i := 0; i < len(dld.shards); i++ {
- shd := dld.shards[i]
- shardIdx := shd.info.ShardIndex
- if reorderedShards[shardIdx] != nil {
- return errors.New(fmt.Sprintf("Both shard %s and " +
- "shard %s have ShardIndex %d.", shd.path,
- reorderedShards[shardIdx].path, shardIdx))
- }
- reorderedShards[shardIdx] = shd
- }
- dld.shards = reorderedShards
- return nil
-}
-
-func (dld *DataStoreLoader) Load() error {
- var err error
- // If data.store.clear was set, clear existing data.
- if dld.ClearStored {
- err = dld.clearStored()
- if err != nil {
- return err
- }
- }
- // Make sure the shard directories exist in all cases, with a mkdir -p
- for i := range dld.shards {
- err := os.MkdirAll(dld.shards[i].path, 0777)
- if err != nil {
- return errors.New(fmt.Sprintf("Failed to MkdirAll(%s): %s",
- dld.shards[i].path, err.Error()))
- }
- }
- // Get information about each shard, and verify them.
- dld.LoadShards()
- err = dld.VerifyShardInfos()
- if err != nil {
- return err
- }
- if dld.shards[0].ldb != nil {
- dld.lg.Infof("Loaded %d leveldb instances with " +
- "DaemonId of 0x%016x\n", len(dld.shards),
- dld.shards[0].info.DaemonId)
- } else {
- // Create leveldb instances if needed.
- rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
- daemonId := uint64(rnd.Int63())
- dld.lg.Infof("Initializing %d leveldb instances with a new " +
- "DaemonId of 0x%016x\n", len(dld.shards), daemonId)
- dld.openOpts.SetCreateIfMissing(true)
- for i := range(dld.shards) {
- shd := dld.shards[i]
- shd.ldb, err = levigo.Open(shd.path, shd.dld.openOpts)
- if err != nil {
- return errors.New(fmt.Sprintf("levigo.Open(%s) failed to " +
- "create the shard: %s", shd.path, err.Error()))
- }
- info := &ShardInfo {
- LayoutVersion: CURRENT_LAYOUT_VERSION,
- DaemonId: daemonId,
- TotalShards: uint32(len(dld.shards)),
- ShardIndex: uint32(i),
- }
- err = shd.writeShardInfo(info)
- if err != nil {
- return errors.New(fmt.Sprintf("levigo.Open(%s) failed to " +
- "write shard info: %s", shd.path, err.Error()))
- }
- dld.lg.Infof("Shard %s initialized with ShardInfo %s \n",
- shd.path, asJson(info))
- }
- }
- return nil
-}
-
-func (dld *DataStoreLoader) clearStored() error {
- for i := range dld.shards {
- path := dld.shards[i].path
- fi, err := os.Stat(path)
- if err != nil && !os.IsNotExist(err) {
- dld.lg.Errorf("Failed to stat %s: %s\n", path, err.Error())
- return err
- }
- if fi != nil {
- err = os.RemoveAll(path)
- if err != nil {
- dld.lg.Errorf("Failed to clear existing datastore directory %s: %s\n",
- path, err.Error())
- return err
- }
- dld.lg.Infof("Cleared existing datastore directory %s\n", path)
- }
- }
- return nil
-}
-
-type ShardLoader struct {
- // The parent DataStoreLoader
- dld *DataStoreLoader
-
- // Path to the shard
- path string
-
- // Leveldb instance of the shard
- ldb *levigo.DB
-
- // Information about the shard
- info *ShardInfo
-
- // If non-null, the error we encountered trying to load the shard info.
- infoErr error
-}
-
-func (shd *ShardLoader) Close() {
- if shd.ldb != nil {
- shd.ldb.Close()
- shd.ldb = nil
- }
-}
-
-// Load information about a particular shard.
-func (shd *ShardLoader) load() {
- shd.info = nil
- fi, err := os.Stat(shd.path)
- if err != nil {
- if os.IsNotExist(err) {
- shd.infoErr = nil
- return
- }
- shd.infoErr = errors.New(fmt.Sprintf(
- "stat() error on leveldb directory " +
- "%s: %s", shd.path, err.Error()))
- return
- }
- if !fi.Mode().IsDir() {
- shd.infoErr = errors.New(fmt.Sprintf(
- "stat() error on leveldb directory " +
- "%s: inode is not directory.", shd.path))
- return
- }
- var dbDir *os.File
- dbDir, err = os.Open(shd.path)
- if err != nil {
- shd.infoErr = errors.New(fmt.Sprintf(
- "open() error on leveldb directory " +
- "%s: %s.", shd.path, err.Error()))
- return
- }
- defer func() {
- if dbDir != nil {
- dbDir.Close()
- }
- }()
- _, err = dbDir.Readdirnames(1)
- if err != nil {
- if err == io.EOF {
- // The db directory is empty.
- shd.infoErr = nil
- return
- }
- shd.infoErr = errors.New(fmt.Sprintf(
- "Readdirnames() error on leveldb directory " +
- "%s: %s.", shd.path, err.Error()))
- return
- }
- dbDir.Close()
- dbDir = nil
- shd.ldb, err = levigo.Open(shd.path, shd.dld.openOpts)
- if err != nil {
- shd.ldb = nil
- shd.infoErr = errors.New(fmt.Sprintf(
- "levigo.Open() error on leveldb directory " +
- "%s: %s.", shd.path, err.Error()))
- return
- }
- shd.info, err = shd.readShardInfo()
- if err != nil {
- shd.infoErr = err
- return
- }
- shd.infoErr = nil
-}
-
-func (shd *ShardLoader) readShardInfo() (*ShardInfo, error) {
- buf, err := shd.ldb.Get(shd.dld.readOpts, []byte{SHARD_INFO_KEY})
- if err != nil {
- return nil, errors.New(fmt.Sprintf("readShardInfo(%s): failed to " +
- "read shard info key: %s", shd.path, err.Error()))
- }
- if len(buf) == 0 {
- return nil, errors.New(fmt.Sprintf("readShardInfo(%s): got zero-" +
- "length value for shard info key.", shd.path))
- }
- mh := new(codec.MsgpackHandle)
- mh.WriteExt = true
- r := bytes.NewBuffer(buf)
- decoder := codec.NewDecoder(r, mh)
- shardInfo := &ShardInfo {
- LayoutVersion: UNKNOWN_LAYOUT_VERSION,
- }
- err = decoder.Decode(shardInfo)
- if err != nil {
- return nil, errors.New(fmt.Sprintf("readShardInfo(%s): msgpack " +
- "decoding failed for shard info key: %s", shd.path, err.Error()))
- }
- return shardInfo, nil
-}
-
-func (shd *ShardLoader) writeShardInfo(info *ShardInfo) error {
- mh := new(codec.MsgpackHandle)
- mh.WriteExt = true
- w := new(bytes.Buffer)
- enc := codec.NewEncoder(w, mh)
- err := enc.Encode(info)
- if err != nil {
- return errors.New(fmt.Sprintf("msgpack encoding error: %s",
- err.Error()))
- }
- err = shd.ldb.Put(shd.dld.writeOpts, []byte{SHARD_INFO_KEY}, w.Bytes())
- if err != nil {
- return errors.New(fmt.Sprintf("leveldb write error: %s",
- err.Error()))
- }
- return nil
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
deleted file mode 100644
index 9176de0..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "math"
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
- "sync"
- "time"
-)
-
-//
-// The Metrics Sink for HTraced.
-//
-// The Metrics sink keeps track of metrics for the htraced daemon.
-// It is important to have good metrics so that we can properly manager htraced. In particular, we
-// need to know what rate we are receiving spans at, the main places spans came from. If spans
-// were dropped because of a high sampling rates, we need to know which part of the system dropped
-// them so that we can adjust the sampling rate there.
-//
-
-const LATENCY_CIRC_BUF_SIZE = 4096
-
-type MetricsSink struct {
- // The metrics sink logger.
- lg *common.Logger
-
- // The maximum number of entries we shuld allow in the HostSpanMetrics map.
- maxMtx int
-
- // The total number of spans ingested by the server (counting dropped spans)
- IngestedSpans uint64
-
- // The total number of spans written to leveldb since the server started.
- WrittenSpans uint64
-
- // The total number of spans dropped by the server.
- ServerDropped uint64
-
- // Per-host Span Metrics
- HostSpanMetrics common.SpanMetricsMap
-
- // The last few writeSpan latencies
- wsLatencyCircBuf *CircBufU32
-
- // Lock protecting all metrics
- lock sync.Mutex
-}
-
-func NewMetricsSink(cnf *conf.Config) *MetricsSink {
- return &MetricsSink{
- lg: common.NewLogger("metrics", cnf),
- maxMtx: cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES),
- HostSpanMetrics: make(common.SpanMetricsMap),
- wsLatencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE),
- }
-}
-
-// Update the total number of spans which were ingested, as well as other
-// metrics that get updated during span ingest.
-func (msink *MetricsSink) UpdateIngested(addr string, totalIngested int,
- serverDropped int, wsLatency time.Duration) {
- msink.lock.Lock()
- defer msink.lock.Unlock()
- msink.IngestedSpans += uint64(totalIngested)
- msink.ServerDropped += uint64(serverDropped)
- msink.updateSpanMetrics(addr, 0, serverDropped)
- wsLatencyMs := wsLatency.Nanoseconds() / 1000000
- var wsLatency32 uint32
- if wsLatencyMs > math.MaxUint32 {
- wsLatency32 = math.MaxUint32
- } else {
- wsLatency32 = uint32(wsLatencyMs)
- }
- msink.wsLatencyCircBuf.Append(wsLatency32)
-}
-
-// Update the per-host span metrics. Must be called with the lock held.
-func (msink *MetricsSink) updateSpanMetrics(addr string, numWritten int,
- serverDropped int) {
- mtx, found := msink.HostSpanMetrics[addr]
- if !found {
- // Ensure that the per-host span metrics map doesn't grow too large.
- if len(msink.HostSpanMetrics) >= msink.maxMtx {
- // Delete a random entry
- for k := range msink.HostSpanMetrics {
- msink.lg.Warnf("Evicting metrics entry for addr %s "+
- "because there are more than %d addrs.\n", k, msink.maxMtx)
- delete(msink.HostSpanMetrics, k)
- break
- }
- }
- mtx = &common.SpanMetrics{}
- msink.HostSpanMetrics[addr] = mtx
- }
- mtx.Written += uint64(numWritten)
- mtx.ServerDropped += uint64(serverDropped)
-}
-
-// Update the total number of spans which were persisted to disk.
-func (msink *MetricsSink) UpdatePersisted(addr string, totalWritten int,
- serverDropped int) {
- msink.lock.Lock()
- defer msink.lock.Unlock()
- msink.WrittenSpans += uint64(totalWritten)
- msink.ServerDropped += uint64(serverDropped)
- msink.updateSpanMetrics(addr, totalWritten, serverDropped)
-}
-
-// Read the server stats.
-func (msink *MetricsSink) PopulateServerStats(stats *common.ServerStats) {
- msink.lock.Lock()
- defer msink.lock.Unlock()
- stats.IngestedSpans = msink.IngestedSpans
- stats.WrittenSpans = msink.WrittenSpans
- stats.ServerDroppedSpans = msink.ServerDropped
- stats.MaxWriteSpansLatencyMs = msink.wsLatencyCircBuf.Max()
- stats.AverageWriteSpansLatencyMs = msink.wsLatencyCircBuf.Average()
- stats.HostSpanMetrics = make(common.SpanMetricsMap)
- for k, v := range msink.HostSpanMetrics {
- stats.HostSpanMetrics[k] = &common.SpanMetrics{
- Written: v.Written,
- ServerDropped: v.ServerDropped,
- }
- }
-}
-
-// A circular buffer of uint32s which supports appending and taking the
-// average, and some other things.
-type CircBufU32 struct {
- // The next slot to fill
- slot int
-
- // The number of slots which are in use. This number only ever
- // increases until the buffer is full.
- slotsUsed int
-
- // The buffer
- buf []uint32
-}
-
-func NewCircBufU32(size int) *CircBufU32 {
- return &CircBufU32{
- slotsUsed: -1,
- buf: make([]uint32, size),
- }
-}
-
-func (cbuf *CircBufU32) Max() uint32 {
- var max uint32
- for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ {
- if cbuf.buf[bufIdx] > max {
- max = cbuf.buf[bufIdx]
- }
- }
- return max
-}
-
-func (cbuf *CircBufU32) Average() uint32 {
- var total uint64
- for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ {
- total += uint64(cbuf.buf[bufIdx])
- }
- return uint32(total / uint64(cbuf.slotsUsed))
-}
-
-func (cbuf *CircBufU32) Append(val uint32) {
- cbuf.buf[cbuf.slot] = val
- cbuf.slot++
- if cbuf.slotsUsed < cbuf.slot {
- cbuf.slotsUsed = cbuf.slot
- }
- if cbuf.slot >= len(cbuf.buf) {
- cbuf.slot = 0
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
deleted file mode 100644
index 6daf640..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "fmt"
- htrace "org/apache/htrace/client"
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
- "reflect"
- "testing"
- "time"
-)
-
-func compareTotals(a, b common.SpanMetricsMap) bool {
- for k, v := range a {
- if !reflect.DeepEqual(v, b[k]) {
- return false
- }
- }
- for k, v := range b {
- if !reflect.DeepEqual(v, a[k]) {
- return false
- }
- }
- return true
-}
-
-type Fatalfer interface {
- Fatalf(format string, args ...interface{})
-}
-
-func assertNumWrittenEquals(t Fatalfer, msink *MetricsSink,
- expectedNumWritten int) {
- var sstats common.ServerStats
- msink.PopulateServerStats(&sstats)
- if sstats.WrittenSpans != uint64(expectedNumWritten) {
- t.Fatalf("sstats.WrittenSpans = %d, but expected %d\n",
- sstats.WrittenSpans, len(SIMPLE_TEST_SPANS))
- }
- if sstats.HostSpanMetrics["127.0.0.1"] == nil {
- t.Fatalf("no entry for sstats.HostSpanMetrics[127.0.0.1] found.")
- }
- if sstats.HostSpanMetrics["127.0.0.1"].Written !=
- uint64(expectedNumWritten) {
- t.Fatalf("sstats.HostSpanMetrics[127.0.0.1].Written = %d, but "+
- "expected %d\n", sstats.HostSpanMetrics["127.0.0.1"].Written,
- len(SIMPLE_TEST_SPANS))
- }
-}
-
-func TestMetricsSinkPerHostEviction(t *testing.T) {
- cnfBld := conf.Builder{
- Values: conf.TEST_VALUES(),
- Defaults: conf.DEFAULTS,
- }
- cnfBld.Values[conf.HTRACE_METRICS_MAX_ADDR_ENTRIES] = "2"
- cnf, err := cnfBld.Build()
- if err != nil {
- t.Fatalf("failed to create conf: %s", err.Error())
- }
- msink := NewMetricsSink(cnf)
- msink.UpdatePersisted("192.168.0.100", 20, 10)
- msink.UpdatePersisted("192.168.0.101", 20, 10)
- msink.UpdatePersisted("192.168.0.102", 20, 10)
- msink.lock.Lock()
- defer msink.lock.Unlock()
- if len(msink.HostSpanMetrics) != 2 {
- for k, v := range msink.HostSpanMetrics {
- fmt.Printf("WATERMELON: [%s] = [%s]\n", k, v)
- }
- t.Fatalf("Expected len(msink.HostSpanMetrics) to be 2, but got %d\n",
- len(msink.HostSpanMetrics))
- }
-}
-
-func TestIngestedSpansMetricsRest(t *testing.T) {
- testIngestedSpansMetricsImpl(t, false)
-}
-
-func TestIngestedSpansMetricsPacked(t *testing.T) {
- testIngestedSpansMetricsImpl(t, true)
-}
-
-func testIngestedSpansMetricsImpl(t *testing.T, usePacked bool) {
- htraceBld := &MiniHTracedBuilder{Name: "TestIngestedSpansMetrics",
- DataDirs: make([]string, 2),
- }
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
- defer ht.Close()
- var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf(), &htrace.TestHooks{
- HrpcDisabled: !usePacked,
- })
- if err != nil {
- t.Fatalf("failed to create client: %s", err.Error())
- }
-
- NUM_TEST_SPANS := 12
- allSpans := createRandomTestSpans(NUM_TEST_SPANS)
- err = hcl.WriteSpans(allSpans)
- if err != nil {
- t.Fatalf("WriteSpans failed: %s\n", err.Error())
- }
- for {
- var stats *common.ServerStats
- stats, err = hcl.GetServerStats()
- if err != nil {
- t.Fatalf("GetServerStats failed: %s\n", err.Error())
- }
- if stats.IngestedSpans == uint64(NUM_TEST_SPANS) {
- break
- }
- time.Sleep(1 * time.Millisecond)
- }
-}
-
-func TestCircBuf32(t *testing.T) {
- cbuf := NewCircBufU32(3)
- // We arbitrarily define that empty circular buffers have an average of 0.
- if cbuf.Average() != 0 {
- t.Fatalf("expected empty CircBufU32 to have an average of 0.\n")
- }
- if cbuf.Max() != 0 {
- t.Fatalf("expected empty CircBufU32 to have a max of 0.\n")
- }
- cbuf.Append(2)
- if cbuf.Average() != 2 {
- t.Fatalf("expected one-element CircBufU32 to have an average of 2.\n")
- }
- cbuf.Append(10)
- if cbuf.Average() != 6 {
- t.Fatalf("expected two-element CircBufU32 to have an average of 6.\n")
- }
- cbuf.Append(12)
- if cbuf.Average() != 8 {
- t.Fatalf("expected three-element CircBufU32 to have an average of 8.\n")
- }
- cbuf.Append(14)
- // The 14 overwrites the original 2 element.
- if cbuf.Average() != 12 {
- t.Fatalf("expected three-element CircBufU32 to have an average of 12.\n")
- }
- cbuf.Append(1)
- // The 1 overwrites the original 10 element.
- if cbuf.Average() != 9 {
- t.Fatalf("expected three-element CircBufU32 to have an average of 12.\n")
- }
- if cbuf.Max() != 14 {
- t.Fatalf("expected three-element CircBufU32 to have a max of 14.\n")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
deleted file mode 100644
index cf7ef67..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "fmt"
- "io/ioutil"
- "net"
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
- "os"
- "strings"
-)
-
-//
-// MiniHTraceD is used in unit tests to set up a daemon with certain settings.
-// It takes care of things like creating and cleaning up temporary directories.
-//
-
-// The default number of managed data directories to use.
-const DEFAULT_NUM_DATA_DIRS = 2
-
-// Builds a MiniHTraced object.
-type MiniHTracedBuilder struct {
- // The name of the MiniHTraced to build. This shows up in the test directory name and some
- // other places.
- Name string
-
- // The configuration values to use for the MiniHTraced.
- // If ths is nil, we use the default configuration for everything.
- Cnf map[string]string
-
- // The DataDirs to use. Empty entries will turn into random names.
- DataDirs []string
-
- // If true, we will keep the data dirs around after MiniHTraced#Close
- KeepDataDirsOnClose bool
-
- // If non-null, the WrittenSpans semaphore to use when creating the DataStore.
- WrittenSpans *common.Semaphore
-
- // The test hooks to use for the HRPC server
- HrpcTestHooks *hrpcTestHooks
-}
-
-type MiniHTraced struct {
- Name string
- Cnf *conf.Config
- DataDirs []string
- Store *dataStore
- Rsv *RestServer
- Hsv *HrpcServer
- Lg *common.Logger
- KeepDataDirsOnClose bool
-}
-
-func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) {
- var err error
- var store *dataStore
- var rsv *RestServer
- var hsv *HrpcServer
- if bld.Name == "" {
- bld.Name = "HTraceTest"
- }
- if bld.Cnf == nil {
- bld.Cnf = make(map[string]string)
- }
- if bld.DataDirs == nil {
- bld.DataDirs = make([]string, 2)
- }
- for idx := range bld.DataDirs {
- if bld.DataDirs[idx] == "" {
- bld.DataDirs[idx], err = ioutil.TempDir(os.TempDir(),
- fmt.Sprintf("%s%d", bld.Name, idx+1))
- if err != nil {
- return nil, err
- }
- }
- }
- // Copy the default test configuration values.
- for k, v := range conf.TEST_VALUES() {
- _, hasVal := bld.Cnf[k]
- if !hasVal {
- bld.Cnf[k] = v
- }
- }
- bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] =
- strings.Join(bld.DataDirs, conf.PATH_LIST_SEP)
- cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS}
- cnf, err := cnfBld.Build()
- if err != nil {
- return nil, err
- }
- lg := common.NewLogger("mini.htraced", cnf)
- defer func() {
- if err != nil {
- if store != nil {
- store.Close()
- }
- for idx := range bld.DataDirs {
- if !bld.KeepDataDirsOnClose {
- if bld.DataDirs[idx] != "" {
- os.RemoveAll(bld.DataDirs[idx])
- }
- }
- }
- if rsv != nil {
- rsv.Close()
- }
- lg.Infof("Failed to create MiniHTraced %s: %s\n", bld.Name, err.Error())
- lg.Close()
- }
- }()
- store, err = CreateDataStore(cnf, bld.WrittenSpans)
- if err != nil {
- return nil, err
- }
- rstListener, listenErr := net.Listen("tcp", cnf.Get(conf.HTRACE_WEB_ADDRESS))
- if listenErr != nil {
- return nil, listenErr
- }
- defer func() {
- if rstListener != nil {
- rstListener.Close()
- }
- }()
- rsv, err = CreateRestServer(cnf, store, rstListener)
- if err != nil {
- return nil, err
- }
- rstListener = nil
- hsv, err = CreateHrpcServer(cnf, store, bld.HrpcTestHooks)
- if err != nil {
- return nil, err
- }
-
- lg.Infof("Created MiniHTraced %s\n", bld.Name)
- return &MiniHTraced{
- Name: bld.Name,
- Cnf: cnf,
- DataDirs: bld.DataDirs,
- Store: store,
- Rsv: rsv,
- Hsv: hsv,
- Lg: lg,
- KeepDataDirsOnClose: bld.KeepDataDirsOnClose,
- }, nil
-}
-
-// Return a Config object that clients can use to connect to this MiniHTraceD.
-func (ht *MiniHTraced) ClientConf() *conf.Config {
- return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(),
- conf.HTRACE_HRPC_ADDRESS, ht.Hsv.Addr().String())
-}
-
-// Return a Config object that clients can use to connect to this MiniHTraceD
-// by HTTP only (no HRPC).
-func (ht *MiniHTraced) RestOnlyClientConf() *conf.Config {
- return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(),
- conf.HTRACE_HRPC_ADDRESS, "")
-}
-
-func (ht *MiniHTraced) Close() {
- ht.Lg.Infof("Closing MiniHTraced %s\n", ht.Name)
- ht.Rsv.Close()
- ht.Hsv.Close()
- ht.Store.Close()
- if !ht.KeepDataDirsOnClose {
- for idx := range ht.DataDirs {
- ht.Lg.Infof("Removing %s...\n", ht.DataDirs[idx])
- os.RemoveAll(ht.DataDirs[idx])
- }
- }
- ht.Lg.Infof("Finished closing MiniHTraced %s\n", ht.Name)
- ht.Lg.Close()
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
deleted file mode 100644
index 2d6a76f..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "fmt"
- "math/rand"
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
- "org/apache/htrace/test"
- "testing"
- "time"
-)
-
-func TestReapingOldSpans(t *testing.T) {
- const NUM_TEST_SPANS = 20
- testSpans := make([]*common.Span, NUM_TEST_SPANS)
- rnd := rand.New(rand.NewSource(2))
- now := common.TimeToUnixMs(time.Now().UTC())
- for i := range testSpans {
- testSpans[i] = test.NewRandomSpan(rnd, testSpans[0:i])
- testSpans[i].Begin = now - int64(NUM_TEST_SPANS-1-i)
- testSpans[i].Description = fmt.Sprintf("Span%02d", i)
- }
- htraceBld := &MiniHTracedBuilder{Name: "TestReapingOldSpans",
- Cnf: map[string]string{
- conf.HTRACE_SPAN_EXPIRY_MS: fmt.Sprintf("%d", 60*60*1000),
- conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS: "1",
- conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "1",
- },
- WrittenSpans: common.NewSemaphore(0),
- DataDirs: make([]string, 2),
- }
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create mini htraced cluster: %s\n", err.Error())
- }
- ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "")
- for spanIdx := range testSpans {
- ing.IngestSpan(testSpans[spanIdx])
- }
- ing.Close(time.Now())
- // Wait the spans to be created
- ht.Store.WrittenSpans.Waits(NUM_TEST_SPANS)
- // Set a reaper date that will remove all the spans except final one.
- ht.Store.rpr.SetReaperDate(now)
-
- common.WaitFor(5*time.Minute, time.Millisecond, func() bool {
- for i := 0; i < NUM_TEST_SPANS-1; i++ {
- span := ht.Store.FindSpan(testSpans[i].Id)
- if span != nil {
- ht.Store.lg.Debugf("Waiting for %s to be removed...\n",
- testSpans[i].Description)
- return false
- }
- }
- span := ht.Store.FindSpan(testSpans[NUM_TEST_SPANS-1].Id)
- if span == nil {
- ht.Store.lg.Debugf("Did not expect %s to be removed\n",
- testSpans[NUM_TEST_SPANS-1].Description)
- return false
- }
- return true
- })
- defer ht.Close()
-}