You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficcontrol.apache.org by ne...@apache.org on 2017/01/13 23:36:02 UTC

[06/29] incubator-trafficcontrol git commit: Refactor influxdb_tools to be a go buildable executable (e.g. go build -o influxtools) with create and sync commands. By reducing to a single main function in the package, we also allow for package level tests

Refactor influxdb_tools to be a go buildable executable (e.g. go build -o influxtools) with create and sync commands. By reducing to a single main function in the package, we also allow for package level tests to get the coverage up


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/cae0dd08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/cae0dd08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/cae0dd08

Branch: refs/heads/master
Commit: cae0dd086019b16e2f030eea4308bf9b4ec444cf
Parents: 07c1077
Author: sbogacz <sb...@zvelo.com>
Authored: Fri Jan 6 19:10:50 2017 -0700
Committer: David Neuman <da...@gmail.com>
Committed: Fri Jan 13 23:33:56 2017 +0000

----------------------------------------------------------------------
 .../influxdb_tools/create_ts_databases.go       |  82 ++++++----
 traffic_stats/influxdb_tools/main.go            |  48 ++++++
 .../influxdb_tools/sync_ts_databases.go         | 150 ++++++++++++-------
 3 files changed, 197 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/cae0dd08/traffic_stats/influxdb_tools/create_ts_databases.go
----------------------------------------------------------------------
diff --git a/traffic_stats/influxdb_tools/create_ts_databases.go b/traffic_stats/influxdb_tools/create_ts_databases.go
index e065b30..4ee6793 100644
--- a/traffic_stats/influxdb_tools/create_ts_databases.go
+++ b/traffic_stats/influxdb_tools/create_ts_databases.go
@@ -20,45 +20,75 @@ under the License.
 package main
 
 import (
-	"flag"
 	"fmt"
-	"os"
 
 	influx "github.com/influxdata/influxdb/client/v2"
+	"github.com/pkg/errors"
+	"github.com/urfave/cli"
 )
 
