You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by cm...@apache.org on 2015/04/23 01:08:34 UTC
[40/41] incubator-htrace git commit: HTRACE-154. Move go and web to
htrace-htraced (abe via cmccabe)
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go b/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go
deleted file mode 100644
index 38cdb58..0000000
--- a/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "bufio"
- "bytes"
- "encoding/json"
- "errors"
- "fmt"
- "github.com/alecthomas/kingpin"
- "io"
- htrace "org/apache/htrace/client"
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
- "os"
- "time"
-)
-
-var RELEASE_VERSION string
-var GIT_VERSION string
-
-const EXIT_SUCCESS = 0
-const EXIT_FAILURE = 1
-
-var verbose *bool
-
-const USAGE = `The Apache HTrace command-line tool. This tool retrieves and modifies settings and
-other data on a running htraced daemon.
-
-If we find an ` + conf.CONFIG_FILE_NAME + ` configuration file in the list of directories
-specified in ` + conf.HTRACED_CONF_DIR + `, we will use that configuration; otherwise,
-the defaults will be used.
-`
-
-func main() {
- // Load htraced configuration
- cnf := common.LoadApplicationConfig()
-
- // Parse argv
- app := kingpin.New(os.Args[0], USAGE)
- app.Flag("Dmy.key", "Set configuration key 'my.key' to 'my.value'. Replace 'my.key' "+
- "with any key you want to set.").Default("my.value").String()
- addr := app.Flag("addr", "Server address.").String()
- verbose = app.Flag("verbose", "Verbose.").Default("false").Bool()
- version := app.Command("version", "Print the version of this program.")
- serverInfo := app.Command("serverInfo", "Print information retrieved from an htraced server.")
- findSpan := app.Command("findSpan", "Print information about a trace span with a given ID.")
- findSpanId := findSpan.Arg("id", "Span ID to find. Example: 0x123456789abcdef").Required().Uint64()
- findChildren := app.Command("findChildren", "Print out the span IDs that are children of a given span ID.")
- parentSpanId := findChildren.Arg("id", "Span ID to print children for. Example: 0x123456789abcdef").
- Required().Uint64()
- childLim := findChildren.Flag("lim", "Maximum number of child IDs to print.").Default("20").Int()
- loadFile := app.Command("loadFile", "Write whitespace-separated JSON spans from a file to the server.")
- loadFilePath := loadFile.Arg("path",
- "A file containing whitespace-separated span JSON.").Required().String()
- loadJson := app.Command("load", "Write JSON spans from the command-line to the server.")
- loadJsonArg := loadJson.Arg("json", "A JSON span to write to the server.").Required().String()
- dumpAll := app.Command("dumpAll", "Dump all spans from the htraced daemon.")
- dumpAllOutPath := dumpAll.Arg("path", "The path to dump the trace spans to.").Default("-").String()
- dumpAllLim := dumpAll.Flag("lim", "The number of spans to transfer from the server at once.").
- Default("100").Int()
- graph := app.Command("graph", "Visualize span JSON as a graph.")
- graphJsonFile := graph.Arg("input", "The JSON file to load").Required().String()
- graphDotFile := graph.Flag("output",
- "The path to write a GraphViz dotfile to. This file can be used as input to "+
- "GraphViz, in order to generate a pretty picture. See graphviz.org for more "+
- "information about generating pictures of graphs.").Default("-").String()
- query := app.Command("query", "Send a query to htraced.")
- queryLim := query.Flag("lim", "Maximum number of spans to retrieve.").Default("20").Int()
- queryArg := query.Arg("query", "The query string to send. Query strings have the format "+
- "[TYPE] [OPERATOR] [CONST], joined by AND statements.").Required().String()
- rawQuery := app.Command("rawQuery", "Send a raw JSON query to htraced.")
- rawQueryArg := query.Arg("json", "The query JSON to send.").Required().String()
- cmd := kingpin.MustParse(app.Parse(os.Args[1:]))
-
- // Add the command-line settings into the configuration.
- if *addr != "" {
- cnf = cnf.Clone(conf.HTRACE_WEB_ADDRESS, *addr)
- }
-
- // Handle commands that don't require an HTrace client.
- switch cmd {
- case version.FullCommand():
- os.Exit(printVersion())
- case graph.FullCommand():
- err := jsonSpanFileToDotFile(*graphJsonFile, *graphDotFile)
- if err != nil {
- fmt.Printf("graphing error: %s\n", err.Error())
- os.Exit(EXIT_FAILURE)
- }
- os.Exit(EXIT_SUCCESS)
- }
-
- // Create HTrace client
- hcl, err := htrace.NewClient(cnf)
- if err != nil {
- fmt.Printf("Failed to create HTrace client: %s\n", err.Error())
- os.Exit(EXIT_FAILURE)
- }
-
- // Handle commands that require an HTrace client.
- switch cmd {
- case version.FullCommand():
- os.Exit(printVersion())
- case serverInfo.FullCommand():
- os.Exit(printServerInfo(hcl))
- case findSpan.FullCommand():
- os.Exit(doFindSpan(hcl, common.SpanId(*findSpanId)))
- case findChildren.FullCommand():
- os.Exit(doFindChildren(hcl, common.SpanId(*parentSpanId), *childLim))
- case loadJson.FullCommand():
- os.Exit(doLoadSpanJson(hcl, *loadJsonArg))
- case loadFile.FullCommand():
- os.Exit(doLoadSpanJsonFile(hcl, *loadFilePath))
- case dumpAll.FullCommand():
- err := doDumpAll(hcl, *dumpAllOutPath, *dumpAllLim)
- if err != nil {
- fmt.Printf("dumpAll error: %s\n", err.Error())
- os.Exit(EXIT_FAILURE)
- }
- os.Exit(EXIT_SUCCESS)
- case query.FullCommand():
- err := doQueryFromString(hcl, *queryArg, *queryLim)
- if err != nil {
- fmt.Printf("query error: %s\n", err.Error())
- os.Exit(EXIT_FAILURE)
- }
- os.Exit(EXIT_SUCCESS)
- case rawQuery.FullCommand():
- err := doRawQuery(hcl, *rawQueryArg)
- if err != nil {
- fmt.Printf("raw query error: %s\n", err.Error())
- os.Exit(EXIT_FAILURE)
- }
- os.Exit(EXIT_SUCCESS)
- }
-
- app.UsageErrorf(os.Stderr, "You must supply a command to run.")
-}
-
-// Print the version of the htrace binary.
-func printVersion() int {
- fmt.Printf("Running htrace command version %s.\n", RELEASE_VERSION)
- return EXIT_SUCCESS
-}
-
-// Print information retrieved from an htraced server via /server/info
-func printServerInfo(hcl *htrace.Client) int {
- info, err := hcl.GetServerInfo()
- if err != nil {
- fmt.Println(err.Error())
- return EXIT_FAILURE
- }
- fmt.Printf("HTraced server version %s (%s)\n", info.ReleaseVersion, info.GitVersion)
- return EXIT_SUCCESS
-}
-
-// Print information about a trace span.
-func doFindSpan(hcl *htrace.Client, sid common.SpanId) int {
- span, err := hcl.FindSpan(sid)
- if err != nil {
- fmt.Println(err.Error())
- return EXIT_FAILURE
- }
- if span == nil {
- fmt.Printf("Span ID not found.\n")
- return EXIT_FAILURE
- }
- pbuf, err := json.MarshalIndent(span, "", " ")
- if err != nil {
- fmt.Printf("Error: error pretty-printing span to JSON: %s\n", err.Error())
- return EXIT_FAILURE
- }
- fmt.Printf("%s\n", string(pbuf))
- return EXIT_SUCCESS
-}
-
-func doLoadSpanJsonFile(hcl *htrace.Client, spanFile string) int {
- if spanFile == "" {
- fmt.Printf("You must specify the json file to load.\n")
- return EXIT_FAILURE
- }
- file, err := OpenInputFile(spanFile)
- if err != nil {
- fmt.Printf("Failed to open %s: %s\n", spanFile, err.Error())
- return EXIT_FAILURE
- }
- defer file.Close()
- return doLoadSpans(hcl, bufio.NewReader(file))
-}
-
-func doLoadSpanJson(hcl *htrace.Client, spanJson string) int {
- return doLoadSpans(hcl, bytes.NewBufferString(spanJson))
-}
-
-func doLoadSpans(hcl *htrace.Client, reader io.Reader) int {
- dec := json.NewDecoder(reader)
- spans := make([]*common.Span, 0, 32)
- var err error
- for {
- var span common.Span
- if err = dec.Decode(&span); err != nil {
- if err == io.EOF {
- break
- }
- fmt.Printf("Failed to decode JSON: %s\n", err.Error())
- return EXIT_FAILURE
- }
- spans = append(spans, &span)
- }
- if *verbose {
- fmt.Printf("Writing ")
- prefix := ""
- for i := range spans {
- fmt.Printf("%s%s", prefix, spans[i].ToJson())
- prefix = ", "
- }
- fmt.Printf("\n")
- }
- err = hcl.WriteSpans(&common.WriteSpansReq{
- Spans: spans,
- })
- if err != nil {
- fmt.Println(err.Error())
- return EXIT_FAILURE
- }
- return EXIT_SUCCESS
-}
-
-// Find information about the children of a span.
-func doFindChildren(hcl *htrace.Client, sid common.SpanId, lim int) int {
- spanIds, err := hcl.FindChildren(sid, lim)
- if err != nil {
- fmt.Printf("%s\n", err.Error())
- return EXIT_FAILURE
- }
- pbuf, err := json.MarshalIndent(spanIds, "", " ")
- if err != nil {
- fmt.Println("Error: error pretty-printing span IDs to JSON: %s", err.Error())
- return 1
- }
- fmt.Printf("%s\n", string(pbuf))
- return 0
-}
-
-// Dump all spans from the htraced daemon.
-func doDumpAll(hcl *htrace.Client, outPath string, lim int) error {
- file, err := CreateOutputFile(outPath)
- if err != nil {
- return err
- }
- w := bufio.NewWriter(file)
- defer func() {
- if file != nil {
- w.Flush()
- file.Close()
- }
- }()
- out := make(chan *common.Span, 50)
- var dumpErr error
- go func() {
- dumpErr = hcl.DumpAll(lim, out)
- }()
- var numSpans int64
- nextLogTime := time.Now().Add(time.Second * 5)
- for {
- span, channelOpen := <-out
- if !channelOpen {
- break
- }
- if err == nil {
- _, err = fmt.Fprintf(w, "%s\n", span.ToJson())
- }
- if *verbose {
- numSpans++
- now := time.Now()
- if !now.Before(nextLogTime) {
- nextLogTime = now.Add(time.Second * 5)
- fmt.Printf("received %d span(s)...\n", numSpans)
- }
- }
- }
- if err != nil {
- return errors.New(fmt.Sprintf("Write error %s", err.Error()))
- }
- if dumpErr != nil {
- return errors.New(fmt.Sprintf("Dump error %s", dumpErr.Error()))
- }
- err = w.Flush()
- if err != nil {
- return err
- }
- err = file.Close()
- file = nil
- if err != nil {
- return err
- }
- return nil
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-core/src/go/src/org/apache/htrace/htrace/file.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/file.go b/htrace-core/src/go/src/org/apache/htrace/htrace/file.go
deleted file mode 100644
index ea214be..0000000
--- a/htrace-core/src/go/src/org/apache/htrace/htrace/file.go
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "bufio"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "org/apache/htrace/common"
- "os"
-)
-
-// A file used for input.
-// Transparently supports using stdin for input.
-type InputFile struct {
- *os.File
- path string
-}
-
-// Open an input file. Stdin will be used when path is -
-func OpenInputFile(path string) (*InputFile, error) {
- if path == "-" {
- return &InputFile{File: os.Stdin, path: path}, nil
- }
- file, err := os.Open(path)
- if err != nil {
- return nil, err
- }
- return &InputFile{File: file, path: path}, nil
-}
-
-func (file *InputFile) Close() {
- if file.path != "-" {
- file.File.Close()
- }
-}
-
-// A file used for output.
-// Transparently supports using stdout for output.
-type OutputFile struct {
- *os.File
- path string
-}
-
-// Create an output file. Stdout will be used when path is -
-func CreateOutputFile(path string) (*OutputFile, error) {
- if path == "-" {
- return &OutputFile{File: os.Stdout, path: path}, nil
- }
- file, err := os.Create(path)
- if err != nil {
- return nil, err
- }
- return &OutputFile{File: file, path: path}, nil
-}
-
-func (file *OutputFile) Close() error {
- if file.path != "-" {
- return file.File.Close()
- }
- return nil
-}
-
-// FailureDeferringWriter is a writer which allows us to call Printf multiple
-// times and then check if all the printfs succeeded at the very end, rather
-// than checking after each call. We will not attempt to write more data
-// after the first write failure.
-type FailureDeferringWriter struct {
- io.Writer
- err error
-}
-
-func NewFailureDeferringWriter(writer io.Writer) *FailureDeferringWriter {
- return &FailureDeferringWriter{writer, nil}
-}
-
-func (w *FailureDeferringWriter) Printf(format string, v ...interface{}) {
- if w.err != nil {
- return
- }
- str := fmt.Sprintf(format, v...)
- _, err := w.Writer.Write([]byte(str))
- if err != nil {
- w.err = err
- }
-}
-
-func (w *FailureDeferringWriter) Error() error {
- return w.err
-}
-
-// Read a file full of whitespace-separated span JSON into a slice of spans.
-func readSpansFile(path string) (common.SpanSlice, error) {
- file, err := OpenInputFile(path)
- if err != nil {
- return nil, err
- }
- defer file.Close()
- return readSpans(bufio.NewReader(file))
-}
-
-// Read whitespace-separated span JSON into a slice of spans.
-func readSpans(reader io.Reader) (common.SpanSlice, error) {
- spans := make(common.SpanSlice, 0)
- dec := json.NewDecoder(reader)
- for {
- var span common.Span
- err := dec.Decode(&span)
- if err != nil {
- if err != io.EOF {
- return nil, errors.New(fmt.Sprintf("Decode error after decoding %d "+
- "span(s): %s", len(spans), err.Error()))
- }
- break
- }
- spans = append(spans, &span)
- }
- return spans, nil
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-core/src/go/src/org/apache/htrace/htrace/file_test.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/file_test.go b/htrace-core/src/go/src/org/apache/htrace/htrace/file_test.go
deleted file mode 100644
index b6f9cac..0000000
--- a/htrace-core/src/go/src/org/apache/htrace/htrace/file_test.go
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "errors"
- "io"
- "io/ioutil"
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
- "org/apache/htrace/test"
- "os"
- "strings"
- "testing"
-)
-
-func TestInputFileAndOutputFile(t *testing.T) {
- tdir, err := ioutil.TempDir(os.TempDir(), "TestInputFileAndOutputFile")
- if err != nil {
- t.Fatalf("failed to create TempDir: %s\n", err.Error())
- }
- defer os.RemoveAll(tdir)
- tpath := tdir + conf.PATH_SEP + "test"
- var ofile *OutputFile
- ofile, err = CreateOutputFile(tpath)
- if err != nil {
- t.Fatalf("failed to create OutputFile at %s: %s\n", tpath, err.Error())
- }
- defer func() {
- if ofile != nil {
- ofile.Close()
- }
- }()
- w := NewFailureDeferringWriter(ofile)
- w.Printf("Hello, world!\n")
- w.Printf("2 + 2 = %d\n", 4)
- if w.Error() != nil {
- t.Fatalf("got unexpected error writing to %s: %s\n", tpath, w.Error().Error())
- }
- err = ofile.Close()
- ofile = nil
- if err != nil {
- t.Fatalf("error on closing OutputFile for %s: %s\n", tpath, err.Error())
- }
- var ifile *InputFile
- ifile, err = OpenInputFile(tpath)
- defer ifile.Close()
- expected := "Hello, world!\n2 + 2 = 4\n"
- buf := make([]byte, len(expected))
- _, err = io.ReadAtLeast(ifile, buf, len(buf))
- if err != nil {
- t.Fatalf("unexpected error on reading %s: %s\n", tpath, err.Error())
- }
- str := string(buf)
- if str != expected {
- t.Fatalf("Could not read back what we wrote to %s.\n"+
- "Got:\n%s\nExpected:\n%s\n", tpath, str, expected)
- }
-}
-
-type LimitedBufferWriter struct {
- buf []byte
- off int
-}
-
-const LIMITED_BUFFER_MESSAGE = "There isn't enough buffer to go around!"
-
-func (w *LimitedBufferWriter) Write(p []byte) (int, error) {
- var nwritten int
- for i := range p {
- if w.off >= len(w.buf) {
- return nwritten, errors.New(LIMITED_BUFFER_MESSAGE)
- }
- w.buf[w.off] = p[i]
- w.off = w.off + 1
- nwritten++
- }
- return nwritten, nil
-}
-
-func TestFailureDeferringWriter(t *testing.T) {
- lw := LimitedBufferWriter{buf: make([]byte, 20), off: 0}
- w := NewFailureDeferringWriter(&lw)
- w.Printf("Zippity do dah #%d\n", 1)
- w.Printf("Zippity do dah #%d\n", 2)
- if w.Error() == nil {
- t.Fatalf("expected FailureDeferringWriter to experience a failure due to " +
- "limited buffer size, but it did not.")
- }
- if w.Error().Error() != LIMITED_BUFFER_MESSAGE {
- t.Fatalf("expected FailureDeferringWriter to have the error message %s, but "+
- "the message was %s\n", LIMITED_BUFFER_MESSAGE, w.Error().Error())
- }
- expected := "Zippity do dah #1\nZi"
- if string(lw.buf) != expected {
- t.Fatalf("expected LimitedBufferWriter to contain %s, but it contained %s "+
- "instead.\n", expected, string(lw.buf))
- }
-}
-
-func TestReadSpans(t *testing.T) {
- SPAN_TEST_STR := `{"i":"bdd6d4ee48de59bf","s":"c0681027d3ea4928",` +
- `"b":1424736225037,"e":1424736225901,"d":"ClientNamenodeProtocol#getFileInfo",` +
- `"r":"FsShell","p":["60538dfb4df91418"]}
-{"i":"bdd6d4ee48de59bf","s":"60538dfb4df91418","b":1424736224969,` +
- `"e":1424736225960,"d":"getFileInfo","r":"FsShell","p":[],"n":{"path":"/"}}
-`
- r := strings.NewReader(SPAN_TEST_STR)
- spans, err := readSpans(r)
- if err != nil {
- t.Fatalf("Failed to read spans from string via readSpans: %s\n", err.Error())
- }
- SPAN_TEST_EXPECTED := common.SpanSlice{
- &common.Span{
- Id: test.SpanId("c0681027d3ea4928"),
- SpanData: common.SpanData{
- TraceId: test.SpanId("bdd6d4ee48de59bf"),
- Begin: 1424736225037,
- End: 1424736225901,
- Description: "ClientNamenodeProtocol#getFileInfo",
- ProcessId: "FsShell",
- Parents: []common.SpanId{test.SpanId("60538dfb4df91418")},
- },
- },
- &common.Span{
- Id: test.SpanId("60538dfb4df91418"),
- SpanData: common.SpanData{
- TraceId: test.SpanId("bdd6d4ee48de59bf"),
- Begin: 1424736224969,
- End: 1424736225960,
- Description: "getFileInfo",
- ProcessId: "FsShell",
- Parents: []common.SpanId{},
- Info: common.TraceInfoMap{
- "path": "/",
- },
- },
- },
- }
- if len(spans) != len(SPAN_TEST_EXPECTED) {
- t.Fatalf("Expected %d spans, but got %d\n",
- len(SPAN_TEST_EXPECTED), len(spans))
- }
- for i := range SPAN_TEST_EXPECTED {
- common.ExpectSpansEqual(t, spans[i], SPAN_TEST_EXPECTED[i])
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-core/src/go/src/org/apache/htrace/htrace/graph.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/graph.go b/htrace-core/src/go/src/org/apache/htrace/htrace/graph.go
deleted file mode 100644
index dabf2df..0000000
--- a/htrace-core/src/go/src/org/apache/htrace/htrace/graph.go
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "bufio"
- "errors"
- "fmt"
- "io"
- "org/apache/htrace/common"
- "os"
- "sort"
-)
-
-// Create a dotfile from a json file.
-func jsonSpanFileToDotFile(jsonFile string, dotFile string) error {
- spans, err := readSpansFile(jsonFile)
- if err != nil {
- return errors.New(fmt.Sprintf("error reading %s: %s",
- jsonFile, err.Error()))
- }
- var file *OutputFile
- file, err = CreateOutputFile(dotFile)
- if err != nil {
- return errors.New(fmt.Sprintf("error opening %s for write: %s",
- dotFile, err.Error()))
- }
- defer func() {
- if file != nil {
- file.Close()
- }
- }()
- writer := bufio.NewWriter(file)
- err = spansToDot(spans, writer)
- if err != nil {
- return err
- }
- err = writer.Flush()
- if err != nil {
- return err
- }
- err = file.Close()
- file = nil
- return err
-}
-
-// Create output in dotfile format from a set of spans.
-func spansToDot(spans common.SpanSlice, writer io.Writer) error {
- sort.Sort(spans)
- idMap := make(map[common.SpanId]*common.Span)
- for i := range spans {
- span := spans[i]
- if idMap[span.Id] != nil {
- fmt.Fprintf(os.Stderr, "There were multiple spans listed which "+
- "had ID %s.\nFirst:%s\nOther:%s\n", span.Id.String(),
- idMap[span.Id].ToJson(), span.ToJson())
- } else {
- idMap[span.Id] = span
- }
- }
- childMap := make(map[common.SpanId]common.SpanSlice)
- for i := range spans {
- child := spans[i]
- for j := range child.Parents {
- parent := idMap[child.Parents[j]]
- if parent == nil {
- fmt.Fprintf(os.Stderr, "Can't find parent id %s for %s\n",
- child.Parents[j].String(), child.ToJson())
- } else {
- children := childMap[parent.Id]
- if children == nil {
- children = make(common.SpanSlice, 0)
- }
- children = append(children, child)
- childMap[parent.Id] = children
- }
- }
- }
- w := NewFailureDeferringWriter(writer)
- w.Printf("digraph spans {\n")
- // Write out the nodes with their descriptions.
- for i := range spans {
- w.Printf(fmt.Sprintf(` "%s" [label="%s"];`+"\n",
- spans[i].Id.String(), spans[i].Description))
- }
- // Write out the edges between nodes... the parent/children relationships
- for i := range spans {
- children := childMap[spans[i].Id]
- sort.Sort(children)
- if children != nil {
- for c := range children {
- w.Printf(fmt.Sprintf(` "%s" -> "%s";`+"\n",
- spans[i].Id.String(), children[c].Id))
- }
- }
- }
- w.Printf("}\n")
- return w.Error()
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-core/src/go/src/org/apache/htrace/htrace/graph_test.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/graph_test.go b/htrace-core/src/go/src/org/apache/htrace/htrace/graph_test.go
deleted file mode 100644
index 8698a98..0000000
--- a/htrace-core/src/go/src/org/apache/htrace/htrace/graph_test.go
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "bytes"
- "org/apache/htrace/common"
- "org/apache/htrace/test"
- "testing"
-)
-
-func TestSpansToDot(t *testing.T) {
- TEST_SPANS := common.SpanSlice{
- &common.Span{
- Id: test.SpanId("6af3cc058e5d829d"),
- SpanData: common.SpanData{
- TraceId: test.SpanId("0e4716fe911244de"),
- Begin: 1424813349020,
- End: 1424813349134,
- Description: "newDFSInputStream",
- ProcessId: "FsShell",
- Parents: []common.SpanId{},
- Info: common.TraceInfoMap{
- "path": "/",
- },
- },
- },
- &common.Span{
- Id: test.SpanId("75d16cc5b2c07d8a"),
- SpanData: common.SpanData{
- TraceId: test.SpanId("0e4716fe911244de"),
- Begin: 1424813349025,
- End: 1424813349133,
- Description: "getBlockLocations",
- ProcessId: "FsShell",
- Parents: []common.SpanId{test.SpanId("6af3cc058e5d829d")},
- },
- },
- &common.Span{
- Id: test.SpanId("e2c7273efb280a8c"),
- SpanData: common.SpanData{
- TraceId: test.SpanId("0e4716fe911244de"),
- Begin: 1424813349027,
- End: 1424813349073,
- Description: "ClientNamenodeProtocol#getBlockLocations",
- ProcessId: "FsShell",
- Parents: []common.SpanId{test.SpanId("75d16cc5b2c07d8a")},
- },
- },
- }
- w := bytes.NewBuffer(make([]byte, 0, 2048))
- err := spansToDot(TEST_SPANS, w)
- if err != nil {
- t.Fatalf("spansToDot failed: error %s\n", err.Error())
- }
- EXPECTED_STR := `digraph spans {
- "6af3cc058e5d829d" [label="newDFSInputStream"];
- "75d16cc5b2c07d8a" [label="getBlockLocations"];
- "e2c7273efb280a8c" [label="ClientNamenodeProtocol#getBlockLocations"];
- "6af3cc058e5d829d" -> "75d16cc5b2c07d8a";
- "75d16cc5b2c07d8a" -> "e2c7273efb280a8c";
-}
-`
- if w.String() != EXPECTED_STR {
- t.Fatalf("Expected to get:\n%s\nGot:\n%s\n", EXPECTED_STR, w.String())
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-core/src/go/src/org/apache/htrace/htrace/queries.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/queries.go b/htrace-core/src/go/src/org/apache/htrace/htrace/queries.go
deleted file mode 100644
index 4ff246c..0000000
--- a/htrace-core/src/go/src/org/apache/htrace/htrace/queries.go
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "encoding/json"
- "errors"
- "fmt"
- htrace "org/apache/htrace/client"
- "org/apache/htrace/common"
- "strings"
- "unicode"
-)
-
-// Convert a string into a whitespace-separated sequence of strings.
-func tokenize(str string) []string {
- prevQuote := rune(0)
- f := func(c rune) bool {
- switch {
- case c == prevQuote:
- prevQuote = rune(0)
- return false
- case prevQuote != rune(0):
- return false
- case unicode.In(c, unicode.Quotation_Mark):
- prevQuote = c
- return false
- default:
- return unicode.IsSpace(c)
- }
- }
- return strings.FieldsFunc(str, f)
-}
-
-// Parses a query string in the format of a series of
-// [TYPE] [OPERATOR] [CONST] tuples, joined by AND statements.
-type predicateParser struct {
- tokens []string
- curToken int
-}
-
-func (ps *predicateParser) Parse() (*common.Predicate, error) {
- if ps.curToken > len(ps.tokens) {
- return nil, nil
- }
- if ps.curToken > 0 {
- if strings.ToLower(ps.tokens[ps.curToken]) != "and" {
- return nil, errors.New(fmt.Sprintf("Error parsing on token %d: "+
- "expected predicates to be joined by 'and', but found '%s'",
- ps.curToken, ps.tokens[ps.curToken]))
- }
- ps.curToken++
- if ps.curToken > len(ps.tokens) {
- return nil, errors.New(fmt.Sprintf("Nothing found after 'and' at "+
- "token %d", ps.curToken))
- }
- }
- field := common.Field(ps.tokens[ps.curToken])
- if !field.IsValid() {
- return nil, errors.New(fmt.Sprintf("Invalid field specifier at token %d. "+
- "Can't understand %s. Valid field specifiers are %v", ps.curToken,
- ps.tokens[ps.curToken], common.ValidFields()))
- }
- ps.curToken++
- if ps.curToken > len(ps.tokens) {
- return nil, errors.New(fmt.Sprintf("Nothing found after field specifier "+
- "at token %d", ps.curToken))
- }
- op := common.Op(ps.tokens[ps.curToken])
- if !op.IsValid() {
- return nil, errors.New(fmt.Sprintf("Invalid operation specifier at token %d. "+
- "Can't understand %s. Valid operation specifiers are %v", ps.curToken,
- ps.tokens[ps.curToken], common.ValidOps()))
- }
- ps.curToken++
- if ps.curToken > len(ps.tokens) {
- return nil, errors.New(fmt.Sprintf("Nothing found after field specifier "+
- "at token %d", ps.curToken))
- }
- val := ps.tokens[ps.curToken]
- return &common.Predicate{Op: op, Field: field, Val: val}, nil
-}
-
-func parseQueryString(str string) ([]common.Predicate, error) {
- ps := predicateParser{tokens: tokenize(str)}
- preds := make([]common.Predicate, 0)
- for {
- pred, err := ps.Parse()
- if pred == nil {
- break
- }
- if err != nil {
- return nil, err
- }
- }
- if len(preds) == 0 {
- return nil, errors.New("Empty query string")
- }
- return preds, nil
-}
-
-// Send a query from a query string.
-func doQueryFromString(hcl *htrace.Client, str string, lim int) error {
- query := &common.Query{Lim: lim}
- var err error
- query.Predicates, err = parseQueryString(str)
- if err != nil {
- return err
- }
- return doQuery(hcl, query)
-}
-
-// Send a query from a raw JSON string.
-func doRawQuery(hcl *htrace.Client, str string) error {
- jsonBytes := []byte(str)
- var query common.Query
- err := json.Unmarshal(jsonBytes, &query)
- if err != nil {
- return errors.New(fmt.Sprintf("Error parsing provided JSON: %s\n", err.Error()))
- }
- return doQuery(hcl, &query)
-}
-
-// Send a query.
-func doQuery(hcl *htrace.Client, query *common.Query) error {
- if *verbose {
- qbytes, err := json.Marshal(*query)
- if err != nil {
- qbytes = []byte("marshaling error: " + err.Error())
- }
- fmt.Printf("Sending query: %s\n", string(qbytes))
- }
- spans, err := hcl.Query(query)
- if err != nil {
- return err
- }
- if *verbose {
- fmt.Printf("%d results...\n", len(spans))
- }
- for i := range spans {
- fmt.Printf("%s\n", spans[i].ToJson())
- }
- return nil
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go b/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go
deleted file mode 100644
index 218c1c8..0000000
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "fmt"
- "math/rand"
- htrace "org/apache/htrace/client"
- "org/apache/htrace/common"
- "org/apache/htrace/test"
- "sort"
- "testing"
- "time"
-)
-
-func TestClientGetServerInfo(t *testing.T) {
- htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerInfo",
- DataDirs: make([]string, 2)}
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
- defer ht.Close()
- var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf())
- if err != nil {
- t.Fatalf("failed to create client: %s", err.Error())
- }
- _, err = hcl.GetServerInfo()
- if err != nil {
- t.Fatalf("failed to call GetServerInfo: %s", err.Error())
- }
-}
-
-func createRandomTestSpans(amount int) common.SpanSlice {
- rnd := rand.New(rand.NewSource(2))
- allSpans := make(common.SpanSlice, amount)
- allSpans[0] = test.NewRandomSpan(rnd, allSpans[0:0])
- for i := 1; i < amount; i++ {
- allSpans[i] = test.NewRandomSpan(rnd, allSpans[1:i])
- }
- allSpans[1].SpanData.Parents = []common.SpanId{common.SpanId(allSpans[0].Id)}
- return allSpans
-}
-
-func TestClientOperations(t *testing.T) {
- htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations",
- DataDirs: make([]string, 2)}
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
- defer ht.Close()
- var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf())
- if err != nil {
- t.Fatalf("failed to create client: %s", err.Error())
- }
-
- // Create some random trace spans.
- NUM_TEST_SPANS := 30
- allSpans := createRandomTestSpans(NUM_TEST_SPANS)
-
- // Write half of the spans to htraced via the client.
- err = hcl.WriteSpans(&common.WriteSpansReq{
- Spans: allSpans[0 : NUM_TEST_SPANS/2],
- })
- if err != nil {
- t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2,
- err.Error())
- }
-
- // Look up the first half of the spans. They should be found.
- var span *common.Span
- for i := 0; i < NUM_TEST_SPANS/2; i++ {
- span, err = hcl.FindSpan(allSpans[i].Id)
- if err != nil {
- t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
- }
- common.ExpectSpansEqual(t, allSpans[i], span)
- }
-
- // Look up the second half of the spans. They should not be found.
- for i := NUM_TEST_SPANS / 2; i < NUM_TEST_SPANS; i++ {
- span, err = hcl.FindSpan(allSpans[i].Id)
- if err != nil {
- t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
- }
- if span != nil {
- t.Fatalf("Unexpectedly found a span we never write to "+
- "the server: FindSpan(%d) succeeded\n", i)
- }
- }
-
- // Test FindChildren
- childSpan := allSpans[1]
- parentId := childSpan.Parents[0]
- var children []common.SpanId
- children, err = hcl.FindChildren(parentId, 1)
- if err != nil {
- t.Fatalf("FindChildren(%s) failed: %s\n", parentId, err.Error())
- }
- if len(children) != 1 {
- t.Fatalf("FindChildren(%s) returned an invalid number of "+
- "children: expected %d, got %d\n", parentId, 1, len(children))
- }
- if children[0] != childSpan.Id {
- t.Fatalf("FindChildren(%s) returned an invalid child id: expected %s, "+
- " got %s\n", parentId, childSpan.Id, children[0])
- }
-
- // Test FindChildren on a span that has no children
- childlessSpan := allSpans[NUM_TEST_SPANS/2]
- children, err = hcl.FindChildren(childlessSpan.Id, 10)
- if err != nil {
- t.Fatalf("FindChildren(%d) failed: %s\n", childlessSpan.Id, err.Error())
- }
- if len(children) != 0 {
- t.Fatalf("FindChildren(%d) returned an invalid number of "+
- "children: expected %d, got %d\n", childlessSpan.Id, 0, len(children))
- }
-
- // Test Query
- var query common.Query
- query = common.Query{Lim: 10}
- spans, err := hcl.Query(&query)
- if err != nil {
- t.Fatalf("Query({lim: %d}) failed: %s\n", 10, err.Error())
- }
- if len(spans) != 10 {
- t.Fatalf("Query({lim: %d}) returned an invalid number of "+
- "children: expected %d, got %d\n", 10, 10, len(spans))
- }
-}
-
-func TestDumpAll(t *testing.T) {
- htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll",
- DataDirs: make([]string, 2)}
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
- defer ht.Close()
- var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf())
- if err != nil {
- t.Fatalf("failed to create client: %s", err.Error())
- }
-
- NUM_TEST_SPANS := 100
- allSpans := createRandomTestSpans(NUM_TEST_SPANS)
- sort.Sort(allSpans)
- err = hcl.WriteSpans(&common.WriteSpansReq{
- Spans: allSpans,
- })
- if err != nil {
- t.Fatalf("WriteSpans failed: %s\n", err.Error())
- }
- out := make(chan *common.Span, 50)
- var dumpErr error
- go func() {
- dumpErr = hcl.DumpAll(3, out)
- }()
- var numSpans int
- nextLogTime := time.Now().Add(time.Millisecond * 5)
- for {
- span, channelOpen := <-out
- if !channelOpen {
- break
- }
- common.ExpectSpansEqual(t, allSpans[numSpans], span)
- numSpans++
- if testing.Verbose() {
- now := time.Now()
- if !now.Before(nextLogTime) {
- nextLogTime = now
- nextLogTime = nextLogTime.Add(time.Millisecond * 5)
- fmt.Printf("read back %d span(s)...\n", numSpans)
- }
- }
- }
- if numSpans != len(allSpans) {
- t.Fatalf("expected to read %d spans... but only read %d\n",
- len(allSpans), numSpans)
- }
- if dumpErr != nil {
- t.Fatalf("got dump error %s\n", dumpErr.Error())
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
deleted file mode 100644
index faf23cd..0000000
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go
+++ /dev/null
@@ -1,929 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "bytes"
- "encoding/gob"
- "errors"
- "fmt"
- "github.com/jmhodges/levigo"
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
- "os"
- "strconv"
- "strings"
- "sync/atomic"
-)
-
-//
-// The data store code for HTraced.
-//
-// This code stores the trace spans. We use levelDB here so that we don't have to store everything
-// in memory at all times. The data is sharded across multiple levelDB databases in multiple
-// directories. Normally, these multiple directories will be on multiple disk drives.
-//
-// The main emphasis in the HTraceD data store is on quickly and efficiently storing trace span data
-// coming from many daemons. Durability is not as big a concern as in some data stores, since
-// losing a little bit of trace data if htraced goes down is not critical. We use the "gob" package
-// for serialization. We assume that there will be many more writes than reads.
-//
-// Schema
-// m -> dataStoreVersion
-// s[8-byte-big-endian-sid] -> SpanData
-// b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {}
-// e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {}
-// d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {}
-// p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {}
-//
-// Note that span IDs are unsigned 64-bit numbers.
-// Begin times, end times, and durations are signed 64-bit numbers.
-// In order to get LevelDB to properly compare the signed 64-bit quantities,
-// we flip the highest bit. This way, we can get leveldb to view negative
-// quantities as less than non-negative ones. This also means that we can do
-// all queries using unsigned 64-bit math, rather than having to special-case
-// the signed fields.
-//
-
-const UNKNOWN_LAYOUT_VERSION = 0
-const CURRENT_LAYOUT_VERSION = 2
-
-var EMPTY_BYTE_BUF []byte = []byte{}
-
-const VERSION_KEY = 'v'
-const SPAN_ID_INDEX_PREFIX = 's'
-const BEGIN_TIME_INDEX_PREFIX = 'b'
-const END_TIME_INDEX_PREFIX = 'e'
-const DURATION_INDEX_PREFIX = 'd'
-const PARENT_ID_INDEX_PREFIX = 'p'
-const INVALID_INDEX_PREFIX = 0
-
-type Statistics struct {
- NumSpansWritten uint64
-}
-
-func (stats *Statistics) IncrementWrittenSpans() {
- atomic.AddUint64(&stats.NumSpansWritten, 1)
-}
-
-// Make a copy of the statistics structure, using atomic operations.
-func (stats *Statistics) Copy() *Statistics {
- return &Statistics{
- NumSpansWritten: atomic.LoadUint64(&stats.NumSpansWritten),
- }
-}
-
-// Translate a span id into a leveldb key.
-func makeKey(tag byte, sid uint64) []byte {
- id := uint64(sid)
- return []byte{
- tag,
- byte(0xff & (id >> 56)),
- byte(0xff & (id >> 48)),
- byte(0xff & (id >> 40)),
- byte(0xff & (id >> 32)),
- byte(0xff & (id >> 24)),
- byte(0xff & (id >> 16)),
- byte(0xff & (id >> 8)),
- byte(0xff & (id >> 0)),
- }
-}
-
-func keyToInt(key []byte) uint64 {
- var id uint64
- id = (uint64(key[0]) << 56) |
- (uint64(key[1]) << 48) |
- (uint64(key[2]) << 40) |
- (uint64(key[3]) << 32) |
- (uint64(key[4]) << 24) |
- (uint64(key[5]) << 16) |
- (uint64(key[6]) << 8) |
- (uint64(key[7]) << 0)
- return id
-}
-
-func makeSecondaryKey(tag byte, fir uint64, sec uint64) []byte {
- return []byte{
- tag,
- byte(0xff & (fir >> 56)),
- byte(0xff & (fir >> 48)),
- byte(0xff & (fir >> 40)),
- byte(0xff & (fir >> 32)),
- byte(0xff & (fir >> 24)),
- byte(0xff & (fir >> 16)),
- byte(0xff & (fir >> 8)),
- byte(0xff & (fir >> 0)),
- byte(0xff & (sec >> 56)),
- byte(0xff & (sec >> 48)),
- byte(0xff & (sec >> 40)),
- byte(0xff & (sec >> 32)),
- byte(0xff & (sec >> 24)),
- byte(0xff & (sec >> 16)),
- byte(0xff & (sec >> 8)),
- byte(0xff & (sec >> 0)),
- }
-}
-
-// A single directory containing a levelDB instance.
-type shard struct {
- // The data store that this shard is part of
- store *dataStore
-
- // The LevelDB instance.
- ldb *levigo.DB
-
- // The path to the leveldb directory this shard is managing.
- path string
-
- // Incoming requests to write Spans.
- incoming chan *common.Span
-
- // The channel we will send a bool to when we exit.
- exited chan bool
-}
-
-// Process incoming spans for a shard.
-func (shd *shard) processIncoming() {
- lg := shd.store.lg
- for {
- span := <-shd.incoming
- if span == nil {
- lg.Infof("Shard processor for %s exiting.\n", shd.path)
- shd.exited <- true
- return
- }
- err := shd.writeSpan(span)
- if err != nil {
- lg.Errorf("Shard processor for %s got fatal error %s.\n", shd.path, err.Error())
- } else {
- lg.Tracef("Shard processor for %s wrote span %s.\n", shd.path, span.ToJson())
- }
- }
-}
-
-// Convert a signed 64-bit number into an unsigned 64-bit number. We flip the
-// highest bit, so that negative input values map to unsigned numbers which are
-// less than non-negative input values.
-func s2u64(val int64) uint64 {
- ret := uint64(val)
- ret ^= 0x8000000000000000
- return ret
-}
-
-func (shd *shard) writeSpan(span *common.Span) error {
- batch := levigo.NewWriteBatch()
- defer batch.Close()
-
- // Add SpanData to batch.
- spanDataBuf := new(bytes.Buffer)
- spanDataEnc := gob.NewEncoder(spanDataBuf)
- err := spanDataEnc.Encode(span.SpanData)
- if err != nil {
- return err
- }
- batch.Put(makeKey(SPAN_ID_INDEX_PREFIX, span.Id.Val()), spanDataBuf.Bytes())
-
- // Add this to the parent index.
- for parentIdx := range span.Parents {
- batch.Put(makeSecondaryKey(PARENT_ID_INDEX_PREFIX,
- span.Parents[parentIdx].Val(), span.Id.Val()), EMPTY_BYTE_BUF)
- }
-
- // Add to the other secondary indices.
- batch.Put(makeSecondaryKey(BEGIN_TIME_INDEX_PREFIX, s2u64(span.Begin),
- span.Id.Val()), EMPTY_BYTE_BUF)
- batch.Put(makeSecondaryKey(END_TIME_INDEX_PREFIX, s2u64(span.End),
- span.Id.Val()), EMPTY_BYTE_BUF)
- batch.Put(makeSecondaryKey(DURATION_INDEX_PREFIX, s2u64(span.Duration()),
- span.Id.Val()), EMPTY_BYTE_BUF)
-
- err = shd.ldb.Write(shd.store.writeOpts, batch)
- if err != nil {
- return err
- }
- shd.store.stats.IncrementWrittenSpans()
- if shd.store.WrittenSpans != nil {
- shd.store.WrittenSpans <- span
- }
- return nil
-}
-
-func (shd *shard) FindChildren(sid common.SpanId, childIds []common.SpanId,
- lim int32) ([]common.SpanId, int32, error) {
- searchKey := makeKey('p', sid.Val())
- iter := shd.ldb.NewIterator(shd.store.readOpts)
- defer iter.Close()
- iter.Seek(searchKey)
- for {
- if !iter.Valid() {
- break
- }
- if lim == 0 {
- break
- }
- key := iter.Key()
- if !bytes.HasPrefix(key, searchKey) {
- break
- }
- id := common.SpanId(keyToInt(key[9:]))
- childIds = append(childIds, id)
- lim--
- iter.Next()
- }
- return childIds, lim, nil
-}
-
-// Close a shard.
-func (shd *shard) Close() {
- lg := shd.store.lg
- shd.incoming <- nil
- lg.Infof("Waiting for %s to exit...\n", shd.path)
- if shd.exited != nil {
- <-shd.exited
- }
- shd.ldb.Close()
- lg.Infof("Closed %s...\n", shd.path)
-}
-
-// The Data Store.
-type dataStore struct {
- lg *common.Logger
-
- // The shards which manage our LevelDB instances.
- shards []*shard
-
- // I/O statistics for all shards.
- stats Statistics
-
- // The read options to use for LevelDB.
- readOpts *levigo.ReadOptions
-
- // The write options to use for LevelDB.
- writeOpts *levigo.WriteOptions
-
- // If non-null, a channel we will send spans to once we finish writing them. This is only used
- // for testing.
- WrittenSpans chan *common.Span
-}
-
-func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataStore, error) {
- // Get the configuration.
- clearStored := cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR)
- dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES)
- dirs := strings.Split(dirsStr, conf.PATH_LIST_SEP)
-
- var err error
- lg := common.NewLogger("datastore", cnf)
- store := &dataStore{lg: lg, shards: []*shard{}, WrittenSpans: writtenSpans}
-
- // If we return an error, close the store.
- defer func() {
- if err != nil {
- store.Close()
- store = nil
- }
- }()
-
- store.readOpts = levigo.NewReadOptions()
- store.readOpts.SetFillCache(true)
- store.writeOpts = levigo.NewWriteOptions()
- store.writeOpts.SetSync(false)
-
- // Open all shards
- for idx := range dirs {
- path := dirs[idx] + conf.PATH_SEP + "db"
- var shd *shard
- shd, err = CreateShard(store, cnf, path, clearStored)
- if err != nil {
- lg.Errorf("Error creating shard %s: %s\n", path, err.Error())
- return nil, err
- }
- store.shards = append(store.shards, shd)
- }
- for idx := range store.shards {
- shd := store.shards[idx]
- shd.exited = make(chan bool, 1)
- go shd.processIncoming()
- }
- return store, nil
-}
-
-func CreateShard(store *dataStore, cnf *conf.Config, path string,
- clearStored bool) (*shard, error) {
- lg := store.lg
- if clearStored {
- fi, err := os.Stat(path)
- if err != nil && !os.IsNotExist(err) {
- lg.Errorf("Failed to stat %s: %s\n", path, err.Error())
- return nil, err
- }
- if fi != nil {
- err = os.RemoveAll(path)
- if err != nil {
- lg.Errorf("Failed to clear existing datastore directory %s: %s\n",
- path, err.Error())
- return nil, err
- }
- lg.Infof("Cleared existing datastore directory %s\n", path)
- }
- }
- err := os.MkdirAll(path, 0777)
- if err != nil {
- lg.Errorf("Failed to MkdirAll(%s): %s\n", path, err.Error())
- return nil, err
- }
- var shd *shard
- openOpts := levigo.NewOptions()
- defer openOpts.Close()
- newlyCreated := false
- ldb, err := levigo.Open(path, openOpts)
- if err == nil {
- store.lg.Infof("LevelDB opened %s\n", path)
- } else {
- store.lg.Debugf("LevelDB failed to open %s: %s\n", path, err.Error())
- openOpts.SetCreateIfMissing(true)
- ldb, err = levigo.Open(path, openOpts)
- if err != nil {
- store.lg.Errorf("LevelDB failed to create %s: %s\n", path, err.Error())
- return nil, err
- }
- store.lg.Infof("Created new LevelDB instance in %s\n", path)
- newlyCreated = true
- }
- defer func() {
- if shd == nil {
- ldb.Close()
- }
- }()
- lv, err := readLayoutVersion(store, ldb)
- if err != nil {
- store.lg.Errorf("Got error while reading datastore version for %s: %s\n",
- path, err.Error())
- return nil, err
- }
- if newlyCreated && (lv == UNKNOWN_LAYOUT_VERSION) {
- err = writeDataStoreVersion(store, ldb, CURRENT_LAYOUT_VERSION)
- if err != nil {
- store.lg.Errorf("Got error while writing datastore version for %s: %s\n",
- path, err.Error())
- return nil, err
- }
- store.lg.Tracef("Wrote layout version %d to shard at %s.\n",
- CURRENT_LAYOUT_VERSION, path)
- } else if lv != CURRENT_LAYOUT_VERSION {
- versionName := "unknown"
- if lv != UNKNOWN_LAYOUT_VERSION {
- versionName = fmt.Sprintf("%d", lv)
- }
- store.lg.Errorf("Can't read old datastore. Its layout version is %s, but this "+
- "software is at layout version %d. Please set %s to clear the datastore "+
- "on startup, or clear it manually.\n", versionName,
- CURRENT_LAYOUT_VERSION, conf.HTRACE_DATA_STORE_CLEAR)
- return nil, errors.New(fmt.Sprintf("Invalid layout version: got %s, expected %d.",
- versionName, CURRENT_LAYOUT_VERSION))
- } else {
- store.lg.Tracef("Found layout version %d in %s.\n", lv, path)
- }
- spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE)
- shd = &shard{store: store, ldb: ldb, path: path,
- incoming: make(chan *common.Span, spanBufferSize)}
- return shd, nil
-}
-
-// Read the datastore version of a leveldb instance.
-func readLayoutVersion(store *dataStore, ldb *levigo.DB) (uint32, error) {
- buf, err := ldb.Get(store.readOpts, []byte{VERSION_KEY})
- if err != nil {
- return 0, err
- }
- if len(buf) == 0 {
- return 0, nil
- }
- r := bytes.NewBuffer(buf)
- decoder := gob.NewDecoder(r)
- var v uint32
- err = decoder.Decode(&v)
- if err != nil {
- return 0, err
- }
- return v, nil
-}
-
-// Write the datastore version to a shard.
-func writeDataStoreVersion(store *dataStore, ldb *levigo.DB, v uint32) error {
- w := new(bytes.Buffer)
- encoder := gob.NewEncoder(w)
- err := encoder.Encode(&v)
- if err != nil {
- return err
- }
- return ldb.Put(store.writeOpts, []byte{VERSION_KEY}, w.Bytes())
-}
-
-func (store *dataStore) GetStatistics() *Statistics {
- return store.stats.Copy()
-}
-
-// Close the DataStore.
-func (store *dataStore) Close() {
- for idx := range store.shards {
- store.shards[idx].Close()
- store.shards[idx] = nil
- }
- if store.readOpts != nil {
- store.readOpts.Close()
- store.readOpts = nil
- }
- if store.writeOpts != nil {
- store.writeOpts.Close()
- store.writeOpts = nil
- }
- if store.lg != nil {
- store.lg.Close()
- store.lg = nil
- }
-}
-
-// Get the index of the shard which stores the given spanId.
-func (store *dataStore) getShardIndex(sid common.SpanId) int {
- return int(sid.Val() % uint64(len(store.shards)))
-}
-
-func (store *dataStore) WriteSpan(span *common.Span) {
- store.shards[store.getShardIndex(span.Id)].incoming <- span
-}
-
-func (store *dataStore) FindSpan(sid common.SpanId) *common.Span {
- return store.shards[store.getShardIndex(sid)].FindSpan(sid)
-}
-
-func (shd *shard) FindSpan(sid common.SpanId) *common.Span {
- lg := shd.store.lg
- buf, err := shd.ldb.Get(shd.store.readOpts, makeKey('s', sid.Val()))
- if err != nil {
- if strings.Index(err.Error(), "NotFound:") != -1 {
- return nil
- }
- lg.Warnf("Shard(%s): FindSpan(%016x) error: %s\n",
- shd.path, sid, err.Error())
- return nil
- }
- var span *common.Span
- span, err = shd.decodeSpan(sid, buf)
- if err != nil {
- lg.Errorf("Shard(%s): FindSpan(%016x) decode error: %s\n",
- shd.path, sid, err.Error())
- return nil
- }
- return span
-}
-
-func (shd *shard) decodeSpan(sid common.SpanId, buf []byte) (*common.Span, error) {
- r := bytes.NewBuffer(buf)
- decoder := gob.NewDecoder(r)
- data := common.SpanData{}
- err := decoder.Decode(&data)
- if err != nil {
- return nil, err
- }
- // Gob encoding translates empty slices to nil. Reverse this so that we're always dealing with
- // non-nil slices.
- if data.Parents == nil {
- data.Parents = []common.SpanId{}
- }
- return &common.Span{Id: common.SpanId(sid), SpanData: data}, nil
-}
-
-// Find the children of a given span id.
-func (store *dataStore) FindChildren(sid common.SpanId, lim int32) []common.SpanId {
- childIds := make([]common.SpanId, 0)
- var err error
-
- startIdx := store.getShardIndex(sid)
- idx := startIdx
- numShards := len(store.shards)
- for {
- if lim == 0 {
- break
- }
- shd := store.shards[idx]
- childIds, lim, err = shd.FindChildren(sid, childIds, lim)
- if err != nil {
- store.lg.Errorf("Shard(%s): FindChildren(%016x) error: %s\n",
- shd.path, sid, err.Error())
- }
- idx++
- if idx >= numShards {
- idx = 0
- }
- if idx == startIdx {
- break
- }
- }
- return childIds
-}
-
-type predicateData struct {
- *common.Predicate
- uintKey uint64
- strKey string
-}
-
-func loadPredicateData(pred *common.Predicate) (*predicateData, error) {
- p := predicateData{Predicate: pred}
-
- // Parse the input value given to make sure it matches up with the field
- // type.
- switch pred.Field {
- case common.SPAN_ID:
- // Span IDs are sent as hex strings.
- var id common.SpanId
- if err := id.FromString(pred.Val); err != nil {
- return nil, errors.New(fmt.Sprintf("Unable to parse span id '%s': %s",
- pred.Val, err.Error()))
- }
- p.uintKey = id.Val()
- break
- case common.DESCRIPTION:
- // Any string is valid for a description.
- p.strKey = pred.Val
- break
- case common.BEGIN_TIME, common.END_TIME, common.DURATION:
- // Parse a base-10 signed numeric field.
- v, err := strconv.ParseInt(pred.Val, 10, 64)
- if err != nil {
- return nil, errors.New(fmt.Sprintf("Unable to parse %s '%s': %s",
- pred.Field, pred.Val, err.Error()))
- }
- p.uintKey = s2u64(v)
- break
- default:
- return nil, errors.New(fmt.Sprintf("Unknown field %s", pred.Field))
- }
-
- // Validate the predicate operation.
- switch pred.Op {
- case common.EQUALS, common.LESS_THAN_OR_EQUALS,
- common.GREATER_THAN_OR_EQUALS, common.GREATER_THAN:
- break
- case common.CONTAINS:
- if p.fieldIsNumeric() {
- return nil, errors.New(fmt.Sprintf("Can't use CONTAINS on a "+
- "numeric field like '%s'", pred.Field))
- }
- default:
- return nil, errors.New(fmt.Sprintf("Unknown predicate operation '%s'",
- pred.Op))
- }
-
- return &p, nil
-}
-
-// Get the index prefix for this predicate, or 0 if it is not indexed.
-func (pred *predicateData) getIndexPrefix() byte {
- switch pred.Field {
- case common.SPAN_ID:
- return SPAN_ID_INDEX_PREFIX
- case common.BEGIN_TIME:
- return BEGIN_TIME_INDEX_PREFIX
- case common.END_TIME:
- return END_TIME_INDEX_PREFIX
- case common.DURATION:
- return DURATION_INDEX_PREFIX
- default:
- return INVALID_INDEX_PREFIX
- }
-}
-
-// Returns true if the predicate type is numeric.
-func (pred *predicateData) fieldIsNumeric() bool {
- switch pred.Field {
- case common.SPAN_ID, common.BEGIN_TIME, common.END_TIME, common.DURATION:
- return true
- default:
- return false
- }
-}
-
-// Get the values that this predicate cares about for a given span.
-func (pred *predicateData) extractRelevantSpanData(span *common.Span) (uint64, string) {
- switch pred.Field {
- case common.SPAN_ID:
- return span.Id.Val(), ""
- case common.DESCRIPTION:
- return 0, span.Description
- case common.BEGIN_TIME:
- return s2u64(span.Begin), ""
- case common.END_TIME:
- return s2u64(span.End), ""
- case common.DURATION:
- return s2u64(span.Duration()), ""
- default:
- panic(fmt.Sprintf("Field type %s isn't a 64-bit integer.", pred.Field))
- }
-}
-
-func (pred *predicateData) spanPtrIsBefore(a *common.Span, b *common.Span) bool {
- // nil is after everything.
- if a == nil {
- if b == nil {
- return false
- }
- return false
- } else if b == nil {
- return true
- }
- // Compare the spans according to this predicate.
- aInt, aStr := pred.extractRelevantSpanData(a)
- bInt, bStr := pred.extractRelevantSpanData(b)
- if pred.fieldIsNumeric() {
- if pred.Op.IsDescending() {
- return aInt > bInt
- } else {
- return aInt < bInt
- }
- } else {
- if pred.Op.IsDescending() {
- return aStr > bStr
- } else {
- return aStr < bStr
- }
- }
-}
-
-// Returns true if the predicate is satisfied by the given span.
-func (pred *predicateData) satisfiedBy(span *common.Span) bool {
- intVal, strVal := pred.extractRelevantSpanData(span)
- if pred.fieldIsNumeric() {
- switch pred.Op {
- case common.EQUALS:
- return intVal == pred.uintKey
- case common.LESS_THAN_OR_EQUALS:
- return intVal <= pred.uintKey
- case common.GREATER_THAN_OR_EQUALS:
- return intVal >= pred.uintKey
- case common.GREATER_THAN:
- return intVal > pred.uintKey
- default:
- panic(fmt.Sprintf("unknown Op type %s should have been caught "+
- "during normalization", pred.Op))
- }
- } else {
- switch pred.Op {
- case common.CONTAINS:
- return strings.Contains(strVal, pred.strKey)
- case common.EQUALS:
- return strVal == pred.strKey
- case common.LESS_THAN_OR_EQUALS:
- return strVal <= pred.strKey
- case common.GREATER_THAN_OR_EQUALS:
- return strVal >= pred.strKey
- case common.GREATER_THAN:
- return strVal > pred.strKey
- default:
- panic(fmt.Sprintf("unknown Op type %s should have been caught "+
- "during normalization", pred.Op))
- }
- }
-}
-
-func (pred *predicateData) createSource(store *dataStore) (*source, error) {
- var ret *source
- src := source{store: store,
- pred: pred,
- iters: make([]*levigo.Iterator, 0, len(store.shards)),
- nexts: make([]*common.Span, len(store.shards)),
- numRead: make([]int, len(store.shards)),
- keyPrefix: pred.getIndexPrefix(),
- }
- if src.keyPrefix == INVALID_INDEX_PREFIX {
- return nil, errors.New(fmt.Sprintf("Can't create source from unindexed "+
- "predicate on field %s", pred.Field))
- }
- defer func() {
- if ret == nil {
- src.Close()
- }
- }()
- for shardIdx := range store.shards {
- shd := store.shards[shardIdx]
- src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts))
- }
- searchKey := makeKey(src.keyPrefix, pred.uintKey)
- for i := range src.iters {
- src.iters[i].Seek(searchKey)
- }
- ret = &src
- return ret, nil
-}
-
-// A source of spans.
-type source struct {
- store *dataStore
- pred *predicateData
- iters []*levigo.Iterator
- nexts []*common.Span
- numRead []int
- keyPrefix byte
-}
-
-// Return true if this operation may require skipping the first result we get back from leveldb.
-func mayRequireOneSkip(op common.Op) bool {
- switch op {
- // When dealing with descending predicates, the first span we read might not satisfy
- // the predicate, even though subsequent ones will. This is because the iter.Seek()
- // function "moves the iterator the position of the key given or, if the key doesn't
- // exist, the next key that does exist in the database." So if we're on that "next
- // key" it will not satisfy the predicate, but the keys previous to it might.
- case common.LESS_THAN_OR_EQUALS:
- return true
- // iter.Seek basically takes us to the key which is "greater than or equal to" some
- // value. Since we want greater than (not greater than or equal to) we may have to
- // skip the first key.
- case common.GREATER_THAN:
- return true
- }
- return false
-}
-
-// Fill in the entry in the 'next' array for a specific shard.
-func (src *source) populateNextFromShard(shardIdx int) {
- lg := src.store.lg
- var err error
- iter := src.iters[shardIdx]
- if iter == nil {
- lg.Debugf("Can't populate: No more entries in shard %d\n", shardIdx)
- return // There are no more entries in this shard.
- }
- if src.nexts[shardIdx] != nil {
- lg.Debugf("No need to populate shard %d\n", shardIdx)
- return // We already have a valid entry for this shard.
- }
- for {
- if !iter.Valid() {
- lg.Debugf("Can't populate: Iterator for shard %d is no longer valid.\n", shardIdx)
- break // Can't read past end of DB
- }
- src.numRead[shardIdx]++
- key := iter.Key()
- if !bytes.HasPrefix(key, []byte{src.keyPrefix}) {
- lg.Debugf("Can't populate: Iterator for shard %d does not have prefix %s\n",
- shardIdx, string(src.keyPrefix))
- break // Can't read past end of indexed section
- }
- var span *common.Span
- var sid common.SpanId
- if src.keyPrefix == SPAN_ID_INDEX_PREFIX {
- // The span id maps to the span itself.
- sid = common.SpanId(keyToInt(key[1:]))
- span, err = src.store.shards[shardIdx].decodeSpan(sid, iter.Value())
- if err != nil {
- lg.Debugf("Internal error decoding span %s in shard %d: %s\n",
- sid.String(), shardIdx, err.Error())
- break
- }
- } else {
- // With a secondary index, we have to look up the span by id.
- sid = common.SpanId(keyToInt(key[9:]))
- span = src.store.shards[shardIdx].FindSpan(sid)
- if span == nil {
- lg.Debugf("Internal error rehydrating span %s in shard %d\n",
- sid.String(), shardIdx)
- break
- }
- }
- if src.pred.Op.IsDescending() {
- iter.Prev()
- } else {
- iter.Next()
- }
- if src.pred.satisfiedBy(span) {
- lg.Debugf("Populated valid span %016x from shard %d.\n", sid, shardIdx)
- src.nexts[shardIdx] = span // Found valid entry
- return
- } else {
- lg.Debugf("Span %016x from shard %d does not satisfy the predicate.\n",
- sid, shardIdx)
- if src.numRead[shardIdx] <= 1 && mayRequireOneSkip(src.pred.Op) {
- continue
- }
- // This and subsequent entries don't satisfy predicate
- break
- }
- }
- lg.Debugf("Closing iterator for shard %d.\n", shardIdx)
- iter.Close()
- src.iters[shardIdx] = nil
-}
-
-func (src *source) next() *common.Span {
- for shardIdx := range src.iters {
- src.populateNextFromShard(shardIdx)
- }
- var best *common.Span
- bestIdx := -1
- for shardIdx := range src.iters {
- span := src.nexts[shardIdx]
- if src.pred.spanPtrIsBefore(span, best) {
- best = span
- bestIdx = shardIdx
- }
- }
- if bestIdx >= 0 {
- src.nexts[bestIdx] = nil
- }
- return best
-}
-
-func (src *source) Close() {
- for i := range src.iters {
- if src.iters[i] != nil {
- src.iters[i].Close()
- }
- }
- src.iters = nil
-}
-
-func (store *dataStore) obtainSource(preds *[]*predicateData) (*source, error) {
- // Read spans from the first predicate that is indexed.
- p := *preds
- for i := range p {
- pred := p[i]
- if pred.getIndexPrefix() != INVALID_INDEX_PREFIX {
- *preds = append(p[0:i], p[i+1:]...)
- return pred.createSource(store)
- }
- }
- // If there are no predicates that are indexed, read rows in order of span id.
- spanIdPred := common.Predicate{Op: common.GREATER_THAN_OR_EQUALS,
- Field: common.SPAN_ID,
- Val: "0000000000000000",
- }
- spanIdPredData, err := loadPredicateData(&spanIdPred)
- if err != nil {
- return nil, err
- }
- return spanIdPredData.createSource(store)
-}
-
-func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) {
- lg := store.lg
- // Parse predicate data.
- var err error
- preds := make([]*predicateData, len(query.Predicates))
- for i := range query.Predicates {
- preds[i], err = loadPredicateData(&query.Predicates[i])
- if err != nil {
- return nil, err
- }
- }
- // Get a source of rows.
- var src *source
- src, err = store.obtainSource(&preds)
- if err != nil {
- return nil, err
- }
- defer src.Close()
- lg.Debugf("HandleQuery %s: preds = %s, src = %v\n", query, preds, src)
-
- // Filter the spans through the remaining predicates.
- ret := make([]*common.Span, 0, 32)
- for {
- if len(ret) >= query.Lim {
- break // we hit the result size limit
- }
- span := src.next()
- if span == nil {
- break // the source has no more spans to give
- }
- lg.Debugf("src.next returned span %s\n", span.ToJson())
- satisfied := true
- for predIdx := range preds {
- if !preds[predIdx].satisfiedBy(span) {
- satisfied = false
- break
- }
- }
- if satisfied {
- ret = append(ret, span)
- }
- }
- return ret, nil
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
deleted file mode 100644
index 79a7c4f..0000000
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
+++ /dev/null
@@ -1,445 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "bytes"
- "encoding/json"
- "math/rand"
- htrace "org/apache/htrace/client"
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
- "org/apache/htrace/test"
- "os"
- "sort"
- "strings"
- "testing"
-)
-
-// Test creating and tearing down a datastore.
-func TestCreateDatastore(t *testing.T) {
- htraceBld := &MiniHTracedBuilder{Name: "TestCreateDatastore",
- DataDirs: make([]string, 3)}
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
- defer ht.Close()
-}
-
-var SIMPLE_TEST_SPANS []common.Span = []common.Span{
- common.Span{Id: 1,
- SpanData: common.SpanData{
- Begin: 123,
- End: 456,
- Description: "getFileDescriptors",
- TraceId: 999,
- Parents: []common.SpanId{},
- ProcessId: "firstd",
- }},
- common.Span{Id: 2,
- SpanData: common.SpanData{
- Begin: 125,
- End: 200,
- Description: "openFd",
- TraceId: 999,
- Parents: []common.SpanId{1},
- ProcessId: "secondd",
- }},
- common.Span{Id: 3,
- SpanData: common.SpanData{
- Begin: 200,
- End: 456,
- Description: "passFd",
- TraceId: 999,
- Parents: []common.SpanId{1},
- ProcessId: "thirdd",
- }},
-}
-
-func createSpans(spans []common.Span, store *dataStore) {
- for idx := range spans {
- store.WriteSpan(&spans[idx])
- }
- // Wait the spans to be created
- for i := 0; i < 3; i++ {
- <-store.WrittenSpans
- }
-}
-
-// Test creating a datastore and adding some spans.
-func TestDatastoreWriteAndRead(t *testing.T) {
- t.Parallel()
- htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead",
- WrittenSpans: make(chan *common.Span, 100)}
- ht, err := htraceBld.Build()
- if err != nil {
- panic(err)
- }
- defer ht.Close()
- createSpans(SIMPLE_TEST_SPANS, ht.Store)
- if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
- t.Fatal()
- }
- span := ht.Store.FindSpan(1)
- if span == nil {
- t.Fatal()
- }
- if span.Id != 1 {
- t.Fatal()
- }
- common.ExpectSpansEqual(t, &SIMPLE_TEST_SPANS[0], span)
- children := ht.Store.FindChildren(1, 1)
- if len(children) != 1 {
- t.Fatalf("expected 1 child, but got %d\n", len(children))
- }
- children = ht.Store.FindChildren(1, 2)
- if len(children) != 2 {
- t.Fatalf("expected 2 children, but got %d\n", len(children))
- }
- sort.Sort(common.SpanIdSlice(children))
- if children[0] != 2 {
- t.Fatal()
- }
- if children[1] != 3 {
- t.Fatal()
- }
-}
-
-func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query,
- expectedSpans []common.Span) {
- spans, err := ht.Store.HandleQuery(query)
- if err != nil {
- t.Fatalf("First query failed: %s\n", err.Error())
- }
- expectedBuf := new(bytes.Buffer)
- dec := json.NewEncoder(expectedBuf)
- err = dec.Encode(expectedSpans)
- if err != nil {
- t.Fatalf("Failed to encode expectedSpans to JSON: %s\n", err.Error())
- }
- spansBuf := new(bytes.Buffer)
- dec = json.NewEncoder(spansBuf)
- err = dec.Encode(spans)
- if err != nil {
- t.Fatalf("Failed to encode result spans to JSON: %s\n", err.Error())
- }
- t.Logf("len(spans) = %d, len(expectedSpans) = %d\n", len(spans),
- len(expectedSpans))
- common.ExpectStrEqual(t, string(expectedBuf.Bytes()), string(spansBuf.Bytes()))
-}
-
-// Test queries on the datastore.
-func TestSimpleQuery(t *testing.T) {
- t.Parallel()
- htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery",
- WrittenSpans: make(chan *common.Span, 100)}
- ht, err := htraceBld.Build()
- if err != nil {
- panic(err)
- }
- defer ht.Close()
- createSpans(SIMPLE_TEST_SPANS, ht.Store)
- if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
- t.Fatal()
- }
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.GREATER_THAN_OR_EQUALS,
- Field: common.BEGIN_TIME,
- Val: "125",
- },
- },
- Lim: 5,
- }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
-}
-
-func TestQueries2(t *testing.T) {
- t.Parallel()
- htraceBld := &MiniHTracedBuilder{Name: "TestQueries2",
- WrittenSpans: make(chan *common.Span, 100)}
- ht, err := htraceBld.Build()
- if err != nil {
- panic(err)
- }
- defer ht.Close()
- createSpans(SIMPLE_TEST_SPANS, ht.Store)
- if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
- t.Fatal()
- }
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.LESS_THAN_OR_EQUALS,
- Field: common.BEGIN_TIME,
- Val: "125",
- },
- },
- Lim: 5,
- }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]})
-
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.LESS_THAN_OR_EQUALS,
- Field: common.BEGIN_TIME,
- Val: "125",
- },
- common.Predicate{
- Op: common.EQUALS,
- Field: common.DESCRIPTION,
- Val: "getFileDescriptors",
- },
- },
- Lim: 2,
- }, []common.Span{SIMPLE_TEST_SPANS[0]})
-
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.EQUALS,
- Field: common.DESCRIPTION,
- Val: "getFileDescriptors",
- },
- },
- Lim: 2,
- }, []common.Span{SIMPLE_TEST_SPANS[0]})
-}
-
-func TestQueries3(t *testing.T) {
- t.Parallel()
- htraceBld := &MiniHTracedBuilder{Name: "TestQueries3",
- WrittenSpans: make(chan *common.Span, 100)}
- ht, err := htraceBld.Build()
- if err != nil {
- panic(err)
- }
- defer ht.Close()
- createSpans(SIMPLE_TEST_SPANS, ht.Store)
- if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
- t.Fatal()
- }
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.CONTAINS,
- Field: common.DESCRIPTION,
- Val: "Fd",
- },
- common.Predicate{
- Op: common.GREATER_THAN_OR_EQUALS,
- Field: common.BEGIN_TIME,
- Val: "100",
- },
- },
- Lim: 5,
- }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
-
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.LESS_THAN_OR_EQUALS,
- Field: common.SPAN_ID,
- Val: "0",
- },
- },
- Lim: 200,
- }, []common.Span{})
-
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.LESS_THAN_OR_EQUALS,
- Field: common.SPAN_ID,
- Val: "2",
- },
- },
- Lim: 200,
- }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]})
-}
-
-func TestQueries4(t *testing.T) {
- t.Parallel()
- htraceBld := &MiniHTracedBuilder{Name: "TestQueries4",
- WrittenSpans: make(chan *common.Span, 100)}
- ht, err := htraceBld.Build()
- if err != nil {
- panic(err)
- }
- defer ht.Close()
- createSpans(SIMPLE_TEST_SPANS, ht.Store)
- if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
- t.Fatal()
- }
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.GREATER_THAN,
- Field: common.BEGIN_TIME,
- Val: "125",
- },
- },
- Lim: 5,
- }, []common.Span{SIMPLE_TEST_SPANS[2]})
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.GREATER_THAN_OR_EQUALS,
- Field: common.DESCRIPTION,
- Val: "openFd",
- },
- },
- Lim: 2,
- }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
- testQuery(t, ht, &common.Query{
- Predicates: []common.Predicate{
- common.Predicate{
- Op: common.GREATER_THAN,
- Field: common.DESCRIPTION,
- Val: "openFd",
- },
- },
- Lim: 2,
- }, []common.Span{SIMPLE_TEST_SPANS[2]})
-}
-
-func BenchmarkDatastoreWrites(b *testing.B) {
- htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites",
- WrittenSpans: make(chan *common.Span, b.N)}
- ht, err := htraceBld.Build()
- if err != nil {
- panic(err)
- }
- defer ht.Close()
- rnd := rand.New(rand.NewSource(1))
- allSpans := make([]*common.Span, b.N)
- // Write many random spans.
- for n := 0; n < b.N; n++ {
- span := test.NewRandomSpan(rnd, allSpans[0:n])
- ht.Store.WriteSpan(span)
- allSpans[n] = span
- }
- // Wait for all the spans to be written.
- for n := 0; n < b.N; n++ {
- <-ht.Store.WrittenSpans
- }
- spansWritten := ht.Store.GetStatistics().NumSpansWritten
- if spansWritten < uint64(b.N) {
- b.Fatal("incorrect statistics: expected %d spans to be written, but only got %d",
- b.N, spansWritten)
- }
-}
-
-func TestReloadDataStore(t *testing.T) {
- htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore",
- DataDirs: make([]string, 2), KeepDataDirsOnClose: true}
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
- dataDirs := make([]string, len(ht.DataDirs))
- copy(dataDirs, ht.DataDirs)
- defer func() {
- if ht != nil {
- ht.Close()
- }
- for i := range dataDirs {
- os.RemoveAll(dataDirs[i])
- }
- }()
- var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf())
- if err != nil {
- t.Fatalf("failed to create client: %s", err.Error())
- }
-
- // Create some random trace spans.
- NUM_TEST_SPANS := 5
- allSpans := createRandomTestSpans(NUM_TEST_SPANS)
- err = hcl.WriteSpans(&common.WriteSpansReq{
- Spans: allSpans,
- })
- if err != nil {
- t.Fatalf("WriteSpans failed: %s\n", err.Error())
- }
-
- // Look up the spans we wrote.
- var span *common.Span
- for i := 0; i < NUM_TEST_SPANS; i++ {
- span, err = hcl.FindSpan(allSpans[i].Id)
- if err != nil {
- t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
- }
- common.ExpectSpansEqual(t, allSpans[i], span)
- }
-
- ht.Close()
- ht = nil
-
- htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore2",
- DataDirs: dataDirs, KeepDataDirsOnClose: true}
- ht, err = htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to re-create datastore: %s", err.Error())
- }
- hcl, err = htrace.NewClient(ht.ClientConf())
- if err != nil {
- t.Fatalf("failed to re-create client: %s", err.Error())
- }
-
- // Look up the spans we wrote earlier.
- for i := 0; i < NUM_TEST_SPANS; i++ {
- span, err = hcl.FindSpan(allSpans[i].Id)
- if err != nil {
- t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
- }
- common.ExpectSpansEqual(t, allSpans[i], span)
- }
-
- // Set an old datastore version number.
- for i := range ht.Store.shards {
- shard := ht.Store.shards[i]
- writeDataStoreVersion(ht.Store, shard.ldb, CURRENT_LAYOUT_VERSION-1)
- }
- ht.Close()
- ht = nil
-
- htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore3",
- DataDirs: dataDirs, KeepDataDirsOnClose: true}
- ht, err = htraceBld.Build()
- if err == nil {
- t.Fatalf("expected the datastore to fail to load after setting an " +
- "incorrect version.\n")
- }
- if !strings.Contains(err.Error(), "Invalid layout version") {
- t.Fatal(`expected the loading error to contain "invalid layout version"` + "\n")
- }
-
- // It should work with data.store.clear set.
- htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore4",
- DataDirs: dataDirs, KeepDataDirsOnClose: true,
- Cnf: map[string]string{conf.HTRACE_DATA_STORE_CLEAR: "true"}}
- ht, err = htraceBld.Build()
- if err != nil {
- t.Fatalf("expected the datastore loading to succeed after setting an "+
- "incorrect version. But it failed with error %s\n", err.Error())
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go
deleted file mode 100644
index 9696cbc..0000000
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "bufio"
- "encoding/binary"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "net"
- "net/rpc"
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
-)
-
-// Handles HRPC calls
-type HrpcHandler struct {
- lg *common.Logger
- store *dataStore
-}
-
-// The HRPC server
-type HrpcServer struct {
- *rpc.Server
- hand *HrpcHandler
- listener net.Listener
-}
-
-// Codec which encodes HRPC data via JSON
-type HrpcServerCodec struct {
- rwc io.ReadWriteCloser
- length uint32
-}
-
-func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
- hdr := common.HrpcRequestHeader{}
- err := binary.Read(cdc.rwc, binary.BigEndian, &hdr)
- if err != nil {
- return errors.New(fmt.Sprintf("Error reading header bytes: %s", err.Error()))
- }
- if hdr.Magic != common.HRPC_MAGIC {
- return errors.New(fmt.Sprintf("Invalid request header: expected "+
- "magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic))
- }
- if hdr.Length > common.MAX_HRPC_BODY_LENGTH {
- return errors.New(fmt.Sprintf("Length prefix was too long. Maximum "+
- "length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH, hdr.Length))
- }
- req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
- if req.ServiceMethod == "" {
- return errors.New(fmt.Sprintf("Unknown MethodID code 0x%04x",
- hdr.MethodId))
- }
- req.Seq = hdr.Seq
- cdc.length = hdr.Length
- return nil
-}
-
-func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
- dec := json.NewDecoder(io.LimitReader(cdc.rwc, int64(cdc.length)))
- err := dec.Decode(body)
- if err != nil {
- return errors.New(fmt.Sprintf("Failed to read request body: %s",
- err.Error()))
- }
- return nil
-}
-
-var EMPTY []byte = make([]byte, 0)
-
-func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) error {
- var err error
- buf := EMPTY
- if msg != nil {
- buf, err = json.Marshal(msg)
- if err != nil {
- return errors.New(fmt.Sprintf("Failed to marshal response message: %s",
- err.Error()))
- }
- }
- hdr := common.HrpcResponseHeader{}
- hdr.MethodId = common.HrpcMethodNameToId(resp.ServiceMethod)
- hdr.Seq = resp.Seq
- hdr.ErrLength = uint32(len(resp.Error))
- hdr.Length = uint32(len(buf))
- writer := bufio.NewWriterSize(cdc.rwc, 256)
- err = binary.Write(writer, binary.BigEndian, &hdr)
- if err != nil {
- return errors.New(fmt.Sprintf("Failed to write response header: %s",
- err.Error()))
- }
- if hdr.ErrLength > 0 {
- _, err = io.WriteString(writer, resp.Error)
- if err != nil {
- return errors.New(fmt.Sprintf("Failed to write error string: %s",
- err.Error()))
- }
- }
- if hdr.Length > 0 {
- var length int
- length, err = writer.Write(buf)
- if err != nil {
- return errors.New(fmt.Sprintf("Failed to write response "+
- "message: %s", err.Error()))
- }
- if uint32(length) != hdr.Length {
- return errors.New(fmt.Sprintf("Failed to write all of response "+
- "message: %s", err.Error()))
- }
- }
- err = writer.Flush()
- if err != nil {
- return errors.New(fmt.Sprintf("Failed to write the response bytes: "+
- "%s", err.Error()))
- }
- return nil
-}
-
-func (cdc *HrpcServerCodec) Close() error {
- return cdc.rwc.Close()
-}
-
-func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
- resp *common.WriteSpansResp) (err error) {
- hand.lg.Debugf("hrpc writeSpansHandler: received %d span(s). "+
- "defaultPid = %s\n", len(req.Spans), req.DefaultPid)
- for i := range req.Spans {
- span := req.Spans[i]
- if span.ProcessId == "" {
- span.ProcessId = req.DefaultPid
- }
- hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson())
- hand.store.WriteSpan(span)
- }
- return nil
-}
-
-func CreateHrpcServer(cnf *conf.Config, store *dataStore) (*HrpcServer, error) {
- lg := common.NewLogger("hrpc", cnf)
- hsv := &HrpcServer{
- Server: rpc.NewServer(),
- hand: &HrpcHandler{
- lg: lg,
- store: store,
- },
- }
- var err error
- hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS))
- if err != nil {
- return nil, err
- }
- hsv.Server.Register(hsv.hand)
- go hsv.run()
- lg.Infof("Started HRPC server on %s...\n", hsv.listener.Addr().String())
- return hsv, nil
-}
-
-func (hsv *HrpcServer) run() {
- lg := hsv.hand.lg
- for {
- conn, err := hsv.listener.Accept()
- if err != nil {
- lg.Errorf("HRPC Accept error: %s\n", err.Error())
- continue
- }
- go hsv.ServeCodec(&HrpcServerCodec{
- rwc: conn,
- })
- }
-}
-
-func (hsv *HrpcServer) Addr() net.Addr {
- return hsv.listener.Addr()
-}
-
-func (hsv *HrpcServer) Close() {
- hsv.listener.Close()
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
deleted file mode 100644
index 64da457..0000000
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "encoding/json"
- "fmt"
- "net"
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
- "os"
- "strings"
- "time"
-)
-
-var RELEASE_VERSION string
-var GIT_VERSION string
-
-const USAGE = `htraced: the HTrace server daemon.
-
-htraced receives trace spans sent from HTrace clients. It exposes a REST
-interface which others can query. It also runs a web server with a graphical
-user interface. htraced stores its span data in levelDB files on the local
-disks.
-
-Usage:
---help: this help message
-
--Dk=v: set configuration key 'k' to value 'v'
-For example -Dweb.address=127.0.0.1:8080 sets the web address to localhost,
-port 8080.
-
--Dk: set configuration key 'k' to 'true'
-
-Normally, configuration options should be set in the ` + conf.CONFIG_FILE_NAME + `
-configuration file. We find this file by searching the paths in the
-` + conf.HTRACED_CONF_DIR + `. The command-line options are just an alternate way
-of setting configuration when launching the daemon.
-`
-
-func main() {
- for idx := range os.Args {
- arg := os.Args[idx]
- if strings.HasPrefix(arg, "--h") || strings.HasPrefix(arg, "-h") {
- fmt.Fprintf(os.Stderr, USAGE)
- os.Exit(0)
- }
- }
- cnf := common.LoadApplicationConfig()
- common.InstallSignalHandlers(cnf)
- lg := common.NewLogger("main", cnf)
- defer lg.Close()
- store, err := CreateDataStore(cnf, nil)
- if err != nil {
- lg.Errorf("Error creating datastore: %s\n", err.Error())
- os.Exit(1)
- }
- var rsv *RestServer
- rsv, err = CreateRestServer(cnf, store)
- if err != nil {
- lg.Errorf("Error creating REST server: %s\n", err.Error())
- os.Exit(1)
- }
- var hsv *HrpcServer
- if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
- hsv, err = CreateHrpcServer(cnf, store)
- if err != nil {
- lg.Errorf("Error creating HRPC server: %s\n", err.Error())
- os.Exit(1)
- }
- } else {
- lg.Infof("Not starting HRPC server because no value was given for %s.\n",
- conf.HTRACE_HRPC_ADDRESS)
- }
- naddr := cnf.Get(conf.HTRACE_STARTUP_NOTIFICATION_ADDRESS)
- if naddr != "" {
- notif := StartupNotification{
- HttpAddr: rsv.Addr().String(),
- ProcessId: os.Getpid(),
- }
- if hsv != nil {
- notif.HrpcAddr = hsv.Addr().String()
- }
- err = sendStartupNotification(naddr, ¬if)
- if err != nil {
- fmt.Fprintf(os.Stderr, "Failed to send startup notification: "+
- "%s\n", err.Error())
- os.Exit(1)
- }
- }
- for {
- time.Sleep(time.Duration(10) * time.Hour)
- }
-}
-
-// A startup notification message that we optionally send on startup.
-// Used by unit tests.
-type StartupNotification struct {
- HttpAddr string
- HrpcAddr string
- ProcessId int
-}
-
-func sendStartupNotification(naddr string, notif *StartupNotification) error {
- conn, err := net.Dial("tcp", naddr)
- if err != nil {
- return err
- }
- defer func() {
- if conn != nil {
- conn.Close()
- }
- }()
- var buf []byte
- buf, err = json.Marshal(notif)
- if err != nil {
- return err
- }
- _, err = conn.Write(buf)
- conn.Close()
- conn = nil
- return nil
-}