You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by cm...@apache.org on 2015/04/23 01:08:14 UTC
[20/41] incubator-htrace git commit: HTRACE-154. Move go and web to
htrace-htraced (abe via cmccabe)
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-htraced/src/go/src/org/apache/htrace/conf/config_keys.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/conf/config_keys.go b/htrace-htraced/src/go/src/org/apache/htrace/conf/config_keys.go
new file mode 100644
index 0000000..ccb09e0
--- /dev/null
+++ b/htrace-htraced/src/go/src/org/apache/htrace/conf/config_keys.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package conf
+
+import (
+ "fmt"
+ "os"
+)
+
+//
+// Configuration keys for HTrace.
+//
+
+// The platform-specific path separator. Usually slash.
+var PATH_SEP string = fmt.Sprintf("%c", os.PathSeparator)
+
+// The platform-specific path list separator. Usually colon.
+var PATH_LIST_SEP string = fmt.Sprintf("%c", os.PathListSeparator)
+
+// The name of the XML configuration file to look for.
+const CONFIG_FILE_NAME = "htraced-conf.xml"
+
+// An environment variable containing a list of paths to search for the
+// configuration file in.
+const HTRACED_CONF_DIR = "HTRACED_CONF_DIR"
+
+// The web address to start the REST server on.
+const HTRACE_WEB_ADDRESS = "web.address"
+
+// The default port for the Htrace web address.
+const HTRACE_WEB_ADDRESS_DEFAULT_PORT = 9095
+
+// The web address to start the REST server on.
+const HTRACE_HRPC_ADDRESS = "hrpc.address"
+
+// The default port for the Htrace HRPC address.
+const HTRACE_HRPC_ADDRESS_DEFAULT_PORT = 9075
+
+// The directories to put the data store into. Separated by PATH_LIST_SEP.
+const HTRACE_DATA_STORE_DIRECTORIES = "data.store.directories"
+
+// Boolean key which indicates whether we should clear data on startup.
+const HTRACE_DATA_STORE_CLEAR = "data.store.clear"
+
+// How many writes to buffer before applying backpressure to span senders.
+const HTRACE_DATA_STORE_SPAN_BUFFER_SIZE = "data.store.span.buffer.size"
+
+// Path to put the logs from htrace, or the empty string to use stdout.
+const HTRACE_LOG_PATH = "log.path"
+
+// The log level to use for the logs in htrace.
+const HTRACE_LOG_LEVEL = "log.level"
+
+// A host:port pair to send information to on startup. This is used in unit
+// tests to determine the (random) port of the htraced process that has been
+// started.
+const HTRACE_STARTUP_NOTIFICATION_ADDRESS = "startup.notification.address"
+
+// Default values for HTrace configuration keys.
+var DEFAULTS = map[string]string{
+ HTRACE_WEB_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT),
+ HTRACE_HRPC_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_HRPC_ADDRESS_DEFAULT_PORT),
+ HTRACE_DATA_STORE_DIRECTORIES: PATH_SEP + "tmp" + PATH_SEP + "htrace1" +
+ PATH_LIST_SEP + PATH_SEP + "tmp" + PATH_SEP + "htrace2",
+ HTRACE_DATA_STORE_CLEAR: "false",
+ HTRACE_DATA_STORE_SPAN_BUFFER_SIZE: "100",
+ HTRACE_LOG_PATH: "",
+ HTRACE_LOG_LEVEL: "INFO",
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-htraced/src/go/src/org/apache/htrace/conf/config_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/conf/config_test.go b/htrace-htraced/src/go/src/org/apache/htrace/conf/config_test.go
new file mode 100644
index 0000000..42c1c71
--- /dev/null
+++ b/htrace-htraced/src/go/src/org/apache/htrace/conf/config_test.go
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package conf
+
+import (
+ "bytes"
+ "os"
+ "strings"
+ "testing"
+)
+
+// Test that parsing command-line arguments of the form -Dfoo=bar works.
+func TestParseArgV(t *testing.T) {
+ t.Parallel()
+ argv := []string{"-Dfoo=bar", "-Dbaz=123", "-DsillyMode"}
+ bld := &Builder{Argv: argv}
+ cnf, err := bld.Build()
+ if err != nil {
+ t.Fatal()
+ }
+ if "bar" != cnf.Get("foo") {
+ t.Fatal()
+ }
+ if 123 != cnf.GetInt("baz") {
+ t.Fatal()
+ }
+ if !cnf.GetBool("sillyMode") {
+ t.Fatal()
+ }
+ if cnf.GetBool("otherSillyMode") {
+ t.Fatal()
+ }
+}
+
+// Test that default values work.
+// Defaults are used only when the configuration option is not present or can't be parsed.
+func TestDefaults(t *testing.T) {
+ t.Parallel()
+ argv := []string{"-Dfoo=bar", "-Dbaz=invalidNumber"}
+ defaults := map[string]string{
+ "foo": "notbar",
+ "baz": "456",
+ "foo2": "4611686018427387904",
+ }
+ bld := &Builder{Argv: argv, Defaults: defaults}
+ cnf, err := bld.Build()
+ if err != nil {
+ t.Fatal()
+ }
+ if "bar" != cnf.Get("foo") {
+ t.Fatal()
+ }
+ if 456 != cnf.GetInt("baz") {
+ t.Fatal()
+ }
+ if 4611686018427387904 != cnf.GetInt64("foo2") {
+ t.Fatal()
+ }
+}
+
+// Test that we can parse our XML configuration file.
+func TestXmlConfigurationFile(t *testing.T) {
+ t.Parallel()
+ xml := `
+<?xml version="1.0"?>
+<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>
+<configuration>
+ <property>
+ <name>foo.bar</name>
+ <value>123</value>
+ </property>
+ <property>
+ <name>foo.baz</name>
+ <value>xmlValue</value>
+ </property>
+ <!--<property>
+ <name>commented.out</name>
+ <value>stuff</value>
+ </property>-->
+</configuration>
+`
+ xmlReader := strings.NewReader(xml)
+ argv := []string{"-Dfoo.bar=456"}
+ defaults := map[string]string{
+ "foo.bar": "789",
+ "cmdline.opt": "4611686018427387904",
+ }
+ bld := &Builder{Argv: argv, Defaults: defaults, Reader: xmlReader}
+ cnf, err := bld.Build()
+ if err != nil {
+ t.Fatal()
+ }
+ // The command-line argument takes precedence over the XML and the defaults.
+ if 456 != cnf.GetInt("foo.bar") {
+ t.Fatal()
+ }
+ if "xmlValue" != cnf.Get("foo.baz") {
+ t.Fatalf("foo.baz = %s", cnf.Get("foo.baz"))
+ }
+ if "" != cnf.Get("commented.out") {
+ t.Fatal()
+ }
+ if 4611686018427387904 != cnf.GetInt64("cmdline.opt") {
+ t.Fatal()
+ }
+}
+
+// Test our handling of the HTRACE_CONF_DIR environment variable.
+func TestGetHTracedConfDirs(t *testing.T) {
+ os.Setenv("HTRACED_CONF_DIR", "")
+ dlog := new(bytes.Buffer)
+ dirs := getHTracedConfDirs(dlog)
+ if len(dirs) != 1 || dirs[0] != "." {
+ t.Fatal()
+ }
+ os.Setenv("HTRACED_CONF_DIR", "/foo/bar:/baz")
+ dirs = getHTracedConfDirs(dlog)
+ if len(dirs) != 2 || dirs[0] != "/foo/bar" || dirs[1] != "/baz" {
+ t.Fatal()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-htraced/src/go/src/org/apache/htrace/conf/xml.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/conf/xml.go b/htrace-htraced/src/go/src/org/apache/htrace/conf/xml.go
new file mode 100644
index 0000000..de14bc5
--- /dev/null
+++ b/htrace-htraced/src/go/src/org/apache/htrace/conf/xml.go
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package conf
+
+import (
+ "encoding/xml"
+ "io"
+ "log"
+)
+
+type configuration struct {
+ Properties []propertyXml `xml:"property"`
+}
+
+type propertyXml struct {
+ Name string `xml:"name"`
+ Value string `xml:"value"`
+}
+
+// Parse an XML configuration file.
+func parseXml(reader io.Reader, m map[string]string) error {
+ dec := xml.NewDecoder(reader)
+ configurationXml := configuration{}
+ err := dec.Decode(&configurationXml)
+ if err != nil {
+ return err
+ }
+ props := configurationXml.Properties
+ for p := range props {
+ key := props[p].Name
+ value := props[p].Value
+ if key == "" {
+ log.Println("Warning: ignoring element with missing or empty <name>.")
+ continue
+ }
+ if value == "" {
+ log.Println("Warning: ignoring element with key " + key + " with missing or empty <value>.")
+ continue
+ }
+ //log.Printf("setting %s to %s\n", key, value)
+ m[key] = value
+ }
+ return nil
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-htraced/src/go/src/org/apache/htrace/htrace/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htrace/cmd.go b/htrace-htraced/src/go/src/org/apache/htrace/htrace/cmd.go
new file mode 100644
index 0000000..38cdb58
--- /dev/null
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htrace/cmd.go
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "bufio"
+ "bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "github.com/alecthomas/kingpin"
+ "io"
+ htrace "org/apache/htrace/client"
+ "org/apache/htrace/common"
+ "org/apache/htrace/conf"
+ "os"
+ "time"
+)
+
+var RELEASE_VERSION string
+var GIT_VERSION string
+
+const EXIT_SUCCESS = 0
+const EXIT_FAILURE = 1
+
+var verbose *bool
+
+const USAGE = `The Apache HTrace command-line tool. This tool retrieves and modifies settings and
+other data on a running htraced daemon.
+
+If we find an ` + conf.CONFIG_FILE_NAME + ` configuration file in the list of directories
+specified in ` + conf.HTRACED_CONF_DIR + `, we will use that configuration; otherwise,
+the defaults will be used.
+`
+
+func main() {
+ // Load htraced configuration
+ cnf := common.LoadApplicationConfig()
+
+ // Parse argv
+ app := kingpin.New(os.Args[0], USAGE)
+ app.Flag("Dmy.key", "Set configuration key 'my.key' to 'my.value'. Replace 'my.key' "+
+ "with any key you want to set.").Default("my.value").String()
+ addr := app.Flag("addr", "Server address.").String()
+ verbose = app.Flag("verbose", "Verbose.").Default("false").Bool()
+ version := app.Command("version", "Print the version of this program.")
+ serverInfo := app.Command("serverInfo", "Print information retrieved from an htraced server.")
+ findSpan := app.Command("findSpan", "Print information about a trace span with a given ID.")
+ findSpanId := findSpan.Arg("id", "Span ID to find. Example: 0x123456789abcdef").Required().Uint64()
+ findChildren := app.Command("findChildren", "Print out the span IDs that are children of a given span ID.")
+ parentSpanId := findChildren.Arg("id", "Span ID to print children for. Example: 0x123456789abcdef").
+ Required().Uint64()
+ childLim := findChildren.Flag("lim", "Maximum number of child IDs to print.").Default("20").Int()
+ loadFile := app.Command("loadFile", "Write whitespace-separated JSON spans from a file to the server.")
+ loadFilePath := loadFile.Arg("path",
+ "A file containing whitespace-separated span JSON.").Required().String()
+ loadJson := app.Command("load", "Write JSON spans from the command-line to the server.")
+ loadJsonArg := loadJson.Arg("json", "A JSON span to write to the server.").Required().String()
+ dumpAll := app.Command("dumpAll", "Dump all spans from the htraced daemon.")
+ dumpAllOutPath := dumpAll.Arg("path", "The path to dump the trace spans to.").Default("-").String()
+ dumpAllLim := dumpAll.Flag("lim", "The number of spans to transfer from the server at once.").
+ Default("100").Int()
+ graph := app.Command("graph", "Visualize span JSON as a graph.")
+ graphJsonFile := graph.Arg("input", "The JSON file to load").Required().String()
+ graphDotFile := graph.Flag("output",
+ "The path to write a GraphViz dotfile to. This file can be used as input to "+
+ "GraphViz, in order to generate a pretty picture. See graphviz.org for more "+
+ "information about generating pictures of graphs.").Default("-").String()
+ query := app.Command("query", "Send a query to htraced.")
+ queryLim := query.Flag("lim", "Maximum number of spans to retrieve.").Default("20").Int()
+ queryArg := query.Arg("query", "The query string to send. Query strings have the format "+
+ "[TYPE] [OPERATOR] [CONST], joined by AND statements.").Required().String()
+ rawQuery := app.Command("rawQuery", "Send a raw JSON query to htraced.")
+ rawQueryArg := query.Arg("json", "The query JSON to send.").Required().String()
+ cmd := kingpin.MustParse(app.Parse(os.Args[1:]))
+
+ // Add the command-line settings into the configuration.
+ if *addr != "" {
+ cnf = cnf.Clone(conf.HTRACE_WEB_ADDRESS, *addr)
+ }
+
+ // Handle commands that don't require an HTrace client.
+ switch cmd {
+ case version.FullCommand():
+ os.Exit(printVersion())
+ case graph.FullCommand():
+ err := jsonSpanFileToDotFile(*graphJsonFile, *graphDotFile)
+ if err != nil {
+ fmt.Printf("graphing error: %s\n", err.Error())
+ os.Exit(EXIT_FAILURE)
+ }
+ os.Exit(EXIT_SUCCESS)
+ }
+
+ // Create HTrace client
+ hcl, err := htrace.NewClient(cnf)
+ if err != nil {
+ fmt.Printf("Failed to create HTrace client: %s\n", err.Error())
+ os.Exit(EXIT_FAILURE)
+ }
+
+ // Handle commands that require an HTrace client.
+ switch cmd {
+ case version.FullCommand():
+ os.Exit(printVersion())
+ case serverInfo.FullCommand():
+ os.Exit(printServerInfo(hcl))
+ case findSpan.FullCommand():
+ os.Exit(doFindSpan(hcl, common.SpanId(*findSpanId)))
+ case findChildren.FullCommand():
+ os.Exit(doFindChildren(hcl, common.SpanId(*parentSpanId), *childLim))
+ case loadJson.FullCommand():
+ os.Exit(doLoadSpanJson(hcl, *loadJsonArg))
+ case loadFile.FullCommand():
+ os.Exit(doLoadSpanJsonFile(hcl, *loadFilePath))
+ case dumpAll.FullCommand():
+ err := doDumpAll(hcl, *dumpAllOutPath, *dumpAllLim)
+ if err != nil {
+ fmt.Printf("dumpAll error: %s\n", err.Error())
+ os.Exit(EXIT_FAILURE)
+ }
+ os.Exit(EXIT_SUCCESS)
+ case query.FullCommand():
+ err := doQueryFromString(hcl, *queryArg, *queryLim)
+ if err != nil {
+ fmt.Printf("query error: %s\n", err.Error())
+ os.Exit(EXIT_FAILURE)
+ }
+ os.Exit(EXIT_SUCCESS)
+ case rawQuery.FullCommand():
+ err := doRawQuery(hcl, *rawQueryArg)
+ if err != nil {
+ fmt.Printf("raw query error: %s\n", err.Error())
+ os.Exit(EXIT_FAILURE)
+ }
+ os.Exit(EXIT_SUCCESS)
+ }
+
+ app.UsageErrorf(os.Stderr, "You must supply a command to run.")
+}
+
+// Print the version of the htrace binary.
+func printVersion() int {
+ fmt.Printf("Running htrace command version %s.\n", RELEASE_VERSION)
+ return EXIT_SUCCESS
+}
+
+// Print information retrieved from an htraced server via /server/info
+func printServerInfo(hcl *htrace.Client) int {
+ info, err := hcl.GetServerInfo()
+ if err != nil {
+ fmt.Println(err.Error())
+ return EXIT_FAILURE
+ }
+ fmt.Printf("HTraced server version %s (%s)\n", info.ReleaseVersion, info.GitVersion)
+ return EXIT_SUCCESS
+}
+
+// Print information about a trace span.
+func doFindSpan(hcl *htrace.Client, sid common.SpanId) int {
+ span, err := hcl.FindSpan(sid)
+ if err != nil {
+ fmt.Println(err.Error())
+ return EXIT_FAILURE
+ }
+ if span == nil {
+ fmt.Printf("Span ID not found.\n")
+ return EXIT_FAILURE
+ }
+ pbuf, err := json.MarshalIndent(span, "", " ")
+ if err != nil {
+ fmt.Printf("Error: error pretty-printing span to JSON: %s\n", err.Error())
+ return EXIT_FAILURE
+ }
+ fmt.Printf("%s\n", string(pbuf))
+ return EXIT_SUCCESS
+}
+
+func doLoadSpanJsonFile(hcl *htrace.Client, spanFile string) int {
+ if spanFile == "" {
+ fmt.Printf("You must specify the json file to load.\n")
+ return EXIT_FAILURE
+ }
+ file, err := OpenInputFile(spanFile)
+ if err != nil {
+ fmt.Printf("Failed to open %s: %s\n", spanFile, err.Error())
+ return EXIT_FAILURE
+ }
+ defer file.Close()
+ return doLoadSpans(hcl, bufio.NewReader(file))
+}
+
+func doLoadSpanJson(hcl *htrace.Client, spanJson string) int {
+ return doLoadSpans(hcl, bytes.NewBufferString(spanJson))
+}
+
+func doLoadSpans(hcl *htrace.Client, reader io.Reader) int {
+ dec := json.NewDecoder(reader)
+ spans := make([]*common.Span, 0, 32)
+ var err error
+ for {
+ var span common.Span
+ if err = dec.Decode(&span); err != nil {
+ if err == io.EOF {
+ break
+ }
+ fmt.Printf("Failed to decode JSON: %s\n", err.Error())
+ return EXIT_FAILURE
+ }
+ spans = append(spans, &span)
+ }
+ if *verbose {
+ fmt.Printf("Writing ")
+ prefix := ""
+ for i := range spans {
+ fmt.Printf("%s%s", prefix, spans[i].ToJson())
+ prefix = ", "
+ }
+ fmt.Printf("\n")
+ }
+ err = hcl.WriteSpans(&common.WriteSpansReq{
+ Spans: spans,
+ })
+ if err != nil {
+ fmt.Println(err.Error())
+ return EXIT_FAILURE
+ }
+ return EXIT_SUCCESS
+}
+
+// Find information about the children of a span.
+func doFindChildren(hcl *htrace.Client, sid common.SpanId, lim int) int {
+ spanIds, err := hcl.FindChildren(sid, lim)
+ if err != nil {
+ fmt.Printf("%s\n", err.Error())
+ return EXIT_FAILURE
+ }
+ pbuf, err := json.MarshalIndent(spanIds, "", " ")
+ if err != nil {
+ fmt.Println("Error: error pretty-printing span IDs to JSON: %s", err.Error())
+ return 1
+ }
+ fmt.Printf("%s\n", string(pbuf))
+ return 0
+}
+
+// Dump all spans from the htraced daemon.
+func doDumpAll(hcl *htrace.Client, outPath string, lim int) error {
+ file, err := CreateOutputFile(outPath)
+ if err != nil {
+ return err
+ }
+ w := bufio.NewWriter(file)
+ defer func() {
+ if file != nil {
+ w.Flush()
+ file.Close()
+ }
+ }()
+ out := make(chan *common.Span, 50)
+ var dumpErr error
+ go func() {
+ dumpErr = hcl.DumpAll(lim, out)
+ }()
+ var numSpans int64
+ nextLogTime := time.Now().Add(time.Second * 5)
+ for {
+ span, channelOpen := <-out
+ if !channelOpen {
+ break
+ }
+ if err == nil {
+ _, err = fmt.Fprintf(w, "%s\n", span.ToJson())
+ }
+ if *verbose {
+ numSpans++
+ now := time.Now()
+ if !now.Before(nextLogTime) {
+ nextLogTime = now.Add(time.Second * 5)
+ fmt.Printf("received %d span(s)...\n", numSpans)
+ }
+ }
+ }
+ if err != nil {
+ return errors.New(fmt.Sprintf("Write error %s", err.Error()))
+ }
+ if dumpErr != nil {
+ return errors.New(fmt.Sprintf("Dump error %s", dumpErr.Error()))
+ }
+ err = w.Flush()
+ if err != nil {
+ return err
+ }
+ err = file.Close()
+ file = nil
+ if err != nil {
+ return err
+ }
+ return nil
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-htraced/src/go/src/org/apache/htrace/htrace/file.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htrace/file.go b/htrace-htraced/src/go/src/org/apache/htrace/htrace/file.go
new file mode 100644
index 0000000..ea214be
--- /dev/null
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htrace/file.go
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "bufio"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "org/apache/htrace/common"
+ "os"
+)
+
+// A file used for input.
+// Transparently supports using stdin for input.
+type InputFile struct {
+ *os.File
+ path string
+}
+
+// Open an input file. Stdin will be used when path is -
+func OpenInputFile(path string) (*InputFile, error) {
+ if path == "-" {
+ return &InputFile{File: os.Stdin, path: path}, nil
+ }
+ file, err := os.Open(path)
+ if err != nil {
+ return nil, err
+ }
+ return &InputFile{File: file, path: path}, nil
+}
+
+func (file *InputFile) Close() {
+ if file.path != "-" {
+ file.File.Close()
+ }
+}
+
+// A file used for output.
+// Transparently supports using stdout for output.
+type OutputFile struct {
+ *os.File
+ path string
+}
+
+// Create an output file. Stdout will be used when path is -
+func CreateOutputFile(path string) (*OutputFile, error) {
+ if path == "-" {
+ return &OutputFile{File: os.Stdout, path: path}, nil
+ }
+ file, err := os.Create(path)
+ if err != nil {
+ return nil, err
+ }
+ return &OutputFile{File: file, path: path}, nil
+}
+
+func (file *OutputFile) Close() error {
+ if file.path != "-" {
+ return file.File.Close()
+ }
+ return nil
+}
+
+// FailureDeferringWriter is a writer which allows us to call Printf multiple
+// times and then check if all the printfs succeeded at the very end, rather
+// than checking after each call. We will not attempt to write more data
+// after the first write failure.
+type FailureDeferringWriter struct {
+ io.Writer
+ err error
+}
+
+func NewFailureDeferringWriter(writer io.Writer) *FailureDeferringWriter {
+ return &FailureDeferringWriter{writer, nil}
+}
+
+func (w *FailureDeferringWriter) Printf(format string, v ...interface{}) {
+ if w.err != nil {
+ return
+ }
+ str := fmt.Sprintf(format, v...)
+ _, err := w.Writer.Write([]byte(str))
+ if err != nil {
+ w.err = err
+ }
+}
+
+func (w *FailureDeferringWriter) Error() error {
+ return w.err
+}
+
+// Read a file full of whitespace-separated span JSON into a slice of spans.
+func readSpansFile(path string) (common.SpanSlice, error) {
+ file, err := OpenInputFile(path)
+ if err != nil {
+ return nil, err
+ }
+ defer file.Close()
+ return readSpans(bufio.NewReader(file))
+}
+
+// Read whitespace-separated span JSON into a slice of spans.
+func readSpans(reader io.Reader) (common.SpanSlice, error) {
+ spans := make(common.SpanSlice, 0)
+ dec := json.NewDecoder(reader)
+ for {
+ var span common.Span
+ err := dec.Decode(&span)
+ if err != nil {
+ if err != io.EOF {
+ return nil, errors.New(fmt.Sprintf("Decode error after decoding %d "+
+ "span(s): %s", len(spans), err.Error()))
+ }
+ break
+ }
+ spans = append(spans, &span)
+ }
+ return spans, nil
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-htraced/src/go/src/org/apache/htrace/htrace/file_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htrace/file_test.go b/htrace-htraced/src/go/src/org/apache/htrace/htrace/file_test.go
new file mode 100644
index 0000000..b6f9cac
--- /dev/null
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htrace/file_test.go
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "errors"
+ "io"
+ "io/ioutil"
+ "org/apache/htrace/common"
+ "org/apache/htrace/conf"
+ "org/apache/htrace/test"
+ "os"
+ "strings"
+ "testing"
+)
+
+func TestInputFileAndOutputFile(t *testing.T) {
+ tdir, err := ioutil.TempDir(os.TempDir(), "TestInputFileAndOutputFile")
+ if err != nil {
+ t.Fatalf("failed to create TempDir: %s\n", err.Error())
+ }
+ defer os.RemoveAll(tdir)
+ tpath := tdir + conf.PATH_SEP + "test"
+ var ofile *OutputFile
+ ofile, err = CreateOutputFile(tpath)
+ if err != nil {
+ t.Fatalf("failed to create OutputFile at %s: %s\n", tpath, err.Error())
+ }
+ defer func() {
+ if ofile != nil {
+ ofile.Close()
+ }
+ }()
+ w := NewFailureDeferringWriter(ofile)
+ w.Printf("Hello, world!\n")
+ w.Printf("2 + 2 = %d\n", 4)
+ if w.Error() != nil {
+ t.Fatalf("got unexpected error writing to %s: %s\n", tpath, w.Error().Error())
+ }
+ err = ofile.Close()
+ ofile = nil
+ if err != nil {
+ t.Fatalf("error on closing OutputFile for %s: %s\n", tpath, err.Error())
+ }
+ var ifile *InputFile
+ ifile, err = OpenInputFile(tpath)
+ defer ifile.Close()
+ expected := "Hello, world!\n2 + 2 = 4\n"
+ buf := make([]byte, len(expected))
+ _, err = io.ReadAtLeast(ifile, buf, len(buf))
+ if err != nil {
+ t.Fatalf("unexpected error on reading %s: %s\n", tpath, err.Error())
+ }
+ str := string(buf)
+ if str != expected {
+ t.Fatalf("Could not read back what we wrote to %s.\n"+
+ "Got:\n%s\nExpected:\n%s\n", tpath, str, expected)
+ }
+}
+
+type LimitedBufferWriter struct {
+ buf []byte
+ off int
+}
+
+const LIMITED_BUFFER_MESSAGE = "There isn't enough buffer to go around!"
+
+func (w *LimitedBufferWriter) Write(p []byte) (int, error) {
+ var nwritten int
+ for i := range p {
+ if w.off >= len(w.buf) {
+ return nwritten, errors.New(LIMITED_BUFFER_MESSAGE)
+ }
+ w.buf[w.off] = p[i]
+ w.off = w.off + 1
+ nwritten++
+ }
+ return nwritten, nil
+}
+
+func TestFailureDeferringWriter(t *testing.T) {
+ lw := LimitedBufferWriter{buf: make([]byte, 20), off: 0}
+ w := NewFailureDeferringWriter(&lw)
+ w.Printf("Zippity do dah #%d\n", 1)
+ w.Printf("Zippity do dah #%d\n", 2)
+ if w.Error() == nil {
+ t.Fatalf("expected FailureDeferringWriter to experience a failure due to " +
+ "limited buffer size, but it did not.")
+ }
+ if w.Error().Error() != LIMITED_BUFFER_MESSAGE {
+ t.Fatalf("expected FailureDeferringWriter to have the error message %s, but "+
+ "the message was %s\n", LIMITED_BUFFER_MESSAGE, w.Error().Error())
+ }
+ expected := "Zippity do dah #1\nZi"
+ if string(lw.buf) != expected {
+ t.Fatalf("expected LimitedBufferWriter to contain %s, but it contained %s "+
+ "instead.\n", expected, string(lw.buf))
+ }
+}
+
+func TestReadSpans(t *testing.T) {
+ SPAN_TEST_STR := `{"i":"bdd6d4ee48de59bf","s":"c0681027d3ea4928",` +
+ `"b":1424736225037,"e":1424736225901,"d":"ClientNamenodeProtocol#getFileInfo",` +
+ `"r":"FsShell","p":["60538dfb4df91418"]}
+{"i":"bdd6d4ee48de59bf","s":"60538dfb4df91418","b":1424736224969,` +
+ `"e":1424736225960,"d":"getFileInfo","r":"FsShell","p":[],"n":{"path":"/"}}
+`
+ r := strings.NewReader(SPAN_TEST_STR)
+ spans, err := readSpans(r)
+ if err != nil {
+ t.Fatalf("Failed to read spans from string via readSpans: %s\n", err.Error())
+ }
+ SPAN_TEST_EXPECTED := common.SpanSlice{
+ &common.Span{
+ Id: test.SpanId("c0681027d3ea4928"),
+ SpanData: common.SpanData{
+ TraceId: test.SpanId("bdd6d4ee48de59bf"),
+ Begin: 1424736225037,
+ End: 1424736225901,
+ Description: "ClientNamenodeProtocol#getFileInfo",
+ ProcessId: "FsShell",
+ Parents: []common.SpanId{test.SpanId("60538dfb4df91418")},
+ },
+ },
+ &common.Span{
+ Id: test.SpanId("60538dfb4df91418"),
+ SpanData: common.SpanData{
+ TraceId: test.SpanId("bdd6d4ee48de59bf"),
+ Begin: 1424736224969,
+ End: 1424736225960,
+ Description: "getFileInfo",
+ ProcessId: "FsShell",
+ Parents: []common.SpanId{},
+ Info: common.TraceInfoMap{
+ "path": "/",
+ },
+ },
+ },
+ }
+ if len(spans) != len(SPAN_TEST_EXPECTED) {
+ t.Fatalf("Expected %d spans, but got %d\n",
+ len(SPAN_TEST_EXPECTED), len(spans))
+ }
+ for i := range SPAN_TEST_EXPECTED {
+ common.ExpectSpansEqual(t, spans[i], SPAN_TEST_EXPECTED[i])
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-htraced/src/go/src/org/apache/htrace/htrace/graph.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htrace/graph.go b/htrace-htraced/src/go/src/org/apache/htrace/htrace/graph.go
new file mode 100644
index 0000000..dabf2df
--- /dev/null
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htrace/graph.go
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "bufio"
+ "errors"
+ "fmt"
+ "io"
+ "org/apache/htrace/common"
+ "os"
+ "sort"
+)
+
+// Create a dotfile from a json file.
+func jsonSpanFileToDotFile(jsonFile string, dotFile string) error {
+ spans, err := readSpansFile(jsonFile)
+ if err != nil {
+ return errors.New(fmt.Sprintf("error reading %s: %s",
+ jsonFile, err.Error()))
+ }
+ var file *OutputFile
+ file, err = CreateOutputFile(dotFile)
+ if err != nil {
+ return errors.New(fmt.Sprintf("error opening %s for write: %s",
+ dotFile, err.Error()))
+ }
+ defer func() {
+ if file != nil {
+ file.Close()
+ }
+ }()
+ writer := bufio.NewWriter(file)
+ err = spansToDot(spans, writer)
+ if err != nil {
+ return err
+ }
+ err = writer.Flush()
+ if err != nil {
+ return err
+ }
+ err = file.Close()
+ file = nil
+ return err
+}
+
+// Create output in dotfile format from a set of spans.
+func spansToDot(spans common.SpanSlice, writer io.Writer) error {
+ sort.Sort(spans)
+ idMap := make(map[common.SpanId]*common.Span)
+ for i := range spans {
+ span := spans[i]
+ if idMap[span.Id] != nil {
+ fmt.Fprintf(os.Stderr, "There were multiple spans listed which "+
+ "had ID %s.\nFirst:%s\nOther:%s\n", span.Id.String(),
+ idMap[span.Id].ToJson(), span.ToJson())
+ } else {
+ idMap[span.Id] = span
+ }
+ }
+ childMap := make(map[common.SpanId]common.SpanSlice)
+ for i := range spans {
+ child := spans[i]
+ for j := range child.Parents {
+ parent := idMap[child.Parents[j]]
+ if parent == nil {
+ fmt.Fprintf(os.Stderr, "Can't find parent id %s for %s\n",
+ child.Parents[j].String(), child.ToJson())
+ } else {
+ children := childMap[parent.Id]
+ if children == nil {
+ children = make(common.SpanSlice, 0)
+ }
+ children = append(children, child)
+ childMap[parent.Id] = children
+ }
+ }
+ }
+ w := NewFailureDeferringWriter(writer)
+ w.Printf("digraph spans {\n")
+ // Write out the nodes with their descriptions.
+ for i := range spans {
+ w.Printf(fmt.Sprintf(` "%s" [label="%s"];`+"\n",
+ spans[i].Id.String(), spans[i].Description))
+ }
+ // Write out the edges between nodes... the parent/children relationships
+ for i := range spans {
+ children := childMap[spans[i].Id]
+ sort.Sort(children)
+ if children != nil {
+ for c := range children {
+ w.Printf(fmt.Sprintf(` "%s" -> "%s";`+"\n",
+ spans[i].Id.String(), children[c].Id))
+ }
+ }
+ }
+ w.Printf("}\n")
+ return w.Error()
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-htraced/src/go/src/org/apache/htrace/htrace/graph_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htrace/graph_test.go b/htrace-htraced/src/go/src/org/apache/htrace/htrace/graph_test.go
new file mode 100644
index 0000000..8698a98
--- /dev/null
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htrace/graph_test.go
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "bytes"
+ "org/apache/htrace/common"
+ "org/apache/htrace/test"
+ "testing"
+)
+
+func TestSpansToDot(t *testing.T) {
+ TEST_SPANS := common.SpanSlice{
+ &common.Span{
+ Id: test.SpanId("6af3cc058e5d829d"),
+ SpanData: common.SpanData{
+ TraceId: test.SpanId("0e4716fe911244de"),
+ Begin: 1424813349020,
+ End: 1424813349134,
+ Description: "newDFSInputStream",
+ ProcessId: "FsShell",
+ Parents: []common.SpanId{},
+ Info: common.TraceInfoMap{
+ "path": "/",
+ },
+ },
+ },
+ &common.Span{
+ Id: test.SpanId("75d16cc5b2c07d8a"),
+ SpanData: common.SpanData{
+ TraceId: test.SpanId("0e4716fe911244de"),
+ Begin: 1424813349025,
+ End: 1424813349133,
+ Description: "getBlockLocations",
+ ProcessId: "FsShell",
+ Parents: []common.SpanId{test.SpanId("6af3cc058e5d829d")},
+ },
+ },
+ &common.Span{
+ Id: test.SpanId("e2c7273efb280a8c"),
+ SpanData: common.SpanData{
+ TraceId: test.SpanId("0e4716fe911244de"),
+ Begin: 1424813349027,
+ End: 1424813349073,
+ Description: "ClientNamenodeProtocol#getBlockLocations",
+ ProcessId: "FsShell",
+ Parents: []common.SpanId{test.SpanId("75d16cc5b2c07d8a")},
+ },
+ },
+ }
+ w := bytes.NewBuffer(make([]byte, 0, 2048))
+ err := spansToDot(TEST_SPANS, w)
+ if err != nil {
+ t.Fatalf("spansToDot failed: error %s\n", err.Error())
+ }
+ EXPECTED_STR := `digraph spans {
+ "6af3cc058e5d829d" [label="newDFSInputStream"];
+ "75d16cc5b2c07d8a" [label="getBlockLocations"];
+ "e2c7273efb280a8c" [label="ClientNamenodeProtocol#getBlockLocations"];
+ "6af3cc058e5d829d" -> "75d16cc5b2c07d8a";
+ "75d16cc5b2c07d8a" -> "e2c7273efb280a8c";
+}
+`
+ if w.String() != EXPECTED_STR {
+ t.Fatalf("Expected to get:\n%s\nGot:\n%s\n", EXPECTED_STR, w.String())
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-htraced/src/go/src/org/apache/htrace/htrace/queries.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htrace/queries.go b/htrace-htraced/src/go/src/org/apache/htrace/htrace/queries.go
new file mode 100644
index 0000000..4ff246c
--- /dev/null
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htrace/queries.go
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ htrace "org/apache/htrace/client"
+ "org/apache/htrace/common"
+ "strings"
+ "unicode"
+)
+
+// Convert a string into a whitespace-separated sequence of strings.
+func tokenize(str string) []string {
+ prevQuote := rune(0)
+ f := func(c rune) bool {
+ switch {
+ case c == prevQuote:
+ prevQuote = rune(0)
+ return false
+ case prevQuote != rune(0):
+ return false
+ case unicode.In(c, unicode.Quotation_Mark):
+ prevQuote = c
+ return false
+ default:
+ return unicode.IsSpace(c)
+ }
+ }
+ return strings.FieldsFunc(str, f)
+}
+
+// Parses a query string in the format of a series of
+// [TYPE] [OPERATOR] [CONST] tuples, joined by AND statements.
+type predicateParser struct {
+ tokens []string
+ curToken int
+}
+
+func (ps *predicateParser) Parse() (*common.Predicate, error) {
+ if ps.curToken > len(ps.tokens) {
+ return nil, nil
+ }
+ if ps.curToken > 0 {
+ if strings.ToLower(ps.tokens[ps.curToken]) != "and" {
+ return nil, errors.New(fmt.Sprintf("Error parsing on token %d: "+
+ "expected predicates to be joined by 'and', but found '%s'",
+ ps.curToken, ps.tokens[ps.curToken]))
+ }
+ ps.curToken++
+ if ps.curToken > len(ps.tokens) {
+ return nil, errors.New(fmt.Sprintf("Nothing found after 'and' at "+
+ "token %d", ps.curToken))
+ }
+ }
+ field := common.Field(ps.tokens[ps.curToken])
+ if !field.IsValid() {
+ return nil, errors.New(fmt.Sprintf("Invalid field specifier at token %d. "+
+ "Can't understand %s. Valid field specifiers are %v", ps.curToken,
+ ps.tokens[ps.curToken], common.ValidFields()))
+ }
+ ps.curToken++
+ if ps.curToken > len(ps.tokens) {
+ return nil, errors.New(fmt.Sprintf("Nothing found after field specifier "+
+ "at token %d", ps.curToken))
+ }
+ op := common.Op(ps.tokens[ps.curToken])
+ if !op.IsValid() {
+ return nil, errors.New(fmt.Sprintf("Invalid operation specifier at token %d. "+
+ "Can't understand %s. Valid operation specifiers are %v", ps.curToken,
+ ps.tokens[ps.curToken], common.ValidOps()))
+ }
+ ps.curToken++
+ if ps.curToken > len(ps.tokens) {
+ return nil, errors.New(fmt.Sprintf("Nothing found after field specifier "+
+ "at token %d", ps.curToken))
+ }
+ val := ps.tokens[ps.curToken]
+ return &common.Predicate{Op: op, Field: field, Val: val}, nil
+}
+
+func parseQueryString(str string) ([]common.Predicate, error) {
+ ps := predicateParser{tokens: tokenize(str)}
+ preds := make([]common.Predicate, 0)
+ for {
+ pred, err := ps.Parse()
+ if pred == nil {
+ break
+ }
+ if err != nil {
+ return nil, err
+ }
+ }
+ if len(preds) == 0 {
+ return nil, errors.New("Empty query string")
+ }
+ return preds, nil
+}
+
+// Send a query from a query string.
+func doQueryFromString(hcl *htrace.Client, str string, lim int) error {
+ query := &common.Query{Lim: lim}
+ var err error
+ query.Predicates, err = parseQueryString(str)
+ if err != nil {
+ return err
+ }
+ return doQuery(hcl, query)
+}
+
+// Send a query from a raw JSON string.
+func doRawQuery(hcl *htrace.Client, str string) error {
+ jsonBytes := []byte(str)
+ var query common.Query
+ err := json.Unmarshal(jsonBytes, &query)
+ if err != nil {
+ return errors.New(fmt.Sprintf("Error parsing provided JSON: %s\n", err.Error()))
+ }
+ return doQuery(hcl, &query)
+}
+
+// Send a query.
+func doQuery(hcl *htrace.Client, query *common.Query) error {
+ if *verbose {
+ qbytes, err := json.Marshal(*query)
+ if err != nil {
+ qbytes = []byte("marshaling error: " + err.Error())
+ }
+ fmt.Printf("Sending query: %s\n", string(qbytes))
+ }
+ spans, err := hcl.Query(query)
+ if err != nil {
+ return err
+ }
+ if *verbose {
+ fmt.Printf("%d results...\n", len(spans))
+ }
+ for i := range spans {
+ fmt.Printf("%s\n", spans[i].ToJson())
+ }
+ return nil
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-htraced/src/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/src/go/src/org/apache/htrace/htraced/client_test.go
new file mode 100644
index 0000000..218c1c8
--- /dev/null
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htraced/client_test.go
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "fmt"
+ "math/rand"
+ htrace "org/apache/htrace/client"
+ "org/apache/htrace/common"
+ "org/apache/htrace/test"
+ "sort"
+ "testing"
+ "time"
+)
+
+func TestClientGetServerInfo(t *testing.T) {
+ htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerInfo",
+ DataDirs: make([]string, 2)}
+ ht, err := htraceBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create datastore: %s", err.Error())
+ }
+ defer ht.Close()
+ var hcl *htrace.Client
+ hcl, err = htrace.NewClient(ht.ClientConf())
+ if err != nil {
+ t.Fatalf("failed to create client: %s", err.Error())
+ }
+ _, err = hcl.GetServerInfo()
+ if err != nil {
+ t.Fatalf("failed to call GetServerInfo: %s", err.Error())
+ }
+}
+
+func createRandomTestSpans(amount int) common.SpanSlice {
+ rnd := rand.New(rand.NewSource(2))
+ allSpans := make(common.SpanSlice, amount)
+ allSpans[0] = test.NewRandomSpan(rnd, allSpans[0:0])
+ for i := 1; i < amount; i++ {
+ allSpans[i] = test.NewRandomSpan(rnd, allSpans[1:i])
+ }
+ allSpans[1].SpanData.Parents = []common.SpanId{common.SpanId(allSpans[0].Id)}
+ return allSpans
+}
+
+func TestClientOperations(t *testing.T) {
+ htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations",
+ DataDirs: make([]string, 2)}
+ ht, err := htraceBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create datastore: %s", err.Error())
+ }
+ defer ht.Close()
+ var hcl *htrace.Client
+ hcl, err = htrace.NewClient(ht.ClientConf())
+ if err != nil {
+ t.Fatalf("failed to create client: %s", err.Error())
+ }
+
+ // Create some random trace spans.
+ NUM_TEST_SPANS := 30
+ allSpans := createRandomTestSpans(NUM_TEST_SPANS)
+
+ // Write half of the spans to htraced via the client.
+ err = hcl.WriteSpans(&common.WriteSpansReq{
+ Spans: allSpans[0 : NUM_TEST_SPANS/2],
+ })
+ if err != nil {
+ t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2,
+ err.Error())
+ }
+
+ // Look up the first half of the spans. They should be found.
+ var span *common.Span
+ for i := 0; i < NUM_TEST_SPANS/2; i++ {
+ span, err = hcl.FindSpan(allSpans[i].Id)
+ if err != nil {
+ t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
+ }
+ common.ExpectSpansEqual(t, allSpans[i], span)
+ }
+
+ // Look up the second half of the spans. They should not be found.
+ for i := NUM_TEST_SPANS / 2; i < NUM_TEST_SPANS; i++ {
+ span, err = hcl.FindSpan(allSpans[i].Id)
+ if err != nil {
+ t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
+ }
+ if span != nil {
+ t.Fatalf("Unexpectedly found a span we never write to "+
+ "the server: FindSpan(%d) succeeded\n", i)
+ }
+ }
+
+ // Test FindChildren
+ childSpan := allSpans[1]
+ parentId := childSpan.Parents[0]
+ var children []common.SpanId
+ children, err = hcl.FindChildren(parentId, 1)
+ if err != nil {
+ t.Fatalf("FindChildren(%s) failed: %s\n", parentId, err.Error())
+ }
+ if len(children) != 1 {
+ t.Fatalf("FindChildren(%s) returned an invalid number of "+
+ "children: expected %d, got %d\n", parentId, 1, len(children))
+ }
+ if children[0] != childSpan.Id {
+ t.Fatalf("FindChildren(%s) returned an invalid child id: expected %s, "+
+ " got %s\n", parentId, childSpan.Id, children[0])
+ }
+
+ // Test FindChildren on a span that has no children
+ childlessSpan := allSpans[NUM_TEST_SPANS/2]
+ children, err = hcl.FindChildren(childlessSpan.Id, 10)
+ if err != nil {
+ t.Fatalf("FindChildren(%d) failed: %s\n", childlessSpan.Id, err.Error())
+ }
+ if len(children) != 0 {
+ t.Fatalf("FindChildren(%d) returned an invalid number of "+
+ "children: expected %d, got %d\n", childlessSpan.Id, 0, len(children))
+ }
+
+ // Test Query
+ var query common.Query
+ query = common.Query{Lim: 10}
+ spans, err := hcl.Query(&query)
+ if err != nil {
+ t.Fatalf("Query({lim: %d}) failed: %s\n", 10, err.Error())
+ }
+ if len(spans) != 10 {
+ t.Fatalf("Query({lim: %d}) returned an invalid number of "+
+ "children: expected %d, got %d\n", 10, 10, len(spans))
+ }
+}
+
+func TestDumpAll(t *testing.T) {
+ htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll",
+ DataDirs: make([]string, 2)}
+ ht, err := htraceBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create datastore: %s", err.Error())
+ }
+ defer ht.Close()
+ var hcl *htrace.Client
+ hcl, err = htrace.NewClient(ht.ClientConf())
+ if err != nil {
+ t.Fatalf("failed to create client: %s", err.Error())
+ }
+
+ NUM_TEST_SPANS := 100
+ allSpans := createRandomTestSpans(NUM_TEST_SPANS)
+ sort.Sort(allSpans)
+ err = hcl.WriteSpans(&common.WriteSpansReq{
+ Spans: allSpans,
+ })
+ if err != nil {
+ t.Fatalf("WriteSpans failed: %s\n", err.Error())
+ }
+ out := make(chan *common.Span, 50)
+ var dumpErr error
+ go func() {
+ dumpErr = hcl.DumpAll(3, out)
+ }()
+ var numSpans int
+ nextLogTime := time.Now().Add(time.Millisecond * 5)
+ for {
+ span, channelOpen := <-out
+ if !channelOpen {
+ break
+ }
+ common.ExpectSpansEqual(t, allSpans[numSpans], span)
+ numSpans++
+ if testing.Verbose() {
+ now := time.Now()
+ if !now.Before(nextLogTime) {
+ nextLogTime = now
+ nextLogTime = nextLogTime.Add(time.Millisecond * 5)
+ fmt.Printf("read back %d span(s)...\n", numSpans)
+ }
+ }
+ }
+ if numSpans != len(allSpans) {
+ t.Fatalf("expected to read %d spans... but only read %d\n",
+ len(allSpans), numSpans)
+ }
+ if dumpErr != nil {
+ t.Fatalf("got dump error %s\n", dumpErr.Error())
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go
new file mode 100644
index 0000000..faf23cd
--- /dev/null
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore.go
@@ -0,0 +1,929 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "bytes"
+ "encoding/gob"
+ "errors"
+ "fmt"
+ "github.com/jmhodges/levigo"
+ "org/apache/htrace/common"
+ "org/apache/htrace/conf"
+ "os"
+ "strconv"
+ "strings"
+ "sync/atomic"
+)
+
+//
+// The data store code for HTraced.
+//
+// This code stores the trace spans. We use levelDB here so that we don't have to store everything
+// in memory at all times. The data is sharded across multiple levelDB databases in multiple
+// directories. Normally, these multiple directories will be on multiple disk drives.
+//
+// The main emphasis in the HTraceD data store is on quickly and efficiently storing trace span data
+// coming from many daemons. Durability is not as big a concern as in some data stores, since
+// losing a little bit of trace data if htraced goes down is not critical. We use the "gob" package
+// for serialization. We assume that there will be many more writes than reads.
+//
+// Schema
+// m -> dataStoreVersion
+// s[8-byte-big-endian-sid] -> SpanData
+// b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {}
+// e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {}
+// d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {}
+// p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {}
+//
+// Note that span IDs are unsigned 64-bit numbers.
+// Begin times, end times, and durations are signed 64-bit numbers.
+// In order to get LevelDB to properly compare the signed 64-bit quantities,
+// we flip the highest bit. This way, we can get leveldb to view negative
+// quantities as less than non-negative ones. This also means that we can do
+// all queries using unsigned 64-bit math, rather than having to special-case
+// the signed fields.
+//
+
+const UNKNOWN_LAYOUT_VERSION = 0
+const CURRENT_LAYOUT_VERSION = 2
+
+var EMPTY_BYTE_BUF []byte = []byte{}
+
+const VERSION_KEY = 'v'
+const SPAN_ID_INDEX_PREFIX = 's'
+const BEGIN_TIME_INDEX_PREFIX = 'b'
+const END_TIME_INDEX_PREFIX = 'e'
+const DURATION_INDEX_PREFIX = 'd'
+const PARENT_ID_INDEX_PREFIX = 'p'
+const INVALID_INDEX_PREFIX = 0
+
+type Statistics struct {
+ NumSpansWritten uint64
+}
+
+func (stats *Statistics) IncrementWrittenSpans() {
+ atomic.AddUint64(&stats.NumSpansWritten, 1)
+}
+
+// Make a copy of the statistics structure, using atomic operations.
+func (stats *Statistics) Copy() *Statistics {
+ return &Statistics{
+ NumSpansWritten: atomic.LoadUint64(&stats.NumSpansWritten),
+ }
+}
+
+// Translate a span id into a leveldb key.
+func makeKey(tag byte, sid uint64) []byte {
+ id := uint64(sid)
+ return []byte{
+ tag,
+ byte(0xff & (id >> 56)),
+ byte(0xff & (id >> 48)),
+ byte(0xff & (id >> 40)),
+ byte(0xff & (id >> 32)),
+ byte(0xff & (id >> 24)),
+ byte(0xff & (id >> 16)),
+ byte(0xff & (id >> 8)),
+ byte(0xff & (id >> 0)),
+ }
+}
+
+func keyToInt(key []byte) uint64 {
+ var id uint64
+ id = (uint64(key[0]) << 56) |
+ (uint64(key[1]) << 48) |
+ (uint64(key[2]) << 40) |
+ (uint64(key[3]) << 32) |
+ (uint64(key[4]) << 24) |
+ (uint64(key[5]) << 16) |
+ (uint64(key[6]) << 8) |
+ (uint64(key[7]) << 0)
+ return id
+}
+
+func makeSecondaryKey(tag byte, fir uint64, sec uint64) []byte {
+ return []byte{
+ tag,
+ byte(0xff & (fir >> 56)),
+ byte(0xff & (fir >> 48)),
+ byte(0xff & (fir >> 40)),
+ byte(0xff & (fir >> 32)),
+ byte(0xff & (fir >> 24)),
+ byte(0xff & (fir >> 16)),
+ byte(0xff & (fir >> 8)),
+ byte(0xff & (fir >> 0)),
+ byte(0xff & (sec >> 56)),
+ byte(0xff & (sec >> 48)),
+ byte(0xff & (sec >> 40)),
+ byte(0xff & (sec >> 32)),
+ byte(0xff & (sec >> 24)),
+ byte(0xff & (sec >> 16)),
+ byte(0xff & (sec >> 8)),
+ byte(0xff & (sec >> 0)),
+ }
+}
+
+// A single directory containing a levelDB instance.
+type shard struct {
+ // The data store that this shard is part of
+ store *dataStore
+
+ // The LevelDB instance.
+ ldb *levigo.DB
+
+ // The path to the leveldb directory this shard is managing.
+ path string
+
+ // Incoming requests to write Spans.
+ incoming chan *common.Span
+
+ // The channel we will send a bool to when we exit.
+ exited chan bool
+}
+
+// Process incoming spans for a shard.
+func (shd *shard) processIncoming() {
+ lg := shd.store.lg
+ for {
+ span := <-shd.incoming
+ if span == nil {
+ lg.Infof("Shard processor for %s exiting.\n", shd.path)
+ shd.exited <- true
+ return
+ }
+ err := shd.writeSpan(span)
+ if err != nil {
+ lg.Errorf("Shard processor for %s got fatal error %s.\n", shd.path, err.Error())
+ } else {
+ lg.Tracef("Shard processor for %s wrote span %s.\n", shd.path, span.ToJson())
+ }
+ }
+}
+
+// Convert a signed 64-bit number into an unsigned 64-bit number. We flip the
+// highest bit, so that negative input values map to unsigned numbers which are
+// less than non-negative input values.
+func s2u64(val int64) uint64 {
+ ret := uint64(val)
+ ret ^= 0x8000000000000000
+ return ret
+}
+
+func (shd *shard) writeSpan(span *common.Span) error {
+ batch := levigo.NewWriteBatch()
+ defer batch.Close()
+
+ // Add SpanData to batch.
+ spanDataBuf := new(bytes.Buffer)
+ spanDataEnc := gob.NewEncoder(spanDataBuf)
+ err := spanDataEnc.Encode(span.SpanData)
+ if err != nil {
+ return err
+ }
+ batch.Put(makeKey(SPAN_ID_INDEX_PREFIX, span.Id.Val()), spanDataBuf.Bytes())
+
+ // Add this to the parent index.
+ for parentIdx := range span.Parents {
+ batch.Put(makeSecondaryKey(PARENT_ID_INDEX_PREFIX,
+ span.Parents[parentIdx].Val(), span.Id.Val()), EMPTY_BYTE_BUF)
+ }
+
+ // Add to the other secondary indices.
+ batch.Put(makeSecondaryKey(BEGIN_TIME_INDEX_PREFIX, s2u64(span.Begin),
+ span.Id.Val()), EMPTY_BYTE_BUF)
+ batch.Put(makeSecondaryKey(END_TIME_INDEX_PREFIX, s2u64(span.End),
+ span.Id.Val()), EMPTY_BYTE_BUF)
+ batch.Put(makeSecondaryKey(DURATION_INDEX_PREFIX, s2u64(span.Duration()),
+ span.Id.Val()), EMPTY_BYTE_BUF)
+
+ err = shd.ldb.Write(shd.store.writeOpts, batch)
+ if err != nil {
+ return err
+ }
+ shd.store.stats.IncrementWrittenSpans()
+ if shd.store.WrittenSpans != nil {
+ shd.store.WrittenSpans <- span
+ }
+ return nil
+}
+
+func (shd *shard) FindChildren(sid common.SpanId, childIds []common.SpanId,
+ lim int32) ([]common.SpanId, int32, error) {
+ searchKey := makeKey('p', sid.Val())
+ iter := shd.ldb.NewIterator(shd.store.readOpts)
+ defer iter.Close()
+ iter.Seek(searchKey)
+ for {
+ if !iter.Valid() {
+ break
+ }
+ if lim == 0 {
+ break
+ }
+ key := iter.Key()
+ if !bytes.HasPrefix(key, searchKey) {
+ break
+ }
+ id := common.SpanId(keyToInt(key[9:]))
+ childIds = append(childIds, id)
+ lim--
+ iter.Next()
+ }
+ return childIds, lim, nil
+}
+
+// Close a shard.
+func (shd *shard) Close() {
+ lg := shd.store.lg
+ shd.incoming <- nil
+ lg.Infof("Waiting for %s to exit...\n", shd.path)
+ if shd.exited != nil {
+ <-shd.exited
+ }
+ shd.ldb.Close()
+ lg.Infof("Closed %s...\n", shd.path)
+}
+
+// The Data Store.
+type dataStore struct {
+ lg *common.Logger
+
+ // The shards which manage our LevelDB instances.
+ shards []*shard
+
+ // I/O statistics for all shards.
+ stats Statistics
+
+ // The read options to use for LevelDB.
+ readOpts *levigo.ReadOptions
+
+ // The write options to use for LevelDB.
+ writeOpts *levigo.WriteOptions
+
+ // If non-null, a channel we will send spans to once we finish writing them. This is only used
+ // for testing.
+ WrittenSpans chan *common.Span
+}
+
+func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataStore, error) {
+ // Get the configuration.
+ clearStored := cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR)
+ dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES)
+ dirs := strings.Split(dirsStr, conf.PATH_LIST_SEP)
+
+ var err error
+ lg := common.NewLogger("datastore", cnf)
+ store := &dataStore{lg: lg, shards: []*shard{}, WrittenSpans: writtenSpans}
+
+ // If we return an error, close the store.
+ defer func() {
+ if err != nil {
+ store.Close()
+ store = nil
+ }
+ }()
+
+ store.readOpts = levigo.NewReadOptions()
+ store.readOpts.SetFillCache(true)
+ store.writeOpts = levigo.NewWriteOptions()
+ store.writeOpts.SetSync(false)
+
+ // Open all shards
+ for idx := range dirs {
+ path := dirs[idx] + conf.PATH_SEP + "db"
+ var shd *shard
+ shd, err = CreateShard(store, cnf, path, clearStored)
+ if err != nil {
+ lg.Errorf("Error creating shard %s: %s\n", path, err.Error())
+ return nil, err
+ }
+ store.shards = append(store.shards, shd)
+ }
+ for idx := range store.shards {
+ shd := store.shards[idx]
+ shd.exited = make(chan bool, 1)
+ go shd.processIncoming()
+ }
+ return store, nil
+}
+
+func CreateShard(store *dataStore, cnf *conf.Config, path string,
+ clearStored bool) (*shard, error) {
+ lg := store.lg
+ if clearStored {
+ fi, err := os.Stat(path)
+ if err != nil && !os.IsNotExist(err) {
+ lg.Errorf("Failed to stat %s: %s\n", path, err.Error())
+ return nil, err
+ }
+ if fi != nil {
+ err = os.RemoveAll(path)
+ if err != nil {
+ lg.Errorf("Failed to clear existing datastore directory %s: %s\n",
+ path, err.Error())
+ return nil, err
+ }
+ lg.Infof("Cleared existing datastore directory %s\n", path)
+ }
+ }
+ err := os.MkdirAll(path, 0777)
+ if err != nil {
+ lg.Errorf("Failed to MkdirAll(%s): %s\n", path, err.Error())
+ return nil, err
+ }
+ var shd *shard
+ openOpts := levigo.NewOptions()
+ defer openOpts.Close()
+ newlyCreated := false
+ ldb, err := levigo.Open(path, openOpts)
+ if err == nil {
+ store.lg.Infof("LevelDB opened %s\n", path)
+ } else {
+ store.lg.Debugf("LevelDB failed to open %s: %s\n", path, err.Error())
+ openOpts.SetCreateIfMissing(true)
+ ldb, err = levigo.Open(path, openOpts)
+ if err != nil {
+ store.lg.Errorf("LevelDB failed to create %s: %s\n", path, err.Error())
+ return nil, err
+ }
+ store.lg.Infof("Created new LevelDB instance in %s\n", path)
+ newlyCreated = true
+ }
+ defer func() {
+ if shd == nil {
+ ldb.Close()
+ }
+ }()
+ lv, err := readLayoutVersion(store, ldb)
+ if err != nil {
+ store.lg.Errorf("Got error while reading datastore version for %s: %s\n",
+ path, err.Error())
+ return nil, err
+ }
+ if newlyCreated && (lv == UNKNOWN_LAYOUT_VERSION) {
+ err = writeDataStoreVersion(store, ldb, CURRENT_LAYOUT_VERSION)
+ if err != nil {
+ store.lg.Errorf("Got error while writing datastore version for %s: %s\n",
+ path, err.Error())
+ return nil, err
+ }
+ store.lg.Tracef("Wrote layout version %d to shard at %s.\n",
+ CURRENT_LAYOUT_VERSION, path)
+ } else if lv != CURRENT_LAYOUT_VERSION {
+ versionName := "unknown"
+ if lv != UNKNOWN_LAYOUT_VERSION {
+ versionName = fmt.Sprintf("%d", lv)
+ }
+ store.lg.Errorf("Can't read old datastore. Its layout version is %s, but this "+
+ "software is at layout version %d. Please set %s to clear the datastore "+
+ "on startup, or clear it manually.\n", versionName,
+ CURRENT_LAYOUT_VERSION, conf.HTRACE_DATA_STORE_CLEAR)
+ return nil, errors.New(fmt.Sprintf("Invalid layout version: got %s, expected %d.",
+ versionName, CURRENT_LAYOUT_VERSION))
+ } else {
+ store.lg.Tracef("Found layout version %d in %s.\n", lv, path)
+ }
+ spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE)
+ shd = &shard{store: store, ldb: ldb, path: path,
+ incoming: make(chan *common.Span, spanBufferSize)}
+ return shd, nil
+}
+
+// Read the datastore version of a leveldb instance.
+func readLayoutVersion(store *dataStore, ldb *levigo.DB) (uint32, error) {
+ buf, err := ldb.Get(store.readOpts, []byte{VERSION_KEY})
+ if err != nil {
+ return 0, err
+ }
+ if len(buf) == 0 {
+ return 0, nil
+ }
+ r := bytes.NewBuffer(buf)
+ decoder := gob.NewDecoder(r)
+ var v uint32
+ err = decoder.Decode(&v)
+ if err != nil {
+ return 0, err
+ }
+ return v, nil
+}
+
+// Write the datastore version to a shard.
+func writeDataStoreVersion(store *dataStore, ldb *levigo.DB, v uint32) error {
+ w := new(bytes.Buffer)
+ encoder := gob.NewEncoder(w)
+ err := encoder.Encode(&v)
+ if err != nil {
+ return err
+ }
+ return ldb.Put(store.writeOpts, []byte{VERSION_KEY}, w.Bytes())
+}
+
+func (store *dataStore) GetStatistics() *Statistics {
+ return store.stats.Copy()
+}
+
+// Close the DataStore.
+func (store *dataStore) Close() {
+ for idx := range store.shards {
+ store.shards[idx].Close()
+ store.shards[idx] = nil
+ }
+ if store.readOpts != nil {
+ store.readOpts.Close()
+ store.readOpts = nil
+ }
+ if store.writeOpts != nil {
+ store.writeOpts.Close()
+ store.writeOpts = nil
+ }
+ if store.lg != nil {
+ store.lg.Close()
+ store.lg = nil
+ }
+}
+
+// Get the index of the shard which stores the given spanId.
+func (store *dataStore) getShardIndex(sid common.SpanId) int {
+ return int(sid.Val() % uint64(len(store.shards)))
+}
+
+func (store *dataStore) WriteSpan(span *common.Span) {
+ store.shards[store.getShardIndex(span.Id)].incoming <- span
+}
+
+func (store *dataStore) FindSpan(sid common.SpanId) *common.Span {
+ return store.shards[store.getShardIndex(sid)].FindSpan(sid)
+}
+
+func (shd *shard) FindSpan(sid common.SpanId) *common.Span {
+ lg := shd.store.lg
+ buf, err := shd.ldb.Get(shd.store.readOpts, makeKey('s', sid.Val()))
+ if err != nil {
+ if strings.Index(err.Error(), "NotFound:") != -1 {
+ return nil
+ }
+ lg.Warnf("Shard(%s): FindSpan(%016x) error: %s\n",
+ shd.path, sid, err.Error())
+ return nil
+ }
+ var span *common.Span
+ span, err = shd.decodeSpan(sid, buf)
+ if err != nil {
+ lg.Errorf("Shard(%s): FindSpan(%016x) decode error: %s\n",
+ shd.path, sid, err.Error())
+ return nil
+ }
+ return span
+}
+
+func (shd *shard) decodeSpan(sid common.SpanId, buf []byte) (*common.Span, error) {
+ r := bytes.NewBuffer(buf)
+ decoder := gob.NewDecoder(r)
+ data := common.SpanData{}
+ err := decoder.Decode(&data)
+ if err != nil {
+ return nil, err
+ }
+ // Gob encoding translates empty slices to nil. Reverse this so that we're always dealing with
+ // non-nil slices.
+ if data.Parents == nil {
+ data.Parents = []common.SpanId{}
+ }
+ return &common.Span{Id: common.SpanId(sid), SpanData: data}, nil
+}
+
+// Find the children of a given span id.
+func (store *dataStore) FindChildren(sid common.SpanId, lim int32) []common.SpanId {
+ childIds := make([]common.SpanId, 0)
+ var err error
+
+ startIdx := store.getShardIndex(sid)
+ idx := startIdx
+ numShards := len(store.shards)
+ for {
+ if lim == 0 {
+ break
+ }
+ shd := store.shards[idx]
+ childIds, lim, err = shd.FindChildren(sid, childIds, lim)
+ if err != nil {
+ store.lg.Errorf("Shard(%s): FindChildren(%016x) error: %s\n",
+ shd.path, sid, err.Error())
+ }
+ idx++
+ if idx >= numShards {
+ idx = 0
+ }
+ if idx == startIdx {
+ break
+ }
+ }
+ return childIds
+}
+
+type predicateData struct {
+ *common.Predicate
+ uintKey uint64
+ strKey string
+}
+
+func loadPredicateData(pred *common.Predicate) (*predicateData, error) {
+ p := predicateData{Predicate: pred}
+
+ // Parse the input value given to make sure it matches up with the field
+ // type.
+ switch pred.Field {
+ case common.SPAN_ID:
+ // Span IDs are sent as hex strings.
+ var id common.SpanId
+ if err := id.FromString(pred.Val); err != nil {
+ return nil, errors.New(fmt.Sprintf("Unable to parse span id '%s': %s",
+ pred.Val, err.Error()))
+ }
+ p.uintKey = id.Val()
+ break
+ case common.DESCRIPTION:
+ // Any string is valid for a description.
+ p.strKey = pred.Val
+ break
+ case common.BEGIN_TIME, common.END_TIME, common.DURATION:
+ // Parse a base-10 signed numeric field.
+ v, err := strconv.ParseInt(pred.Val, 10, 64)
+ if err != nil {
+ return nil, errors.New(fmt.Sprintf("Unable to parse %s '%s': %s",
+ pred.Field, pred.Val, err.Error()))
+ }
+ p.uintKey = s2u64(v)
+ break
+ default:
+ return nil, errors.New(fmt.Sprintf("Unknown field %s", pred.Field))
+ }
+
+ // Validate the predicate operation.
+ switch pred.Op {
+ case common.EQUALS, common.LESS_THAN_OR_EQUALS,
+ common.GREATER_THAN_OR_EQUALS, common.GREATER_THAN:
+ break
+ case common.CONTAINS:
+ if p.fieldIsNumeric() {
+ return nil, errors.New(fmt.Sprintf("Can't use CONTAINS on a "+
+ "numeric field like '%s'", pred.Field))
+ }
+ default:
+ return nil, errors.New(fmt.Sprintf("Unknown predicate operation '%s'",
+ pred.Op))
+ }
+
+ return &p, nil
+}
+
+// Get the index prefix for this predicate, or 0 if it is not indexed.
+func (pred *predicateData) getIndexPrefix() byte {
+ switch pred.Field {
+ case common.SPAN_ID:
+ return SPAN_ID_INDEX_PREFIX
+ case common.BEGIN_TIME:
+ return BEGIN_TIME_INDEX_PREFIX
+ case common.END_TIME:
+ return END_TIME_INDEX_PREFIX
+ case common.DURATION:
+ return DURATION_INDEX_PREFIX
+ default:
+ return INVALID_INDEX_PREFIX
+ }
+}
+
+// Returns true if the predicate type is numeric.
+func (pred *predicateData) fieldIsNumeric() bool {
+ switch pred.Field {
+ case common.SPAN_ID, common.BEGIN_TIME, common.END_TIME, common.DURATION:
+ return true
+ default:
+ return false
+ }
+}
+
+// Get the values that this predicate cares about for a given span.
+func (pred *predicateData) extractRelevantSpanData(span *common.Span) (uint64, string) {
+ switch pred.Field {
+ case common.SPAN_ID:
+ return span.Id.Val(), ""
+ case common.DESCRIPTION:
+ return 0, span.Description
+ case common.BEGIN_TIME:
+ return s2u64(span.Begin), ""
+ case common.END_TIME:
+ return s2u64(span.End), ""
+ case common.DURATION:
+ return s2u64(span.Duration()), ""
+ default:
+ panic(fmt.Sprintf("Field type %s isn't a 64-bit integer.", pred.Field))
+ }
+}
+
+func (pred *predicateData) spanPtrIsBefore(a *common.Span, b *common.Span) bool {
+ // nil is after everything.
+ if a == nil {
+ if b == nil {
+ return false
+ }
+ return false
+ } else if b == nil {
+ return true
+ }
+ // Compare the spans according to this predicate.
+ aInt, aStr := pred.extractRelevantSpanData(a)
+ bInt, bStr := pred.extractRelevantSpanData(b)
+ if pred.fieldIsNumeric() {
+ if pred.Op.IsDescending() {
+ return aInt > bInt
+ } else {
+ return aInt < bInt
+ }
+ } else {
+ if pred.Op.IsDescending() {
+ return aStr > bStr
+ } else {
+ return aStr < bStr
+ }
+ }
+}
+
+// Returns true if the predicate is satisfied by the given span.
+func (pred *predicateData) satisfiedBy(span *common.Span) bool {
+ intVal, strVal := pred.extractRelevantSpanData(span)
+ if pred.fieldIsNumeric() {
+ switch pred.Op {
+ case common.EQUALS:
+ return intVal == pred.uintKey
+ case common.LESS_THAN_OR_EQUALS:
+ return intVal <= pred.uintKey
+ case common.GREATER_THAN_OR_EQUALS:
+ return intVal >= pred.uintKey
+ case common.GREATER_THAN:
+ return intVal > pred.uintKey
+ default:
+ panic(fmt.Sprintf("unknown Op type %s should have been caught "+
+ "during normalization", pred.Op))
+ }
+ } else {
+ switch pred.Op {
+ case common.CONTAINS:
+ return strings.Contains(strVal, pred.strKey)
+ case common.EQUALS:
+ return strVal == pred.strKey
+ case common.LESS_THAN_OR_EQUALS:
+ return strVal <= pred.strKey
+ case common.GREATER_THAN_OR_EQUALS:
+ return strVal >= pred.strKey
+ case common.GREATER_THAN:
+ return strVal > pred.strKey
+ default:
+ panic(fmt.Sprintf("unknown Op type %s should have been caught "+
+ "during normalization", pred.Op))
+ }
+ }
+}
+
+func (pred *predicateData) createSource(store *dataStore) (*source, error) {
+ var ret *source
+ src := source{store: store,
+ pred: pred,
+ iters: make([]*levigo.Iterator, 0, len(store.shards)),
+ nexts: make([]*common.Span, len(store.shards)),
+ numRead: make([]int, len(store.shards)),
+ keyPrefix: pred.getIndexPrefix(),
+ }
+ if src.keyPrefix == INVALID_INDEX_PREFIX {
+ return nil, errors.New(fmt.Sprintf("Can't create source from unindexed "+
+ "predicate on field %s", pred.Field))
+ }
+ defer func() {
+ if ret == nil {
+ src.Close()
+ }
+ }()
+ for shardIdx := range store.shards {
+ shd := store.shards[shardIdx]
+ src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts))
+ }
+ searchKey := makeKey(src.keyPrefix, pred.uintKey)
+ for i := range src.iters {
+ src.iters[i].Seek(searchKey)
+ }
+ ret = &src
+ return ret, nil
+}
+
+// A source of spans.
+type source struct {
+ store *dataStore
+ pred *predicateData
+ iters []*levigo.Iterator
+ nexts []*common.Span
+ numRead []int
+ keyPrefix byte
+}
+
+// Return true if this operation may require skipping the first result we get back from leveldb.
+func mayRequireOneSkip(op common.Op) bool {
+ switch op {
+ // When dealing with descending predicates, the first span we read might not satisfy
+ // the predicate, even though subsequent ones will. This is because the iter.Seek()
+ // function "moves the iterator the position of the key given or, if the key doesn't
+ // exist, the next key that does exist in the database." So if we're on that "next
+ // key" it will not satisfy the predicate, but the keys previous to it might.
+ case common.LESS_THAN_OR_EQUALS:
+ return true
+ // iter.Seek basically takes us to the key which is "greater than or equal to" some
+ // value. Since we want greater than (not greater than or equal to) we may have to
+ // skip the first key.
+ case common.GREATER_THAN:
+ return true
+ }
+ return false
+}
+
+// Fill in the entry in the 'next' array for a specific shard.
+func (src *source) populateNextFromShard(shardIdx int) {
+ lg := src.store.lg
+ var err error
+ iter := src.iters[shardIdx]
+ if iter == nil {
+ lg.Debugf("Can't populate: No more entries in shard %d\n", shardIdx)
+ return // There are no more entries in this shard.
+ }
+ if src.nexts[shardIdx] != nil {
+ lg.Debugf("No need to populate shard %d\n", shardIdx)
+ return // We already have a valid entry for this shard.
+ }
+ for {
+ if !iter.Valid() {
+ lg.Debugf("Can't populate: Iterator for shard %d is no longer valid.\n", shardIdx)
+ break // Can't read past end of DB
+ }
+ src.numRead[shardIdx]++
+ key := iter.Key()
+ if !bytes.HasPrefix(key, []byte{src.keyPrefix}) {
+ lg.Debugf("Can't populate: Iterator for shard %d does not have prefix %s\n",
+ shardIdx, string(src.keyPrefix))
+ break // Can't read past end of indexed section
+ }
+ var span *common.Span
+ var sid common.SpanId
+ if src.keyPrefix == SPAN_ID_INDEX_PREFIX {
+ // The span id maps to the span itself.
+ sid = common.SpanId(keyToInt(key[1:]))
+ span, err = src.store.shards[shardIdx].decodeSpan(sid, iter.Value())
+ if err != nil {
+ lg.Debugf("Internal error decoding span %s in shard %d: %s\n",
+ sid.String(), shardIdx, err.Error())
+ break
+ }
+ } else {
+ // With a secondary index, we have to look up the span by id.
+ sid = common.SpanId(keyToInt(key[9:]))
+ span = src.store.shards[shardIdx].FindSpan(sid)
+ if span == nil {
+ lg.Debugf("Internal error rehydrating span %s in shard %d\n",
+ sid.String(), shardIdx)
+ break
+ }
+ }
+ if src.pred.Op.IsDescending() {
+ iter.Prev()
+ } else {
+ iter.Next()
+ }
+ if src.pred.satisfiedBy(span) {
+ lg.Debugf("Populated valid span %016x from shard %d.\n", sid, shardIdx)
+ src.nexts[shardIdx] = span // Found valid entry
+ return
+ } else {
+ lg.Debugf("Span %016x from shard %d does not satisfy the predicate.\n",
+ sid, shardIdx)
+ if src.numRead[shardIdx] <= 1 && mayRequireOneSkip(src.pred.Op) {
+ continue
+ }
+ // This and subsequent entries don't satisfy predicate
+ break
+ }
+ }
+ lg.Debugf("Closing iterator for shard %d.\n", shardIdx)
+ iter.Close()
+ src.iters[shardIdx] = nil
+}
+
+func (src *source) next() *common.Span {
+ for shardIdx := range src.iters {
+ src.populateNextFromShard(shardIdx)
+ }
+ var best *common.Span
+ bestIdx := -1
+ for shardIdx := range src.iters {
+ span := src.nexts[shardIdx]
+ if src.pred.spanPtrIsBefore(span, best) {
+ best = span
+ bestIdx = shardIdx
+ }
+ }
+ if bestIdx >= 0 {
+ src.nexts[bestIdx] = nil
+ }
+ return best
+}
+
+func (src *source) Close() {
+ for i := range src.iters {
+ if src.iters[i] != nil {
+ src.iters[i].Close()
+ }
+ }
+ src.iters = nil
+}
+
+func (store *dataStore) obtainSource(preds *[]*predicateData) (*source, error) {
+ // Read spans from the first predicate that is indexed.
+ p := *preds
+ for i := range p {
+ pred := p[i]
+ if pred.getIndexPrefix() != INVALID_INDEX_PREFIX {
+ *preds = append(p[0:i], p[i+1:]...)
+ return pred.createSource(store)
+ }
+ }
+ // If there are no predicates that are indexed, read rows in order of span id.
+ spanIdPred := common.Predicate{Op: common.GREATER_THAN_OR_EQUALS,
+ Field: common.SPAN_ID,
+ Val: "0000000000000000",
+ }
+ spanIdPredData, err := loadPredicateData(&spanIdPred)
+ if err != nil {
+ return nil, err
+ }
+ return spanIdPredData.createSource(store)
+}
+
+func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) {
+ lg := store.lg
+ // Parse predicate data.
+ var err error
+ preds := make([]*predicateData, len(query.Predicates))
+ for i := range query.Predicates {
+ preds[i], err = loadPredicateData(&query.Predicates[i])
+ if err != nil {
+ return nil, err
+ }
+ }
+ // Get a source of rows.
+ var src *source
+ src, err = store.obtainSource(&preds)
+ if err != nil {
+ return nil, err
+ }
+ defer src.Close()
+ lg.Debugf("HandleQuery %s: preds = %s, src = %v\n", query, preds, src)
+
+ // Filter the spans through the remaining predicates.
+ ret := make([]*common.Span, 0, 32)
+ for {
+ if len(ret) >= query.Lim {
+ break // we hit the result size limit
+ }
+ span := src.next()
+ if span == nil {
+ break // the source has no more spans to give
+ }
+ lg.Debugf("src.next returned span %s\n", span.ToJson())
+ satisfied := true
+ for predIdx := range preds {
+ if !preds[predIdx].satisfiedBy(span) {
+ satisfied = false
+ break
+ }
+ }
+ if satisfied {
+ ret = append(ret, span)
+ }
+ }
+ return ret, nil
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/39e89ea0/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go
new file mode 100644
index 0000000..79a7c4f
--- /dev/null
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+ "bytes"
+ "encoding/json"
+ "math/rand"
+ htrace "org/apache/htrace/client"
+ "org/apache/htrace/common"
+ "org/apache/htrace/conf"
+ "org/apache/htrace/test"
+ "os"
+ "sort"
+ "strings"
+ "testing"
+)
+
+// Test creating and tearing down a datastore.
+func TestCreateDatastore(t *testing.T) {
+ htraceBld := &MiniHTracedBuilder{Name: "TestCreateDatastore",
+ DataDirs: make([]string, 3)}
+ ht, err := htraceBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create datastore: %s", err.Error())
+ }
+ defer ht.Close()
+}
+
+var SIMPLE_TEST_SPANS []common.Span = []common.Span{
+ common.Span{Id: 1,
+ SpanData: common.SpanData{
+ Begin: 123,
+ End: 456,
+ Description: "getFileDescriptors",
+ TraceId: 999,
+ Parents: []common.SpanId{},
+ ProcessId: "firstd",
+ }},
+ common.Span{Id: 2,
+ SpanData: common.SpanData{
+ Begin: 125,
+ End: 200,
+ Description: "openFd",
+ TraceId: 999,
+ Parents: []common.SpanId{1},
+ ProcessId: "secondd",
+ }},
+ common.Span{Id: 3,
+ SpanData: common.SpanData{
+ Begin: 200,
+ End: 456,
+ Description: "passFd",
+ TraceId: 999,
+ Parents: []common.SpanId{1},
+ ProcessId: "thirdd",
+ }},
+}
+
+func createSpans(spans []common.Span, store *dataStore) {
+ for idx := range spans {
+ store.WriteSpan(&spans[idx])
+ }
+ // Wait the spans to be created
+ for i := 0; i < 3; i++ {
+ <-store.WrittenSpans
+ }
+}
+
+// Test creating a datastore and adding some spans.
+func TestDatastoreWriteAndRead(t *testing.T) {
+ t.Parallel()
+ htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead",
+ WrittenSpans: make(chan *common.Span, 100)}
+ ht, err := htraceBld.Build()
+ if err != nil {
+ panic(err)
+ }
+ defer ht.Close()
+ createSpans(SIMPLE_TEST_SPANS, ht.Store)
+ if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
+ t.Fatal()
+ }
+ span := ht.Store.FindSpan(1)
+ if span == nil {
+ t.Fatal()
+ }
+ if span.Id != 1 {
+ t.Fatal()
+ }
+ common.ExpectSpansEqual(t, &SIMPLE_TEST_SPANS[0], span)
+ children := ht.Store.FindChildren(1, 1)
+ if len(children) != 1 {
+ t.Fatalf("expected 1 child, but got %d\n", len(children))
+ }
+ children = ht.Store.FindChildren(1, 2)
+ if len(children) != 2 {
+ t.Fatalf("expected 2 children, but got %d\n", len(children))
+ }
+ sort.Sort(common.SpanIdSlice(children))
+ if children[0] != 2 {
+ t.Fatal()
+ }
+ if children[1] != 3 {
+ t.Fatal()
+ }
+}
+
+func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query,
+ expectedSpans []common.Span) {
+ spans, err := ht.Store.HandleQuery(query)
+ if err != nil {
+ t.Fatalf("First query failed: %s\n", err.Error())
+ }
+ expectedBuf := new(bytes.Buffer)
+ dec := json.NewEncoder(expectedBuf)
+ err = dec.Encode(expectedSpans)
+ if err != nil {
+ t.Fatalf("Failed to encode expectedSpans to JSON: %s\n", err.Error())
+ }
+ spansBuf := new(bytes.Buffer)
+ dec = json.NewEncoder(spansBuf)
+ err = dec.Encode(spans)
+ if err != nil {
+ t.Fatalf("Failed to encode result spans to JSON: %s\n", err.Error())
+ }
+ t.Logf("len(spans) = %d, len(expectedSpans) = %d\n", len(spans),
+ len(expectedSpans))
+ common.ExpectStrEqual(t, string(expectedBuf.Bytes()), string(spansBuf.Bytes()))
+}
+
+// Test queries on the datastore.
+func TestSimpleQuery(t *testing.T) {
+ t.Parallel()
+ htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery",
+ WrittenSpans: make(chan *common.Span, 100)}
+ ht, err := htraceBld.Build()
+ if err != nil {
+ panic(err)
+ }
+ defer ht.Close()
+ createSpans(SIMPLE_TEST_SPANS, ht.Store)
+ if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
+ t.Fatal()
+ }
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.GREATER_THAN_OR_EQUALS,
+ Field: common.BEGIN_TIME,
+ Val: "125",
+ },
+ },
+ Lim: 5,
+ }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+}
+
+func TestQueries2(t *testing.T) {
+ t.Parallel()
+ htraceBld := &MiniHTracedBuilder{Name: "TestQueries2",
+ WrittenSpans: make(chan *common.Span, 100)}
+ ht, err := htraceBld.Build()
+ if err != nil {
+ panic(err)
+ }
+ defer ht.Close()
+ createSpans(SIMPLE_TEST_SPANS, ht.Store)
+ if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
+ t.Fatal()
+ }
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.LESS_THAN_OR_EQUALS,
+ Field: common.BEGIN_TIME,
+ Val: "125",
+ },
+ },
+ Lim: 5,
+ }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]})
+
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.LESS_THAN_OR_EQUALS,
+ Field: common.BEGIN_TIME,
+ Val: "125",
+ },
+ common.Predicate{
+ Op: common.EQUALS,
+ Field: common.DESCRIPTION,
+ Val: "getFileDescriptors",
+ },
+ },
+ Lim: 2,
+ }, []common.Span{SIMPLE_TEST_SPANS[0]})
+
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.EQUALS,
+ Field: common.DESCRIPTION,
+ Val: "getFileDescriptors",
+ },
+ },
+ Lim: 2,
+ }, []common.Span{SIMPLE_TEST_SPANS[0]})
+}
+
+func TestQueries3(t *testing.T) {
+ t.Parallel()
+ htraceBld := &MiniHTracedBuilder{Name: "TestQueries3",
+ WrittenSpans: make(chan *common.Span, 100)}
+ ht, err := htraceBld.Build()
+ if err != nil {
+ panic(err)
+ }
+ defer ht.Close()
+ createSpans(SIMPLE_TEST_SPANS, ht.Store)
+ if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
+ t.Fatal()
+ }
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.CONTAINS,
+ Field: common.DESCRIPTION,
+ Val: "Fd",
+ },
+ common.Predicate{
+ Op: common.GREATER_THAN_OR_EQUALS,
+ Field: common.BEGIN_TIME,
+ Val: "100",
+ },
+ },
+ Lim: 5,
+ }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.LESS_THAN_OR_EQUALS,
+ Field: common.SPAN_ID,
+ Val: "0",
+ },
+ },
+ Lim: 200,
+ }, []common.Span{})
+
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.LESS_THAN_OR_EQUALS,
+ Field: common.SPAN_ID,
+ Val: "2",
+ },
+ },
+ Lim: 200,
+ }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]})
+}
+
+func TestQueries4(t *testing.T) {
+ t.Parallel()
+ htraceBld := &MiniHTracedBuilder{Name: "TestQueries4",
+ WrittenSpans: make(chan *common.Span, 100)}
+ ht, err := htraceBld.Build()
+ if err != nil {
+ panic(err)
+ }
+ defer ht.Close()
+ createSpans(SIMPLE_TEST_SPANS, ht.Store)
+ if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) {
+ t.Fatal()
+ }
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.GREATER_THAN,
+ Field: common.BEGIN_TIME,
+ Val: "125",
+ },
+ },
+ Lim: 5,
+ }, []common.Span{SIMPLE_TEST_SPANS[2]})
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.GREATER_THAN_OR_EQUALS,
+ Field: common.DESCRIPTION,
+ Val: "openFd",
+ },
+ },
+ Lim: 2,
+ }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]})
+ testQuery(t, ht, &common.Query{
+ Predicates: []common.Predicate{
+ common.Predicate{
+ Op: common.GREATER_THAN,
+ Field: common.DESCRIPTION,
+ Val: "openFd",
+ },
+ },
+ Lim: 2,
+ }, []common.Span{SIMPLE_TEST_SPANS[2]})
+}
+
+func BenchmarkDatastoreWrites(b *testing.B) {
+ htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites",
+ WrittenSpans: make(chan *common.Span, b.N)}
+ ht, err := htraceBld.Build()
+ if err != nil {
+ panic(err)
+ }
+ defer ht.Close()
+ rnd := rand.New(rand.NewSource(1))
+ allSpans := make([]*common.Span, b.N)
+ // Write many random spans.
+ for n := 0; n < b.N; n++ {
+ span := test.NewRandomSpan(rnd, allSpans[0:n])
+ ht.Store.WriteSpan(span)
+ allSpans[n] = span
+ }
+ // Wait for all the spans to be written.
+ for n := 0; n < b.N; n++ {
+ <-ht.Store.WrittenSpans
+ }
+ spansWritten := ht.Store.GetStatistics().NumSpansWritten
+ if spansWritten < uint64(b.N) {
+ b.Fatal("incorrect statistics: expected %d spans to be written, but only got %d",
+ b.N, spansWritten)
+ }
+}
+
+func TestReloadDataStore(t *testing.T) {
+ htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore",
+ DataDirs: make([]string, 2), KeepDataDirsOnClose: true}
+ ht, err := htraceBld.Build()
+ if err != nil {
+ t.Fatalf("failed to create datastore: %s", err.Error())
+ }
+ dataDirs := make([]string, len(ht.DataDirs))
+ copy(dataDirs, ht.DataDirs)
+ defer func() {
+ if ht != nil {
+ ht.Close()
+ }
+ for i := range dataDirs {
+ os.RemoveAll(dataDirs[i])
+ }
+ }()
+ var hcl *htrace.Client
+ hcl, err = htrace.NewClient(ht.ClientConf())
+ if err != nil {
+ t.Fatalf("failed to create client: %s", err.Error())
+ }
+
+ // Create some random trace spans.
+ NUM_TEST_SPANS := 5
+ allSpans := createRandomTestSpans(NUM_TEST_SPANS)
+ err = hcl.WriteSpans(&common.WriteSpansReq{
+ Spans: allSpans,
+ })
+ if err != nil {
+ t.Fatalf("WriteSpans failed: %s\n", err.Error())
+ }
+
+ // Look up the spans we wrote.
+ var span *common.Span
+ for i := 0; i < NUM_TEST_SPANS; i++ {
+ span, err = hcl.FindSpan(allSpans[i].Id)
+ if err != nil {
+ t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
+ }
+ common.ExpectSpansEqual(t, allSpans[i], span)
+ }
+
+ ht.Close()
+ ht = nil
+
+ htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore2",
+ DataDirs: dataDirs, KeepDataDirsOnClose: true}
+ ht, err = htraceBld.Build()
+ if err != nil {
+ t.Fatalf("failed to re-create datastore: %s", err.Error())
+ }
+ hcl, err = htrace.NewClient(ht.ClientConf())
+ if err != nil {
+ t.Fatalf("failed to re-create client: %s", err.Error())
+ }
+
+ // Look up the spans we wrote earlier.
+ for i := 0; i < NUM_TEST_SPANS; i++ {
+ span, err = hcl.FindSpan(allSpans[i].Id)
+ if err != nil {
+ t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
+ }
+ common.ExpectSpansEqual(t, allSpans[i], span)
+ }
+
+ // Set an old datastore version number.
+ for i := range ht.Store.shards {
+ shard := ht.Store.shards[i]
+ writeDataStoreVersion(ht.Store, shard.ldb, CURRENT_LAYOUT_VERSION-1)
+ }
+ ht.Close()
+ ht = nil
+
+ htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore3",
+ DataDirs: dataDirs, KeepDataDirsOnClose: true}
+ ht, err = htraceBld.Build()
+ if err == nil {
+ t.Fatalf("expected the datastore to fail to load after setting an " +
+ "incorrect version.\n")
+ }
+ if !strings.Contains(err.Error(), "Invalid layout version") {
+ t.Fatal(`expected the loading error to contain "invalid layout version"` + "\n")
+ }
+
+ // It should work with data.store.clear set.
+ htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore4",
+ DataDirs: dataDirs, KeepDataDirsOnClose: true,
+ Cnf: map[string]string{conf.HTRACE_DATA_STORE_CLEAR: "true"}}
+ ht, err = htraceBld.Build()
+ if err != nil {
+ t.Fatalf("expected the datastore loading to succeed after setting an "+
+ "incorrect version. But it failed with error %s\n", err.Error())
+ }
+}