You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by cm...@apache.org on 2015/01/28 01:51:49 UTC
incubator-htrace git commit: HTRACE-88. Add REST query API to htraced
(cmccabe)
Repository: incubator-htrace
Updated Branches:
refs/heads/master 0ca3f8f23 -> af2c084ed
HTRACE-88. Add REST query API to htraced (cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/af2c084e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/af2c084e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/af2c084e
Branch: refs/heads/master
Commit: af2c084ed8fe5af6a4cee350d80f154f9e328aa5
Parents: 0ca3f8f
Author: Colin P. Mccabe <cm...@apache.org>
Authored: Wed Jan 14 19:41:10 2015 -0800
Committer: Colin P. Mccabe <cm...@apache.org>
Committed: Tue Jan 27 16:51:28 2015 -0800
----------------------------------------------------------------------
.../go/src/org/apache/htrace/client/client.go | 19 +
.../go/src/org/apache/htrace/common/query.go | 87 ++++
.../src/go/src/org/apache/htrace/common/span.go | 11 +-
.../src/org/apache/htrace/htraced/datastore.go | 417 ++++++++++++++++++-
.../org/apache/htrace/htraced/datastore_test.go | 155 +++++++
.../go/src/org/apache/htrace/htraced/rest.go | 43 +-
6 files changed, 718 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/client/client.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/client/client.go b/htrace-core/src/go/src/org/apache/htrace/client/client.go
index fbcdcc6..52fe78e 100644
--- a/htrace-core/src/go/src/org/apache/htrace/client/client.go
+++ b/htrace-core/src/go/src/org/apache/htrace/client/client.go
@@ -106,6 +106,25 @@ func (hcl *Client) FindChildren(sid common.SpanId, lim int) ([]common.SpanId, er
return spanIds, nil
}
+// Make a query
+func (hcl *Client) Query(query *common.Query) ([]common.Span, error) {
+ in, err := json.Marshal(query)
+ if err != nil {
+ return nil, errors.New(fmt.Sprintf("Error marshalling query: %s", err.Error()))
+ }
+ var out []byte
+ out, _, err = hcl.makeRestRequest("GET", "query", bytes.NewReader(in))
+ if err != nil {
+ return nil, err
+ }
+ var spans []common.Span
+ err = json.Unmarshal(out, &spans)
+ if err != nil {
+ return nil, errors.New(fmt.Sprintf("Error unmarshalling results: %s", err.Error()))
+ }
+ return spans, nil
+}
+
func (hcl *Client) makeGetRequest(reqName string) ([]byte, int, error) {
return hcl.makeRestRequest("GET", reqName, nil)
}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/common/query.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/common/query.go b/htrace-core/src/go/src/org/apache/htrace/common/query.go
new file mode 100644
index 0000000..0c909a1
--- /dev/null
+++ b/htrace-core/src/go/src/org/apache/htrace/common/query.go
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+import (
+ "encoding/json"
+)
+
+//
+// Represents queries that can be sent to htraced.
+//
+// Each query consists of set of predicates that will be 'AND'ed together to
+// return a set of spans. Predicates contain an operation, a field, and a
+// value.
+//
+// For example, a query might be "return the first 100 spans between 5:00pm
+// and 5:01pm" This query would have two predicates: time greater than or
+// equal to 5:00pm, and time less than or equal to 5:01pm.
+// In HTrace, times are always expressed in milliseconds since the Epoch.
+// So this would become:
+// { "lim" : 100, "pred" : [
+// { "op" : "ge", "field" : "begin", "val" : 1234 },
+// { "op" : "le", "field" : "begin", "val" : 5678 },
+// ] }
+//
+// Where '1234' and '5678' were replaced by times since the epoch in
+// milliseconds.
+//
+
+type Op string
+
+const (
+ CONTAINS Op = "cn"
+ EQUALS Op = "eq"
+ LESS_THAN_OR_EQUALS Op = "le"
+ GREATER_THAN_OR_EQUALS Op = "ge"
+)
+
+func (op Op) IsDescending() bool {
+ return op == LESS_THAN_OR_EQUALS
+}
+
+type Field string
+
+const (
+ SPAN_ID Field = "spanid"
+ DESCRIPTION Field = "description"
+ BEGIN_TIME Field = "begin"
+ END_TIME Field = "end"
+ DURATION Field = "duration"
+)
+
+type Predicate struct {
+ Op Op `json:"op"`
+ Field Field `json:"field"`
+ Val string `val:"val"`
+}
+
+type Query struct {
+ Predicates []Predicate `json:"pred"`
+ Lim int `json:"lim"`
+}
+
+func (query *Query) String() string {
+ buf, err := json.Marshal(query)
+ if err != nil {
+ panic(err)
+ }
+ return string(buf)
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/common/span.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/common/span.go b/htrace-core/src/go/src/org/apache/htrace/common/span.go
index 36e716a..64975d2 100644
--- a/htrace-core/src/go/src/org/apache/htrace/common/span.go
+++ b/htrace-core/src/go/src/org/apache/htrace/common/span.go
@@ -80,7 +80,11 @@ func (id *SpanId) UnmarshalJSON(b []byte) error {
if b[len(b)-1] != DOUBLE_QUOTE {
return errors.New("Expected spanID to end with a string quote.")
}
- v, err := strconv.ParseUint(string(b[1:len(b)-1]), 16, 64)
+ return id.FromString(string(b[1 : len(b)-1]))
+}
+
+func (id *SpanId) FromString(str string) error {
+ v, err := strconv.ParseUint(str, 16, 64)
if err != nil {
return err
}
@@ -111,3 +115,8 @@ func (span *Span) ToJson() []byte {
}
return jbytes
}
+
+// Compute the span duration. We ignore overflow since we never deal with negative times.
+func (span *Span) Duration() int64 {
+ return span.End - span.Begin
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
index 40678bd..523b7ab 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
@@ -22,10 +22,13 @@ package main
import (
"bytes"
"encoding/gob"
+ "errors"
+ "fmt"
"github.com/jmhodges/levigo"
"org/apache/htrace/common"
"org/apache/htrace/conf"
"os"
+ "strconv"
"strings"
"sync/atomic"
"syscall"
@@ -49,14 +52,23 @@ import (
// Schema
// m -> dataStoreMetadata
// s[8-byte-big-endian-sid] -> SpanData
+// b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {}
+// e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {}
+// d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {}
// p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {}
-// t[8-byte-big-endian-time][8-byte-big-endian-child-sid] -> {}
//
const DATA_STORE_VERSION = 1
var EMPTY_BYTE_BUF []byte = []byte{}
+const SPAN_ID_INDEX_PREFIX = 's'
+const BEGIN_TIME_INDEX_PREFIX = 'b'
+const END_TIME_INDEX_PREFIX = 'e'
+const DURATION_INDEX_PREFIX = 'd'
+const PARENT_ID_INDEX_PREFIX = 'p'
+const INVALID_INDEX_PREFIX = 0
+
type Statistics struct {
NumSpansWritten uint64
}
@@ -190,15 +202,21 @@ func (shd *shard) writeSpan(span *common.Span) error {
if err != nil {
return err
}
- batch.Put(makeKey('s', span.Id.Val()), spanDataBuf.Bytes())
+ batch.Put(makeKey(SPAN_ID_INDEX_PREFIX, span.Id.Val()), spanDataBuf.Bytes())
// Add this to the parent index.
for parentIdx := range span.Parents {
- batch.Put(makeSecondaryKey('p', span.Parents[parentIdx].Val(), span.Id.Val()), EMPTY_BYTE_BUF)
+ batch.Put(makeSecondaryKey(PARENT_ID_INDEX_PREFIX,
+ span.Parents[parentIdx].Val(), span.Id.Val()), EMPTY_BYTE_BUF)
}
- // Add this to the timeline index.
- batch.Put(makeSecondaryKey('t', span.Begin, span.Id.Val()), EMPTY_BYTE_BUF)
+ // Add to the other secondary indices.
+ batch.Put(makeSecondaryKey(BEGIN_TIME_INDEX_PREFIX, span.Begin,
+ span.Id.Val()), EMPTY_BYTE_BUF)
+ batch.Put(makeSecondaryKey(END_TIME_INDEX_PREFIX, span.End,
+ span.Id.Val()), EMPTY_BYTE_BUF)
+ batch.Put(makeSecondaryKey(DURATION_INDEX_PREFIX, span.Duration(),
+ span.Id.Val()), EMPTY_BYTE_BUF)
err = shd.ldb.Write(shd.store.writeOpts, batch)
if err != nil {
@@ -407,21 +425,30 @@ func (shd *shard) FindSpan(sid int64) *common.Span {
shd.path, sid, err.Error())
return nil
}
- r := bytes.NewBuffer(buf)
- decoder := gob.NewDecoder(r)
- data := common.SpanData{}
- err = decoder.Decode(&data)
+ var span *common.Span
+ span, err = shd.decodeSpan(sid, buf)
if err != nil {
lg.Errorf("Shard(%s): FindSpan(%016x) decode error: %s\n",
shd.path, sid, err.Error())
return nil
}
+ return span
+}
+
+func (shd *shard) decodeSpan(sid int64, buf []byte) (*common.Span, error) {
+ r := bytes.NewBuffer(buf)
+ decoder := gob.NewDecoder(r)
+ data := common.SpanData{}
+ err := decoder.Decode(&data)
+ if err != nil {
+ return nil, err
+ }
// Gob encoding translates empty slices to nil. Reverse this so that we're always dealing with
// non-nil slices.
if data.Parents == nil {
data.Parents = []common.SpanId{}
}
- return &common.Span{Id: common.SpanId(sid), SpanData: data}
+ return &common.Span{Id: common.SpanId(sid), SpanData: data}, nil
}
// Find the children of a given span id.
@@ -453,5 +480,371 @@ func (store *dataStore) FindChildren(sid int64, lim int32) []common.SpanId {
return childIds
}
-//func (store *dataStore) FindByTimeRange(startTime int64, endTime int64, lim int32) []int64 {
-//}
+type predicateData struct {
+ *common.Predicate
+ intKey int64
+ strKey string
+}
+
+func loadPredicateData(pred *common.Predicate) (*predicateData, error) {
+ p := predicateData{Predicate: pred}
+
+ // Parse the input value given to make sure it matches up with the field
+ // type.
+ switch pred.Field {
+ case common.SPAN_ID:
+ // Span IDs are sent as hex strings.
+ var id common.SpanId
+ if err := id.FromString(pred.Val); err != nil {
+ return nil, errors.New(fmt.Sprintf("Unable to parse span id '%s': %s",
+ pred.Val, err.Error()))
+ }
+ p.intKey = id.Val()
+ break
+ case common.DESCRIPTION:
+ // Any string is valid for a description.
+ p.strKey = pred.Val
+ break
+ case common.BEGIN_TIME, common.END_TIME, common.DURATION:
+ // Base-10 numeric fields.
+ v, err := strconv.ParseInt(pred.Val, 10, 64)
+ if err != nil {
+ return nil, errors.New(fmt.Sprintf("Unable to parse %s '%s': %s",
+ pred.Field, pred.Val, err.Error()))
+ }
+ p.intKey = v
+ break
+ default:
+ return nil, errors.New(fmt.Sprintf("Unknown field %s", pred.Field))
+ }
+
+ // Validate the predicate operation.
+ switch pred.Op {
+ case common.EQUALS, common.LESS_THAN_OR_EQUALS, common.GREATER_THAN_OR_EQUALS:
+ break
+ case common.CONTAINS:
+ if p.fieldIsNumeric() {
+ return nil, errors.New(fmt.Sprintf("Can't use CONTAINS on a "+
+ "numeric field like '%s'", pred.Field))
+ }
+ default:
+ return nil, errors.New(fmt.Sprintf("Unknown predicate operation '%s'",
+ pred.Op))
+ }
+
+ return &p, nil
+}
+
+// Get the index prefix for this predicate, or 0 if it is not indexed.
+func (pred *predicateData) getIndexPrefix() byte {
+ switch pred.Field {
+ case common.SPAN_ID:
+ return SPAN_ID_INDEX_PREFIX
+ case common.BEGIN_TIME:
+ return BEGIN_TIME_INDEX_PREFIX
+ case common.END_TIME:
+ return END_TIME_INDEX_PREFIX
+ case common.DURATION:
+ return DURATION_INDEX_PREFIX
+ default:
+ return INVALID_INDEX_PREFIX
+ }
+}
+
+// Returns true if the predicate type is numeric.
+func (pred *predicateData) fieldIsNumeric() bool {
+ switch pred.Field {
+ case common.SPAN_ID, common.BEGIN_TIME, common.END_TIME, common.DURATION:
+ return true
+ default:
+ return false
+ }
+}
+
+// Get the values that this predicate cares about for a given span.
+func (pred *predicateData) extractRelevantSpanData(span *common.Span) (int64, string) {
+ switch pred.Field {
+ case common.SPAN_ID:
+ return span.Id.Val(), ""
+ case common.DESCRIPTION:
+ return 0, span.Description
+ case common.BEGIN_TIME:
+ return span.Begin, ""
+ case common.END_TIME:
+ return span.End, ""
+ case common.DURATION:
+ return span.Duration(), ""
+ default:
+ panic(fmt.Sprintf("Field type %s isn't a 64-bit integer.", pred.Field))
+ }
+}
+
+func (pred *predicateData) spanPtrIsBefore(a *common.Span, b *common.Span) bool {
+ // nil is after everything.
+ if a == nil {
+ if b == nil {
+ return false
+ }
+ return false
+ } else if b == nil {
+ return true
+ }
+ // Compare the spans according to this predicate.
+ aInt, aStr := pred.extractRelevantSpanData(a)
+ bInt, bStr := pred.extractRelevantSpanData(b)
+ if pred.fieldIsNumeric() {
+ if pred.Op.IsDescending() {
+ return aInt > bInt
+ } else {
+ return aInt < bInt
+ }
+ } else {
+ if pred.Op.IsDescending() {
+ return aStr > bStr
+ } else {
+ return aStr < bStr
+ }
+ }
+}
+
+// Returns true if the predicate is satisfied by the given span.
+func (pred *predicateData) satisfiedBy(span *common.Span) bool {
+ intVal, strVal := pred.extractRelevantSpanData(span)
+ if pred.fieldIsNumeric() {
+ switch pred.Op {
+ case common.EQUALS:
+ return intVal == pred.intKey
+ case common.LESS_THAN_OR_EQUALS:
+ return intVal <= pred.intKey
+ case common.GREATER_THAN_OR_EQUALS:
+ return intVal >= pred.intKey
+ default:
+ panic(fmt.Sprintf("unknown Op type %s should have been caught "+
+ "during normalization", pred.Op))
+ }
+ } else {
+ switch pred.Op {
+ case common.CONTAINS:
+ return strings.Contains(strVal, pred.strKey)
+ case common.EQUALS:
+ return strVal == pred.strKey
+ case common.LESS_THAN_OR_EQUALS:
+ return strVal <= pred.strKey
+ case common.GREATER_THAN_OR_EQUALS:
+ return strVal >= pred.strKey
+ default:
+ panic(fmt.Sprintf("unknown Op type %s should have been caught "+
+ "during normalization", pred.Op))
+ }
+ }
+}
+
+func (pred *predicateData) createSource(store *dataStore) (*source, error) {
+ var ret *source
+ src := source{store: store,
+ pred: pred,
+ iters: make([]*levigo.Iterator, 0, len(store.shards)),
+ nexts: make([]*common.Span, len(store.shards)),
+ numRead: make([]int, len(store.shards)),
+ keyPrefix: pred.getIndexPrefix(),
+ }
+ if src.keyPrefix == INVALID_INDEX_PREFIX {
+ return nil, errors.New(fmt.Sprintf("Can't create source from unindexed "+
+ "predicate on field %s", pred.Field))
+ }
+ defer func() {
+ if ret == nil {
+ src.Close()
+ }
+ }()
+ for shardIdx := range store.shards {
+ shd := store.shards[shardIdx]
+ src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts))
+ }
+ searchKey := makeKey(src.keyPrefix, pred.intKey)
+ for i := range src.iters {
+ src.iters[i].Seek(searchKey)
+ }
+ ret = &src
+ return ret, nil
+}
+
+// A source of spans.
+type source struct {
+ store *dataStore
+ pred *predicateData
+ iters []*levigo.Iterator
+ nexts []*common.Span
+ numRead []int
+ keyPrefix byte
+}
+
+// Fill in the entry in the 'next' array for a specific shard.
+func (src *source) populateNextFromShard(shardIdx int) {
+ lg := src.store.lg
+ var err error
+ iter := src.iters[shardIdx]
+ if iter == nil {
+ lg.Debugf("Can't populate: No more entries in shard %d\n", shardIdx)
+ return // There are no more entries in this shard.
+ }
+ if src.nexts[shardIdx] != nil {
+ lg.Debugf("No need to populate shard %d\n", shardIdx)
+ return // We already have a valid entry for this shard.
+ }
+ for {
+ if !iter.Valid() {
+ lg.Debugf("Can't populate: Iterator for shard %d is no longer valid.\n", shardIdx)
+ break // Can't read past end of DB
+ }
+ src.numRead[shardIdx]++
+ key := iter.Key()
+ if !bytes.HasPrefix(key, []byte{src.keyPrefix}) {
+ lg.Debugf("Can't populate: Iterator for shard %d does not have prefix %s",
+ shardIdx, string(src.keyPrefix))
+ break // Can't read past end of indexed section
+ }
+ var span *common.Span
+ var sid int64
+ if src.keyPrefix == SPAN_ID_INDEX_PREFIX {
+ // The span id maps to the span itself.
+ sid = keyToInt(key[1:])
+ span, err = src.store.shards[shardIdx].decodeSpan(sid, iter.Value())
+ if err != nil {
+ lg.Debugf("Internal error decoding span %016x in shard %d: %s\n",
+ sid, shardIdx, err.Error())
+ break
+ }
+ } else {
+ // With a secondary index, we have to look up the span by id.
+ sid = keyToInt(key[9:])
+ span = src.store.shards[shardIdx].FindSpan(sid)
+ if span == nil {
+ lg.Debugf("Internal error rehydrating span %016x in shard %d\n",
+ sid, shardIdx)
+ break
+ }
+ }
+ if src.pred.Op.IsDescending() {
+ iter.Prev()
+ } else {
+ iter.Next()
+ }
+ if src.pred.satisfiedBy(span) {
+ lg.Debugf("Populated valid span %016x from shard %d.\n", sid, shardIdx)
+ src.nexts[shardIdx] = span // Found valid entry
+ return
+ } else {
+ lg.Debugf("Span %016x from shard %d does not satisfy the predicate.\n",
+ sid, shardIdx)
+ if src.numRead[shardIdx] <= 1 && src.pred.Op.IsDescending() {
+ // When dealing with descending predicates, the first span we read might not satisfy
+ // the predicate, even though subsequent ones will. This is because the iter.Seek()
+ // function "moves the iterator the position of the key given or, if the key doesn't
+ // exist, the next key that does exist in the database." So if we're on that "next
+ // key" it will not satisfy the predicate, but the keys previous to it might.
+ continue
+ }
+ // This and subsequent entries don't satisfy predicate
+ break
+ }
+ }
+ lg.Debugf("Closing iterator for shard %d.\n", shardIdx)
+ iter.Close()
+ src.iters[shardIdx] = nil
+}
+
+func (src *source) next() *common.Span {
+ for shardIdx := range src.iters {
+ src.populateNextFromShard(shardIdx)
+ }
+ var best *common.Span
+ bestIdx := -1
+ for shardIdx := range src.iters {
+ span := src.nexts[shardIdx]
+ if src.pred.spanPtrIsBefore(span, best) {
+ best = span
+ bestIdx = shardIdx
+ }
+ }
+ if bestIdx >= 0 {
+ src.nexts[bestIdx] = nil
+ }
+ return best
+}
+
+func (src *source) Close() {
+ for i := range src.iters {
+ if src.iters[i] != nil {
+ src.iters[i].Close()
+ }
+ }
+ src.iters = nil
+}
+
+func (store *dataStore) obtainSource(preds *[]*predicateData) (*source, error) {
+ // Read spans from the first predicate that is indexed.
+ p := *preds
+ for i := range p {
+ pred := p[i]
+ if pred.getIndexPrefix() != INVALID_INDEX_PREFIX {
+ *preds = append(p[0:i], p[i+1:]...)
+ return pred.createSource(store)
+ }
+ }
+ // If there are no predicates that are indexed, read rows in order of span id.
+ spanIdPred := common.Predicate{Op: common.GREATER_THAN_OR_EQUALS,
+ Field: common.SPAN_ID,
+ Val: "0000000000000000",
+ }
+ spanIdPredData, err := loadPredicateData(&spanIdPred)
+ if err != nil {
+ return nil, err
+ }
+ return spanIdPredData.createSource(store)
+}
+
+func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) {
+ lg := store.lg
+ // Parse predicate data.
+ var err error
+ preds := make([]*predicateData, len(query.Predicates))
+ for i := range query.Predicates {
+ preds[i], err = loadPredicateData(&query.Predicates[i])
+ if err != nil {
+ return nil, err
+ }
+ }
+ // Get a source of rows.
+ var src *source
+ src, err = store.obtainSource(&preds)
+ if err != nil {
+ return nil, err
+ }
+ defer src.Close()
+ lg.Debugf("HandleQuery %s: preds = %s, src = %v\n", query, preds, src)
+
+ // Filter the spans through the remaining predicates.
+ ret := make([]*common.Span, 0, 32)
+ for {
+ if len(ret) >= query.Lim {
+ break // we hit the result size limit
+ }
+ span := src.next()
+ if span == nil {
+ break // the source has no more spans to give
+ }
+ lg.Debugf("src.next returned span %s\n", span.ToJson())
+ satisfied := true
+ for predIdx := range preds {
+ if !preds[predIdx].satisfiedBy(span) {
+ satisfied = false
+ break
+ }
+ }
+ if satisfied {
+ ret = append(ret, span)
+ }
+ }
+ return ret, nil
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
index f0449fe..3330723 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -20,6 +20,8 @@
package main
import (
+ "bytes"
+ "encoding/json"
"math/rand"
"org/apache/htrace/common"
"org/apache/htrace/test"
@@ -116,6 +118,159 @@ func TestDatastoreWriteAndRead(t *testing.T) {
}
}
+func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query,
+ expectedSpans []common.Span) {
+ spans, err := ht.Store.HandleQuery(query)
+ if err != nil {
+ t.Fatalf("First query failed: %s\n", err.Error())
+ }
+ expectedBuf := new(bytes.Buffer)
+ dec := json.NewEncoder(expectedBuf)
+ err = dec.Encode(expectedSpans)
+ if err != nil {
+ t.Fatalf("Failed to encode expectedSpans to JSON: %s\n", err.Error())
+ }
+ spansBuf := new(bytes.Buffer)
+ dec = json.NewEncoder(spansBuf)
+ err = dec.Encode(spans)
+ if err != nil {
+ t.Fatalf("Failed to encode result spans to JSON: %s\n", err.Error())
+ }
+ t.Logf("len(spans) = %d, len(expectedSpans) = %d\n", len(spans),
+ len(expectedSpans))
+ common.ExpectStrEqual(t, string(expectedBuf.Bytes()), string(spansBuf.Bytes()))
+}
+
+// Test queries on the datastore.
+func TestSimpleQuery(t *testing.T) {
+ t.Parallel()
+ htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery",
+ WrittenSpans: make(chan *common.Span, 100)}
+ ht, err := htraceBld.Build()
+ if err != nil {
+ panic(err)
+ }
+ defer ht.Close()
+ createSpans(SIMPLE_TEST_SPANS, ht.Store)
+ if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
+ t.Fatal()
+ }
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.GREATER_THAN_OR_EQUALS,
+ Field: common.BEGIN_TIME,
+ Val: "125",
+ },
+ },
+ Lim: 5,
+ }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+}
+
+func TestQueries2(t *testing.T) {
+ t.Parallel()
+ htraceBld := &MiniHTracedBuilder{Name: "TestQueries2",
+ WrittenSpans: make(chan *common.Span, 100)}
+ ht, err := htraceBld.Build()
+ if err != nil {
+ panic(err)
+ }
+ defer ht.Close()
+ createSpans(SIMPLE_TEST_SPANS, ht.Store)
+ if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
+ t.Fatal()
+ }
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.LESS_THAN_OR_EQUALS,
+ Field: common.BEGIN_TIME,
+ Val: "125",
+ },
+ },
+ Lim: 5,
+ }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]})
+
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.LESS_THAN_OR_EQUALS,
+ Field: common.BEGIN_TIME,
+ Val: "125",
+ },
+ common.Predicate{
+ Op: common.EQUALS,
+ Field: common.DESCRIPTION,
+ Val: "getFileDescriptors",
+ },
+ },
+ Lim: 2,
+ }, []common.Span{SIMPLE_TEST_SPANS[0]})
+
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.EQUALS,
+ Field: common.DESCRIPTION,
+ Val: "getFileDescriptors",
+ },
+ },
+ Lim: 2,
+ }, []common.Span{SIMPLE_TEST_SPANS[0]})
+}
+
+func TestQueries3(t *testing.T) {
+ t.Parallel()
+ htraceBld := &MiniHTracedBuilder{Name: "TestQueries3",
+ WrittenSpans: make(chan *common.Span, 100)}
+ ht, err := htraceBld.Build()
+ if err != nil {
+ panic(err)
+ }
+ defer ht.Close()
+ createSpans(SIMPLE_TEST_SPANS, ht.Store)
+ if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
+ t.Fatal()
+ }
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.CONTAINS,
+ Field: common.DESCRIPTION,
+ Val: "Fd",
+ },
+ common.Predicate{
+ Op: common.GREATER_THAN_OR_EQUALS,
+ Field: common.BEGIN_TIME,
+ Val: "100",
+ },
+ },
+ Lim: 5,
+ }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.LESS_THAN_OR_EQUALS,
+ Field: common.SPAN_ID,
+ Val: "0",
+ },
+ },
+ Lim: 200,
+ }, []common.Span{})
+
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.LESS_THAN_OR_EQUALS,
+ Field: common.SPAN_ID,
+ Val: "2",
+ },
+ },
+ Lim: 200,
+ }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]})
+}
+
func BenchmarkDatastoreWrites(b *testing.B) {
htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites",
WrittenSpans: make(chan *common.Span, b.N)}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/af2c084e/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
index efc89e1..39e5744 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go
@@ -141,7 +141,9 @@ func (hand *findChildrenHandler) ServeHTTP(w http.ResponseWriter, req *http.Requ
children := hand.store.FindChildren(sid, lim)
jbytes, err := json.Marshal(children)
if err != nil {
- panic(err)
+ writeError(hand.lg, w, http.StatusInternalServerError,
+ fmt.Sprintf("Error marshalling children: %s", err.Error()))
+ return
}
w.Write(jbytes)
}
@@ -173,6 +175,42 @@ func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
}
}
+type queryHandler struct {
+ dataStoreHandler
+}
+
+func (hand *queryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ setResponseHeaders(w.Header())
+ _, ok := hand.getReqField32("lim", w, req)
+ if !ok {
+ return
+ }
+ var query common.Query
+ dec := json.NewDecoder(req.Body)
+ err := dec.Decode(&query)
+ if err != nil {
+ writeError(hand.lg, w, http.StatusBadRequest,
+ fmt.Sprintf("Error parsing query: %s", err.Error()))
+ return
+ }
+ var results []*common.Span
+ results, err = hand.store.HandleQuery(&query)
+ if err != nil {
+ writeError(hand.lg, w, http.StatusInternalServerError,
+ fmt.Sprintf("Internal error processing query %s: %s",
+ query.String(), err.Error()))
+ return
+ }
+ var jbytes []byte
+ jbytes, err = json.Marshal(results)
+ if err != nil {
+ writeError(hand.lg, w, http.StatusInternalServerError,
+ fmt.Sprintf("Error marshalling results: %s", err.Error()))
+ return
+ }
+ w.Write(jbytes)
+}
+
type defaultServeHandler struct {
lg *common.Logger
}
@@ -225,6 +263,9 @@ func CreateRestServer(cnf *conf.Config, store *dataStore) (*RestServer, error) {
store: store, lg: rsv.lg}}
r.Handle("/writeSpans", writeSpansH).Methods("POST")
+ queryH := &queryHandler{dataStoreHandler: dataStoreHandler{store: store}}
+ r.Handle("/query", queryH).Methods("GET")
+
span := r.PathPrefix("/span").Subrouter()
findSidH := &findSidHandler{dataStoreHandler: dataStoreHandler{store: store, lg: rsv.lg}}
span.Handle("/{id}", findSidH).Methods("GET")