You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficcontrol.apache.org by ne...@apache.org on 2017/01/13 23:36:05 UTC
[09/29] incubator-trafficcontrol git commit: Refactor influxdb_tools
to have separate folders for the different executables (so as not to break
the current build script,
in comparison to separate commands on a single cli.App
Refactor influxdb_tools to have separate folders for the different executables (so as not to break the current build script, in comparison to separate commands on a single cli.App
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/23292650
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/23292650
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/23292650
Branch: refs/heads/master
Commit: 2329265011edf56d18639140e1cdecee0fc82050
Parents: 8bdb8be
Author: sbogacz <sb...@zvelo.com>
Authored: Fri Jan 6 23:35:59 2017 -0700
Committer: David Neuman <da...@gmail.com>
Committed: Fri Jan 13 23:33:56 2017 +0000
----------------------------------------------------------------------
.../create/create_ts_databases.go | 177 +++++++
traffic_stats/influxdb_tools/create/main.go | 37 ++
.../influxdb_tools/create_ts_databases.go | 171 ------
traffic_stats/influxdb_tools/main.go | 48 --
traffic_stats/influxdb_tools/sync/main.go | 37 ++
traffic_stats/influxdb_tools/sync/sync_test.go | 230 +++++++++
.../influxdb_tools/sync/sync_ts_databases.go | 516 +++++++++++++++++++
.../influxdb_tools/sync_ts_databases.go | 483 -----------------
8 files changed, 997 insertions(+), 702 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/23292650/traffic_stats/influxdb_tools/create/create_ts_databases.go
----------------------------------------------------------------------
diff --git a/traffic_stats/influxdb_tools/create/create_ts_databases.go b/traffic_stats/influxdb_tools/create/create_ts_databases.go
new file mode 100644
index 0000000..fa1260a
--- /dev/null
+++ b/traffic_stats/influxdb_tools/create/create_ts_databases.go
@@ -0,0 +1,177 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+ "fmt"
+
+ influx "github.com/influxdata/influxdb/client/v2"
+ "github.com/pkg/errors"
+ "github.com/urfave/cli"
+)
+
+const (
+ cache = "cache_stats"
+ deliveryService = "deliveryservice_stats"
+ daily = "daily_stats"
+)
+
+func createFlags() []cli.Flag {
+ return []cli.Flag{
+ cli.StringFlag{
+ Name: "url",
+ Usage: "The influxdb url and port",
+ Value: "http://localhost:8086",
+ },
+ cli.IntFlag{
+ Name: "replication",
+ Usage: "The number of nodes in the cluster",
+ Value: 3,
+ },
+ cli.StringFlag{
+ Name: "user",
+ Usage: "The influxdb username used to create DBs",
+ Value: "",
+ },
+ cli.StringFlag{
+ Name: "password",
+ Usage: "The influxdb password used to create DBs",
+ Value: "",
+ },
+ }
+}
+
+func create(c *cli.Context) error {
+ influxURL := c.String("url")
+ replication := c.Int("replication")
+ user := c.String("user")
+ password := c.String("password")
+
+ fmt.Printf("creating datbases for influxUrl: %s with a replication of %d using user %s\n", influxURL, replication, user)
+ client, err := influx.NewHTTPClient(influx.HTTPConfig{
+ Addr: influxURL,
+ Username: user,
+ Password: password,
+ })
+ if err != nil {
+ return errors.Wrap(err, "Error creating influx client")
+ }
+ _, _, err = client.Ping(10)
+ if err != nil {
+ return errors.Wrap(err, "Error creating influx client")
+ }
+
+ createCacheStats(client, replication)
+ createDailyStats(client, replication)
+ createDeliveryServiceStats(client, replication)
+ return nil
+}
+
+// queryDB takes a variadic argument for the target database so as to make
+// passing the variable optional, however, if passed, only the first db passed
+// in will be used
+func queryDB(client influx.Client, cmd string, dbs ...string) (res []influx.Result, err error) {
+ db := ""
+ if len(dbs) > 0 {
+ db = dbs[0]
+ }
+ q := influx.Query{
+ Command: cmd,
+ Database: db,
+ }
+ if response, err := client.Query(q); err == nil {
+ if response.Error() != nil {
+ return res, response.Error()
+ }
+ res = response.Results
+ }
+ return res, nil
+}
+
+func createCacheStats(client influx.Client, replication int) {
+ db := cache
+ createDatabase(client, db)
+ createRetentionPolicy(client, db, "daily", "26h", replication, true)
+ createRetentionPolicy(client, db, "monthly", "30d", replication, false)
+ createRetentionPolicy(client, db, "indefinite", "INF", replication, false)
+ createContinuousQuery(client, "bandwidth_1min", `CREATE CONTINUOUS QUERY bandwidth_1min ON cache_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "cache_stats"."monthly"."bandwidth.1min" FROM "cache_stats"."daily".bandwidth GROUP BY time(1m), * END`)
+ createContinuousQuery(client, "connections_1min", `CREATE CONTINUOUS QUERY connections_1min ON cache_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "cache_stats"."monthly"."connections.1min" FROM "cache_stats"."daily"."ats.proxy.process.http.current_client_connections" GROUP BY time(1m), * END`)
+ createContinuousQuery(client, "bandwidth_cdn_1min", `CREATE CONTINUOUS QUERY bandwidth_cdn_1min ON cache_stats RESAMPLE FOR 5m BEGIN SELECT sum(value) AS "value" INTO "cache_stats"."monthly"."bandwidth.cdn.1min" FROM "cache_stats"."monthly"."bandwidth.1min" GROUP BY time(1m), cdn END`)
+ createContinuousQuery(client, "connections_cdn_1min", `CREATE CONTINUOUS QUERY connections_cdn_1min ON cache_stats RESAMPLE FOR 5m BEGIN SELECT sum(value) AS "value" INTO "cache_stats"."monthly"."connections.cdn.1min" FROM "cache_stats"."monthly"."connections.1min" GROUP BY time(1m), cdn END`)
+ createContinuousQuery(client, "bandwidth_cdn_type_1min", `CREATE CONTINUOUS QUERY bandwidth_cdn_type_1min ON cache_stats RESAMPLE FOR 5m BEGIN SELECT sum(value) AS "value" INTO "cache_stats"."monthly"."bandwidth.cdn.type.1min" FROM "cache_stats"."monthly"."bandwidth.1min" GROUP BY time(1m), cdn, type END`)
+ createContinuousQuery(client, "connections_cdn_type_1min", `CREATE CONTINUOUS QUERY connections_cdn_type_1min ON cache_stats RESAMPLE FOR 5m BEGIN SELECT sum(value) AS "value" INTO "cache_stats"."monthly"."connections.cdn.type.1min" FROM "cache_stats"."monthly"."connections.1min" GROUP BY time(1m), cdn, type END`)
+ createContinuousQuery(client, "maxKbps_1min", `CREATE CONTINUOUS QUERY maxKbps_1min ON cache_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS value INTO cache_stats.monthly."maxkbps.1min" FROM cache_stats.daily.maxKbps GROUP BY time(1m), * END`)
+ createContinuousQuery(client, "maxkbps_cdn_1min", `CREATE CONTINUOUS QUERY maxkbps_cdn_1min ON cache_stats RESAMPLE FOR 5m BEGIN SELECT sum(value) AS value INTO cache_stats.monthly."maxkbps.cdn.1min" FROM cache_stats.monthly."maxkbps.1min" GROUP BY time(1m), cdn END`)
+ createContinuousQuery(client, "wrap_count_vol1_1m", `CREATE CONTINUOUS QUERY wrap_count_vol1_1m ON cache_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS vol1_wrap_count INTO cache_stats.monthly."wrap_count.1min" FROM cache_stats.daily."ats.proxy.process.cache.volume_1.wrap_count" GROUP BY time(1m), * END`)
+ createContinuousQuery(client, "wrap_count_vol2_1m", `CREATE CONTINUOUS QUERY wrap_count_vol2_1m ON cache_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS vol2_wrap_count INTO cache_stats.monthly."wrap_count.1min" FROM cache_stats.daily."ats.proxy.process.cache.volume_2.wrap_count" GROUP BY time(1m), * END`)
+}
+
+func createDeliveryServiceStats(client influx.Client, replication int) {
+ db := deliveryService
+ createDatabase(client, db)
+ createRetentionPolicy(client, db, "daily", "26h", replication, true)
+ createRetentionPolicy(client, db, "monthly", "30d", replication, false)
+ createRetentionPolicy(client, db, "indefinite", "INF", replication, false)
+ createContinuousQuery(client, "tps_2xx_ds_1min", `CREATE CONTINUOUS QUERY tps_2xx_ds_1min ON deliveryservice_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "deliveryservice_stats"."monthly"."tps_2xx.ds.1min" FROM "deliveryservice_stats"."daily".tps_2xx WHERE cachegroup = 'total' GROUP BY time(1m), * END`)
+ createContinuousQuery(client, "tps_3xx_ds_1min", `CREATE CONTINUOUS QUERY tps_3xx_ds_1min ON deliveryservice_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "deliveryservice_stats"."monthly"."tps_3xx.ds.1min" FROM "deliveryservice_stats"."daily".tps_3xx WHERE cachegroup = 'total' GROUP BY time(1m), * END`)
+ createContinuousQuery(client, "tps_4xx_ds_1min", `CREATE CONTINUOUS QUERY tps_4xx_ds_1min ON deliveryservice_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "deliveryservice_stats"."monthly"."tps_4xx.ds.1min" FROM "deliveryservice_stats"."daily".tps_4xx WHERE cachegroup = 'total' GROUP BY time(1m), * END`)
+ createContinuousQuery(client, "tps_5xx_ds_1min", `CREATE CONTINUOUS QUERY tps_5xx_ds_1min ON deliveryservice_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "deliveryservice_stats"."monthly"."tps_5xx.ds.1min" FROM "deliveryservice_stats"."daily".tps_5xx WHERE cachegroup = 'total' GROUP BY time(1m), * END`)
+ createContinuousQuery(client, "tps_total_ds_1min", `CREATE CONTINUOUS QUERY tps_total_ds_1min ON deliveryservice_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "deliveryservice_stats"."monthly"."tps_total.ds.1min" FROM "deliveryservice_stats"."daily".tps_total WHERE cachegroup = 'total' GROUP BY time(1m), * END`)
+ createContinuousQuery(client, "kbps_ds_1min", `CREATE CONTINUOUS QUERY kbps_ds_1min ON deliveryservice_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "deliveryservice_stats"."monthly"."kbps.ds.1min" FROM "deliveryservice_stats"."daily".kbps WHERE cachegroup = 'total' GROUP BY time(1m), * END`)
+ createContinuousQuery(client, "kbps_cg_1min", `CREATE CONTINUOUS QUERY kbps_cg_1min ON deliveryservice_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "deliveryservice_stats"."monthly"."kbps.cg.1min" FROM "deliveryservice_stats"."daily".kbps WHERE cachegroup != 'total' GROUP BY time(1m), * END`)
+ createContinuousQuery(client, "max_kbps_ds_1day", `CREATE CONTINUOUS QUERY max_kbps_ds_1day ON deliveryservice_stats RESAMPLE FOR 2d BEGIN SELECT max(value) AS "value" INTO "deliveryservice_stats"."indefinite"."max.kbps.ds.1day" FROM "deliveryservice_stats"."monthly"."kbps.ds.1min" GROUP BY time(1d), deliveryservice, cdn END`)
+}
+
+func createDailyStats(client influx.Client, replication int) {
+ db := daily
+ createDatabase(client, db)
+ createRetentionPolicy(client, db, "indefinite", "INF", replication, true)
+}
+
+func createDatabase(client influx.Client, db string) {
+ _, err := queryDB(client, fmt.Sprintf("CREATE DATABASE %s", db))
+ if err != nil {
+ fmt.Printf("An error occured creating the %v database: %v\n", db, err)
+ return
+ }
+ fmt.Println("Successfully created database: ", db)
+}
+
+func createRetentionPolicy(client influx.Client, db string, name string, duration string, replication int, isDefault bool) {
+ qString := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION %s REPLICATION %d", name, db, duration, replication)
+ if isDefault {
+ qString += " DEFAULT"
+ }
+ _, err := queryDB(client, qString)
+ if err != nil {
+ fmt.Printf("An error occured creating the retention policy %s on database: %s: %v\n", name, db, err)
+ return
+ }
+ fmt.Printf("Successfully created retention policy %s for database: %s\n", name, db)
+}
+
+func createContinuousQuery(client influx.Client, name string, query string) {
+ _, err := queryDB(client, query)
+ if err != nil {
+ fmt.Printf("An error occured creating continuous query %s: %v\n", name, err)
+ return
+ }
+ fmt.Println("Successfully created continuous query ", name)
+}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/23292650/traffic_stats/influxdb_tools/create/main.go
----------------------------------------------------------------------
diff --git a/traffic_stats/influxdb_tools/create/main.go b/traffic_stats/influxdb_tools/create/main.go
new file mode 100644
index 0000000..986901b
--- /dev/null
+++ b/traffic_stats/influxdb_tools/create/main.go
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+ "os"
+
+ "github.com/urfave/cli"
+)
+
+func main() {
+ app := cli.NewApp()
+ app.Name = "create_ts_databases"
+ app.Version = "0.1.0"
+ app.Usage = "create_ts_databases provides a cli tool for creating the requisite influxdb databases"
+ app.Flags = createFlags()
+ app.Action = create
+
+ app.Run(os.Args)
+}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/23292650/traffic_stats/influxdb_tools/create_ts_databases.go
----------------------------------------------------------------------
diff --git a/traffic_stats/influxdb_tools/create_ts_databases.go b/traffic_stats/influxdb_tools/create_ts_databases.go
deleted file mode 100644
index 4ee6793..0000000
--- a/traffic_stats/influxdb_tools/create_ts_databases.go
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package main
-
-import (
- "fmt"
-
- influx "github.com/influxdata/influxdb/client/v2"
- "github.com/pkg/errors"
- "github.com/urfave/cli"
-)
-
-func createFlags() []cli.Flag {
- return []cli.Flag{
- cli.StringFlag{
- Name: "url",
- Usage: "The influxdb url and port",
- Value: "http://localhost:8086",
- },
- cli.IntFlag{
- Name: "replication",
- Usage: "The number of nodes in the cluster",
- Value: 3,
- },
- cli.StringFlag{
- Name: "user",
- Usage: "The influxdb username used to create DBs",
- Value: "",
- },
- cli.StringFlag{
- Name: "password",
- Usage: "The influxdb password used to create DBs",
- Value: "",
- },
- }
-}
-
-func create(c *cli.Context) error {
- influxURL := c.String("url")
- replication := c.Int("replication")
- user := c.String("user")
- password := c.String("password")
-
- fmt.Printf("creating datbases for influxUrl: %s with a replication of %d using user %s\n", influxURL, replication, user)
- client, err := influx.NewHTTPClient(influx.HTTPConfig{
- Addr: influxURL,
- Username: user,
- Password: password,
- })
- if err != nil {
- return errors.Wrap(err, "Error creating influx client")
- }
- _, _, err = client.Ping(10)
- if err != nil {
- return errors.Wrap(err, "Error creating influx client")
- }
-
- createCacheStats(client, replication)
- createDailyStats(client, replication)
- createDeliveryServiceStats(client, replication)
- return nil
-}
-
-// queryDB takes a variadic argument for the target database so as to make
-// passing the variable optional, however, if passed, only the first db passed
-// in will be used
-func queryDB(client influx.Client, cmd string, dbs ...string) (res []influx.Result, err error) {
- db := ""
- if len(dbs) > 0 {
- db = dbs[0]
- }
- q := influx.Query{
- Command: cmd,
- Database: db,
- }
- if response, err := client.Query(q); err == nil {
- if response.Error() != nil {
- return res, response.Error()
- }
- res = response.Results
- }
- return res, nil
-}
-
-func createCacheStats(client influx.Client, replication int) {
- db := cache
- createDatabase(client, db)
- createRetentionPolicy(client, db, "daily", "26h", replication, true)
- createRetentionPolicy(client, db, "monthly", "30d", replication, false)
- createRetentionPolicy(client, db, "indefinite", "INF", replication, false)
- createContinuousQuery(client, "bandwidth_1min", `CREATE CONTINUOUS QUERY bandwidth_1min ON cache_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "cache_stats"."monthly"."bandwidth.1min" FROM "cache_stats"."daily".bandwidth GROUP BY time(1m), * END`)
- createContinuousQuery(client, "connections_1min", `CREATE CONTINUOUS QUERY connections_1min ON cache_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "cache_stats"."monthly"."connections.1min" FROM "cache_stats"."daily"."ats.proxy.process.http.current_client_connections" GROUP BY time(1m), * END`)
- createContinuousQuery(client, "bandwidth_cdn_1min", `CREATE CONTINUOUS QUERY bandwidth_cdn_1min ON cache_stats RESAMPLE FOR 5m BEGIN SELECT sum(value) AS "value" INTO "cache_stats"."monthly"."bandwidth.cdn.1min" FROM "cache_stats"."monthly"."bandwidth.1min" GROUP BY time(1m), cdn END`)
- createContinuousQuery(client, "connections_cdn_1min", `CREATE CONTINUOUS QUERY connections_cdn_1min ON cache_stats RESAMPLE FOR 5m BEGIN SELECT sum(value) AS "value" INTO "cache_stats"."monthly"."connections.cdn.1min" FROM "cache_stats"."monthly"."connections.1min" GROUP BY time(1m), cdn END`)
- createContinuousQuery(client, "bandwidth_cdn_type_1min", `CREATE CONTINUOUS QUERY bandwidth_cdn_type_1min ON cache_stats RESAMPLE FOR 5m BEGIN SELECT sum(value) AS "value" INTO "cache_stats"."monthly"."bandwidth.cdn.type.1min" FROM "cache_stats"."monthly"."bandwidth.1min" GROUP BY time(1m), cdn, type END`)
- createContinuousQuery(client, "connections_cdn_type_1min", `CREATE CONTINUOUS QUERY connections_cdn_type_1min ON cache_stats RESAMPLE FOR 5m BEGIN SELECT sum(value) AS "value" INTO "cache_stats"."monthly"."connections.cdn.type.1min" FROM "cache_stats"."monthly"."connections.1min" GROUP BY time(1m), cdn, type END`)
- createContinuousQuery(client, "maxKbps_1min", `CREATE CONTINUOUS QUERY maxKbps_1min ON cache_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS value INTO cache_stats.monthly."maxkbps.1min" FROM cache_stats.daily.maxKbps GROUP BY time(1m), * END`)
- createContinuousQuery(client, "maxkbps_cdn_1min", `CREATE CONTINUOUS QUERY maxkbps_cdn_1min ON cache_stats RESAMPLE FOR 5m BEGIN SELECT sum(value) AS value INTO cache_stats.monthly."maxkbps.cdn.1min" FROM cache_stats.monthly."maxkbps.1min" GROUP BY time(1m), cdn END`)
- createContinuousQuery(client, "wrap_count_vol1_1m", `CREATE CONTINUOUS QUERY wrap_count_vol1_1m ON cache_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS vol1_wrap_count INTO cache_stats.monthly."wrap_count.1min" FROM cache_stats.daily."ats.proxy.process.cache.volume_1.wrap_count" GROUP BY time(1m), * END`)
- createContinuousQuery(client, "wrap_count_vol2_1m", `CREATE CONTINUOUS QUERY wrap_count_vol2_1m ON cache_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS vol2_wrap_count INTO cache_stats.monthly."wrap_count.1min" FROM cache_stats.daily."ats.proxy.process.cache.volume_2.wrap_count" GROUP BY time(1m), * END`)
-}
-
-func createDeliveryServiceStats(client influx.Client, replication int) {
- db := deliveryService
- createDatabase(client, db)
- createRetentionPolicy(client, db, "daily", "26h", replication, true)
- createRetentionPolicy(client, db, "monthly", "30d", replication, false)
- createRetentionPolicy(client, db, "indefinite", "INF", replication, false)
- createContinuousQuery(client, "tps_2xx_ds_1min", `CREATE CONTINUOUS QUERY tps_2xx_ds_1min ON deliveryservice_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "deliveryservice_stats"."monthly"."tps_2xx.ds.1min" FROM "deliveryservice_stats"."daily".tps_2xx WHERE cachegroup = 'total' GROUP BY time(1m), * END`)
- createContinuousQuery(client, "tps_3xx_ds_1min", `CREATE CONTINUOUS QUERY tps_3xx_ds_1min ON deliveryservice_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "deliveryservice_stats"."monthly"."tps_3xx.ds.1min" FROM "deliveryservice_stats"."daily".tps_3xx WHERE cachegroup = 'total' GROUP BY time(1m), * END`)
- createContinuousQuery(client, "tps_4xx_ds_1min", `CREATE CONTINUOUS QUERY tps_4xx_ds_1min ON deliveryservice_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "deliveryservice_stats"."monthly"."tps_4xx.ds.1min" FROM "deliveryservice_stats"."daily".tps_4xx WHERE cachegroup = 'total' GROUP BY time(1m), * END`)
- createContinuousQuery(client, "tps_5xx_ds_1min", `CREATE CONTINUOUS QUERY tps_5xx_ds_1min ON deliveryservice_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "deliveryservice_stats"."monthly"."tps_5xx.ds.1min" FROM "deliveryservice_stats"."daily".tps_5xx WHERE cachegroup = 'total' GROUP BY time(1m), * END`)
- createContinuousQuery(client, "tps_total_ds_1min", `CREATE CONTINUOUS QUERY tps_total_ds_1min ON deliveryservice_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "deliveryservice_stats"."monthly"."tps_total.ds.1min" FROM "deliveryservice_stats"."daily".tps_total WHERE cachegroup = 'total' GROUP BY time(1m), * END`)
- createContinuousQuery(client, "kbps_ds_1min", `CREATE CONTINUOUS QUERY kbps_ds_1min ON deliveryservice_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "deliveryservice_stats"."monthly"."kbps.ds.1min" FROM "deliveryservice_stats"."daily".kbps WHERE cachegroup = 'total' GROUP BY time(1m), * END`)
- createContinuousQuery(client, "kbps_cg_1min", `CREATE CONTINUOUS QUERY kbps_cg_1min ON deliveryservice_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS "value" INTO "deliveryservice_stats"."monthly"."kbps.cg.1min" FROM "deliveryservice_stats"."daily".kbps WHERE cachegroup != 'total' GROUP BY time(1m), * END`)
- createContinuousQuery(client, "max_kbps_ds_1day", `CREATE CONTINUOUS QUERY max_kbps_ds_1day ON deliveryservice_stats RESAMPLE FOR 2d BEGIN SELECT max(value) AS "value" INTO "deliveryservice_stats"."indefinite"."max.kbps.ds.1day" FROM "deliveryservice_stats"."monthly"."kbps.ds.1min" GROUP BY time(1d), deliveryservice, cdn END`)
-}
-
-func createDailyStats(client influx.Client, replication int) {
- db := daily
- createDatabase(client, db)
- createRetentionPolicy(client, db, "indefinite", "INF", replication, true)
-}
-
-func createDatabase(client influx.Client, db string) {
- _, err := queryDB(client, fmt.Sprintf("CREATE DATABASE %s", db))
- if err != nil {
- fmt.Printf("An error occured creating the %v database: %v\n", db, err)
- return
- }
- fmt.Println("Successfully created database: ", db)
-}
-
-func createRetentionPolicy(client influx.Client, db string, name string, duration string, replication int, isDefault bool) {
- qString := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION %s REPLICATION %d", name, db, duration, replication)
- if isDefault {
- qString += " DEFAULT"
- }
- _, err := queryDB(client, qString)
- if err != nil {
- fmt.Printf("An error occured creating the retention policy %s on database: %s: %v\n", name, db, err)
- return
- }
- fmt.Printf("Successfully created retention policy %s for database: %s\n", name, db)
-}
-
-func createContinuousQuery(client influx.Client, name string, query string) {
- _, err := queryDB(client, query)
- if err != nil {
- fmt.Printf("An error occured creating continuous query %s: %v\n", name, err)
- return
- }
- fmt.Println("Successfully created continuous query ", name)
-}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/23292650/traffic_stats/influxdb_tools/main.go
----------------------------------------------------------------------
diff --git a/traffic_stats/influxdb_tools/main.go b/traffic_stats/influxdb_tools/main.go
deleted file mode 100644
index 06fc3ef..0000000
--- a/traffic_stats/influxdb_tools/main.go
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package main
-
-import (
- "os"
-
- "github.com/urfave/cli"
-)
-
-func main() {
- app := cli.NewApp()
- app.Name = "influx-tools"
- app.Version = "0.1.0"
- app.Usage = "influx-tools provides cli methods for creating and syncing the requisite influxdb databases"
- app.Commands = []cli.Command{
- cli.Command{
- Name: "create",
- Usage: "create the influxDB tables",
- Action: create,
- Flags: createFlags(),
- },
- cli.Command{
- Name: "sync",
- Usage: "sync the influxDB tables",
- Action: sync,
- Flags: syncFlags(),
- },
- }
- app.Run(os.Args)
-}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/23292650/traffic_stats/influxdb_tools/sync/main.go
----------------------------------------------------------------------
diff --git a/traffic_stats/influxdb_tools/sync/main.go b/traffic_stats/influxdb_tools/sync/main.go
new file mode 100644
index 0000000..717f655
--- /dev/null
+++ b/traffic_stats/influxdb_tools/sync/main.go
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+ "os"
+
+ "github.com/urfave/cli"
+)
+
+func main() {
+ app := cli.NewApp()
+ app.Name = "sync_ts_databases"
+ app.Version = "0.1.0"
+ app.Usage = "sync_ts_databases provides a cli tool syncing the requisite influxdb databases"
+ app.Flags = syncFlags()
+ app.Action = sync
+
+ app.Run(os.Args)
+}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/23292650/traffic_stats/influxdb_tools/sync/sync_test.go
----------------------------------------------------------------------
diff --git a/traffic_stats/influxdb_tools/sync/sync_test.go b/traffic_stats/influxdb_tools/sync/sync_test.go
new file mode 100644
index 0000000..943e7a5
--- /dev/null
+++ b/traffic_stats/influxdb_tools/sync/sync_test.go
@@ -0,0 +1,230 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "testing"
+
+ influx "github.com/influxdata/influxdb/client/v2"
+ "github.com/influxdata/influxdb/models"
+ . "github.com/smartystreets/goconvey/convey"
+)
+
+func TestSync(t *testing.T) {
+ Convey("getDailyStats should work as expected", t, func() {
+ Convey("getDailyStats should work with no results passed", func() {
+ dailyStatsMap := getDailyStats(nil)
+ So(dailyStatsMap, ShouldNotBeNil)
+ So(dailyStatsMap, ShouldBeEmpty)
+ })
+ Convey("getDailyStats should work with empty results passed", func() {
+ results := []influx.Result{}
+ dailyStatsMap := getDailyStats(results)
+ So(dailyStatsMap, ShouldNotBeNil)
+ So(dailyStatsMap, ShouldBeEmpty)
+ })
+ Convey("getDailyStats should work as expected on a slice of results", func() {
+ results := []influx.Result{
+ influx.Result{
+ Series: []models.Row{
+ models.Row{
+ Values: generateDailyValues(0),
+ },
+ },
+ },
+ influx.Result{
+ Series: []models.Row{
+ models.Row{
+ Values: generateDailyValues(1),
+ },
+ models.Row{
+ Values: generateDailyValues(2),
+ },
+ },
+ },
+ influx.Result{
+ Series: []models.Row{
+ models.Row{
+ Values: generateDailyValues(3),
+ },
+ models.Row{
+ Values: generateDailyValues(4),
+ },
+ models.Row{
+ Values: generateDailyValues(5),
+ },
+ },
+ },
+ }
+ dailyStatsMap := getDailyStats(results)
+ So(dailyStatsMap, ShouldNotBeNil)
+ So(dailyStatsMap, ShouldNotBeEmpty)
+ So(len(dailyStatsMap), ShouldEqual, 6) // we get one dailyStats object per Values entry
+ })
+ })
+ Convey("getDeliveryServiceStats should work as expected", t, func() {
+ Convey("getDeliveryServiceStats should work with no results passed", func() {
+ getDeliveryServiceStatsMap := getDeliveryServiceStats(nil)
+ So(getDeliveryServiceStatsMap, ShouldNotBeNil)
+ So(getDeliveryServiceStatsMap, ShouldBeEmpty)
+ })
+ Convey("getDeliveryServiceStats should work with empty results passed", func() {
+ results := []influx.Result{}
+ getDeliveryServiceStatsMap := getDeliveryServiceStats(results)
+ So(getDeliveryServiceStatsMap, ShouldNotBeNil)
+ So(getDeliveryServiceStatsMap, ShouldBeEmpty)
+ })
+ Convey("getDeliveryServiceStats should work as expected on a slice of results", func() {
+ results := []influx.Result{
+ influx.Result{
+ Series: []models.Row{
+ models.Row{
+ Values: generateDeliveryServiceValues(0),
+ },
+ },
+ },
+ influx.Result{
+ Series: []models.Row{
+ models.Row{
+ Values: generateDeliveryServiceValues(1),
+ },
+ models.Row{
+ Values: generateDeliveryServiceValues(2),
+ },
+ },
+ },
+ influx.Result{
+ Series: []models.Row{
+ models.Row{
+ Values: generateDeliveryServiceValues(3),
+ },
+ models.Row{
+ Values: generateDeliveryServiceValues(4),
+ },
+ models.Row{
+ Values: generateDeliveryServiceValues(5),
+ },
+ },
+ },
+ }
+ getDeliveryServiceStatsMap := getDeliveryServiceStats(results)
+ So(getDeliveryServiceStatsMap, ShouldNotBeNil)
+ So(getDeliveryServiceStatsMap, ShouldNotBeEmpty)
+ So(len(getDeliveryServiceStatsMap), ShouldEqual, 6) // we get one dailyStats object per Values entry
+ })
+ })
+ Convey("getCacheStats should work as expected", t, func() {
+ Convey("getCacheStats should work with no results passed", func() {
+ getCacheStatsMap := getCacheStats(nil)
+ So(getCacheStatsMap, ShouldNotBeNil)
+ So(getCacheStatsMap, ShouldBeEmpty)
+ })
+ Convey("getCacheStats should work with empty results passed", func() {
+ results := []influx.Result{}
+ getCacheStatsMap := getCacheStats(results)
+ So(getCacheStatsMap, ShouldNotBeNil)
+ So(getCacheStatsMap, ShouldBeEmpty)
+ })
+ Convey("getCacheStats should work as expected on a slice of results", func() {
+ results := []influx.Result{
+ influx.Result{
+ Series: []models.Row{
+ models.Row{
+ Values: generateCacheValues(0),
+ },
+ },
+ },
+ influx.Result{
+ Series: []models.Row{
+ models.Row{
+ Values: generateCacheValues(1),
+ },
+ models.Row{
+ Values: generateCacheValues(2),
+ },
+ },
+ },
+ influx.Result{
+ Series: []models.Row{
+ models.Row{
+ Values: generateCacheValues(3),
+ },
+ models.Row{
+ Values: generateCacheValues(4),
+ },
+ models.Row{
+ Values: generateCacheValues(5),
+ },
+ },
+ },
+ }
+ getCacheStatsMap := getCacheStats(results)
+ So(getCacheStatsMap, ShouldNotBeNil)
+ So(getCacheStatsMap, ShouldNotBeEmpty)
+ So(len(getCacheStatsMap), ShouldEqual, 6) // we get one dailyStats object per Values entry
+ })
+ })
+}
+
+func generateDailyValues(i int) [][]interface{} {
+ ret := make([][]interface{}, 1)
+ startIdx := i * 4 // this is to have the right amount of difference between the test numbers used
+ for j := startIdx; j < startIdx+3; j++ {
+ n := startIdx + j
+ ret[0] = make([]interface{}, 4)
+ ret[0][0] = fmt.Sprintf("t%d", n)
+ ret[0][1] = fmt.Sprintf("test.cdn-%d", n)
+ ret[0][2] = fmt.Sprintf("testDeliveryService-%d", n)
+ ret[0][3] = json.Number(fmt.Sprintf("%d.%d%d", n, n, n))
+ }
+ return ret
+}
+
+func generateDeliveryServiceValues(i int) [][]interface{} {
+ ret := make([][]interface{}, 1)
+ startIdx := i * 4 // this is to have the right amount of difference between the test numbers used
+ for j := startIdx; j < startIdx+3; j++ {
+ n := startIdx + j
+ ret[0] = make([]interface{}, 5)
+ ret[0][0] = fmt.Sprintf("t%d", n)
+ ret[0][1] = fmt.Sprintf("cache-group%d", n)
+ ret[0][2] = fmt.Sprintf("test.cdn-%d", n)
+ ret[0][3] = fmt.Sprintf("testDeliveryService-%d", n)
+ ret[0][4] = json.Number(fmt.Sprintf("%d.%d%d", n, n, n))
+ }
+ return ret
+}
+
+func generateCacheValues(i int) [][]interface{} {
+ ret := make([][]interface{}, 1)
+ startIdx := i * 4 // this is to have the right amount of difference between the test numbers used
+ for j := startIdx; j < startIdx+3; j++ {
+ n := startIdx + j
+ ret[0] = make([]interface{}, 5)
+ ret[0][0] = fmt.Sprintf("t%d", n)
+ ret[0][1] = fmt.Sprintf("test.cdn-%d", n)
+ ret[0][2] = fmt.Sprintf("test.hostname-%d", n)
+ ret[0][3] = fmt.Sprintf("test.cacheType-%d", n)
+ ret[0][4] = json.Number(fmt.Sprintf("%d.%d%d", n, n, n))
+ }
+ return ret
+}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/23292650/traffic_stats/influxdb_tools/sync/sync_ts_databases.go
----------------------------------------------------------------------
diff --git a/traffic_stats/influxdb_tools/sync/sync_ts_databases.go b/traffic_stats/influxdb_tools/sync/sync_ts_databases.go
new file mode 100644
index 0000000..02a8249
--- /dev/null
+++ b/traffic_stats/influxdb_tools/sync/sync_ts_databases.go
@@ -0,0 +1,516 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "time"
+
+ influx "github.com/influxdata/influxdb/client/v2"
+ "github.com/pkg/errors"
+ "github.com/urfave/cli"
+)
+
+const (
+ cache = "cache_stats"
+ deliveryService = "deliveryservice_stats"
+ daily = "daily_stats"
+)
+
+type cacheStats struct {
+ t string //time
+ value float64
+ cdn string
+ hostname string
+ cacheType string
+}
+
+type deliveryServiceStats struct {
+ t string //time
+ value float64
+ cdn string
+ deliveryService string
+ cacheGroup string
+}
+type dailyStats struct {
+ t string //time
+ cdn string
+ deliveryService string
+ value float64
+}
+
+func syncFlags() []cli.Flag {
+ return []cli.Flag{
+ cli.StringFlag{
+ Name: "sourceUrl",
+ Usage: "The source influxdb url and port",
+ Value: "http://server1.kabletown.net:8086",
+ },
+ cli.StringFlag{
+ Name: "targetUrl",
+ Usage: "The target influxdb url and port",
+ Value: "http://server2.kabletown.net:8086",
+ },
+ cli.StringFlag{
+ Name: "database",
+ Usage: "Sync a specific database",
+ Value: "all",
+ },
+ cli.IntFlag{
+ Name: "days",
+ Usage: "Number of days in the past to sync (today - x days), 0 is all",
+ Value: 0,
+ },
+ cli.StringFlag{
+ Name: "sourceUser",
+ Usage: "The source influxdb username",
+ Value: "",
+ },
+ cli.StringFlag{
+ Name: "sourcePass",
+ Usage: "The source influxdb password",
+ Value: "",
+ },
+ cli.StringFlag{
+ Name: "targetUser",
+ Usage: "The target influxdb username",
+ Value: "",
+ },
+ cli.StringFlag{
+ Name: "targetPass",
+ Usage: "The target influxdb password",
+ Value: "",
+ },
+ }
+}
+
+func sync(c *cli.Context) error {
+
+ sourceURL := c.String("sourceUrl")
+ targetURL := c.String("targetUrl")
+ database := c.String("database")
+ days := c.Int("days")
+ sourceUser := c.String("sourceUser")
+ sourcePass := c.String("sourcePass")
+ targetUser := c.String("targetUser")
+ targetPass := c.String("targetPass")
+ fmt.Printf("syncing %s to %s for %s database(s) for the past %d day(s)\n", sourceURL, targetURL, database, days)
+ sourceClient, err := influx.NewHTTPClient(influx.HTTPConfig{
+ Addr: sourceURL,
+ Username: sourceUser,
+ Password: sourcePass,
+ })
+ if err != nil {
+ return errors.Wrap(err, "Error creating influx sourceClient")
+ }
+
+ if _, _, err = sourceClient.Ping(10); err != nil {
+ return errors.Wrap(err, "Error creating influx sourceClient")
+ }
+
+ targetClient, err := influx.NewHTTPClient(influx.HTTPConfig{
+ Addr: targetURL,
+ Username: targetUser,
+ Password: targetPass,
+ })
+ if err != nil {
+ return errors.Wrap(err, "Error creating influx targetClient")
+ }
+
+ if _, _, err = targetClient.Ping(10); err != nil {
+ return errors.Wrap(err, "Error creating influx targetClient")
+ }
+
+ chSize := 1
+ if database == "all" {
+ chSize = 3
+ }
+
+ ch := make(chan string)
+
+ switch database {
+ case "all":
+ go syncDailyDb(ch, sourceClient, targetClient, days)
+ go syncCsDb(ch, sourceClient, targetClient, days)
+ go syncDsDb(ch, sourceClient, targetClient, days)
+ case cache:
+ go syncCsDb(ch, sourceClient, targetClient, days)
+ case deliveryService:
+ go syncDsDb(ch, sourceClient, targetClient, days)
+ case daily:
+ go syncDailyDb(ch, sourceClient, targetClient, days)
+ default:
+ return errors.New("No database selected")
+ }
+
+ for i := 1; i <= chSize; i++ {
+ fmt.Println(<-ch)
+ }
+
+ fmt.Println("Traffic Stats have been synced!")
+ return nil
+}
+
+func syncCsDb(ch chan string, sourceClient influx.Client, targetClient influx.Client, days int) {
+ db := cache
+ fmt.Printf("Syncing %s database...\n", db)
+ stats := [...]string{
+ "bandwidth.cdn.1min",
+ "connections.cdn.1min",
+ "connections.cdn.type.1min",
+ "bandwidth.cdn.type.1min",
+ //these take a long time so do them last
+ "bandwidth.1min",
+ "connections.1min",
+ //
+ }
+ for _, statName := range stats {
+ fmt.Printf("Syncing %s database with %s \n", db, statName)
+ syncCacheStat(sourceClient, targetClient, statName, days)
+ fmt.Printf("Done syncing %s\n", statName)
+ }
+ ch <- fmt.Sprintf("Done syncing %s!\n", db)
+}
+
+func syncDsDb(ch chan string, sourceClient influx.Client, targetClient influx.Client, days int) {
+ db := deliveryService
+ fmt.Printf("Syncing %s database...\n", db)
+ stats := [...]string{
+ "kbps.ds.1min",
+ "max.kbps.ds.1day",
+ "kbps.cg.1min",
+ "tps_2xx.ds.1min",
+ "tps_3xx.ds.1min",
+ "tps_4xx.ds.1min",
+ "tps_5xx.ds.1min",
+ "tps_total.ds.1min",
+ }
+ for _, statName := range stats {
+ fmt.Printf("Syncing %s database with %s\n", db, statName)
+ syncDeliveryServiceStat(sourceClient, targetClient, statName, days)
+ }
+ ch <- fmt.Sprintf("Done syncing %s!\n", db)
+}
+
+func syncDailyDb(ch chan string, sourceClient influx.Client, targetClient influx.Client, days int) {
+ db := daily
+ fmt.Printf("Syncing %s database...\n", db)
+ stats := [...]string{
+ "daily_bytesserved",
+ "daily_maxgbps",
+ }
+
+ for _, statName := range stats {
+ fmt.Printf("Syncing %s database with %s\n", db, statName)
+ syncDailyStat(sourceClient, targetClient, statName, days)
+ }
+ ch <- fmt.Sprintf("Done syncing %s!\n", db)
+
+}
+
+func syncCacheStat(sourceClient influx.Client, targetClient influx.Client, statName string, days int) {
+ //get records from source DB
+ db := cache
+ bps, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
+ Database: db,
+ Precision: "ms",
+ RetentionPolicy: "monthly",
+ })
+
+ queryString := fmt.Sprintf("select time, cdn, hostname, type, value from \"monthly\".\"%s\"", statName)
+ if days > 0 {
+ queryString = fmt.Sprintf("%s where time > now() - %dd", queryString, days)
+ }
+ fmt.Println("queryString ", queryString)
+ res, err := queryDB(sourceClient, queryString, db)
+ if err != nil {
+ fmt.Printf("An error occured getting %s records from sourceDb\n", statName)
+ return
+ }
+ sourceStats := getCacheStats(res)
+
+ //get values from target DB
+ targetRes, err := queryDB(targetClient, queryString, db)
+ if err != nil {
+ fmt.Printf("An error occured getting %s record from target db: %v\n", statName, err)
+ return
+ }
+ targetStats := getCacheStats(targetRes)
+
+ for ssKey := range sourceStats {
+ ts := targetStats[ssKey]
+ ss := sourceStats[ssKey]
+ if ts.value > ss.value {
+ //fmt.Printf("target value %v is at least equal to source value %v\n", ts.value, ss.value)
+ continue //target value is bigger so leave it
+ }
+ statTime, _ := time.Parse(time.RFC3339, ss.t)
+ tags := map[string]string{"cdn": ss.cdn}
+ if ss.hostname != "" {
+ tags["hostname"] = ss.hostname
+ }
+ if ss.cacheType != "" {
+ tags["type"] = ss.cacheType
+ }
+ fields := map[string]interface{}{
+ "value": ss.value,
+ }
+ pt, err := influx.NewPoint(
+ statName,
+ tags,
+ fields,
+ statTime,
+ )
+ if err != nil {
+ fmt.Printf("error adding creating point for %v...%v\n", statName, err)
+ continue
+ }
+ bps.AddPoint(pt)
+ }
+ targetClient.Write(bps)
+}
+
+func syncDeliveryServiceStat(sourceClient influx.Client, targetClient influx.Client, statName string, days int) {
+
+ db := deliveryService
+ bps, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
+ Database: db,
+ Precision: "ms",
+ RetentionPolicy: "monthly",
+ })
+
+ queryString := fmt.Sprintf("select time, cachegroup, cdn, deliveryservice, value from \"monthly\".\"%s\"", statName)
+ if days > 0 {
+ queryString = fmt.Sprintf("%s where time > now() - %dd", queryString, days)
+ }
+ fmt.Println("queryString ", queryString)
+ res, err := queryDB(sourceClient, queryString, db)
+ if err != nil {
+ fmt.Printf("An error occured getting %s records from sourceDb: %v\n", statName, err)
+ return
+ }
+ sourceStats := getDeliveryServiceStats(res)
+ // get value from target DB
+ targetRes, err := queryDB(targetClient, queryString, db)
+ if err != nil {
+ fmt.Printf("An error occured getting %s record from target db: %v\n", statName, err)
+ return
+ }
+ targetStats := getDeliveryServiceStats(targetRes)
+
+ for ssKey := range sourceStats {
+ ts := targetStats[ssKey]
+ ss := sourceStats[ssKey]
+ if ts.value > ss.value {
+ //fmt.Printf("target value %v is at least equal to source value %v\n", ts.value, ss.value)
+ continue //target value is bigger so leave it
+ }
+ statTime, _ := time.Parse(time.RFC3339, ss.t)
+ tags := map[string]string{
+ "cdn": ss.cdn,
+ "cachegroup": ss.cacheGroup,
+ "deliveryservice": ss.deliveryService,
+ }
+ fields := map[string]interface{}{
+ "value": ss.value,
+ }
+ pt, err := influx.NewPoint(
+ statName,
+ tags,
+ fields,
+ statTime,
+ )
+ if err != nil {
+ fmt.Printf("error adding creating point for %v...%v\n", statName, err)
+ continue
+ }
+ bps.AddPoint(pt)
+ }
+ targetClient.Write(bps)
+}
+
+func syncDailyStat(sourceClient influx.Client, targetClient influx.Client, statName string, days int) {
+
+ db := daily
+ bps, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
+ Database: db,
+ Precision: "s",
+ })
+ //get records from source DB
+ queryString := fmt.Sprintf("select time, cdn, deliveryservice, value from \"%s\"", statName)
+ if days > 0 {
+ queryString = fmt.Sprintf("%s where time > now() - %dd", queryString, days)
+ }
+ res, err := queryDB(sourceClient, queryString, db)
+ if err != nil {
+ fmt.Printf("An error occured getting %s records from sourceDb: %v\n", statName, err)
+ return
+ }
+ sourceStats := getDailyStats(res)
+ // get value from target DB
+ targetRes, err := queryDB(targetClient, queryString, db)
+ if err != nil {
+ fmt.Printf("An error occured getting %s record from target db: %v\n", statName, err)
+ return
+ }
+ targetStats := getDailyStats(targetRes)
+
+ for ssKey := range sourceStats {
+ ts := targetStats[ssKey]
+ ss := sourceStats[ssKey]
+ if ts.value >= ss.value {
+ //fmt.Printf("target value %v is at least equal to source value %v\n", ts.value, ss.value)
+ continue //target value is bigger or equal so leave it
+ }
+ statTime, _ := time.Parse(time.RFC3339, ss.t)
+ tags := map[string]string{
+ "cdn": ss.cdn,
+ "deliveryservice": ss.deliveryService,
+ }
+ fields := map[string]interface{}{
+ "value": ss.value,
+ }
+ pt, err := influx.NewPoint(
+ statName,
+ tags,
+ fields,
+ statTime,
+ )
+ if err != nil {
+ fmt.Printf("error adding creating point for %v...%v\n", statName, err)
+ continue
+ }
+ bps.AddPoint(pt)
+ }
+ targetClient.Write(bps)
+}
+
+func getCacheStats(res []influx.Result) map[string]cacheStats {
+ response := make(map[string]cacheStats)
+ if len(res) == 0 {
+ return response
+ }
+ for i := range res {
+ for _, row := range res[i].Series {
+ for _, record := range row.Values {
+ data := new(cacheStats)
+ t := record[0].(string)
+ data.t = t
+ data.cdn = record[1].(string)
+ if record[2] != nil {
+ data.hostname = record[2].(string)
+ }
+ if record[3] != nil {
+ data.cacheType = record[3].(string)
+ }
+ var err error
+ data.value, err = record[4].(json.Number).Float64()
+ if err != nil {
+ fmt.Printf("Couldn't parse value from record %v\n", record)
+ continue
+ }
+ key := data.t + data.cdn + data.hostname
+ response[key] = *data
+ }
+ }
+ }
+ return response
+}
+
+func getDeliveryServiceStats(res []influx.Result) map[string]deliveryServiceStats {
+ response := make(map[string]deliveryServiceStats)
+ // if the slice is empty, just return
+ if len(res) == 0 {
+ return response
+ }
+ for i := range res {
+ for _, row := range res[i].Series {
+ for _, record := range row.Values {
+ data := new(deliveryServiceStats)
+ data.t = record[0].(string)
+ if record[1] != nil {
+ data.cacheGroup = record[1].(string)
+ }
+ data.cdn = record[2].(string)
+ if record[3] != nil {
+ data.deliveryService = record[3].(string)
+ }
+ var err error
+ data.value, err = record[4].(json.Number).Float64()
+ if err != nil {
+ fmt.Printf("Couldn't parse value from record %v\n", record)
+ continue
+ }
+ key := data.t + data.cacheGroup + data.cdn + data.deliveryService
+ response[key] = *data
+ }
+ }
+ }
+ return response
+}
+
+func getDailyStats(res []influx.Result) map[string]dailyStats {
+ response := make(map[string]dailyStats)
+ // if the slice is empty, just return
+ if len(res) == 0 {
+ return response
+ }
+ for i := range res {
+ for _, row := range res[i].Series {
+ for _, record := range row.Values {
+ data := new(dailyStats)
+ data.t = record[0].(string)
+ data.cdn = record[1].(string)
+ data.deliveryService = record[2].(string)
+ var err error
+ data.value, err = record[3].(json.Number).Float64()
+ if err != nil {
+ fmt.Printf("Couldn't parse value from record %v\n", record)
+ continue
+ }
+ key := data.t + data.cdn + data.deliveryService
+ response[key] = *data
+ }
+ }
+ }
+ return response
+}
+
+// queryDB takes a variadic argument for the target database so as to make
+// passing the variable optional, however, if passed, only the first db passed
+// in will be used
+func queryDB(client influx.Client, cmd string, dbs ...string) (res []influx.Result, err error) {
+ db := ""
+ if len(dbs) > 0 {
+ db = dbs[0]
+ }
+ q := influx.Query{
+ Command: cmd,
+ Database: db,
+ }
+ if response, err := client.Query(q); err == nil {
+ if response.Error() != nil {
+ return res, response.Error()
+ }
+ res = response.Results
+ }
+ return res, nil
+}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/23292650/traffic_stats/influxdb_tools/sync_ts_databases.go
----------------------------------------------------------------------
diff --git a/traffic_stats/influxdb_tools/sync_ts_databases.go b/traffic_stats/influxdb_tools/sync_ts_databases.go
deleted file mode 100644
index 55e6878..0000000
--- a/traffic_stats/influxdb_tools/sync_ts_databases.go
+++ /dev/null
@@ -1,483 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package main
-
-import (
- "encoding/json"
- "fmt"
- "time"
-
- influx "github.com/influxdata/influxdb/client/v2"
- "github.com/pkg/errors"
- "github.com/urfave/cli"
-)
-
-const (
- cache = "cache_stats"
- deliveryService = "deliveryservice_stats"
- daily = "daily_stats"
-)
-
-type cacheStats struct {
- t string //time
- value float64
- cdn string
- hostname string
- cacheType string
-}
-type deliveryServiceStats struct {
- t string //time
- value float64
- cdn string
- deliveryService string
- cacheGroup string
-}
-type dailyStats struct {
- t string //time
- cdn string
- deliveryService string
- value float64
-}
-
-func syncFlags() []cli.Flag {
- return []cli.Flag{
- cli.StringFlag{
- Name: "sourceUrl",
- Usage: "The source influxdb url and port",
- Value: "http://server1.kabletown.net:8086",
- },
- cli.StringFlag{
- Name: "targetUrl",
- Usage: "The target influxdb url and port",
- Value: "http://server2.kabletown.net:8086",
- },
- cli.StringFlag{
- Name: "database",
- Usage: "Sync a specific database",
- Value: "all",
- },
- cli.IntFlag{
- Name: "days",
- Usage: "Number of days in the past to sync (today - x days), 0 is all",
- Value: 0,
- },
- cli.StringFlag{
- Name: "sourceUser",
- Usage: "The source influxdb username",
- Value: "",
- },
- cli.StringFlag{
- Name: "sourcePass",
- Usage: "The source influxdb password",
- Value: "",
- },
- cli.StringFlag{
- Name: "targetUser",
- Usage: "The target influxdb username",
- Value: "",
- },
- cli.StringFlag{
- Name: "targetPass",
- Usage: "The target influxdb password",
- Value: "",
- },
- }
-}
-
-func sync(c *cli.Context) error {
-
- sourceURL := c.String("sourceUrl")
- targetURL := c.String("targetUrl")
- database := c.String("database")
- days := c.Int("days")
- sourceUser := c.String("sourceUser")
- sourcePass := c.String("sourcePass")
- targetUser := c.String("targetUser")
- targetPass := c.String("targetPass")
- fmt.Printf("syncing %s to %s for %s database(s) for the past %d day(s)\n", sourceURL, targetURL, database, days)
- sourceClient, err := influx.NewHTTPClient(influx.HTTPConfig{
- Addr: sourceURL,
- Username: sourceUser,
- Password: sourcePass,
- })
- if err != nil {
- return errors.Wrap(err, "Error creating influx sourceClient")
- }
-
- if _, _, err = sourceClient.Ping(10); err != nil {
- return errors.Wrap(err, "Error creating influx sourceClient")
- }
-
- targetClient, err := influx.NewHTTPClient(influx.HTTPConfig{
- Addr: targetURL,
- Username: targetUser,
- Password: targetPass,
- })
- if err != nil {
- return errors.Wrap(err, "Error creating influx targetClient")
- }
-
- if _, _, err = targetClient.Ping(10); err != nil {
- return errors.Wrap(err, "Error creating influx targetClient")
- }
-
- chSize := 1
- if database == "all" {
- chSize = 3
- }
-
- ch := make(chan string)
-
- switch database {
- case "all":
- go syncDailyDb(ch, sourceClient, targetClient, days)
- go syncCsDb(ch, sourceClient, targetClient, days)
- go syncDsDb(ch, sourceClient, targetClient, days)
- case cache:
- go syncCsDb(ch, sourceClient, targetClient, days)
- case deliveryService:
- go syncDsDb(ch, sourceClient, targetClient, days)
- case daily:
- go syncDailyDb(ch, sourceClient, targetClient, days)
- default:
- return errors.New("No database selected")
- }
-
- for i := 1; i <= chSize; i++ {
- fmt.Println(<-ch)
- }
-
- fmt.Println("Traffic Stats have been synced!")
- return nil
-}
-
-func syncCsDb(ch chan string, sourceClient influx.Client, targetClient influx.Client, days int) {
- db := cache
- fmt.Printf("Syncing %s database...\n", db)
- stats := [...]string{
- "bandwidth.cdn.1min",
- "connections.cdn.1min",
- "connections.cdn.type.1min",
- "bandwidth.cdn.type.1min",
- //these take a long time so do them last
- "bandwidth.1min",
- "connections.1min",
- //
- }
- for _, statName := range stats {
- fmt.Printf("Syncing %s database with %s \n", db, statName)
- syncCacheStat(sourceClient, targetClient, statName, days)
- fmt.Printf("Done syncing %s\n", statName)
- }
- ch <- fmt.Sprintf("Done syncing %s!\n", db)
-}
-
-func syncDsDb(ch chan string, sourceClient influx.Client, targetClient influx.Client, days int) {
- db := deliveryService
- fmt.Printf("Syncing %s database...\n", db)
- stats := [...]string{
- "kbps.ds.1min",
- "max.kbps.ds.1day",
- "kbps.cg.1min",
- "tps_2xx.ds.1min",
- "tps_3xx.ds.1min",
- "tps_4xx.ds.1min",
- "tps_5xx.ds.1min",
- "tps_total.ds.1min",
- }
- for _, statName := range stats {
- fmt.Printf("Syncing %s database with %s\n", db, statName)
- syncDeliveryServiceStat(sourceClient, targetClient, statName, days)
- }
- ch <- fmt.Sprintf("Done syncing %s!\n", db)
-}
-
-func syncDailyDb(ch chan string, sourceClient influx.Client, targetClient influx.Client, days int) {
- db := daily
- fmt.Printf("Syncing %s database...\n", db)
- stats := [...]string{
- "daily_bytesserved",
- "daily_maxgbps",
- }
-
- for _, statName := range stats {
- fmt.Printf("Syncing %s database with %s\n", db, statName)
- syncDailyStat(sourceClient, targetClient, statName, days)
- }
- ch <- fmt.Sprintf("Done syncing %s!\n", db)
-
-}
-
-func syncCacheStat(sourceClient influx.Client, targetClient influx.Client, statName string, days int) {
- //get records from source DB
- db := cache
- bps, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
- Database: db,
- Precision: "ms",
- RetentionPolicy: "monthly",
- })
-
- queryString := fmt.Sprintf("select time, cdn, hostname, type, value from \"monthly\".\"%s\"", statName)
- if days > 0 {
- queryString = fmt.Sprintf("%s where time > now() - %dd", queryString, days)
- }
- fmt.Println("queryString ", queryString)
- res, err := queryDB(sourceClient, queryString, db)
- if err != nil {
- fmt.Printf("An error occured getting %s records from sourceDb\n", statName)
- return
- }
- sourceStats := getCacheStats(res)
-
- //get values from target DB
- targetRes, err := queryDB(targetClient, queryString, db)
- if err != nil {
- fmt.Printf("An error occured getting %s record from target db: %v\n", statName, err)
- return
- }
- targetStats := getCacheStats(targetRes)
-
- for ssKey := range sourceStats {
- ts := targetStats[ssKey]
- ss := sourceStats[ssKey]
- if ts.value > ss.value {
- //fmt.Printf("target value %v is at least equal to source value %v\n", ts.value, ss.value)
- continue //target value is bigger so leave it
- }
- statTime, _ := time.Parse(time.RFC3339, ss.t)
- tags := map[string]string{"cdn": ss.cdn}
- if ss.hostname != "" {
- tags["hostname"] = ss.hostname
- }
- if ss.cacheType != "" {
- tags["type"] = ss.cacheType
- }
- fields := map[string]interface{}{
- "value": ss.value,
- }
- pt, err := influx.NewPoint(
- statName,
- tags,
- fields,
- statTime,
- )
- if err != nil {
- fmt.Printf("error adding creating point for %v...%v\n", statName, err)
- continue
- }
- bps.AddPoint(pt)
- }
- targetClient.Write(bps)
-}
-
-func syncDeliveryServiceStat(sourceClient influx.Client, targetClient influx.Client, statName string, days int) {
-
- db := deliveryService
- bps, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
- Database: db,
- Precision: "ms",
- RetentionPolicy: "monthly",
- })
-
- queryString := fmt.Sprintf("select time, cachegroup, cdn, deliveryservice, value from \"monthly\".\"%s\"", statName)
- if days > 0 {
- queryString = fmt.Sprintf("%s where time > now() - %dd", queryString, days)
- }
- fmt.Println("queryString ", queryString)
- res, err := queryDB(sourceClient, queryString, db)
- if err != nil {
- fmt.Printf("An error occured getting %s records from sourceDb: %v\n", statName, err)
- return
- }
- sourceStats := getDeliveryServiceStats(res)
- // get value from target DB
- targetRes, err := queryDB(targetClient, queryString, db)
- if err != nil {
- fmt.Printf("An error occured getting %s record from target db: %v\n", statName, err)
- return
- }
- targetStats := getDeliveryServiceStats(targetRes)
-
- for ssKey := range sourceStats {
- ts := targetStats[ssKey]
- ss := sourceStats[ssKey]
- if ts.value > ss.value {
- //fmt.Printf("target value %v is at least equal to source value %v\n", ts.value, ss.value)
- continue //target value is bigger so leave it
- }
- statTime, _ := time.Parse(time.RFC3339, ss.t)
- tags := map[string]string{
- "cdn": ss.cdn,
- "cachegroup": ss.cacheGroup,
- "deliveryservice": ss.deliveryService,
- }
- fields := map[string]interface{}{
- "value": ss.value,
- }
- pt, err := influx.NewPoint(
- statName,
- tags,
- fields,
- statTime,
- )
- if err != nil {
- fmt.Printf("error adding creating point for %v...%v\n", statName, err)
- continue
- }
- bps.AddPoint(pt)
- }
- targetClient.Write(bps)
-}
-
-func syncDailyStat(sourceClient influx.Client, targetClient influx.Client, statName string, days int) {
-
- db := daily
- bps, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
- Database: db,
- Precision: "s",
- })
- //get records from source DB
- queryString := fmt.Sprintf("select time, cdn, deliveryservice, value from \"%s\"", statName)
- if days > 0 {
- queryString = fmt.Sprintf("%s where time > now() - %dd", queryString, days)
- }
- res, err := queryDB(sourceClient, queryString, db)
- if err != nil {
- fmt.Printf("An error occured getting %s records from sourceDb: %v\n", statName, err)
- return
- }
- sourceStats := getDailyStats(res)
- // get value from target DB
- targetRes, err := queryDB(targetClient, queryString, db)
- if err != nil {
- fmt.Printf("An error occured getting %s record from target db: %v\n", statName, err)
- return
- }
- targetStats := getDailyStats(targetRes)
-
- for ssKey := range sourceStats {
- ts := targetStats[ssKey]
- ss := sourceStats[ssKey]
- if ts.value >= ss.value {
- //fmt.Printf("target value %v is at least equal to source value %v\n", ts.value, ss.value)
- continue //target value is bigger or equal so leave it
- }
- statTime, _ := time.Parse(time.RFC3339, ss.t)
- tags := map[string]string{
- "cdn": ss.cdn,
- "deliveryservice": ss.deliveryService,
- }
- fields := map[string]interface{}{
- "value": ss.value,
- }
- pt, err := influx.NewPoint(
- statName,
- tags,
- fields,
- statTime,
- )
- if err != nil {
- fmt.Printf("error adding creating point for %v...%v\n", statName, err)
- continue
- }
- bps.AddPoint(pt)
- }
- targetClient.Write(bps)
-}
-
-func getCacheStats(res []influx.Result) map[string]cacheStats {
- response := make(map[string]cacheStats)
- if res != nil && len(res[0].Series) > 0 {
- for _, row := range res[0].Series {
- for _, record := range row.Values {
- data := new(cacheStats)
- t := record[0].(string)
- data.t = t
- data.cdn = record[1].(string)
- if record[2] != nil {
- data.hostname = record[2].(string)
- }
- if record[3] != nil {
- data.cacheType = record[3].(string)
- }
- var err error
- data.value, err = record[4].(json.Number).Float64()
- if err != nil {
- fmt.Printf("Couldn't parse value from record %v\n", record)
- continue
- }
- key := data.t + data.cdn + data.hostname
- response[key] = *data
- }
- }
- }
- return response
-}
-
-func getDeliveryServiceStats(res []influx.Result) map[string]deliveryServiceStats {
- response := make(map[string]deliveryServiceStats)
- if len(res[0].Series) > 0 {
- for _, row := range res[0].Series {
- for _, record := range row.Values {
- data := new(deliveryServiceStats)
- data.t = record[0].(string)
- if record[1] != nil {
- data.cacheGroup = record[1].(string)
- }
- data.cdn = record[2].(string)
- if record[3] != nil {
- data.deliveryService = record[3].(string)
- }
- var err error
- data.value, err = record[4].(json.Number).Float64()
- if err != nil {
- fmt.Printf("Couldn't parse value from record %v\n", record)
- continue
- }
- key := data.t + data.cacheGroup + data.cdn + data.deliveryService
- response[key] = *data
- }
- }
- }
- return response
-}
-
-func getDailyStats(res []influx.Result) map[string]dailyStats {
- response := make(map[string]dailyStats)
- if len(res) > 0 && len(res[0].Series) > 0 {
- for _, row := range res[0].Series {
- for _, record := range row.Values {
- data := new(dailyStats)
- data.t = record[0].(string)
- data.cdn = record[1].(string)
- data.deliveryService = record[2].(string)
- var err error
- data.value, err = record[3].(json.Number).Float64()
- if err != nil {
- fmt.Printf("Couldn't parse value from record %v\n", record)
- continue
- }
- key := data.t + data.cdn + data.deliveryService
- response[key] = *data
- }
- }
- }
- return response
-}