-func main() {
+func createFlags() []cli.Flag {
+	return []cli.Flag{
+		cli.StringFlag{
+			Name:  "url",
+			Usage: "The influxdb url and port",
+			Value: "http://localhost:8086",
+		},
+		cli.IntFlag{
+			Name:  "replication",
+			Usage: "The number of nodes in the cluster",
+			Value: 3,
+		},
+		cli.StringFlag{
+			Name:  "user",
+			Usage: "The influxdb username used to create DBs",
+			Value: "",
+		},
+		cli.StringFlag{
+			Name:  "password",
+			Usage: "The influxdb password used to create DBs",
+			Value: "",
+		},
+	}
+}
 
-	influxURL := flag.String("url", "http://localhost:8086", "The influxdb url and port")
-	replication := flag.String("replication", "3", "The number of nodes in the cluster")
-	user := flag.String("user", "", "The influxdb username used to create DBs")
-	password := flag.String("password", "", "The influxdb password used to create DBs")
-	flag.Parse()
-	fmt.Printf("creating datbases for influxUrl: %v with a replication of %v using user %s\n", *influxURL, *replication, *user)
+func create(c *cli.Context) error {
+	influxURL := c.String("url")
+	replication := c.Int("replication")
+	user := c.String("user")
+	password := c.String("password")
+
+	fmt.Printf("creating datbases for influxUrl: %s with a replication of %d using user %s\n", influxURL, replication, user)
 	client, err := influx.NewHTTPClient(influx.HTTPConfig{
-		Addr:     *influxURL,
-		Username: *user,
-		Password: *password,
+		Addr:     influxURL,
+		Username: user,
+		Password: password,
 	})
 	if err != nil {
-		fmt.Printf("Error creating influx client: %v\n", err)
-		os.Exit(1)
+		return errors.Wrap(err, "Error creating influx client")
 	}
 	_, _, err = client.Ping(10)
 	if err != nil {
-		fmt.Printf("Error creating influx client: %v\n", err)
-		os.Exit(1)
+		return errors.Wrap(err, "Error creating influx client")
 	}
 
 	createCacheStats(client, replication)
 	createDailyStats(client, replication)
 	createDeliveryServiceStats(client, replication)
+	return nil
 }
 
-func queryDB(client influx.Client, cmd string) (res []influx.Result, err error) {
+// queryDB takes a variadic argument for the target database so as to make
+// passing the variable optional, however, if passed, only the first db passed
+// in will be used
+func queryDB(client influx.Client, cmd string, dbs ...string) (res []influx.Result, err error) {
+	db := ""
+	if len(dbs) > 0 {
+		db = dbs[0]
+	}
 	q := influx.Query{
 		Command:  cmd,
-		Database: "",
+		Database: db,
 	}
 	if response, err := client.Query(q); err == nil {
 		if response.Error() != nil {
@@ -69,8 +99,8 @@ func queryDB(client influx.Client, cmd string) (res []influx.Result, err error)
 	return res, nil
 }
 
-func createCacheStats(client influx.Client, replication *string) {
-	db := "cache_stats"
+func createCacheStats(client influx.Client, replication int) {
+	db := cache
 	createDatabase(client, db)
 	createRetentionPolicy(client, db, "daily", "26h", replication, true)
 	createRetentionPolicy(client, db, "monthly", "30d", replication, false)
@@ -87,8 +117,8 @@ func createCacheStats(client influx.Client, replication *string) {
 	createContinuousQuery(client, "wrap_count_vol2_1m", `CREATE CONTINUOUS QUERY wrap_count_vol2_1m ON cache_stats RESAMPLE FOR 2m BEGIN SELECT mean(value) AS vol2_wrap_count INTO cache_stats.monthly."wrap_count.1min" FROM cache_stats.daily."ats.proxy.process.cache.volume_2.wrap_count" GROUP BY time(1m), * END`)
 }
 
-func createDeliveryServiceStats(client influx.Client, replication *string) {
-	db := "deliveryservice_stats"
+func createDeliveryServiceStats(client influx.Client, replication int) {
+	db := deliveryService
 	createDatabase(client, db)
 	createRetentionPolicy(client, db, "daily", "26h", replication, true)
 	createRetentionPolicy(client, db, "monthly", "30d", replication, false)
@@ -103,8 +133,8 @@ func createDeliveryServiceStats(client influx.Client, replication *string) {
 	createContinuousQuery(client, "max_kbps_ds_1day", `CREATE CONTINUOUS QUERY max_kbps_ds_1day ON deliveryservice_stats RESAMPLE FOR 2d BEGIN SELECT max(value) AS "value" INTO "deliveryservice_stats"."indefinite"."max.kbps.ds.1day" FROM "deliveryservice_stats"."monthly"."kbps.ds.1min" GROUP BY time(1d), deliveryservice, cdn END`)
 }
 
-func createDailyStats(client influx.Client, replication *string) {
-	db := "daily_stats"
+func createDailyStats(client influx.Client, replication int) {
+	db := daily
 	createDatabase(client, db)
 	createRetentionPolicy(client, db, "indefinite", "INF", replication, true)
 }
@@ -118,8 +148,8 @@ func createDatabase(client influx.Client, db string) {
 	fmt.Println("Successfully created database: ", db)
 }
 
-func createRetentionPolicy(client influx.Client, db string, name string, duration string, replication *string, isDefault bool) {
-	qString := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION %s REPLICATION %s", name, db, duration, *replication)
+func createRetentionPolicy(client influx.Client, db string, name string, duration string, replication int, isDefault bool) {
+	qString := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION %s REPLICATION %d", name, db, duration, replication)
 	if isDefault {
 		qString += " DEFAULT"
 	}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/cae0dd08/traffic_stats/influxdb_tools/main.go
----------------------------------------------------------------------
diff --git a/traffic_stats/influxdb_tools/main.go b/traffic_stats/influxdb_tools/main.go
new file mode 100644
index 0000000..06fc3ef
--- /dev/null
+++ b/traffic_stats/influxdb_tools/main.go
@@ -0,0 +1,48 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+	"os"
+
+	"github.com/urfave/cli"
+)
+
+func main() {
+	app := cli.NewApp()
+	app.Name = "influx-tools"
+	app.Version = "0.1.0"
+	app.Usage = "influx-tools provides cli methods for creating and syncing the requisite influxdb databases"
+	app.Commands = []cli.Command{
+		cli.Command{
+			Name:   "create",
+			Usage:  "create the influxDB tables",
+			Action: create,
+			Flags:  createFlags(),
+		},
+		cli.Command{
+			Name:   "sync",
+			Usage:  "sync the influxDB tables",
+			Action: sync,
+			Flags:  syncFlags(),
+		},
+	}
+	app.Run(os.Args)
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/cae0dd08/traffic_stats/influxdb_tools/sync_ts_databases.go
----------------------------------------------------------------------
diff --git a/traffic_stats/influxdb_tools/sync_ts_databases.go b/traffic_stats/influxdb_tools/sync_ts_databases.go
index 50096ec..29f8083 100644
--- a/traffic_stats/influxdb_tools/sync_ts_databases.go
+++ b/traffic_stats/influxdb_tools/sync_ts_databases.go
@@ -21,12 +21,12 @@ package main
 
 import (
 	"encoding/json"
-	"flag"
 	"fmt"
-	"os"
 	"time"
 
 	influx "github.com/influxdata/influxdb/client/v2"
+	"github.com/pkg/errors"
+	"github.com/urfave/cli"
 )
 
 const (
@@ -58,80 +58,116 @@ type dailyStats struct {
 
 var errorMessage string
 
-func main() {
-
-	sourceURL := flag.String("sourceUrl", "http://server1.kabletown.net:8086", "The influxdb url and port")
-	targetURL := flag.String("targetUrl", "http://server2.kabletown.net:8086", "The influxdb url and port")
-	database := flag.String("database", "all", "Sync a specific database")
-	days := flag.Int("days", 0, "Number of days in the past to sync (today - x days), 0 is all")
-	sourceUser := flag.String("sourceUser", "", "The source influxdb username")
-	sourcePass := flag.String("sourcePass", "", "The source influxdb password")
-	targetUser := flag.String("targetUser", "", "The target influxdb username")
-	targetPass := flag.String("targetPass", "", "The target influxdb password")
-	flag.Parse()
-	fmt.Printf("syncing %v to %v for %v database(s) for the past %v day(s)\n", *sourceURL, *targetURL, *database, *days)
+func syncFlags() []cli.Flag {
+	return []cli.Flag{
+		cli.StringFlag{
+			Name:  "sourceUrl",
+			Usage: "The source influxdb url and port",
+			Value: "http://server1.kabletown.net:8086",
+		},
+		cli.StringFlag{
+			Name:  "targetUrl",
+			Usage: "The target influxdb url and port",
+			Value: "http://server2.kabletown.net:8086",
+		},
+		cli.StringFlag{
+			Name:  "database",
+			Usage: "Sync a specific database",
+			Value: "all",
+		},
+		cli.IntFlag{
+			Name:  "days",
+			Usage: "Number of days in the past to sync (today - x days), 0 is all",
+			Value: 0,
+		},
+		cli.StringFlag{
+			Name:  "sourceUser",
+			Usage: "The source influxdb username",
+			Value: "",
+		},
+		cli.StringFlag{
+			Name:  "sourcePass",
+			Usage: "The source influxdb password",
+			Value: "",
+		},
+		cli.StringFlag{
+			Name:  "targetUser",
+			Usage: "The target influxdb username",
+			Value: "",
+		},
+		cli.StringFlag{
+			Name:  "targetPass",
+			Usage: "The target influxdb password",
+			Value: "",
+		},
+	}
+}
+
+func sync(c *cli.Context) error {
+
+	sourceURL := c.String("sourceUrl")
+	targetURL := c.String("targetUrl")
+	database := c.String("database")
+	days := c.Int("days")
+	sourceUser := c.String("sourceUser")
+	sourcePass := c.String("sourcePass")
+	targetUser := c.String("targetUser")
+	targetPass := c.String("targetPass")
+	fmt.Printf("syncing %s to %s for %s database(s) for the past %d day(s)\n", sourceURL, targetURL, database, days)
 	sourceClient, err := influx.NewHTTPClient(influx.HTTPConfig{
-		Addr:     *sourceURL,
-		Username: *sourceUser,
-		Password: *sourcePass,
+		Addr:     sourceURL,
+		Username: sourceUser,
+		Password: sourcePass,
 	})
 	if err != nil {
-		errorMessage = fmt.Sprintf("Error creating influx sourceClient: %v\n", err)
-		fmt.Println(errorMessage)
-		os.Exit(1)
+		return errors.Wrap(err, "Error creating influx sourceClient")
 	}
-	_, _, err = sourceClient.Ping(10)
-	if err != nil {
-		errorMessage = fmt.Sprintf("Error creating influx sourceClient: %v\n", err)
-		fmt.Println(errorMessage)
-		os.Exit(1)
+
+	if _, _, err = sourceClient.Ping(10); err != nil {
+		return errors.Wrap(err, "Error creating influx sourceClient")
 	}
+
 	targetClient, err := influx.NewHTTPClient(influx.HTTPConfig{
-		Addr:     *targetURL,
-		Username: *targetUser,
-		Password: *targetPass,
+		Addr:     targetURL,
+		Username: targetUser,
+		Password: targetPass,
 	})
 	if err != nil {
-		errorMessage = fmt.Sprintf("Error creating influx targetClient: %v\n", err)
-		fmt.Println(errorMessage)
-		os.Exit(1)
+		return errors.Wrap(err, "Error creating influx targetClient")
 	}
-	_, _, err = targetClient.Ping(10)
-	if err != nil {
-		errorMessage = fmt.Sprintf("Error creating influx targetClient: %v\n", err)
-		fmt.Println(errorMessage)
-		os.Exit(1)
+
+	if _, _, err = targetClient.Ping(10); err != nil {
+		return errors.Wrap(err, "Error creating influx targetClient")
 	}
+
 	chSize := 1
-	if *database == "all" {
+	if database == "all" {
 		chSize = 3
 	}
 
 	ch := make(chan string)
 
-	switch *database {
+	switch database {
 	case "all":
-		go syncDailyDb(ch, sourceClient, targetClient, *days)
-		go syncCsDb(ch, sourceClient, targetClient, *days)
-		go syncDsDb(ch, sourceClient, targetClient, *days)
+		go syncDailyDb(ch, sourceClient, targetClient, days)
+		go syncCsDb(ch, sourceClient, targetClient, days)
+		go syncDsDb(ch, sourceClient, targetClient, days)
 	case cache:
-		go syncCsDb(ch, sourceClient, targetClient, *days)
+		go syncCsDb(ch, sourceClient, targetClient, days)
 	case deliveryService:
-		go syncDsDb(ch, sourceClient, targetClient, *days)
+		go syncDsDb(ch, sourceClient, targetClient, days)
 	case daily:
-		go syncDailyDb(ch, sourceClient, targetClient, *days)
+		go syncDailyDb(ch, sourceClient, targetClient, days)
+	default:
+		return errors.New("No database selected")
 	}
 
 	for i := 1; i <= chSize; i++ {
 		fmt.Println(<-ch)
 	}
 
-	if errorMessage != "" {
-		fmt.Println(errorMessage)
-		return
-	}
-
-	fmt.Println("Traffic Stats has been synced!")
+	fmt.Println("Traffic Stats have been synced!")
+	return nil
 }
 
 func syncCsDb(ch chan string, sourceClient influx.Client, targetClient influx.Client, days int) {
@@ -191,7 +227,7 @@ func syncDailyDb(ch chan string, sourceClient influx.Client, targetClient influx
 
 }
 
-func queryDB(client influx.Client, cmd string, db string) (res []influx.Result, err error) {
+func queryDb2(client influx.Client, cmd string, db string) (res []influx.Result, err error) {
 	q := influx.Query{
 		Command:  cmd,
 		Database: db,
@@ -219,7 +255,7 @@ func syncCacheStat(sourceClient influx.Client, targetClient influx.Client, statN
 		queryString = fmt.Sprintf("%s where time > now() - %dd", queryString, days)
 	}
 	fmt.Println("queryString ", queryString)
-	res, err := queryDB(sourceClient, queryString, db)
+	res, err := queryDb2(sourceClient, queryString, db)
 	if err != nil {
 		fmt.Printf("An error occured getting %s records from sourceDb\n", statName)
 		return
@@ -227,7 +263,7 @@ func syncCacheStat(sourceClient influx.Client, targetClient influx.Client, statN
 	sourceStats := getCacheStats(res)
 
 	//get values from target DB
-	targetRes, err := queryDB(targetClient, queryString, db)
+	targetRes, err := queryDb2(targetClient, queryString, db)
 	if err != nil {
 		errorMessage = fmt.Sprintf("An error occured getting %s record from target db: %v\n", statName, err)
 		fmt.Println(errorMessage)
@@ -282,7 +318,7 @@ func syncDeliveryServiceStat(sourceClient influx.Client, targetClient influx.Cli
 		queryString = fmt.Sprintf("%s where time > now() - %dd", queryString, days)
 	}
 	fmt.Println("queryString ", queryString)
-	res, err := queryDB(sourceClient, queryString, db)
+	res, err := queryDb2(sourceClient, queryString, db)
 	if err != nil {
 		errorMessage = fmt.Sprintf("An error occured getting %s records from sourceDb: %v\n", statName, err)
 		fmt.Println(errorMessage)
@@ -290,7 +326,7 @@ func syncDeliveryServiceStat(sourceClient influx.Client, targetClient influx.Cli
 	}
 	sourceStats := getDeliveryServiceStats(res)
 	// get value from target DB
-	targetRes, err := queryDB(targetClient, queryString, db)
+	targetRes, err := queryDb2(targetClient, queryString, db)
 	if err != nil {
 		errorMessage = fmt.Sprintf("An error occured getting %s record from target db: %v\n", statName, err)
 		fmt.Println(errorMessage)
@@ -341,7 +377,7 @@ func syncDailyStat(sourceClient influx.Client, targetClient influx.Client, statN
 	if days > 0 {
 		queryString = fmt.Sprintf("%s where time > now() - %dd", queryString, days)
 	}
-	res, err := queryDB(sourceClient, queryString, db)
+	res, err := queryDb2(sourceClient, queryString, db)
 	if err != nil {
 		errorMessage = fmt.Sprintf("An error occured getting %s records from sourceDb: %v\n", statName, err)
 		fmt.Println(errorMessage)
@@ -349,7 +385,7 @@ func syncDailyStat(sourceClient influx.Client, targetClient influx.Client, statN
 	}
 	sourceStats := getDailyStats(res)
 	// get value from target DB
-	targetRes, err := queryDB(targetClient, queryString, db)
+	targetRes, err := queryDb2(targetClient, queryString, db)
 	if err != nil {
 		errorMessage = fmt.Sprintf("An error occured getting %s record from target db: %v\n", statName, err)
 		fmt.Println(errorMessage)