You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by iw...@apache.org on 2016/04/20 01:32:46 UTC
[3/7] incubator-htrace git commit: HTRACE-357. Rename
htrace-htraced/go/src/org/apache/htrace to htrace-htraced/go/src/htrace
(Colin Patrick McCabe via iwasakims)
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/common/time_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/time_test.go b/htrace-htraced/go/src/org/apache/htrace/common/time_test.go
deleted file mode 100644
index 11e2733..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/common/time_test.go
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package common
-
-import (
- "testing"
-)
-
-func testRoundTrip(t *testing.T, u int64) {
- tme := UnixMsToTime(u)
- u2 := TimeToUnixMs(tme)
- if u2 != u {
- t.Fatalf("Error taking %d on a round trip: came back as "+
- "%d instead.\n", u, u2)
- }
-}
-
-func TestTimeConversions(t *testing.T) {
- testRoundTrip(t, 0)
- testRoundTrip(t, 1445540632000)
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/conf/config.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config.go b/htrace-htraced/go/src/org/apache/htrace/conf/config.go
deleted file mode 100644
index 24170b2..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config.go
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package conf
-
-import (
- "bufio"
- "bytes"
- "fmt"
- "io"
- "log"
- "os"
- "path/filepath"
- "sort"
- "strconv"
- "strings"
- "syscall"
-)
-
-//
-// The configuration code for HTraced.
-//
-// HTraced can be configured via Hadoop-style XML configuration files, or by passing -Dkey=value
-// command line arguments. Command-line arguments without an equals sign, such as "-Dkey", will be
-// treated as setting the key to "true".
-//
-// Configuration key constants should be defined in config_keys.go. Each key should have a default,
-// which will be used if the user supplies no value, or supplies an invalid value.
-// For that reason, it is not necessary for the Get, GetInt, etc. functions to take a default value
-// argument.
-//
-// Configuration objects are immutable. However, you can make a copy of a configuration which adds
-// some changes using Configuration#Clone().
-//
-
-type Config struct {
- settings map[string]string
- defaults map[string]string
-}
-
-type Builder struct {
- // If non-nil, the XML configuration file to read.
- Reader io.Reader
-
- // If non-nil, the configuration values to use.
- Values map[string]string
-
- // If non-nil, the default configuration values to use.
- Defaults map[string]string
-
- // If non-nil, the command-line arguments to use.
- Argv []string
-
- // The name of the application. Configuration keys that start with this
- // string will be converted to their unprefixed forms.
- AppPrefix string
-}
-
-func getDefaultHTracedConfDir() string {
- return PATH_SEP + "etc" + PATH_SEP + "htraced" + PATH_SEP + "conf"
-}
-
-func getHTracedConfDirs(dlog io.Writer) []string {
- confDir := os.Getenv("HTRACED_CONF_DIR")
- paths := filepath.SplitList(confDir)
- if len(paths) < 1 {
- def := getDefaultHTracedConfDir()
- io.WriteString(dlog, fmt.Sprintf("HTRACED_CONF_DIR defaulting to %s\n", def))
- return []string{def}
- }
- io.WriteString(dlog, fmt.Sprintf("HTRACED_CONF_DIR=%s\n", confDir))
- return paths
-}
-
-// Load a configuration from the application's argv, configuration file, and the standard
-// defaults.
-func LoadApplicationConfig(appPrefix string) (*Config, io.Reader) {
- dlog := new(bytes.Buffer)
- reader := openFile(CONFIG_FILE_NAME, getHTracedConfDirs(dlog), dlog)
- bld := Builder{}
- if reader != nil {
- defer reader.Close()
- bld.Reader = bufio.NewReader(reader)
- }
- bld.Argv = os.Args[1:]
- bld.Defaults = DEFAULTS
- bld.AppPrefix = appPrefix
- cnf, err := bld.Build()
- if err != nil {
- log.Fatal("Error building configuration: " + err.Error())
- }
- os.Args = append(os.Args[0:1], bld.Argv...)
- keys := make(sort.StringSlice, 0, 20)
- for k, _ := range cnf.settings {
- keys = append(keys, k)
- }
- sort.Sort(keys)
- prefix := ""
- io.WriteString(dlog, "Read configuration: ")
- for i := range keys {
- io.WriteString(dlog, fmt.Sprintf(`%s%s = "%s"`,
- prefix, keys[i], cnf.settings[keys[i]]))
- prefix = ", "
- }
- return cnf, dlog
-}
-
-// Attempt to open a configuration file somewhere on the provided list of paths.
-func openFile(cnfName string, paths []string, dlog io.Writer) io.ReadCloser {
- for p := range paths {
- path := fmt.Sprintf("%s%c%s", paths[p], os.PathSeparator, cnfName)
- file, err := os.Open(path)
- if err == nil {
- io.WriteString(dlog, fmt.Sprintf("Reading configuration from %s.\n", path))
- return file
- }
- if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOENT {
- continue
- }
- io.WriteString(dlog, fmt.Sprintf("Error opening %s for read: %s\n", path, err.Error()))
- }
- return nil
-}
-
-// Try to parse a command-line element as a key=value pair.
-func parseAsConfigFlag(flag string) (string, string) {
- var confPart string
- if strings.HasPrefix(flag, "-D") {
- confPart = flag[2:]
- } else if strings.HasPrefix(flag, "--D") {
- confPart = flag[3:]
- } else {
- return "", ""
- }
- if len(confPart) == 0 {
- return "", ""
- }
- idx := strings.Index(confPart, "=")
- if idx == -1 {
- return confPart, "true"
- }
- return confPart[0:idx], confPart[idx+1:]
-}
-
-// Build a new configuration object from the provided conf.Builder.
-func (bld *Builder) Build() (*Config, error) {
- // Load values and defaults
- cnf := Config{}
- cnf.settings = make(map[string]string)
- if bld.Values != nil {
- for k, v := range bld.Values {
- cnf.settings[k] = v
- }
- }
- cnf.defaults = make(map[string]string)
- if bld.Defaults != nil {
- for k, v := range bld.Defaults {
- cnf.defaults[k] = v
- }
- }
-
- // Process the configuration file, if we have one
- if bld.Reader != nil {
- parseXml(bld.Reader, cnf.settings)
- }
-
- // Process command line arguments
- var i int
- for i < len(bld.Argv) {
- str := bld.Argv[i]
- key, val := parseAsConfigFlag(str)
- if key != "" {
- cnf.settings[key] = val
- bld.Argv = append(bld.Argv[:i], bld.Argv[i+1:]...)
- } else {
- i++
- }
- }
- cnf.settings = bld.removeApplicationPrefixes(cnf.settings)
- cnf.defaults = bld.removeApplicationPrefixes(cnf.defaults)
- return &cnf, nil
-}
-
-func (bld *Builder) removeApplicationPrefixes(in map[string]string) map[string]string {
- out := make(map[string]string)
- for k, v := range in {
- if strings.HasPrefix(k, bld.AppPrefix) {
- out[k[len(bld.AppPrefix):]] = v
- } else {
- out[k] = v
- }
- }
- return out
-}
-
-// Returns true if the configuration has a non-default value for the given key.
-func (cnf *Config) Contains(key string) bool {
- _, ok := cnf.settings[key]
- return ok
-}
-
-// Get a string configuration key.
-func (cnf *Config) Get(key string) string {
- ret, hadKey := cnf.settings[key]
- if hadKey {
- return ret
- }
- return cnf.defaults[key]
-}
-
-// Get a boolean configuration key.
-func (cnf *Config) GetBool(key string) bool {
- str := cnf.settings[key]
- ret, err := strconv.ParseBool(str)
- if err == nil {
- return ret
- }
- str = cnf.defaults[key]
- ret, err = strconv.ParseBool(str)
- if err == nil {
- return ret
- }
- return false
-}
-
-// Get an integer configuration key.
-func (cnf *Config) GetInt(key string) int {
- str := cnf.settings[key]
- ret, err := strconv.Atoi(str)
- if err == nil {
- return ret
- }
- str = cnf.defaults[key]
- ret, err = strconv.Atoi(str)
- if err == nil {
- return ret
- }
- return 0
-}
-
-// Get an int64 configuration key.
-func (cnf *Config) GetInt64(key string) int64 {
- str := cnf.settings[key]
- ret, err := strconv.ParseInt(str, 10, 64)
- if err == nil {
- return ret
- }
- str = cnf.defaults[key]
- ret, err = strconv.ParseInt(str, 10, 64)
- if err == nil {
- return ret
- }
- return 0
-}
-
-// Make a deep copy of the given configuration.
-// Optionally, you can specify particular key/value pairs to change.
-// Example:
-// cnf2 := cnf.Copy("my.changed.key", "my.new.value")
-func (cnf *Config) Clone(args ...string) *Config {
- if len(args)%2 != 0 {
- panic("The arguments to Config#copy are key1, value1, " +
- "key2, value2, and so on. You must specify an even number of arguments.")
- }
- ncnf := &Config{defaults: cnf.defaults}
- ncnf.settings = make(map[string]string)
- for k, v := range cnf.settings {
- ncnf.settings[k] = v
- }
- for i := 0; i < len(args); i += 2 {
- ncnf.settings[args[i]] = args[i+1]
- }
- return ncnf
-}
-
-// Export the configuration as a map
-func (cnf *Config) Export() map[string]string {
- m := make(map[string]string)
- for k, v := range cnf.defaults {
- m[k] = v
- }
- for k, v := range cnf.settings {
- m[k] = v
- }
- return m
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
deleted file mode 100644
index 16790d8..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package conf
-
-import (
- "fmt"
- "os"
-)
-
-//
-// Configuration keys for HTrace.
-//
-
-// The platform-specific path separator. Usually slash.
-var PATH_SEP string = fmt.Sprintf("%c", os.PathSeparator)
-
-// The platform-specific path list separator. Usually colon.
-var PATH_LIST_SEP string = fmt.Sprintf("%c", os.PathListSeparator)
-
-// The name of the XML configuration file to look for.
-const CONFIG_FILE_NAME = "htraced-conf.xml"
-
-// An environment variable containing a list of paths to search for the
-// configuration file in.
-const HTRACED_CONF_DIR = "HTRACED_CONF_DIR"
-
-// The web address to start the REST server on.
-const HTRACE_WEB_ADDRESS = "web.address"
-
-// The default port for the Htrace web address.
-const HTRACE_WEB_ADDRESS_DEFAULT_PORT = 9096
-
-// The web address to start the REST server on.
-const HTRACE_HRPC_ADDRESS = "hrpc.address"
-
-// The default port for the Htrace HRPC address.
-const HTRACE_HRPC_ADDRESS_DEFAULT_PORT = 9075
-
-// The directories to put the data store into. Separated by PATH_LIST_SEP.
-const HTRACE_DATA_STORE_DIRECTORIES = "data.store.directories"
-
-// Boolean key which indicates whether we should clear data on startup.
-const HTRACE_DATA_STORE_CLEAR = "data.store.clear"
-
-// How many writes to buffer before applying backpressure to span senders.
-const HTRACE_DATA_STORE_SPAN_BUFFER_SIZE = "data.store.span.buffer.size"
-
-// Path to put the logs from htrace, or the empty string to use stdout.
-const HTRACE_LOG_PATH = "log.path"
-
-// The log level to use for the logs in htrace.
-const HTRACE_LOG_LEVEL = "log.level"
-
-// The period between datastore heartbeats. This is the approximate interval at which we will
-// prune expired spans.
-const HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS = "datastore.heartbeat.period.ms"
-
-// The maximum number of addresses for which we will maintain metrics.
-const HTRACE_METRICS_MAX_ADDR_ENTRIES = "metrics.max.addr.entries"
-
-// The number of milliseconds we should keep spans before discarding them.
-const HTRACE_SPAN_EXPIRY_MS = "span.expiry.ms"
-
-// The period between updates to the span reaper
-const HTRACE_REAPER_HEARTBEAT_PERIOD_MS = "reaper.heartbeat.period.ms"
-
-// A host:port pair to send information to on startup. This is used in unit
-// tests to determine the (random) port of the htraced process that has been
-// started.
-const HTRACE_STARTUP_NOTIFICATION_ADDRESS = "startup.notification.address"
-
-// The maximum number of HRPC handler goroutines we will create at once. If
-// this is too small, we won't get enough concurrency; if it's too big, we will
-// buffer too much data in memory while waiting for the datastore to process
-// requests.
-const HTRACE_NUM_HRPC_HANDLERS = "num.hrpc.handlers"
-
-// The I/O timeout HRPC will use, in milliseconds. If it takes longer than
-// this to read or write a message, we will abort the connection.
-const HTRACE_HRPC_IO_TIMEOUT_MS = "hrpc.io.timeout.ms"
-
-// The leveldb write buffer size, or 0 to use the library default, which is 4
-// MB in leveldb 1.16. See leveldb's options.h for more details.
-const HTRACE_LEVELDB_WRITE_BUFFER_SIZE = "leveldb.write.buffer.size"
-
-// The LRU cache size for leveldb, in bytes.
-const HTRACE_LEVELDB_CACHE_SIZE = "leveldb.cache.size"
-
-// Default values for HTrace configuration keys.
-var DEFAULTS = map[string]string{
- HTRACE_WEB_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT),
- HTRACE_HRPC_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_HRPC_ADDRESS_DEFAULT_PORT),
- HTRACE_DATA_STORE_DIRECTORIES: PATH_SEP + "tmp" + PATH_SEP + "htrace1" +
- PATH_LIST_SEP + PATH_SEP + "tmp" + PATH_SEP + "htrace2",
- HTRACE_DATA_STORE_CLEAR: "false",
- HTRACE_DATA_STORE_SPAN_BUFFER_SIZE: "100",
- HTRACE_LOG_PATH: "",
- HTRACE_LOG_LEVEL: "INFO",
- HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 45*1000),
- HTRACE_METRICS_MAX_ADDR_ENTRIES: "100000",
- HTRACE_SPAN_EXPIRY_MS: "0",
- HTRACE_REAPER_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 90*1000),
- HTRACE_NUM_HRPC_HANDLERS: "20",
- HTRACE_HRPC_IO_TIMEOUT_MS: "60000",
- HTRACE_LEVELDB_WRITE_BUFFER_SIZE: "0",
- HTRACE_LEVELDB_CACHE_SIZE: fmt.Sprintf("%d", 100 * 1024 * 1024),
-}
-
-// Values to be used when creating test configurations
-func TEST_VALUES() map[string]string {
- return map[string]string{
- HTRACE_HRPC_ADDRESS: ":0", // use a random port for the HRPC server
- HTRACE_LOG_LEVEL: "TRACE", // show all log messages in tests
- HTRACE_WEB_ADDRESS: ":0", // use a random port for the REST server
- HTRACE_SPAN_EXPIRY_MS: "0", // never time out spans (unless testing the reaper)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/conf/config_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_test.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_test.go
deleted file mode 100644
index a681136..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config_test.go
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package conf
-
-import (
- "bytes"
- "os"
- "strings"
- "testing"
-)
-
-// Test that parsing command-line arguments of the form -Dfoo=bar works.
-func TestParseArgV(t *testing.T) {
- t.Parallel()
- argv := []string{"-Dfoo=bar", "-Dbaz=123", "-DsillyMode", "-Dlog.path="}
- bld := &Builder{Argv: argv,
- Defaults:map[string]string {
- "log.path": "/log/path/default",
- }}
- cnf, err := bld.Build()
- if err != nil {
- t.Fatal()
- }
- if "bar" != cnf.Get("foo") {
- t.Fatal()
- }
- if 123 != cnf.GetInt("baz") {
- t.Fatal()
- }
- if !cnf.GetBool("sillyMode") {
- t.Fatal()
- }
- if cnf.GetBool("otherSillyMode") {
- t.Fatal()
- }
- if "" != cnf.Get("log.path") {
- t.Fatal()
- }
-}
-
-// Test that default values work.
-// Defaults are used only when the configuration option is not present or can't be parsed.
-func TestDefaults(t *testing.T) {
- t.Parallel()
- argv := []string{"-Dfoo=bar", "-Dbaz=invalidNumber"}
- defaults := map[string]string{
- "foo": "notbar",
- "baz": "456",
- "foo2": "4611686018427387904",
- }
- bld := &Builder{Argv: argv, Defaults: defaults}
- cnf, err := bld.Build()
- if err != nil {
- t.Fatal()
- }
- if "bar" != cnf.Get("foo") {
- t.Fatal()
- }
- if 456 != cnf.GetInt("baz") {
- t.Fatal()
- }
- if 4611686018427387904 != cnf.GetInt64("foo2") {
- t.Fatal()
- }
-}
-
-// Test that we can parse our XML configuration file.
-func TestXmlConfigurationFile(t *testing.T) {
- t.Parallel()
- xml := `
-<?xml version="1.0"?>
-<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>
-<configuration>
- <property>
- <name>foo.bar</name>
- <value>123</value>
- </property>
- <property>
- <name>foo.baz</name>
- <value>xmlValue</value>
- </property>
- <!--<property>
- <name>commented.out</name>
- <value>stuff</value>
- </property>-->
-</configuration>
-`
- xmlReader := strings.NewReader(xml)
- argv := []string{"-Dfoo.bar=456"}
- defaults := map[string]string{
- "foo.bar": "789",
- "cmdline.opt": "4611686018427387904",
- }
- bld := &Builder{Argv: argv, Defaults: defaults, Reader: xmlReader}
- cnf, err := bld.Build()
- if err != nil {
- t.Fatal()
- }
- // The command-line argument takes precedence over the XML and the defaults.
- if 456 != cnf.GetInt("foo.bar") {
- t.Fatal()
- }
- if "xmlValue" != cnf.Get("foo.baz") {
- t.Fatalf("foo.baz = %s", cnf.Get("foo.baz"))
- }
- if "" != cnf.Get("commented.out") {
- t.Fatal()
- }
- if 4611686018427387904 != cnf.GetInt64("cmdline.opt") {
- t.Fatal()
- }
-}
-
-// Test our handling of the HTRACE_CONF_DIR environment variable.
-func TestGetHTracedConfDirs(t *testing.T) {
- os.Setenv("HTRACED_CONF_DIR", "")
- dlog := new(bytes.Buffer)
- dirs := getHTracedConfDirs(dlog)
- if len(dirs) != 1 || dirs[0] != getDefaultHTracedConfDir() {
- t.Fatal()
- }
- os.Setenv("HTRACED_CONF_DIR", "/foo/bar:/baz")
- dirs = getHTracedConfDirs(dlog)
- if len(dirs) != 2 || dirs[0] != "/foo/bar" || dirs[1] != "/baz" {
- t.Fatal()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/conf/xml.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/xml.go b/htrace-htraced/go/src/org/apache/htrace/conf/xml.go
deleted file mode 100644
index de14bc5..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/conf/xml.go
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package conf
-
-import (
- "encoding/xml"
- "io"
- "log"
-)
-
-type configuration struct {
- Properties []propertyXml `xml:"property"`
-}
-
-type propertyXml struct {
- Name string `xml:"name"`
- Value string `xml:"value"`
-}
-
-// Parse an XML configuration file.
-func parseXml(reader io.Reader, m map[string]string) error {
- dec := xml.NewDecoder(reader)
- configurationXml := configuration{}
- err := dec.Decode(&configurationXml)
- if err != nil {
- return err
- }
- props := configurationXml.Properties
- for p := range props {
- key := props[p].Name
- value := props[p].Value
- if key == "" {
- log.Println("Warning: ignoring element with missing or empty <name>.")
- continue
- }
- if value == "" {
- log.Println("Warning: ignoring element with key " + key + " with missing or empty <value>.")
- continue
- }
- //log.Printf("setting %s to %s\n", key, value)
- m[key] = value
- }
- return nil
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
deleted file mode 100644
index 7b64914..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ /dev/null
@@ -1,484 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "fmt"
- "github.com/ugorji/go/codec"
- "math"
- "math/rand"
- htrace "org/apache/htrace/client"
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
- "org/apache/htrace/test"
- "sort"
- "sync"
- "sync/atomic"
- "testing"
- "time"
-)
-
-func TestClientGetServerVersion(t *testing.T) {
- htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerVersion",
- DataDirs: make([]string, 2)}
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
- defer ht.Close()
- var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf(), nil)
- if err != nil {
- t.Fatalf("failed to create client: %s", err.Error())
- }
- defer hcl.Close()
- _, err = hcl.GetServerVersion()
- if err != nil {
- t.Fatalf("failed to call GetServerVersion: %s", err.Error())
- }
-}
-
-func TestClientGetServerDebugInfo(t *testing.T) {
- htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerDebugInfo",
- DataDirs: make([]string, 2)}
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
- defer ht.Close()
- var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf(), nil)
- if err != nil {
- t.Fatalf("failed to create client: %s", err.Error())
- }
- defer hcl.Close()
- debugInfo, err := hcl.GetServerDebugInfo()
- if err != nil {
- t.Fatalf("failed to call GetServerDebugInfo: %s", err.Error())
- }
- if debugInfo.StackTraces == "" {
- t.Fatalf(`debugInfo.StackTraces == ""`)
- }
- if debugInfo.GCStats == "" {
- t.Fatalf(`debugInfo.GCStats == ""`)
- }
-}
-
-func createRandomTestSpans(amount int) common.SpanSlice {
- rnd := rand.New(rand.NewSource(2))
- allSpans := make(common.SpanSlice, amount)
- allSpans[0] = test.NewRandomSpan(rnd, allSpans[0:0])
- for i := 1; i < amount; i++ {
- allSpans[i] = test.NewRandomSpan(rnd, allSpans[1:i])
- }
- allSpans[1].SpanData.Parents = []common.SpanId{common.SpanId(allSpans[0].Id)}
- return allSpans
-}
-
-func TestClientOperations(t *testing.T) {
- htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations",
- DataDirs: make([]string, 2),
- WrittenSpans: common.NewSemaphore(0),
- }
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
- defer ht.Close()
- var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf(), nil)
- if err != nil {
- t.Fatalf("failed to create client: %s", err.Error())
- }
- defer hcl.Close()
-
- // Create some random trace spans.
- NUM_TEST_SPANS := 30
- allSpans := createRandomTestSpans(NUM_TEST_SPANS)
-
- // Write half of the spans to htraced via the client.
- err = hcl.WriteSpans(allSpans[0 : NUM_TEST_SPANS/2])
- if err != nil {
- t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2,
- err.Error())
- }
- ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS / 2))
-
- // Look up the first half of the spans. They should be found.
- var span *common.Span
- for i := 0; i < NUM_TEST_SPANS/2; i++ {
- span, err = hcl.FindSpan(allSpans[i].Id)
- if err != nil {
- t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
- }
- common.ExpectSpansEqual(t, allSpans[i], span)
- }
-
- // Look up the second half of the spans. They should not be found.
- for i := NUM_TEST_SPANS / 2; i < NUM_TEST_SPANS; i++ {
- span, err = hcl.FindSpan(allSpans[i].Id)
- if err != nil {
- t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
- }
- if span != nil {
- t.Fatalf("Unexpectedly found a span we never write to "+
- "the server: FindSpan(%d) succeeded\n", i)
- }
- }
-
- // Test FindChildren
- childSpan := allSpans[1]
- parentId := childSpan.Parents[0]
- var children []common.SpanId
- children, err = hcl.FindChildren(parentId, 1)
- if err != nil {
- t.Fatalf("FindChildren(%s) failed: %s\n", parentId, err.Error())
- }
- if len(children) != 1 {
- t.Fatalf("FindChildren(%s) returned an invalid number of "+
- "children: expected %d, got %d\n", parentId, 1, len(children))
- }
- if !children[0].Equal(childSpan.Id) {
- t.Fatalf("FindChildren(%s) returned an invalid child id: expected %s, "+
- " got %s\n", parentId, childSpan.Id, children[0])
- }
-
- // Test FindChildren on a span that has no children
- childlessSpan := allSpans[NUM_TEST_SPANS/2]
- children, err = hcl.FindChildren(childlessSpan.Id, 10)
- if err != nil {
- t.Fatalf("FindChildren(%d) failed: %s\n", childlessSpan.Id, err.Error())
- }
- if len(children) != 0 {
- t.Fatalf("FindChildren(%d) returned an invalid number of "+
- "children: expected %d, got %d\n", childlessSpan.Id, 0, len(children))
- }
-
- // Test Query
- var query common.Query
- query = common.Query{Lim: 10}
- spans, err := hcl.Query(&query)
- if err != nil {
- t.Fatalf("Query({lim: %d}) failed: %s\n", 10, err.Error())
- }
- if len(spans) != 10 {
- t.Fatalf("Query({lim: %d}) returned an invalid number of "+
- "children: expected %d, got %d\n", 10, 10, len(spans))
- }
-}
-
-func TestDumpAll(t *testing.T) {
- htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll",
- DataDirs: make([]string, 2),
- WrittenSpans: common.NewSemaphore(0),
- Cnf: map[string]string{
- conf.HTRACE_LOG_LEVEL: "INFO",
- },
- }
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
- defer ht.Close()
- var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf(), nil)
- if err != nil {
- t.Fatalf("failed to create client: %s", err.Error())
- }
- defer hcl.Close()
-
- NUM_TEST_SPANS := 100
- allSpans := createRandomTestSpans(NUM_TEST_SPANS)
- sort.Sort(allSpans)
- err = hcl.WriteSpans(allSpans)
- if err != nil {
- t.Fatalf("WriteSpans failed: %s\n", err.Error())
- }
- ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS))
- out := make(chan *common.Span, NUM_TEST_SPANS)
- var dumpErr error
- go func() {
- dumpErr = hcl.DumpAll(3, out)
- }()
- var numSpans int
- nextLogTime := time.Now().Add(time.Millisecond * 5)
- for {
- span, channelOpen := <-out
- if !channelOpen {
- break
- }
- common.ExpectSpansEqual(t, allSpans[numSpans], span)
- numSpans++
- if testing.Verbose() {
- now := time.Now()
- if !now.Before(nextLogTime) {
- nextLogTime = now
- nextLogTime = nextLogTime.Add(time.Millisecond * 5)
- fmt.Printf("read back %d span(s)...\n", numSpans)
- }
- }
- }
- if numSpans != len(allSpans) {
- t.Fatalf("expected to read %d spans... but only read %d\n",
- len(allSpans), numSpans)
- }
- if dumpErr != nil {
- t.Fatalf("got dump error %s\n", dumpErr.Error())
- }
-}
-
-const EXAMPLE_CONF_KEY = "example.conf.key"
-const EXAMPLE_CONF_VALUE = "foo.bar.baz"
-
-func TestClientGetServerConf(t *testing.T) {
- htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerConf",
- Cnf: map[string]string{
- EXAMPLE_CONF_KEY: EXAMPLE_CONF_VALUE,
- },
- DataDirs: make([]string, 2)}
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
- defer ht.Close()
- var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf(), nil)
- if err != nil {
- t.Fatalf("failed to create client: %s", err.Error())
- }
- defer hcl.Close()
- serverCnf, err2 := hcl.GetServerConf()
- if err2 != nil {
- t.Fatalf("failed to call GetServerConf: %s", err2.Error())
- }
- if serverCnf[EXAMPLE_CONF_KEY] != EXAMPLE_CONF_VALUE {
- t.Fatalf("unexpected value for %s: %s",
- EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE)
- }
-}
-
-const TEST_NUM_HRPC_HANDLERS = 2
-
-const TEST_NUM_WRITESPANS = 4
-
-// Tests that HRPC limits the number of simultaneous connections being processed.
-func TestHrpcAdmissionsControl(t *testing.T) {
- var wg sync.WaitGroup
- wg.Add(TEST_NUM_WRITESPANS)
- var numConcurrentHrpcCalls int32
- testHooks := &hrpcTestHooks{
- HandleAdmission: func() {
- defer wg.Done()
- n := atomic.AddInt32(&numConcurrentHrpcCalls, 1)
- if n > TEST_NUM_HRPC_HANDLERS {
- t.Fatalf("The number of concurrent HRPC calls went above "+
- "%d: it's at %d\n", TEST_NUM_HRPC_HANDLERS, n)
- }
- time.Sleep(1 * time.Millisecond)
- n = atomic.AddInt32(&numConcurrentHrpcCalls, -1)
- if n >= TEST_NUM_HRPC_HANDLERS {
- t.Fatalf("The number of concurrent HRPC calls went above "+
- "%d: it was at %d\n", TEST_NUM_HRPC_HANDLERS, n+1)
- }
- },
- }
- htraceBld := &MiniHTracedBuilder{Name: "TestHrpcAdmissionsControl",
- DataDirs: make([]string, 2),
- Cnf: map[string]string{
- conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS),
- },
- WrittenSpans: common.NewSemaphore(0),
- HrpcTestHooks: testHooks,
- }
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
- defer ht.Close()
- var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf(), nil)
- if err != nil {
- t.Fatalf("failed to create client: %s", err.Error())
- }
- // Create some random trace spans.
- allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
- for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
- go func(i int) {
- err = hcl.WriteSpans(allSpans[i : i+1])
- if err != nil {
- t.Fatalf("WriteSpans failed: %s\n", err.Error())
- }
- }(iter)
- }
- wg.Wait()
- ht.Store.WrittenSpans.Waits(int64(TEST_NUM_WRITESPANS))
-}
-
-// Tests that HRPC I/O timeouts work.
-func TestHrpcIoTimeout(t *testing.T) {
- htraceBld := &MiniHTracedBuilder{Name: "TestHrpcIoTimeout",
- DataDirs: make([]string, 2),
- Cnf: map[string]string{
- conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS),
- conf.HTRACE_HRPC_IO_TIMEOUT_MS: "1",
- },
- }
- ht, err := htraceBld.Build()
- if err != nil {
- t.Fatalf("failed to create datastore: %s", err.Error())
- }
- defer ht.Close()
- var hcl *htrace.Client
- finishClient := make(chan interface{})
- defer func() {
- // Close the finishClient channel, if it hasn't already been closed.
- defer func() { recover() }()
- close(finishClient)
- }()
- testHooks := &htrace.TestHooks{
- HandleWriteRequestBody: func() {
- <-finishClient
- },
- }
- hcl, err = htrace.NewClient(ht.ClientConf(), testHooks)
- if err != nil {
- t.Fatalf("failed to create client: %s", err.Error())
- }
- // Create some random trace spans.
- allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
- var wg sync.WaitGroup
- wg.Add(TEST_NUM_WRITESPANS)
- for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
- go func(i int) {
- defer wg.Done()
- // Ignore the error return because there are internal retries in
- // the client which will make this succeed eventually, usually.
- // Keep in mind that we only block until we have seen
- // TEST_NUM_WRITESPANS I/O errors in the HRPC server-- after that,
- // we let requests through so that the test can exit cleanly.
- hcl.WriteSpans(allSpans[i : i+1])
- }(iter)
- }
- for {
- if ht.Hsv.GetNumIoErrors() >= TEST_NUM_WRITESPANS {
- break
- }
- time.Sleep(1000 * time.Nanosecond)
- }
- close(finishClient)
- wg.Wait()
-}
-
-func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B) {
- htraceBld := &MiniHTracedBuilder{Name: "doWriteSpans",
- Cnf: map[string]string{
- conf.HTRACE_LOG_LEVEL: "INFO",
- conf.HTRACE_NUM_HRPC_HANDLERS: "20",
- },
- WrittenSpans: common.NewSemaphore(int64(1 - N)),
- }
- ht, err := htraceBld.Build()
- if err != nil {
- panic(err)
- }
- defer ht.Close()
- rnd := rand.New(rand.NewSource(1))
- allSpans := make([]*common.Span, N)
- for n := 0; n < N; n++ {
- allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n])
- }
- // Determine how many calls to WriteSpans we should make. Each writeSpans
- // message should be small enough so that it doesn't exceed the max RPC
- // body length limit. TODO: a production-quality golang client would do
- // this internally rather than needing us to do it here in the unit test.
- bodyLen := (4 * common.MAX_HRPC_BODY_LENGTH) / 5
- reqs := make([][]*common.Span, 0, 4)
- curReq := -1
- curReqLen := bodyLen
- var curReqSpans uint32
- mh := new(codec.MsgpackHandle)
- mh.WriteExt = true
- var mbuf [8192]byte
- buf := mbuf[:0]
- enc := codec.NewEncoderBytes(&buf, mh)
- for n := 0; n < N; n++ {
- span := allSpans[n]
- if (curReqSpans >= maxSpansPerRpc) ||
- (curReqLen >= bodyLen) {
- reqs = append(reqs, make([]*common.Span, 0, 16))
- curReqLen = 0
- curReq++
- curReqSpans = 0
- }
- buf = mbuf[:0]
- enc.ResetBytes(&buf)
- err := enc.Encode(span)
- if err != nil {
- panic(fmt.Sprintf("Error encoding span %s: %s\n",
- span.String(), err.Error()))
- }
- bufLen := len(buf)
- if bufLen > (bodyLen / 5) {
- panic(fmt.Sprintf("Span too long at %d bytes\n", bufLen))
- }
- curReqLen += bufLen
- reqs[curReq] = append(reqs[curReq], span)
- curReqSpans++
- }
- ht.Store.lg.Infof("num spans: %d. num WriteSpansReq calls: %d\n", N, len(reqs))
- var hcl *htrace.Client
- hcl, err = htrace.NewClient(ht.ClientConf(), nil)
- if err != nil {
- panic(fmt.Sprintf("failed to create client: %s", err.Error()))
- }
- defer hcl.Close()
-
- // Reset the timer to avoid including the time required to create new
- // random spans in the benchmark total.
- if b != nil {
- b.ResetTimer()
- }
-
- // Write many random spans.
- for reqIdx := range reqs {
- go func(i int) {
- err = hcl.WriteSpans(reqs[i])
- if err != nil {
- panic(fmt.Sprintf("failed to send WriteSpans request %d: %s",
- i, err.Error()))
- }
- }(reqIdx)
- }
- // Wait for all the spans to be written.
- ht.Store.WrittenSpans.Wait()
-}
-
-// This is a test of how quickly we can create new spans via WriteSpans RPCs.
-// Like BenchmarkDatastoreWrites, it creates b.N spans in the datastore.
-// Unlike that benchmark, it sends the spans via RPC.
-// Suggested flags for running this:
-// -tags unsafe -cpu 16 -benchtime=1m
-func BenchmarkWriteSpans(b *testing.B) {
- doWriteSpans("BenchmarkWriteSpans", b.N, math.MaxUint32, b)
-}
-
-func TestWriteSpansRpcs(t *testing.T) {
- doWriteSpans("TestWriteSpansRpcs", 3000, 1000, nil)
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
deleted file mode 100644
index 82fb7b5..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ /dev/null
@@ -1,1340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package main
-
-import (
- "bytes"
- "encoding/hex"
- "errors"
- "fmt"
- "github.com/jmhodges/levigo"
- "github.com/ugorji/go/codec"
- "org/apache/htrace/common"
- "org/apache/htrace/conf"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
-)
-
-//
-// The data store code for HTraced.
-//
-// This code stores the trace spans. We use levelDB here so that we don't have to store everything
-// in memory at all times. The data is sharded across multiple levelDB databases in multiple
-// directories. Normally, these multiple directories will be on multiple disk drives.
-//
-// The main emphasis in the HTraceD data store is on quickly and efficiently storing trace span data
-// coming from many daemons. Durability is not as big a concern as in some data stores, since
-// losing a little bit of trace data if htraced goes down is not critical. We use msgpack
-// for serialization. We assume that there will be many more writes than reads.
-//
-// Schema
-// w -> ShardInfo
-// s[8-byte-big-endian-sid] -> SpanData
-// b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {}
-// e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {}
-// d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {}
-// p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {}
-//
-// Note that span IDs are unsigned 64-bit numbers.
-// Begin times, end times, and durations are signed 64-bit numbers.
-// In order to get LevelDB to properly compare the signed 64-bit quantities,
-// we flip the highest bit. This way, we can get leveldb to view negative
-// quantities as less than non-negative ones. This also means that we can do
-// all queries using unsigned 64-bit math, rather than having to special-case
-// the signed fields.
-//
-
-var EMPTY_BYTE_BUF []byte = []byte{}
-
-const SPAN_ID_INDEX_PREFIX = 's'
-const BEGIN_TIME_INDEX_PREFIX = 'b'
-const END_TIME_INDEX_PREFIX = 'e'
-const DURATION_INDEX_PREFIX = 'd'
-const PARENT_ID_INDEX_PREFIX = 'p'
-const INVALID_INDEX_PREFIX = 0
-
-// The maximum span expiry time, in milliseconds.
-// For all practical purposes this is "never" since it's more than a million years.
-const MAX_SPAN_EXPIRY_MS = 0x7ffffffffffffff
-
-type IncomingSpan struct {
- // The address that the span was sent from.
- Addr string
-
- // The span.
- *common.Span
-
- // Serialized span data
- SpanDataBytes []byte
-}
-
-// A single directory containing a levelDB instance.
-type shard struct {
- // The data store that this shard is part of
- store *dataStore
-
- // The LevelDB instance.
- ldb *levigo.DB
-
- // The path to the leveldb directory this shard is managing.
- path string
-
- // Incoming requests to write Spans.
- incoming chan []*IncomingSpan
-
- // A channel for incoming heartbeats
- heartbeats chan interface{}
-
- // Tracks whether the shard goroutine has exited.
- exited sync.WaitGroup
-}
-
-// Process incoming spans for a shard.
-func (shd *shard) processIncoming() {
- lg := shd.store.lg
- defer func() {
- lg.Infof("Shard processor for %s exiting.\n", shd.path)
- shd.exited.Done()
- }()
- for {
- select {
- case spans := <-shd.incoming:
- if spans == nil {
- return
- }
- totalWritten := 0
- totalDropped := 0
- for spanIdx := range spans {
- err := shd.writeSpan(spans[spanIdx])
- if err != nil {
- lg.Errorf("Shard processor for %s got fatal error %s.\n",
- shd.path, err.Error())
- totalDropped++
- } else {
- if lg.TraceEnabled() {
- lg.Tracef("Shard processor for %s wrote span %s.\n",
- shd.path, spans[spanIdx].ToJson())
- }
- totalWritten++
- }
- }
- shd.store.msink.UpdatePersisted(spans[0].Addr, totalWritten, totalDropped)
- if shd.store.WrittenSpans != nil {
- lg.Debugf("Shard %s incrementing WrittenSpans by %d\n", shd.path, len(spans))
- shd.store.WrittenSpans.Posts(int64(len(spans)))
- }
- case <-shd.heartbeats:
- lg.Tracef("Shard processor for %s handling heartbeat.\n", shd.path)
- shd.pruneExpired()
- }
- }
-}
-
-func (shd *shard) pruneExpired() {
- lg := shd.store.rpr.lg
- src, err := CreateReaperSource(shd)
- if err != nil {
- lg.Errorf("Error creating reaper source for shd(%s): %s\n",
- shd.path, err.Error())
- return
- }
- var totalReaped uint64
- defer func() {
- src.Close()
- if totalReaped > 0 {
- atomic.AddUint64(&shd.store.rpr.ReapedSpans, totalReaped)
- }
- }()
- urdate := s2u64(shd.store.rpr.GetReaperDate())
- for {
- span := src.next()
- if span == nil {
- lg.Debugf("After reaping %d span(s), no more found in shard %s "+
- "to reap.\n", totalReaped, shd.path)
- return
- }
- begin := s2u64(span.Begin)
- if begin >= urdate {
- lg.Debugf("After reaping %d span(s), the remaining spans in "+
- "shard %s are new enough to be kept\n",
- totalReaped, shd.path)
- return
- }
- err = shd.DeleteSpan(span)
- if err != nil {
- lg.Errorf("Error deleting span %s from shd(%s): %s\n",
- span.String(), shd.path, err.Error())
- return
- }
- if lg.TraceEnabled() {
- lg.Tracef("Reaped span %s from shard %s\n", span.String(), shd.path)
- }
- totalReaped++
- }
-}
-
-// Delete a span from the shard. Note that leveldb may retain the data until
-// compaction(s) remove it.
-func (shd *shard) DeleteSpan(span *common.Span) error {
- batch := levigo.NewWriteBatch()
- defer batch.Close()
- primaryKey :=
- append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...)
- batch.Delete(primaryKey)
- for parentIdx := range span.Parents {
- key := append(append([]byte{PARENT_ID_INDEX_PREFIX},
- span.Parents[parentIdx].Val()...), span.Id.Val()...)
- batch.Delete(key)
- }
- beginTimeKey := append(append([]byte{BEGIN_TIME_INDEX_PREFIX},
- u64toSlice(s2u64(span.Begin))...), span.Id.Val()...)
- batch.Delete(beginTimeKey)
- endTimeKey := append(append([]byte{END_TIME_INDEX_PREFIX},
- u64toSlice(s2u64(span.End))...), span.Id.Val()...)
- batch.Delete(endTimeKey)
- durationKey := append(append([]byte{DURATION_INDEX_PREFIX},
- u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...)
- batch.Delete(durationKey)
- err := shd.ldb.Write(shd.store.writeOpts, batch)
- if err != nil {
- return err
- }
- return nil
-}
-
-// Convert a signed 64-bit number into an unsigned 64-bit number. We flip the
-// highest bit, so that negative input values map to unsigned numbers which are
-// less than non-negative input values.
-func s2u64(val int64) uint64 {
- ret := uint64(val)
- ret ^= 0x8000000000000000
- return ret
-}
-
-func u64toSlice(val uint64) []byte {
- return []byte{
- byte(0xff & (val >> 56)),
- byte(0xff & (val >> 48)),
- byte(0xff & (val >> 40)),
- byte(0xff & (val >> 32)),
- byte(0xff & (val >> 24)),
- byte(0xff & (val >> 16)),
- byte(0xff & (val >> 8)),
- byte(0xff & (val >> 0))}
-}
-
-func (shd *shard) writeSpan(ispan *IncomingSpan) error {
- batch := levigo.NewWriteBatch()
- defer batch.Close()
- span := ispan.Span
- primaryKey :=
- append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...)
- batch.Put(primaryKey, ispan.SpanDataBytes)
-
- // Add this to the parent index.
- for parentIdx := range span.Parents {
- key := append(append([]byte{PARENT_ID_INDEX_PREFIX},
- span.Parents[parentIdx].Val()...), span.Id.Val()...)
- batch.Put(key, EMPTY_BYTE_BUF)
- }
-
- // Add to the other secondary indices.
- beginTimeKey := append(append([]byte{BEGIN_TIME_INDEX_PREFIX},
- u64toSlice(s2u64(span.Begin))...), span.Id.Val()...)
- batch.Put(beginTimeKey, EMPTY_BYTE_BUF)
- endTimeKey := append(append([]byte{END_TIME_INDEX_PREFIX},
- u64toSlice(s2u64(span.End))...), span.Id.Val()...)
- batch.Put(endTimeKey, EMPTY_BYTE_BUF)
- durationKey := append(append([]byte{DURATION_INDEX_PREFIX},
- u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...)
- batch.Put(durationKey, EMPTY_BYTE_BUF)
-
- err := shd.ldb.Write(shd.store.writeOpts, batch)
- if err != nil {
- shd.store.lg.Errorf("Error writing span %s to leveldb at %s: %s\n",
- span.String(), shd.path, err.Error())
- return err
- }
- return nil
-}
-
-func (shd *shard) FindChildren(sid common.SpanId, childIds []common.SpanId,
- lim int32) ([]common.SpanId, int32, error) {
- searchKey := append([]byte{PARENT_ID_INDEX_PREFIX}, sid.Val()...)
- iter := shd.ldb.NewIterator(shd.store.readOpts)
- defer iter.Close()
- iter.Seek(searchKey)
- for {
- if !iter.Valid() {
- break
- }
- if lim == 0 {
- break
- }
- key := iter.Key()
- if !bytes.HasPrefix(key, searchKey) {
- break
- }
- id := common.SpanId(key[17:])
- childIds = append(childIds, id)
- lim--
- iter.Next()
- }
- return childIds, lim, nil
-}
-
-// Close a shard.
-func (shd *shard) Close() {
- lg := shd.store.lg
- shd.incoming <- nil
- lg.Infof("Waiting for %s to exit...\n", shd.path)
- shd.exited.Wait()
- shd.ldb.Close()
- lg.Infof("Closed %s...\n", shd.path)
-}
-
-type Reaper struct {
- // The logger used by the reaper
- lg *common.Logger
-
- // The number of milliseconds to keep spans around, in milliseconds.
- spanExpiryMs int64
-
- // The oldest date for which we'll keep spans.
- reaperDate int64
-
- // A channel used to send heartbeats to the reaper
- heartbeats chan interface{}
-
- // Tracks whether the reaper goroutine has exited
- exited sync.WaitGroup
-
- // The lock protecting reaper data.
- lock sync.Mutex
-
- // The reaper heartbeater
- hb *Heartbeater
-
- // The total number of spans which have been reaped.
- ReapedSpans uint64
-}
-
-func NewReaper(cnf *conf.Config) *Reaper {
- rpr := &Reaper{
- lg: common.NewLogger("reaper", cnf),
- spanExpiryMs: cnf.GetInt64(conf.HTRACE_SPAN_EXPIRY_MS),
- heartbeats: make(chan interface{}, 1),
- }
- if rpr.spanExpiryMs >= MAX_SPAN_EXPIRY_MS {
- rpr.spanExpiryMs = MAX_SPAN_EXPIRY_MS
- } else if rpr.spanExpiryMs <= 0 {
- rpr.spanExpiryMs = MAX_SPAN_EXPIRY_MS
- }
- rpr.hb = NewHeartbeater("ReaperHeartbeater",
- cnf.GetInt64(conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS), rpr.lg)
- rpr.exited.Add(1)
- go rpr.run()
- rpr.hb.AddHeartbeatTarget(&HeartbeatTarget{
- name: "reaper",
- targetChan: rpr.heartbeats,
- })
- var when string
- if rpr.spanExpiryMs >= MAX_SPAN_EXPIRY_MS {
- when = "never"
- } else {
- when = "after " + time.Duration(rpr.spanExpiryMs).String()
- }
- rpr.lg.Infof("Initializing span reaper: span time out = %s.\n", when)
- return rpr
-}
-
-func (rpr *Reaper) run() {
- defer func() {
- rpr.lg.Info("Exiting Reaper goroutine.\n")
- rpr.exited.Done()
- }()
-
- for {
- _, isOpen := <-rpr.heartbeats
- if !isOpen {
- return
- }
- rpr.handleHeartbeat()
- }
-}
-
-func (rpr *Reaper) handleHeartbeat() {
- // TODO: check dataStore fullness
- now := common.TimeToUnixMs(time.Now().UTC())
- d, updated := func() (int64, bool) {
- rpr.lock.Lock()
- defer rpr.lock.Unlock()
- newReaperDate := now - rpr.spanExpiryMs
- if newReaperDate > rpr.reaperDate {
- rpr.reaperDate = newReaperDate
- return rpr.reaperDate, true
- } else {
- return rpr.reaperDate, false
- }
- }()
- if rpr.lg.DebugEnabled() {
- if updated {
- rpr.lg.Debugf("Updating UTC reaper date to %s.\n",
- common.UnixMsToTime(d).Format(time.RFC3339))
- } else {
- rpr.lg.Debugf("Not updating previous reaperDate of %s.\n",
- common.UnixMsToTime(d).Format(time.RFC3339))
- }
- }
-}
-
-func (rpr *Reaper) GetReaperDate() int64 {
- rpr.lock.Lock()
- defer rpr.lock.Unlock()
- return rpr.reaperDate
-}
-
-func (rpr *Reaper) SetReaperDate(rdate int64) {
- rpr.lock.Lock()
- defer rpr.lock.Unlock()
- rpr.reaperDate = rdate
-}
-
-func (rpr *Reaper) Shutdown() {
- rpr.hb.Shutdown()
- close(rpr.heartbeats)
-}
-
-// The Data Store.
-type dataStore struct {
- lg *common.Logger
-
- // The shards which manage our LevelDB instances.
- shards []*shard
-
- // The read options to use for LevelDB.
- readOpts *levigo.ReadOptions
-
- // The write options to use for LevelDB.
- writeOpts *levigo.WriteOptions
-
- // If non-null, a semaphore we will increment once for each span we receive.
- // Used for testing.
- WrittenSpans *common.Semaphore
-
- // The metrics sink.
- msink *MetricsSink
-
- // The heartbeater which periodically asks shards to update the MetricsSink.
- hb *Heartbeater
-
- // The reaper for this datastore
- rpr *Reaper
-
- // When this datastore was started (in UTC milliseconds since the epoch)
- startMs int64
-}
-
-func CreateDataStore(cnf *conf.Config, writtenSpans *common.Semaphore) (*dataStore, error) {
- dld := NewDataStoreLoader(cnf)
- defer dld.Close()
- err := dld.Load()
- if err != nil {
- dld.lg.Errorf("Error loading datastore: %s\n", err.Error())
- return nil, err
- }
- store := &dataStore {
- lg: dld.lg,
- shards: make([]*shard, len(dld.shards)),
- readOpts: dld.readOpts,
- writeOpts: dld.writeOpts,
- WrittenSpans: writtenSpans,
- msink: NewMetricsSink(cnf),
- hb: NewHeartbeater("DatastoreHeartbeater",
- cnf.GetInt64(conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS), dld.lg),
- rpr: NewReaper(cnf),
- startMs: common.TimeToUnixMs(time.Now().UTC()),
- }
- spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE)
- for shdIdx := range store.shards {
- shd := &shard {
- store: store,
- ldb: dld.shards[shdIdx].ldb,
- path: dld.shards[shdIdx].path,
- incoming: make(chan []*IncomingSpan, spanBufferSize),
- heartbeats: make(chan interface{}, 1),
- }
- shd.exited.Add(1)
- go shd.processIncoming()
- store.shards[shdIdx] = shd
- store.hb.AddHeartbeatTarget(&HeartbeatTarget{
- name: fmt.Sprintf("shard(%s)", shd.path),
- targetChan: shd.heartbeats,
- })
- }
- dld.DisownResources()
- return store, nil
-}
-
-// Close the DataStore.
-func (store *dataStore) Close() {
- if store.hb != nil {
- store.hb.Shutdown()
- store.hb = nil
- }
- for idx := range store.shards {
- if store.shards[idx] != nil {
- store.shards[idx].Close()
- store.shards[idx] = nil
- }
- }
- if store.rpr != nil {
- store.rpr.Shutdown()
- store.rpr = nil
- }
- if store.readOpts != nil {
- store.readOpts.Close()
- store.readOpts = nil
- }
- if store.writeOpts != nil {
- store.writeOpts.Close()
- store.writeOpts = nil
- }
- if store.lg != nil {
- store.lg.Close()
- store.lg = nil
- }
-}
-
-// Get the index of the shard which stores the given spanId.
-func (store *dataStore) getShardIndex(sid common.SpanId) int {
- return int(sid.Hash32() % uint32(len(store.shards)))
-}
-
-const WRITESPANS_BATCH_SIZE = 128
-
-// SpanIngestor is a class used internally to ingest spans from an RPC
-// endpoint. It groups spans destined for a particular shard into small
-// batches, so that we can reduce the number of objects that need to be sent
-// over the shard's "incoming" channel. Since sending objects over a channel
-// requires goroutine synchronization, this improves performance.
-//
-// SpanIngestor also allows us to reuse the same encoder object for many spans,
-// rather than creating a new encoder per span. This avoids re-doing the
-// encoder setup for each span, and also generates less garbage.
-type SpanIngestor struct {
- // The logger to use.
- lg *common.Logger
-
- // The dataStore we are ingesting spans into.
- store *dataStore
-
- // The remote address these spans are coming from.
- addr string
-
- // Default TracerId
- defaultTrid string
-
- // The msgpack handle to use to serialize the spans.
- mh codec.MsgpackHandle
-
- // The msgpack encoder to use to serialize the spans.
- // Caching this avoids generating a lot of garbage and burning CPUs
- // creating new encoder objects for each span.
- enc *codec.Encoder
-
- // The buffer which codec.Encoder is currently serializing to.
- // We have to create a new buffer for each span because once we hand it off to the shard, the
- // shard manages the buffer lifecycle.
- spanDataBytes []byte
-
- // An array mapping shard index to span batch.
- batches []*SpanIngestorBatch
-
- // The total number of spans ingested. Includes dropped spans.
- totalIngested int
-
- // The total number of spans the ingestor dropped because of a server-side error.
- serverDropped int
-}
-
-// A batch of spans destined for a particular shard.
-type SpanIngestorBatch struct {
- incoming []*IncomingSpan
-}
-
-func (store *dataStore) NewSpanIngestor(lg *common.Logger,
- addr string, defaultTrid string) *SpanIngestor {
- ing := &SpanIngestor{
- lg: lg,
- store: store,
- addr: addr,
- defaultTrid: defaultTrid,
- spanDataBytes: make([]byte, 0, 1024),
- batches: make([]*SpanIngestorBatch, len(store.shards)),
- }
- ing.mh.WriteExt = true
- ing.enc = codec.NewEncoderBytes(&ing.spanDataBytes, &ing.mh)
- for batchIdx := range ing.batches {
- ing.batches[batchIdx] = &SpanIngestorBatch{
- incoming: make([]*IncomingSpan, 0, WRITESPANS_BATCH_SIZE),
- }
- }
- return ing
-}
-
-func (ing *SpanIngestor) IngestSpan(span *common.Span) {
- ing.totalIngested++
- // Make sure the span ID is valid.
- spanIdProblem := span.Id.FindProblem()
- if spanIdProblem != "" {
- // Can't print the invalid span ID because String() might fail.
- ing.lg.Warnf("Invalid span ID: %s\n", spanIdProblem)
- ing.serverDropped++
- return
- }
-
- // Set the default tracer id, if needed.
- if span.TracerId == "" {
- span.TracerId = ing.defaultTrid
- }
-
- // Encode the span data. Doing the encoding here is better than doing it
- // in the shard goroutine, because we can achieve more parallelism.
- // There is one shard goroutine per shard, but potentially many more
- // ingestors per shard.
- err := ing.enc.Encode(span.SpanData)
- if err != nil {
- ing.lg.Warnf("Failed to encode span ID %s: %s\n",
- span.Id.String(), err.Error())
- ing.serverDropped++
- return
- }
- spanDataBytes := ing.spanDataBytes
- ing.spanDataBytes = make([]byte, 0, 1024)
- ing.enc.ResetBytes(&ing.spanDataBytes)
-
- // Determine which shard this span should go to.
- shardIdx := ing.store.getShardIndex(span.Id)
- batch := ing.batches[shardIdx]
- incomingLen := len(batch.incoming)
- if ing.lg.TraceEnabled() {
- ing.lg.Tracef("SpanIngestor#IngestSpan: spanId=%s, shardIdx=%d, "+
- "incomingLen=%d, cap(batch.incoming)=%d\n",
- span.Id.String(), shardIdx, incomingLen, cap(batch.incoming))
- }
- if incomingLen+1 == cap(batch.incoming) {
- if ing.lg.TraceEnabled() {
- ing.lg.Tracef("SpanIngestor#IngestSpan: flushing %d spans for "+
- "shard %d\n", len(batch.incoming), shardIdx)
- }
- ing.store.WriteSpans(shardIdx, batch.incoming)
- batch.incoming = make([]*IncomingSpan, 1, WRITESPANS_BATCH_SIZE)
- incomingLen = 0
- } else {
- batch.incoming = batch.incoming[0 : incomingLen+1]
- }
- batch.incoming[incomingLen] = &IncomingSpan{
- Addr: ing.addr,
- Span: span,
- SpanDataBytes: spanDataBytes,
- }
-}
-
-func (ing *SpanIngestor) Close(startTime time.Time) {
- for shardIdx := range ing.batches {
- batch := ing.batches[shardIdx]
- if len(batch.incoming) > 0 {
- if ing.lg.TraceEnabled() {
- ing.lg.Tracef("SpanIngestor#Close: flushing %d span(s) for "+
- "shard %d\n", len(batch.incoming), shardIdx)
- }
- ing.store.WriteSpans(shardIdx, batch.incoming)
- }
- batch.incoming = nil
- }
- ing.lg.Debugf("Closed span ingestor for %s. Ingested %d span(s); dropped "+
- "%d span(s).\n", ing.addr, ing.totalIngested, ing.serverDropped)
-
- endTime := time.Now()
- ing.store.msink.UpdateIngested(ing.addr, ing.totalIngested,
- ing.serverDropped, endTime.Sub(startTime))
-}
-
-func (store *dataStore) WriteSpans(shardIdx int, ispans []*IncomingSpan) {
- store.shards[shardIdx].incoming <- ispans
-}
-
-func (store *dataStore) FindSpan(sid common.SpanId) *common.Span {
- return store.shards[store.getShardIndex(sid)].FindSpan(sid)
-}
-
-func (shd *shard) FindSpan(sid common.SpanId) *common.Span {
- lg := shd.store.lg
- primaryKey := append([]byte{SPAN_ID_INDEX_PREFIX}, sid.Val()...)
- buf, err := shd.ldb.Get(shd.store.readOpts, primaryKey)
- if err != nil {
- if strings.Index(err.Error(), "NotFound:") != -1 {
- return nil
- }
- lg.Warnf("Shard(%s): FindSpan(%s) error: %s\n",
- shd.path, sid.String(), err.Error())
- return nil
- }
- var span *common.Span
- span, err = shd.decodeSpan(sid, buf)
- if err != nil {
- lg.Errorf("Shard(%s): FindSpan(%s) decode error: %s decoding [%s]\n",
- shd.path, sid.String(), err.Error(), hex.EncodeToString(buf))
- return nil
- }
- return span
-}
-
-func (shd *shard) decodeSpan(sid common.SpanId, buf []byte) (*common.Span, error) {
- r := bytes.NewBuffer(buf)
- mh := new(codec.MsgpackHandle)
- mh.WriteExt = true
- decoder := codec.NewDecoder(r, mh)
- data := common.SpanData{}
- err := decoder.Decode(&data)
- if err != nil {
- return nil, err
- }
- if data.Parents == nil {
- data.Parents = []common.SpanId{}
- }
- return &common.Span{Id: common.SpanId(sid), SpanData: data}, nil
-}
-
-// Find the children of a given span id.
-func (store *dataStore) FindChildren(sid common.SpanId, lim int32) []common.SpanId {
- childIds := make([]common.SpanId, 0)
- var err error
-
- startIdx := store.getShardIndex(sid)
- idx := startIdx
- numShards := len(store.shards)
- for {
- if lim == 0 {
- break
- }
- shd := store.shards[idx]
- childIds, lim, err = shd.FindChildren(sid, childIds, lim)
- if err != nil {
- store.lg.Errorf("Shard(%s): FindChildren(%s) error: %s\n",
- shd.path, sid.String(), err.Error())
- }
- idx++
- if idx >= numShards {
- idx = 0
- }
- if idx == startIdx {
- break
- }
- }
- return childIds
-}
-
-type predicateData struct {
- *common.Predicate
- key []byte
-}
-
-func loadPredicateData(pred *common.Predicate) (*predicateData, error) {
- p := predicateData{Predicate: pred}
-
- // Parse the input value given to make sure it matches up with the field
- // type.
- switch pred.Field {
- case common.SPAN_ID:
- // Span IDs are sent as hex strings.
- var id common.SpanId
- if err := id.FromString(pred.Val); err != nil {
- return nil, errors.New(fmt.Sprintf("Unable to parse span id '%s': %s",
- pred.Val, err.Error()))
- }
- p.key = id.Val()
- break
- case common.DESCRIPTION:
- // Any string is valid for a description.
- p.key = []byte(pred.Val)
- break
- case common.BEGIN_TIME, common.END_TIME, common.DURATION:
- // Parse a base-10 signed numeric field.
- v, err := strconv.ParseInt(pred.Val, 10, 64)
- if err != nil {
- return nil, errors.New(fmt.Sprintf("Unable to parse %s '%s': %s",
- pred.Field, pred.Val, err.Error()))
- }
- p.key = u64toSlice(s2u64(v))
- break
- case common.TRACER_ID:
- // Any string is valid for a tracer ID.
- p.key = []byte(pred.Val)
- break
- default:
- return nil, errors.New(fmt.Sprintf("Unknown field %s", pred.Field))
- }
-
- // Validate the predicate operation.
- switch pred.Op {
- case common.EQUALS, common.LESS_THAN_OR_EQUALS,
- common.GREATER_THAN_OR_EQUALS, common.GREATER_THAN:
- break
- case common.CONTAINS:
- if p.fieldIsNumeric() {
- return nil, errors.New(fmt.Sprintf("Can't use CONTAINS on a "+
- "numeric field like '%s'", pred.Field))
- }
- default:
- return nil, errors.New(fmt.Sprintf("Unknown predicate operation '%s'",
- pred.Op))
- }
-
- return &p, nil
-}
-
-// Get the index prefix for this predicate, or 0 if it is not indexed.
-func (pred *predicateData) getIndexPrefix() byte {
- switch pred.Field {
- case common.SPAN_ID:
- return SPAN_ID_INDEX_PREFIX
- case common.BEGIN_TIME:
- return BEGIN_TIME_INDEX_PREFIX
- case common.END_TIME:
- return END_TIME_INDEX_PREFIX
- case common.DURATION:
- return DURATION_INDEX_PREFIX
- default:
- return INVALID_INDEX_PREFIX
- }
-}
-
-// Returns true if the predicate type is numeric.
-func (pred *predicateData) fieldIsNumeric() bool {
- switch pred.Field {
- case common.SPAN_ID, common.BEGIN_TIME, common.END_TIME, common.DURATION:
- return true
- default:
- return false
- }
-}
-
-// Get the values that this predicate cares about for a given span.
-func (pred *predicateData) extractRelevantSpanData(span *common.Span) []byte {
- switch pred.Field {
- case common.SPAN_ID:
- return span.Id.Val()
- case common.DESCRIPTION:
- return []byte(span.Description)
- case common.BEGIN_TIME:
- return u64toSlice(s2u64(span.Begin))
- case common.END_TIME:
- return u64toSlice(s2u64(span.End))
- case common.DURATION:
- return u64toSlice(s2u64(span.Duration()))
- case common.TRACER_ID:
- return []byte(span.TracerId)
- default:
- panic(fmt.Sprintf("Unknown field type %s.", pred.Field))
- }
-}
-
-func (pred *predicateData) spanPtrIsBefore(a *common.Span, b *common.Span) bool {
- // nil is after everything.
- if a == nil {
- if b == nil {
- return false
- }
- return false
- } else if b == nil {
- return true
- }
- // Compare the spans according to this predicate.
- aVal := pred.extractRelevantSpanData(a)
- bVal := pred.extractRelevantSpanData(b)
- cmp := bytes.Compare(aVal, bVal)
- if pred.Op.IsDescending() {
- return cmp > 0
- } else {
- return cmp < 0
- }
-}
-
-type satisfiedByReturn int
-
-const (
- NOT_SATISFIED satisfiedByReturn = iota
- NOT_YET_SATISFIED = iota
- SATISFIED = iota
-)
-
-func (r satisfiedByReturn) String() string {
- switch (r) {
- case NOT_SATISFIED:
- return "NOT_SATISFIED"
- case NOT_YET_SATISFIED:
- return "NOT_YET_SATISFIED"
- case SATISFIED:
- return "SATISFIED"
- default:
- return "(unknown)"
- }
-}
-
-// Determine whether the predicate is satisfied by the given span.
-func (pred *predicateData) satisfiedBy(span *common.Span) satisfiedByReturn {
- val := pred.extractRelevantSpanData(span)
- switch pred.Op {
- case common.CONTAINS:
- if bytes.Contains(val, pred.key) {
- return SATISFIED
- } else {
- return NOT_SATISFIED
- }
- case common.EQUALS:
- if bytes.Equal(val, pred.key) {
- return SATISFIED
- } else {
- return NOT_SATISFIED
- }
- case common.LESS_THAN_OR_EQUALS:
- if bytes.Compare(val, pred.key) <= 0 {
- return SATISFIED
- } else {
- return NOT_YET_SATISFIED
- }
- case common.GREATER_THAN_OR_EQUALS:
- if bytes.Compare(val, pred.key) >= 0 {
- return SATISFIED
- } else {
- return NOT_SATISFIED
- }
- case common.GREATER_THAN:
- cmp := bytes.Compare(val, pred.key)
- if cmp <= 0 {
- return NOT_YET_SATISFIED
- } else {
- return SATISFIED
- }
- default:
- panic(fmt.Sprintf("unknown Op type %s should have been caught "+
- "during normalization", pred.Op))
- }
-}
-
-func (pred *predicateData) createSource(store *dataStore, prev *common.Span) (*source, error) {
- var ret *source
- src := source{store: store,
- pred: pred,
- shards: make([]*shard, len(store.shards)),
- iters: make([]*levigo.Iterator, 0, len(store.shards)),
- nexts: make([]*common.Span, len(store.shards)),
- numRead: make([]int, len(store.shards)),
- keyPrefix: pred.getIndexPrefix(),
- }
- if src.keyPrefix == INVALID_INDEX_PREFIX {
- return nil, errors.New(fmt.Sprintf("Can't create source from unindexed "+
- "predicate on field %s", pred.Field))
- }
- defer func() {
- if ret == nil {
- src.Close()
- }
- }()
- for shardIdx := range store.shards {
- shd := store.shards[shardIdx]
- src.shards[shardIdx] = shd
- src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts))
- }
- var searchKey []byte
- lg := store.lg
- if prev != nil {
- // If prev != nil, this query RPC is the continuation of a previous
- // one. The final result returned the last time is 'prev'.
- //
- // To avoid returning the same results multiple times, we adjust the
- // predicate here. If the predicate is on the span id field, we
- // simply manipulate the span ID we're looking for.
- //
- // If the predicate is on a secondary index, we also use span ID, but
- // in a slightly different way. Since the secondary indices are
- // organized as [type-code][8b-secondary-key][8b-span-id], elements
- // with the same secondary index field are ordered by span ID. So we
- // create a 17-byte key incorporating the span ID from 'prev.'
- startId := common.INVALID_SPAN_ID
- switch pred.Op {
- case common.EQUALS:
- if pred.Field == common.SPAN_ID {
- // This is an annoying corner case. There can only be one
- // result each time we do an EQUALS search for a span id.
- // Span id is the primary key for all our spans.
- // But for some reason someone is asking for another result.
- // We modify the query to search for the illegal 0 span ID,
- // which will never be present.
- if lg.DebugEnabled() {
- lg.Debugf("Attempted to use a continuation token with an EQUALS "+
- "SPAN_ID query. %s. Setting search id = 0",
- pred.Predicate.String())
- }
- startId = common.INVALID_SPAN_ID
- } else {
- // When doing an EQUALS search on a secondary index, the
- // results are sorted by span id.
- startId = prev.Id.Next()
- }
- case common.LESS_THAN_OR_EQUALS:
- // Subtract one from the previous span id. Since the previous
- // start ID will never be 0 (0 is an illegal span id), we'll never
- // wrap around when doing this.
- startId = prev.Id.Prev()
- case common.GREATER_THAN_OR_EQUALS:
- // We can't add one to the span id, since the previous span ID
- // might be the maximum value. So just switch over to using
- // GREATER_THAN.
- pred.Op = common.GREATER_THAN
- startId = prev.Id
- case common.GREATER_THAN:
- // This one is easy.
- startId = prev.Id
- default:
- str := fmt.Sprintf("Can't use a %v predicate as a source.", pred.Predicate.String())
- lg.Error(str + "\n")
- panic(str)
- }
- if pred.Field == common.SPAN_ID {
- pred.key = startId.Val()
- searchKey = append([]byte{src.keyPrefix}, startId.Val()...)
- } else {
- // Start where the previous query left off. This means adjusting
- // our uintKey.
- pred.key = pred.extractRelevantSpanData(prev)
- searchKey = append(append([]byte{src.keyPrefix}, pred.key...),
- startId.Val()...)
- }
- if lg.TraceEnabled() {
- lg.Tracef("Handling continuation token %s for %s. startId=%d, "+
- "pred.uintKey=%s\n", prev, pred.Predicate.String(), startId,
- hex.EncodeToString(pred.key))
- }
- } else {
- searchKey = append([]byte{src.keyPrefix}, pred.key...)
- }
- for i := range src.iters {
- src.iters[i].Seek(searchKey)
- }
- ret = &src
- return ret, nil
-}
-
-// A source of spans.
-type source struct {
- store *dataStore
- pred *predicateData
- shards []*shard
- iters []*levigo.Iterator
- nexts []*common.Span
- numRead []int
- keyPrefix byte
-}
-
-func CreateReaperSource(shd *shard) (*source, error) {
- store := shd.store
- p := &common.Predicate{
- Op: common.GREATER_THAN_OR_EQUALS,
- Field: common.BEGIN_TIME,
- Val: common.INVALID_SPAN_ID.String(),
- }
- pred, err := loadPredicateData(p)
- if err != nil {
- return nil, err
- }
- src := &source{
- store: store,
- pred: pred,
- shards: []*shard{shd},
- iters: make([]*levigo.Iterator, 1),
- nexts: make([]*common.Span, 1),
- numRead: make([]int, 1),
- keyPrefix: pred.getIndexPrefix(),
- }
- iter := shd.ldb.NewIterator(store.readOpts)
- src.iters[0] = iter
- searchKey := append(append([]byte{src.keyPrefix}, pred.key...),
- pred.key...)
- iter.Seek(searchKey)
- return src, nil
-}
-
-// Fill in the entry in the 'next' array for a specific shard.
-func (src *source) populateNextFromShard(shardIdx int) {
- lg := src.store.lg
- var err error
- iter := src.iters[shardIdx]
- shdPath := src.shards[shardIdx].path
- if iter == nil {
- lg.Debugf("Can't populate: No more entries in shard %s\n", shdPath)
- return // There are no more entries in this shard.
- }
- if src.nexts[shardIdx] != nil {
- lg.Debugf("No need to populate shard %s\n", shdPath)
- return // We already have a valid entry for this shard.
- }
- for {
- if !iter.Valid() {
- lg.Debugf("Can't populate: Iterator for shard %s is no longer valid.\n", shdPath)
- break // Can't read past end of DB
- }
- src.numRead[shardIdx]++
- key := iter.Key()
- if len(key) < 1 {
- lg.Warnf("Encountered invalid zero-byte key in shard %s.\n", shdPath)
- break
- }
- ret := src.checkKeyPrefix(key[0], iter)
- if ret == NOT_SATISFIED {
- break // Can't read past end of indexed section
- } else if ret == NOT_YET_SATISFIED {
- if src.pred.Op.IsDescending() {
- iter.Prev()
- } else {
- iter.Next()
- }
- continue // Try again because we are not yet at the indexed section.
- }
- var span *common.Span
- var sid common.SpanId
- if src.keyPrefix == SPAN_ID_INDEX_PREFIX {
- // The span id maps to the span itself.
- sid = common.SpanId(key[1:17])
- span, err = src.shards[shardIdx].decodeSpan(sid, iter.Value())
- if err != nil {
- if lg.DebugEnabled() {
- lg.Debugf("Internal error decoding span %s in shard %s: %s\n",
- sid.String(), shdPath, err.Error())
- }
- break
- }
- } else {
- // With a secondary index, we have to look up the span by id.
- sid = common.SpanId(key[9:25])
- span = src.shards[shardIdx].FindSpan(sid)
- if span == nil {
- if lg.DebugEnabled() {
- lg.Debugf("Internal error rehydrating span %s in shard %s\n",
- sid.String(), shdPath)
- }
- break
- }
- }
- if src.pred.Op.IsDescending() {
- iter.Prev()
- } else {
- iter.Next()
- }
- ret = src.pred.satisfiedBy(span)
- if ret == SATISFIED {
- if lg.DebugEnabled() {
- lg.Debugf("Populated valid span %v from shard %s.\n", sid, shdPath)
- }
- src.nexts[shardIdx] = span // Found valid entry
- return
- }
- if ret == NOT_SATISFIED {
- // This and subsequent entries don't satisfy predicate
- break
- }
- }
- lg.Debugf("Closing iterator for shard %s.\n", shdPath)
- iter.Close()
- src.iters[shardIdx] = nil
-}
-
-// Check the key prefix against the key prefix of the query.
-func (src *source) checkKeyPrefix(kp byte, iter *levigo.Iterator) satisfiedByReturn {
- if kp == src.keyPrefix {
- return SATISFIED
- } else if kp < src.keyPrefix {
- if src.pred.Op.IsDescending() {
- return NOT_SATISFIED
- } else {
- return NOT_YET_SATISFIED
- }
- } else {
- if src.pred.Op.IsDescending() {
- return NOT_YET_SATISFIED
- } else {
- return NOT_SATISFIED
- }
- }
-}
-
-
-func (src *source) next() *common.Span {
- for shardIdx := range src.shards {
- src.populateNextFromShard(shardIdx)
- }
- var best *common.Span
- bestIdx := -1
- for shardIdx := range src.iters {
- span := src.nexts[shardIdx]
- if src.pred.spanPtrIsBefore(span, best) {
- best = span
- bestIdx = shardIdx
- }
- }
- if bestIdx >= 0 {
- src.nexts[bestIdx] = nil
- }
- return best
-}
-
-func (src *source) Close() {
- for i := range src.iters {
- if src.iters[i] != nil {
- src.iters[i].Close()
- }
- }
- src.iters = nil
-}
-
-func (src *source) getStats() string {
- ret := fmt.Sprintf("Source stats: pred = %s", src.pred.String())
- prefix := ". "
- for shardIdx := range src.shards {
- next := fmt.Sprintf("%sRead %d spans from %s", prefix,
- src.numRead[shardIdx], src.shards[shardIdx].path)
- prefix = ", "
- ret = ret + next
- }
- return ret
-}
-
-func (store *dataStore) obtainSource(preds *[]*predicateData, span *common.Span) (*source, error) {
- // Read spans from the first predicate that is indexed.
- p := *preds
- for i := range p {
- pred := p[i]
- if pred.getIndexPrefix() != INVALID_INDEX_PREFIX {
- *preds = append(p[0:i], p[i+1:]...)
- return pred.createSource(store, span)
- }
- }
- // If there are no predicates that are indexed, read rows in order of span id.
- spanIdPred := common.Predicate{Op: common.GREATER_THAN_OR_EQUALS,
- Field: common.SPAN_ID,
- Val: common.INVALID_SPAN_ID.String(),
- }
- spanIdPredData, err := loadPredicateData(&spanIdPred)
- if err != nil {
- return nil, err
- }
- return spanIdPredData.createSource(store, span)
-}
-
-func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error, []int) {
- lg := store.lg
- // Parse predicate data.
- var err error
- preds := make([]*predicateData, len(query.Predicates))
- for i := range query.Predicates {
- preds[i], err = loadPredicateData(&query.Predicates[i])
- if err != nil {
- return nil, err, nil
- }
- }
- // Get a source of rows.
- var src *source
- src, err = store.obtainSource(&preds, query.Prev)
- if err != nil {
- return nil, err, nil
- }
- defer src.Close()
- if lg.DebugEnabled() {
- lg.Debugf("HandleQuery %s: preds = %s, src = %v\n", query, preds, src)
- }
-
- // Filter the spans through the remaining predicates.
- reserved := 32
- if query.Lim < reserved {
- reserved = query.Lim
- }
- ret := make([]*common.Span, 0, reserved)
- for {
- if len(ret) >= query.Lim {
- if lg.DebugEnabled() {
- lg.Debugf("HandleQuery %s: hit query limit after obtaining " +
- "%d results. %s\n.", query, query.Lim, src.getStats())
- }
- break // we hit the result size limit
- }
- span := src.next()
- if span == nil {
- if lg.DebugEnabled() {
- lg.Debugf("HandleQuery %s: found %d result(s), which are " +
- "all that exist. %s\n", query, len(ret), src.getStats())
- }
- break // the source has no more spans to give
- }
- if lg.DebugEnabled() {
- lg.Debugf("src.next returned span %s\n", span.ToJson())
- }
- satisfied := true
- for predIdx := range preds {
- if preds[predIdx].satisfiedBy(span) != SATISFIED {
- satisfied = false
- break
- }
- }
- if satisfied {
- ret = append(ret, span)
- }
- }
- return ret, nil, src.numRead
-}
-
-func (store *dataStore) ServerStats() *common.ServerStats {
- serverStats := common.ServerStats{
- Dirs: make([]common.StorageDirectoryStats, len(store.shards)),
- }
- for shardIdx := range store.shards {
- shard := store.shards[shardIdx]
- serverStats.Dirs[shardIdx].Path = shard.path
- r := levigo.Range{
- Start: []byte{0},
- Limit: []byte{0xff},
- }
- vals := shard.ldb.GetApproximateSizes([]levigo.Range{r})
- serverStats.Dirs[shardIdx].ApproximateBytes = vals[0]
- serverStats.Dirs[shardIdx].LevelDbStats =
- shard.ldb.PropertyValue("leveldb.stats")
- store.msink.lg.Debugf("levedb.stats for %s: %s\n",
- shard.path, shard.ldb.PropertyValue("leveldb.stats"))
- }
- serverStats.LastStartMs = store.startMs
- serverStats.CurMs = common.TimeToUnixMs(time.Now().UTC())
- serverStats.ReapedSpans = atomic.LoadUint64(&store.rpr.ReapedSpans)
- store.msink.PopulateServerStats(&serverStats)
- return &serverStats
-}