You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficcontrol.apache.org by ne...@apache.org on 2017/01/13 23:36:02 UTC
[06/29] incubator-trafficcontrol git commit: Refactor influxdb_tools
to be a go buildable executable (e.g. go build -o influxtools) with create
and sync commands. By reducing to a single main function in the package,
we also allow for package level tests
Refactor influxdb_tools to be a go buildable executable (e.g. go build -o influxtools) with create and sync commands. By reducing to a single main function in the package, we also allow for package level tests to get the coverage up
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/cae0dd08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/cae0dd08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/cae0dd08
Branch: refs/heads/master
Commit: cae0dd086019b16e2f030eea4308bf9b4ec444cf
Parents: 07c1077
Author: sbogacz <sb...@zvelo.com>
Authored: Fri Jan 6 19:10:50 2017 -0700
Committer: David Neuman <da...@gmail.com>
Committed: Fri Jan 13 23:33:56 2017 +0000
----------------------------------------------------------------------
.../influxdb_tools/create_ts_databases.go | 82 ++++++----
traffic_stats/influxdb_tools/main.go | 48 ++++++
.../influxdb_tools/sync_ts_databases.go | 150 ++++++++++++-------
3 files changed, 197 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/cae0dd08/traffic_stats/influxdb_tools/create_ts_databases.go
----------------------------------------------------------------------
diff --git a/traffic_stats/influxdb_tools/create_ts_databases.go b/traffic_stats/influxdb_tools/create_ts_databases.go
index e065b30..4ee6793 100644
--- a/traffic_stats/influxdb_tools/create_ts_databases.go
+++ b/traffic_stats/influxdb_tools/create_ts_databases.go
@@ -20,45 +20,75 @@ under the License.
package main
import (
- "flag"
"fmt"
- "os"
influx "github.com/influxdata/influxdb/client/v2"
+ "github.com/pkg/errors"
+ "github.com/urfave/cli"
)
-func main() {
+func createFlags() []cli.Flag {
+ return []cli.Flag{
+ cli.StringFlag{
+ Name: "url",
+ Usage: "The influxdb url and port",
+ Value: "http://localhost:8086",
+ },
+ cli.IntFlag{
+ Name: "replication",
+ Usage: "The number of nodes in the cluster",
+ Value: 3,
+ },
+ cli.StringFlag{
+ Name: "user",
+ Usage: "The influxdb username used to create DBs",
+ Value: "",
+ },
+ cli.StringFlag{
+ Name: "password",
+ Usage: "The influxdb password used to create DBs",
+ Value: "",
+ },
+ }
+}
- influxURL := flag.String("url", "http://localhost:8086", "The influxdb url and port")
- replication := flag.String("replication", "3", "The number of nodes in the cluster")
- user := flag.String("user", "", "The influxdb username used to create DBs")
- password := flag.String("password", "", "The influxdb password used to create DBs")
- flag.Parse()
- fmt.Printf("creating datbases for influxUrl: %v with a replication of %v using user %s\n", *influxURL, *replication, *user)
+func create(c *cli.Context) error {
+ influxURL := c.String("url")
+ replication := c.Int("replication")
+ user := c.String("user")
+ password := c.String("password")
+
+ fmt.Printf("creating datbases for influxUrl: %s with a replication of %d using user %s\n", influxURL, replication, user)
client, err := influx.NewHTTPClient(influx.HTTPConfig{
- Addr: *influxURL,
- Username: *user,
- Password: *password,
+ Addr: influxURL,
+ Username: user,
+ Password: password,
})
if err != nil {
- fmt.Printf("Error creating influx client: %v\n", err)
- os.Exit(1)
+ return errors.Wrap(err, "Error creating influx client")
}
_, _, err = client.Ping(10)
if err != nil {
- fmt.Printf("Error creating influx client: %v\n", err)
- os.Exit(1)
+ return errors.Wrap(err, "Error creating influx client")
}
createCacheStats(client, replication)
createDailyStats(client, replication)
createDeliveryServiceStats(client, replication)
+ return nil
}
-func queryDB(client influx.Client, cmd string) (res []influx.Result, err error) {
+// queryDB takes a variadic argument for the target database so as to make
+// passing the variable optional, however, if passed, only the first db passed
+// in will be used
+func queryDB(client influx.Client, cmd string, dbs ...string) (res []influx.Result, err error) {
+ db := ""
+ if len(dbs) > 0 {
+ db = dbs[0]
+ }
q := influx.Query{
Command: cmd,
- Database: "",
+ Database: db,
}
if response, err := client.Query(q); err == nil {
if response.Error() != nil {
@@ -69,8 +99,8 @@ func queryDB(client influx.Client, cmd string) (res []influx.Result, err error)
return res, nil
}
-func createCacheStats(client influx.Client, replication *string) {
- db := "cache_stats"
+func createCacheStats(client influx.Client, replication int) {
+ db := cache
createDatabase(client, db)
createRetentionPolicy(client, db, "daily", "26h", replication, true)
createRetentionPolicy(client, db, "monthly", "30d", replication, false)
@@ -87,8 +117,8 @@ func createCacheStats(client influx.Client, replication *string) {
createContinuousQuery(client, "wrap_count_vol2_1m", `CREATE CONTINUOUS QUERY wrap_count_vol2_1m ON cache_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS vol2_wrap_count INTO cache_stats.monthly."wrap_count.1min" FROM cache_stats.daily."ats.proxy.process.cache.volume_2.wrap_count" GROUP BY time(1m), * END`)
}
-func createDeliveryServiceStats(client influx.Client, replication *string) {
- db := "deliveryservice_stats"
+func createDeliveryServiceStats(client influx.Client, replication int) {
+ db := deliveryService
createDatabase(client, db)
createRetentionPolicy(client, db, "daily", "26h", replication, true)
createRetentionPolicy(client, db, "monthly", "30d", replication, false)
@@ -103,8 +133,8 @@ func createDeliveryServiceStats(client influx.Client, replication *string) {
createContinuousQuery(client, "max_kbps_ds_1day", `CREATE CONTINUOUS QUERY max_kbps_ds_1day ON deliveryservice_stats RESAMPLE FOR 2d BEGIN SELECT max(value) AS "value" INTO "deliveryservice_stats"."indefinite"."max.kbps.ds.1day" FROM "deliveryservice_stats"."monthly"."kbps.ds.1min" GROUP BY time(1d), deliveryservice, cdn END`)
}
-func createDailyStats(client influx.Client, replication *string) {
- db := "daily_stats"
+func createDailyStats(client influx.Client, replication int) {
+ db := daily
createDatabase(client, db)
createRetentionPolicy(client, db, "indefinite", "INF", replication, true)
}
@@ -118,8 +148,8 @@ func createDatabase(client influx.Client, db string) {
fmt.Println("Successfully created database: ", db)
}
-func createRetentionPolicy(client influx.Client, db string, name string, duration string, replication *string, isDefault bool) {
- qString := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION %s REPLICATION %s", name, db, duration, *replication)
+func createRetentionPolicy(client influx.Client, db string, name string, duration string, replication int, isDefault bool) {
+ qString := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION %s REPLICATION %d", name, db, duration, replication)
if isDefault {
qString += " DEFAULT"
}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/cae0dd08/traffic_stats/influxdb_tools/main.go
----------------------------------------------------------------------
diff --git a/traffic_stats/influxdb_tools/main.go b/traffic_stats/influxdb_tools/main.go
new file mode 100644
index 0000000..06fc3ef
--- /dev/null
+++ b/traffic_stats/influxdb_tools/main.go
@@ -0,0 +1,48 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+ "os"
+
+ "github.com/urfave/cli"
+)
+
+func main() {
+ app := cli.NewApp()
+ app.Name = "influx-tools"
+ app.Version = "0.1.0"
+ app.Usage = "influx-tools provides cli methods for creating and syncing the requisite influxdb databases"
+ app.Commands = []cli.Command{
+ cli.Command{
+ Name: "create",
+ Usage: "create the influxDB tables",
+ Action: create,
+ Flags: createFlags(),
+ },
+ cli.Command{
+ Name: "sync",
+ Usage: "sync the influxDB tables",
+ Action: sync,
+ Flags: syncFlags(),
+ },
+ }
+ app.Run(os.Args)
+}
http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/cae0dd08/traffic_stats/influxdb_tools/sync_ts_databases.go
----------------------------------------------------------------------
diff --git a/traffic_stats/influxdb_tools/sync_ts_databases.go b/traffic_stats/influxdb_tools/sync_ts_databases.go
index 50096ec..29f8083 100644
--- a/traffic_stats/influxdb_tools/sync_ts_databases.go
+++ b/traffic_stats/influxdb_tools/sync_ts_databases.go
@@ -21,12 +21,12 @@ package main
import (
"encoding/json"
- "flag"
"fmt"
- "os"
"time"
influx "github.com/influxdata/influxdb/client/v2"
+ "github.com/pkg/errors"
+ "github.com/urfave/cli"
)
const (
@@ -58,80 +58,116 @@ type dailyStats struct {
var errorMessage string
-func main() {
-
- sourceURL := flag.String("sourceUrl", "http://server1.kabletown.net:8086", "The influxdb url and port")
- targetURL := flag.String("targetUrl", "http://server2.kabletown.net:8086", "The influxdb url and port")
- database := flag.String("database", "all", "Sync a specific database")
- days := flag.Int("days", 0, "Number of days in the past to sync (today - x days), 0 is all")
- sourceUser := flag.String("sourceUser", "", "The source influxdb username")
- sourcePass := flag.String("sourcePass", "", "The source influxdb password")
- targetUser := flag.String("targetUser", "", "The target influxdb username")
- targetPass := flag.String("targetPass", "", "The target influxdb password")
- flag.Parse()
- fmt.Printf("syncing %v to %v for %v database(s) for the past %v day(s)\n", *sourceURL, *targetURL, *database, *days)
+func syncFlags() []cli.Flag {
+ return []cli.Flag{
+ cli.StringFlag{
+ Name: "sourceUrl",
+ Usage: "The source influxdb url and port",
+ Value: "http://server1.kabletown.net:8086",
+ },
+ cli.StringFlag{
+ Name: "targetUrl",
+ Usage: "The target influxdb url and port",
+ Value: "http://server2.kabletown.net:8086",
+ },
+ cli.StringFlag{
+ Name: "database",
+ Usage: "Sync a specific database",
+ Value: "all",
+ },
+ cli.IntFlag{
+ Name: "days",
+ Usage: "Number of days in the past to sync (today - x days), 0 is all",
+ Value: 0,
+ },
+ cli.StringFlag{
+ Name: "sourceUser",
+ Usage: "The source influxdb username",
+ Value: "",
+ },
+ cli.StringFlag{
+ Name: "sourcePass",
+ Usage: "The source influxdb password",
+ Value: "",
+ },
+ cli.StringFlag{
+ Name: "targetUser",
+ Usage: "The target influxdb username",
+ Value: "",
+ },
+ cli.StringFlag{
+ Name: "targetPass",
+ Usage: "The target influxdb password",
+ Value: "",
+ },
+ }
+}
+
+func sync(c *cli.Context) error {
+
+ sourceURL := c.String("sourceUrl")
+ targetURL := c.String("targetUrl")
+ database := c.String("database")
+ days := c.Int("days")
+ sourceUser := c.String("sourceUser")
+ sourcePass := c.String("sourcePass")
+ targetUser := c.String("targetUser")
+ targetPass := c.String("targetPass")
+ fmt.Printf("syncing %s to %s for %s database(s) for the past %d day(s)\n", sourceURL, targetURL, database, days)
sourceClient, err := influx.NewHTTPClient(influx.HTTPConfig{
- Addr: *sourceURL,
- Username: *sourceUser,
- Password: *sourcePass,
+ Addr: sourceURL,
+ Username: sourceUser,
+ Password: sourcePass,
})
if err != nil {
- errorMessage = fmt.Sprintf("Error creating influx sourceClient: %v\n", err)
- fmt.Println(errorMessage)
- os.Exit(1)
+ return errors.Wrap(err, "Error creating influx sourceClient")
}
- _, _, err = sourceClient.Ping(10)
- if err != nil {
- errorMessage = fmt.Sprintf("Error creating influx sourceClient: %v\n", err)
- fmt.Println(errorMessage)
- os.Exit(1)
+
+ if _, _, err = sourceClient.Ping(10); err != nil {
+ return errors.Wrap(err, "Error creating influx sourceClient")
}
+
targetClient, err := influx.NewHTTPClient(influx.HTTPConfig{
- Addr: *targetURL,
- Username: *targetUser,
- Password: *targetPass,
+ Addr: targetURL,
+ Username: targetUser,
+ Password: targetPass,
})
if err != nil {
- errorMessage = fmt.Sprintf("Error creating influx targetClient: %v\n", err)
- fmt.Println(errorMessage)
- os.Exit(1)
+ return errors.Wrap(err, "Error creating influx targetClient")
}
- _, _, err = targetClient.Ping(10)
- if err != nil {
- errorMessage = fmt.Sprintf("Error creating influx targetClient: %v\n", err)
- fmt.Println(errorMessage)
- os.Exit(1)
+
+ if _, _, err = targetClient.Ping(10); err != nil {
+ return errors.Wrap(err, "Error creating influx targetClient")
}
+
chSize := 1
- if *database == "all" {
+ if database == "all" {
chSize = 3
}
ch := make(chan string)
- switch *database {
+ switch database {
case "all":
- go syncDailyDb(ch, sourceClient, targetClient, *days)
- go syncCsDb(ch, sourceClient, targetClient, *days)
- go syncDsDb(ch, sourceClient, targetClient, *days)
+ go syncDailyDb(ch, sourceClient, targetClient, days)
+ go syncCsDb(ch, sourceClient, targetClient, days)
+ go syncDsDb(ch, sourceClient, targetClient, days)
case cache:
- go syncCsDb(ch, sourceClient, targetClient, *days)
+ go syncCsDb(ch, sourceClient, targetClient, days)
case deliveryService:
- go syncDsDb(ch, sourceClient, targetClient, *days)
+ go syncDsDb(ch, sourceClient, targetClient, days)
case daily:
- go syncDailyDb(ch, sourceClient, targetClient, *days)
+ go syncDailyDb(ch, sourceClient, targetClient, days)
+ default:
+ return errors.New("No database selected")
}
for i := 1; i <= chSize; i++ {
fmt.Println(<-ch)
}
- if errorMessage != "" {
- fmt.Println(errorMessage)
- return
- }
-
- fmt.Println("Traffic Stats has been synced!")
+ fmt.Println("Traffic Stats have been synced!")
+ return nil
}
func syncCsDb(ch chan string, sourceClient influx.Client, targetClient influx.Client, days int) {
@@ -191,7 +227,7 @@ func syncDailyDb(ch chan string, sourceClient influx.Client, targetClient influx
}
-func queryDB(client influx.Client, cmd string, db string) (res []influx.Result, err error) {
+func queryDb2(client influx.Client, cmd string, db string) (res []influx.Result, err error) {
q := influx.Query{
Command: cmd,
Database: db,
@@ -219,7 +255,7 @@ func syncCacheStat(sourceClient influx.Client, targetClient influx.Client, statN
queryString = fmt.Sprintf("%s where time > now() - %dd", queryString, days)
}
fmt.Println("queryString ", queryString)
- res, err := queryDB(sourceClient, queryString, db)
+ res, err := queryDb2(sourceClient, queryString, db)
if err != nil {
fmt.Printf("An error occured getting %s records from sourceDb\n", statName)
return
@@ -227,7 +263,7 @@ func syncCacheStat(sourceClient influx.Client, targetClient influx.Client, statN
sourceStats := getCacheStats(res)
//get values from target DB
- targetRes, err := queryDB(targetClient, queryString, db)
+ targetRes, err := queryDb2(targetClient, queryString, db)
if err != nil {
errorMessage = fmt.Sprintf("An error occured getting %s record from target db: %v\n", statName, err)
fmt.Println(errorMessage)
@@ -282,7 +318,7 @@ func syncDeliveryServiceStat(sourceClient influx.Client, targetClient influx.Cli
queryString = fmt.Sprintf("%s where time > now() - %dd", queryString, days)
}
fmt.Println("queryString ", queryString)
- res, err := queryDB(sourceClient, queryString, db)
+ res, err := queryDb2(sourceClient, queryString, db)
if err != nil {
errorMessage = fmt.Sprintf("An error occured getting %s records from sourceDb: %v\n", statName, err)
fmt.Println(errorMessage)
@@ -290,7 +326,7 @@ func syncDeliveryServiceStat(sourceClient influx.Client, targetClient influx.Cli
}
sourceStats := getDeliveryServiceStats(res)
// get value from target DB
- targetRes, err := queryDB(targetClient, queryString, db)
+ targetRes, err := queryDb2(targetClient, queryString, db)
if err != nil {
errorMessage = fmt.Sprintf("An error occured getting %s record from target db: %v\n", statName, err)
fmt.Println(errorMessage)
@@ -341,7 +377,7 @@ func syncDailyStat(sourceClient influx.Client, targetClient influx.Client, statN
if days > 0 {
queryString = fmt.Sprintf("%s where time > now() - %dd", queryString, days)
}
- res, err := queryDB(sourceClient, queryString, db)
+ res, err := queryDb2(sourceClient, queryString, db)
if err != nil {
errorMessage = fmt.Sprintf("An error occured getting %s records from sourceDb: %v\n", statName, err)
fmt.Println(errorMessage)
@@ -349,7 +385,7 @@ func syncDailyStat(sourceClient influx.Client, targetClient influx.Client, statN
}
sourceStats := getDailyStats(res)
// get value from target DB
- targetRes, err := queryDB(targetClient, queryString, db)
+ targetRes, err := queryDb2(targetClient, queryString, db)
if err != nil {
errorMessage = fmt.Sprintf("An error occured getting %s record from target db: %v\n", statName, err)
fmt.Println(errorMessage)