You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@trafficcontrol.apache.org by GitBox <gi...@apache.org> on 2022/01/07 03:31:45 UTC

[GitHub] [trafficcontrol] henryluimcit opened a new pull request #6451: Adding Support for Kafka to Traffic Stats

henryluimcit opened a new pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451


   This PR closes [#5118](https://github.com/apache/trafficcontrol/issues/5118)
   
   <!-- **^ Add meaningful description above** --><hr>
   
   ## Which Traffic Control components are affected by this PR?
   <!-- Please delete all components from this list that are NOT affected by this PR.
   Feel free to add the name of a tool or script that is affected but not on the list.
   -->
   
   - Traffic Stats
   
   ## What is the best way to verify this PR?
   <!-- Please include here ALL the steps necessary to test your PR.
   If your PR has tests (and most should), provide the steps needed to run the tests.
   If not, please provide step-by-step instructions to test the PR manually and explain why your PR does not need tests. -->
   
   Within the `traffic_stats.cfg` file, change the `kafkaConfig` to connect to the appropriate local or TLS authenticated server:
   
   
   ```cfg
   	"kafkaConfig": {
   	    "enable": false,
   	    "brokers": "localhost:9092",
   	    "topic": "traffic-stats",
   	    "required_acks": 1,
   	    "enable_tls": false,
   	    "root_ca": "",
   	    "client_cert": "",
   	    "client_cert_key": ""
   	},
   ```
   
   To enable sending data to Kafka, set `enable` to `true`. Add the broker(s) and topic. If using TLS authentication, set `enable_tls` to `true`, and add the path to the CA certificate, client certificate, and client certificate key respectively.
   
   Make sure that the Kafka server is running before starting Traffic Stats. If starting a Kafka environment for the first time, instructions can be found [here](https://kafka.apache.org/quickstart) using steps 1-3.
   
   In order to see the messages pushed by the Kafka producer locally, a consumer can fetch the data. Here's example local consumer using the broker and topic from above:
   
   ```
   bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic traffic-stats --group my-traffic-stats-app --from-beginning
   ```
   
   InfluxDB can also be disabled in `traffic_stats.cfg`. Set `enableInflux` to `false` if Influx is not needed. However, daily stats cannot be written since Traffic Stats queries InfluxDB for daily stats, and Grafana queries InfluxDB and thus will not display charts in Traffic Portal.
   
   ## PR submission checklist
   - [x] This PR does not have tests. A Kafka environment must be set up to test manually.<!-- If not, please delete this text and explain why this PR does not need tests. -->
   - [x] This PR has documentation <!-- If not, please delete this text and explain why this PR does not need documentation. -->
   - [x] This PR has a CHANGELOG.md entry <!-- A fix for a bug from an ATC release, an improvement, or a new feature should have a changelog entry. -->
   - [x] This PR **DOES NOT FIX A SERIOUS SECURITY VULNERABILITY** (see [the Apache Software Foundation's security guidelines](https://apache.org/security) for details)
   
   <!--
   Licensed to the Apache Software Foundation (ASF) under one
   or more contributor license agreements.  See the NOTICE file
   distributed with this work for additional information
   regarding copyright ownership.  The ASF licenses this file
   to you under the Apache License, Version 2.0 (the
   "License"); you may not use this file except in compliance
   with the License.  You may obtain a copy of the License at
   
       http://www.apache.org/licenses/LICENSE-2.0
   
   Unless required by applicable law or agreed to in writing,
   software distributed under the License is distributed on an
   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   KIND, either express or implied.  See the License for the
   specific language governing permissions and limitations
   under the License.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] henryluimcit commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
henryluimcit commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r787198249



##########
File path: traffic_stats/traffic_stats.go
##########
@@ -280,6 +317,131 @@ func main() {
 	}
 }
 
+func startShutdown(c *KafkaCluster) {
+	if c == nil {
+		return
+	}
+	go func() {
+		log.Infof("Starting cluster shutdown, closing producer and client")
+		if err := (*c.producer).Close(); err != nil {
+			log.Warnf("Error closing producer for cluster:  %v", err)
+		}
+		if err := (*c.client).Close(); err != nil {
+			log.Warnf("Error closing client for cluster:  %v", err)
+		}
+		c.producer = nil
+		log.Infof("Finished cluster shutdown")
+	}()
+}
+
+func TlsConfig(config KafkaConfig) (*tls.Config, error) {
+	if config.EnableTls == false {
+		return nil, nil
+	}
+	c := &tls.Config{}
+	if config.RootCA != "" {
+		log.Infof("Loading TLS root CA certificate from %s", config.RootCA)
+		caCert, err := ioutil.ReadFile(config.RootCA)
+		if err != nil {
+			return nil, err
+		}
+		caCertPool := x509.NewCertPool()
+		caCertPool.AppendCertsFromPEM(caCert)
+		c.RootCAs = caCertPool
+	} else {
+		log.Warnf("No TLS root CA defined")
+	}
+	if config.ClientCert != "" {
+		log.Infof("Loading TLS client certificate")
+		if config.ClientCertKey == "" {
+			return nil, fmt.Errorf("client cert path defined without key path")
+		}
+		certPEM, err := ioutil.ReadFile(config.ClientCert)
+		if err != nil {
+			return nil, err
+		}
+		keyPEM, err := ioutil.ReadFile(config.ClientCertKey)
+		if err != nil {
+			return nil, err
+		}
+		cert, err := tls.X509KeyPair(certPEM, keyPEM)
+		if err != nil {
+			return nil, err
+		}
+		c.Certificates = []tls.Certificate{cert}
+	}
+	return c, nil
+}
+
+func newKakfaCluster(config KafkaConfig) *KafkaCluster {
+
+	if config.Enable == false {
+		return nil
+	}
+
+	brokers := strings.Split(config.Brokers, ",")
+	sc := sarama.NewConfig()
+
+	tlsConfig, err := TlsConfig(config)
+	if err != nil {
+		log.Errorln("Unable to create TLS config", err)
+		return nil
+	}
+
+	sc.Producer.RequiredAcks = sarama.RequiredAcks(config.RequiredAcks)
+	if config.EnableTls && tlsConfig != nil {
+		sc.Net.TLS.Enable = true
+		sc.Net.TLS.Config = tlsConfig
+	}
+
+	cl, err := sarama.NewClient(brokers, sc)
+
+	if err != nil {
+		log.Errorln("Unable to create client", err)
+		return nil
+	}
+
+	p, err := sarama.NewAsyncProducerFromClient(cl)
+
+	if err != nil {
+		log.Errorln("Unable to create producer", err)
+		return nil
+	}
+	c := &KafkaCluster{
+		producer: &p,
+		client:   &cl,
+	}
+
+	return c
+}
+
+func publishToKafka(config StartupConfig, bps influx.BatchPoints, c *KafkaCluster) error {
+
+	input := (*c.producer).Input()
+
+	for _, point := range bps.Points() {
+
+		var KafkaJSON KafkaJSON
+		KafkaJSON.Name = point.Name()
+		KafkaJSON.Tags = point.Tags()
+		KafkaJSON.Fields, _ = point.Fields()
+		KafkaJSON.Time = point.Time()
+
+		message, err := json.Marshal(KafkaJSON)

Review comment:
       Good catch - I put the error in the correct spot




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] rawlinp merged pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
rawlinp merged pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] rawlinp commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
rawlinp commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r782601633



##########
File path: traffic_stats/traffic_stats.go
##########
@@ -280,6 +317,131 @@ func main() {
 	}
 }
 
+func startShutdown(c *KafkaCluster) {
+	if c == nil {
+		return
+	}
+	go func() {

Review comment:
       Since this runs in a goroutine, it's possible that the main goroutine will exit before this cleanup code is ever run -- normally you would use a mechanism to make the main goroutine wait until this cleanup goroutine is completed. Is there even a reason for this to be run in a separate goroutine? 

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -20,10 +20,13 @@ under the License.
 package main
 
 import (
+	"crypto/tls"
+	"crypto/x509"
 	"encoding/json"
 	"errors"
 	"flag"
 	"fmt"
+	"github.com/Shopify/sarama"

Review comment:
       nit: we generally try to group imports so this one would go into the 3rd group (external dependencies)

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -243,11 +273,18 @@ func main() {
 		case <-termChan:
 			info("Shutdown Request Received - Sending stored metrics then quitting")
 			for _, val := range Bps {
+				if c != nil {
+					publishToKafka(config, val, c)

Review comment:
       This line ignores any returned error

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -280,6 +317,131 @@ func main() {
 	}
 }
 
+func startShutdown(c *KafkaCluster) {
+	if c == nil {
+		return
+	}
+	go func() {
+		log.Infof("Starting cluster shutdown, closing producer and client")
+		if err := (*c.producer).Close(); err != nil {
+			log.Warnf("Error closing producer for cluster:  %v", err)
+		}
+		if err := (*c.client).Close(); err != nil {
+			log.Warnf("Error closing client for cluster:  %v", err)
+		}
+		c.producer = nil
+		log.Infof("Finished cluster shutdown")
+	}()
+}
+
+func TlsConfig(config KafkaConfig) (*tls.Config, error) {
+	if config.EnableTls == false {
+		return nil, nil
+	}
+	c := &tls.Config{}
+	if config.RootCA != "" {
+		log.Infof("Loading TLS root CA certificate from %s", config.RootCA)
+		caCert, err := ioutil.ReadFile(config.RootCA)
+		if err != nil {
+			return nil, err
+		}
+		caCertPool := x509.NewCertPool()
+		caCertPool.AppendCertsFromPEM(caCert)
+		c.RootCAs = caCertPool
+	} else {
+		log.Warnf("No TLS root CA defined")
+	}
+	if config.ClientCert != "" {
+		log.Infof("Loading TLS client certificate")
+		if config.ClientCertKey == "" {
+			return nil, fmt.Errorf("client cert path defined without key path")
+		}
+		certPEM, err := ioutil.ReadFile(config.ClientCert)
+		if err != nil {
+			return nil, err
+		}
+		keyPEM, err := ioutil.ReadFile(config.ClientCertKey)
+		if err != nil {
+			return nil, err
+		}
+		cert, err := tls.X509KeyPair(certPEM, keyPEM)
+		if err != nil {
+			return nil, err
+		}
+		c.Certificates = []tls.Certificate{cert}
+	}
+	return c, nil
+}
+
+func newKakfaCluster(config KafkaConfig) *KafkaCluster {
+
+	if config.Enable == false {
+		return nil
+	}
+
+	brokers := strings.Split(config.Brokers, ",")
+	sc := sarama.NewConfig()
+
+	tlsConfig, err := TlsConfig(config)
+	if err != nil {
+		log.Errorln("Unable to create TLS config", err)
+		return nil
+	}
+
+	sc.Producer.RequiredAcks = sarama.RequiredAcks(config.RequiredAcks)
+	if config.EnableTls && tlsConfig != nil {
+		sc.Net.TLS.Enable = true
+		sc.Net.TLS.Config = tlsConfig
+	}
+
+	cl, err := sarama.NewClient(brokers, sc)
+
+	if err != nil {
+		log.Errorln("Unable to create client", err)
+		return nil
+	}
+
+	p, err := sarama.NewAsyncProducerFromClient(cl)
+
+	if err != nil {
+		log.Errorln("Unable to create producer", err)
+		return nil
+	}
+	c := &KafkaCluster{
+		producer: &p,
+		client:   &cl,
+	}
+
+	return c
+}
+
+func publishToKafka(config StartupConfig, bps influx.BatchPoints, c *KafkaCluster) error {
+
+	input := (*c.producer).Input()
+
+	for _, point := range bps.Points() {
+
+		var KafkaJSON KafkaJSON
+		KafkaJSON.Name = point.Name()
+		KafkaJSON.Tags = point.Tags()
+		KafkaJSON.Fields, _ = point.Fields()
+		KafkaJSON.Time = point.Time()
+
+		message, err := json.Marshal(KafkaJSON)

Review comment:
       Why is this error left unchecked until L438?

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -243,11 +273,18 @@ func main() {
 		case <-termChan:
 			info("Shutdown Request Received - Sending stored metrics then quitting")
 			for _, val := range Bps {
+				if c != nil {
+					publishToKafka(config, val, c)
+				}
 				sendMetrics(config, val, false)
 			}
+			startShutdown(c)
 			os.Exit(0)
 		case <-tickers.Publish:
 			for key, val := range Bps {
+				if c != nil {

Review comment:
       So, rather than checking if the kafka client isn't nil and if InfluxDB is enabled, I think it would make sense to add some kind of `type DataExporter interface { ExportData(bps influx.BatchPoints) }` of which there are two concrete implementations -- one for InfluxDB and one for Kafka. Then, we would have a `dataExporters := []DataExporter{}` which we would add the InfluxDB and Kafka instances to (if enabled). Then, the main goroutine would iterate through the `[]DataExporter` and call `ExportData(...)` on each one. This way, we don't have to constantly check all over the code if certain data exporters are enabled or not.
   
   That said, for `calcDailySummary()`, if that's just going to be an influxDB thing only, then it might make sense to still check if influxDB is enabled in that function.
   
   In the future, if someone wanted to add another data exporter implementation (e.g. Prometheus), this would make it a lot easier. What do you think of this idea?

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -243,11 +273,18 @@ func main() {
 		case <-termChan:
 			info("Shutdown Request Received - Sending stored metrics then quitting")
 			for _, val := range Bps {
+				if c != nil {
+					publishToKafka(config, val, c)
+				}
 				sendMetrics(config, val, false)
 			}
+			startShutdown(c)
 			os.Exit(0)
 		case <-tickers.Publish:
 			for key, val := range Bps {
+				if c != nil {
+					go publishToKafka(config, val, c)

Review comment:
       This line ignores any returned error

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -92,6 +95,7 @@ type StartupConfig struct {
 	ToUser                      string   `json:"toUser"`
 	ToPasswd                    string   `json:"toPasswd"`
 	ToURL                       string   `json:"toUrl"`
+	EnableInflux                bool     `json:"enableInflux"`

Review comment:
       Since influx is enabled by default and should remain so for backwards-compatibility, it might make sense to call this `DisableInflux` (and reverse the logic) since it will default to `false` if left out of existing configs (which is what we want). We don't want to require preexisting config files to have to add `enableInflux: true` in order to continue working as is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] henryluimcit commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
henryluimcit commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r789111243



##########
File path: vendor/modules.txt
##########
@@ -10,6 +10,11 @@ github.com/GehirnInc/crypt
 github.com/GehirnInc/crypt/common

Review comment:
       Added vendor files in most recent commit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] rawlinp commented on pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
rawlinp commented on pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#issuecomment-1016913633


   This looks good to me, but there are merge conflicts to address.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] ocket8888 commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
ocket8888 commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r787267339



##########
File path: traffic_stats/traffic_stats.go
##########
@@ -229,6 +274,16 @@ func main() {
 	hupChan := make(chan os.Signal, 1)
 	signal.Notify(hupChan, syscall.SIGHUP)
 
+	dataExporters := []DataExporter{}
+
+	if config.KafkaConfig.Enable == true && c != nil {
+		dataExporters = append(dataExporters, c)
+	}
+
+	if config.DisableInflux == false {

Review comment:
       I'd like to point out once again that with this change our configuration file for TO will have both `disableInflux` and `influxEnabled` - and I don't think that's ideal




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] zrhoffman commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
zrhoffman commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r789889432



##########
File path: go.mod
##########
@@ -51,6 +51,7 @@ require (
 	golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3

Review comment:
       Never mind, I'm seeing all of these deps in `go.mod` when regenerating it from scratch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] rawlinp commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
rawlinp commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r787301657



##########
File path: traffic_stats/traffic_stats.go
##########
@@ -229,6 +274,16 @@ func main() {
 	hupChan := make(chan os.Signal, 1)
 	signal.Notify(hupChan, syscall.SIGHUP)
 
+	dataExporters := []DataExporter{}
+
+	if config.KafkaConfig.Enable == true && c != nil {
+		dataExporters = append(dataExporters, c)
+	}
+
+	if config.DisableInflux == false {

Review comment:
       This isn't the TO config file -- this is the Traffic Stats config file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] henryluimcit commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
henryluimcit commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r787200435



##########
File path: traffic_stats/traffic_stats.go
##########
@@ -243,11 +273,18 @@ func main() {
 		case <-termChan:
 			info("Shutdown Request Received - Sending stored metrics then quitting")
 			for _, val := range Bps {
+				if c != nil {
+					publishToKafka(config, val, c)

Review comment:
       Fixed with ExportData method (line 147)

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -243,11 +273,18 @@ func main() {
 		case <-termChan:
 			info("Shutdown Request Received - Sending stored metrics then quitting")
 			for _, val := range Bps {
+				if c != nil {
+					publishToKafka(config, val, c)
+				}
 				sendMetrics(config, val, false)
 			}
+			startShutdown(c)
 			os.Exit(0)
 		case <-tickers.Publish:
 			for key, val := range Bps {
+				if c != nil {
+					go publishToKafka(config, val, c)

Review comment:
       Fixed with ExportData method (line 147)

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -243,11 +273,18 @@ func main() {
 		case <-termChan:
 			info("Shutdown Request Received - Sending stored metrics then quitting")
 			for _, val := range Bps {
+				if c != nil {
+					publishToKafka(config, val, c)
+				}
 				sendMetrics(config, val, false)
 			}
+			startShutdown(c)
 			os.Exit(0)
 		case <-tickers.Publish:
 			for key, val := range Bps {
+				if c != nil {
+					go publishToKafka(config, val, c)

Review comment:
       Fixed with ExportData method (line 147)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] rawlinp commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
rawlinp commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r789058544



##########
File path: vendor/modules.txt
##########
@@ -10,6 +10,11 @@ github.com/GehirnInc/crypt
 github.com/GehirnInc/crypt/common

Review comment:
       were there no other additions to the `vendor` directory?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] rawlinp commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
rawlinp commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r787223700



##########
File path: traffic_stats/traffic_stats.go
##########
@@ -229,6 +274,16 @@ func main() {
 	hupChan := make(chan os.Signal, 1)
 	signal.Notify(hupChan, syscall.SIGHUP)
 
+	dataExporters := []DataExporter{}
+
+	if config.KafkaConfig.Enable == true && c != nil {
+		dataExporters = append(dataExporters, c)
+	}
+
+	if config.DisableInflux == false {

Review comment:
       This can be simplified to `if !config.DisableInflux`

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -229,6 +274,16 @@ func main() {
 	hupChan := make(chan os.Signal, 1)
 	signal.Notify(hupChan, syscall.SIGHUP)
 
+	dataExporters := []DataExporter{}
+
+	if config.KafkaConfig.Enable == true && c != nil {

Review comment:
       This can be simplified to `if config.KafkaConfig.Enable && c != nil`

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -280,6 +340,131 @@ func main() {
 	}
 }
 
+func startShutdown(c *KafkaCluster) {
+	if c == nil {
+		return
+	}
+
+	log.Infof("Starting cluster shutdown, closing producer and client")
+	if err := (*c.producer).Close(); err != nil {
+		log.Warnf("Error closing producer for cluster:  %v", err)
+	}
+	if err := (*c.client).Close(); err != nil {
+		log.Warnf("Error closing client for cluster:  %v", err)
+	}
+	c.producer = nil
+	log.Infof("Finished cluster shutdown")
+}
+
+func TlsConfig(config KafkaConfig) (*tls.Config, error) {
+	if config.EnableTls == false {

Review comment:
       This can be simplified to `if !config.EnableTls`

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -349,6 +534,10 @@ func loadStartupConfig(configFile string, oldConfig StartupConfig) (StartupConfi
 		warn("No logging configuration found in configuration file - default logging to stderr will be used")
 	}
 
+	if config.DisableInflux == true {

Review comment:
       This can be simplified to `if config.DisableInflux`

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -370,6 +559,10 @@ func loadStartupConfig(configFile string, oldConfig StartupConfig) (StartupConfi
 }
 
 func calcDailySummary(now time.Time, config StartupConfig, runningConfig RunningConfig) {
+	if config.DisableInflux == true {

Review comment:
       This can be simplified to `if config.DisableInflux`

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -108,6 +112,45 @@ type StartupConfig struct {
 	DailySummaryRetentionPolicy string   `json:"dailySummaryRetentionPolicy"`
 	BpsChan                     chan influx.BatchPoints
 	InfluxDBs                   []*InfluxDBProps
+	KafkaConfig                 KafkaConfig `json:"kafkaConfig"`
+}
+
+type KafkaConfig struct {
+	Enable        bool   `json:"enable"`
+	Brokers       string `json:"brokers"`
+	Topic         string `json:"topic"`
+	RequiredAcks  int    `json:"requiredAcks"`
+	EnableTls     bool   `json:"enableTls"`
+	RootCA        string `json:"rootCA"`
+	ClientCert    string `json:"clientCert"`
+	ClientCertKey string `json:"clientCertKey"`
+}
+
+type KafkaCluster struct {
+	producer *sarama.AsyncProducer
+	client   *sarama.Client
+}
+
+type KafkaJSON struct {
+	Name   string                 `json:"name"`
+	Tags   map[string]string      `json:"tags"`
+	Fields map[string]interface{} `json:"fields"`
+	Time   time.Time              `json:"time"`
+}
+
+type DataExporter interface {
+	ExportData(config StartupConfig, bps influx.BatchPoints, retry bool)
+}
+
+func (c *KafkaCluster) ExportData(config StartupConfig, bps influx.BatchPoints, retry bool) {
+	err := publishToKafka(config, bps, c)
+	if err != nil {
+		log.Errorln("Unable to export to Kafka", err)
+	}
+}
+
+func (influx StartupConfig) ExportData(config StartupConfig, bps influx.BatchPoints, retry bool) {

Review comment:
       Rather than tying this method to `StartupConfig`, it might make more sense to create a new `InfluxClient` struct (or some other name) to associate this interface method with.

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -280,6 +340,131 @@ func main() {
 	}
 }
 
+func startShutdown(c *KafkaCluster) {
+	if c == nil {
+		return
+	}
+
+	log.Infof("Starting cluster shutdown, closing producer and client")
+	if err := (*c.producer).Close(); err != nil {
+		log.Warnf("Error closing producer for cluster:  %v", err)
+	}
+	if err := (*c.client).Close(); err != nil {
+		log.Warnf("Error closing client for cluster:  %v", err)
+	}
+	c.producer = nil
+	log.Infof("Finished cluster shutdown")
+}
+
+func TlsConfig(config KafkaConfig) (*tls.Config, error) {
+	if config.EnableTls == false {
+		return nil, nil
+	}
+	c := &tls.Config{}
+	if config.RootCA != "" {
+		log.Infof("Loading TLS root CA certificate from %s", config.RootCA)
+		caCert, err := ioutil.ReadFile(config.RootCA)
+		if err != nil {
+			return nil, err
+		}
+		caCertPool := x509.NewCertPool()
+		caCertPool.AppendCertsFromPEM(caCert)
+		c.RootCAs = caCertPool
+	} else {
+		log.Warnf("No TLS root CA defined")
+	}
+	if config.ClientCert != "" {
+		log.Infof("Loading TLS client certificate")
+		if config.ClientCertKey == "" {
+			return nil, fmt.Errorf("client cert path defined without key path")
+		}
+		certPEM, err := ioutil.ReadFile(config.ClientCert)
+		if err != nil {
+			return nil, err
+		}
+		keyPEM, err := ioutil.ReadFile(config.ClientCertKey)
+		if err != nil {
+			return nil, err
+		}
+		cert, err := tls.X509KeyPair(certPEM, keyPEM)
+		if err != nil {
+			return nil, err
+		}
+		c.Certificates = []tls.Certificate{cert}
+	}
+	return c, nil
+}
+
+func newKakfaCluster(config KafkaConfig) *KafkaCluster {
+
+	if config.Enable == false {

Review comment:
       This can be simplified to `if !config.Enable`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] henryluimcit commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
henryluimcit commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r787199902



##########
File path: traffic_stats/traffic_stats.go
##########
@@ -243,11 +273,18 @@ func main() {
 		case <-termChan:
 			info("Shutdown Request Received - Sending stored metrics then quitting")
 			for _, val := range Bps {
+				if c != nil {
+					publishToKafka(config, val, c)
+				}
 				sendMetrics(config, val, false)
 			}
+			startShutdown(c)
 			os.Exit(0)
 		case <-tickers.Publish:
 			for key, val := range Bps {
+				if c != nil {

Review comment:
       I created a DataExporter interface with a couple caveats: 
   
   1. The retry (bool) parameter is only used for Influx
   2. I used the startup config object to implement the method for ExportData (line 152) for Influx (line 284). I wasn't sure what else to use since nothing was needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] henryluimcit commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
henryluimcit commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r787197759



##########
File path: traffic_stats/traffic_stats.go
##########
@@ -20,10 +20,13 @@ under the License.
 package main
 
 import (
+	"crypto/tls"
+	"crypto/x509"
 	"encoding/json"
 	"errors"
 	"flag"
 	"fmt"
+	"github.com/Shopify/sarama"

Review comment:
       Fixed in latest commit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] henryluimcit commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
henryluimcit commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r787198005



##########
File path: traffic_stats/traffic_stats.go
##########
@@ -280,6 +317,131 @@ func main() {
 	}
 }
 
+func startShutdown(c *KafkaCluster) {
+	if c == nil {
+		return
+	}
+	go func() {

Review comment:
       Yes, the separate goroutine is unneeded in this case. I removed it in the latest commit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] henryluimcit commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
henryluimcit commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r787200176



##########
File path: traffic_stats/traffic_stats.go
##########
@@ -92,6 +95,7 @@ type StartupConfig struct {
 	ToUser                      string   `json:"toUser"`
 	ToPasswd                    string   `json:"toPasswd"`
 	ToURL                       string   `json:"toUrl"`
+	EnableInflux                bool     `json:"enableInflux"`

Review comment:
       That makes sense - I changed to `DisableInflux`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] ocket8888 commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
ocket8888 commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r783275909



##########
File path: traffic_stats/traffic_stats.go
##########
@@ -92,6 +95,7 @@ type StartupConfig struct {
 	ToUser                      string   `json:"toUser"`
 	ToPasswd                    string   `json:"toPasswd"`
 	ToURL                       string   `json:"toUrl"`
+	EnableInflux                bool     `json:"enableInflux"`

Review comment:
       Also, we already have `InfluxEnabled` - I don't want to think about what it should do if you have `"disableInflux": true` *and* `"influxEnabled": true`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] zrhoffman commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
zrhoffman commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r789875307



##########
File path: go.mod
##########
@@ -51,6 +51,7 @@ require (
 	golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3

Review comment:
       The following dependencies are unused and can be removed from `go.mod`:
   * github.com/hashicorp/go-uuid
   * github.com/jcmturner/gofork
   * github.com/klauspost/compress
   * gopkg.in/jcmturner/aescts.v1
   * gopkg.in/jcmturner/dnsutils.v1
   * gopkg.in/jcmturner/gokrb5.v7
   * gopkg.in/jcmturner/rpc.v1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] zrhoffman commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
zrhoffman commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r789899416



##########
File path: go.mod
##########
@@ -51,6 +51,7 @@ require (
 	golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3

Review comment:
       Yep
   > Never mind, I'm seeing all of these deps in `go.mod` when regenerating it from scratch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [trafficcontrol] henryluimcit commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

Posted by GitBox <gi...@apache.org>.
henryluimcit commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r789897856



##########
File path: go.mod
##########
@@ -51,6 +51,7 @@ require (
 	golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3

Review comment:
       These are required dependencies for this PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org