You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@trafficcontrol.apache.org by GitBox <gi...@apache.org> on 2022/01/12 16:26:46 UTC

[GitHub] [trafficcontrol] rawlinp commented on a change in pull request #6451: Adding Support for Kafka to Traffic Stats

rawlinp commented on a change in pull request #6451:
URL: https://github.com/apache/trafficcontrol/pull/6451#discussion_r782601633



##########
File path: traffic_stats/traffic_stats.go
##########
@@ -280,6 +317,131 @@ func main() {
 	}
 }
 
+func startShutdown(c *KafkaCluster) {
+	if c == nil {
+		return
+	}
+	go func() {

Review comment:
       Since this runs in a goroutine, it's possible that the main goroutine will exit before this cleanup code is ever run -- normally you would use a mechanism to make the main goroutine wait until this cleanup goroutine is completed. Is there even a reason for this to be run in a separate goroutine? 

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -20,10 +20,13 @@ under the License.
 package main
 
 import (
+	"crypto/tls"
+	"crypto/x509"
 	"encoding/json"
 	"errors"
 	"flag"
 	"fmt"
+	"github.com/Shopify/sarama"

Review comment:
       nit: we generally try to group imports so this one would go into the 3rd group (external dependencies)

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -243,11 +273,18 @@ func main() {
 		case <-termChan:
 			info("Shutdown Request Received - Sending stored metrics then quitting")
 			for _, val := range Bps {
+				if c != nil {
+					publishToKafka(config, val, c)

Review comment:
       This line ignores any returned error

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -280,6 +317,131 @@ func main() {
 	}
 }
 
+func startShutdown(c *KafkaCluster) {
+	if c == nil {
+		return
+	}
+	go func() {
+		log.Infof("Starting cluster shutdown, closing producer and client")
+		if err := (*c.producer).Close(); err != nil {
+			log.Warnf("Error closing producer for cluster:  %v", err)
+		}
+		if err := (*c.client).Close(); err != nil {
+			log.Warnf("Error closing client for cluster:  %v", err)
+		}
+		c.producer = nil
+		log.Infof("Finished cluster shutdown")
+	}()
+}
+
+func TlsConfig(config KafkaConfig) (*tls.Config, error) {
+	if config.EnableTls == false {
+		return nil, nil
+	}
+	c := &tls.Config{}
+	if config.RootCA != "" {
+		log.Infof("Loading TLS root CA certificate from %s", config.RootCA)
+		caCert, err := ioutil.ReadFile(config.RootCA)
+		if err != nil {
+			return nil, err
+		}
+		caCertPool := x509.NewCertPool()
+		caCertPool.AppendCertsFromPEM(caCert)
+		c.RootCAs = caCertPool
+	} else {
+		log.Warnf("No TLS root CA defined")
+	}
+	if config.ClientCert != "" {
+		log.Infof("Loading TLS client certificate")
+		if config.ClientCertKey == "" {
+			return nil, fmt.Errorf("client cert path defined without key path")
+		}
+		certPEM, err := ioutil.ReadFile(config.ClientCert)
+		if err != nil {
+			return nil, err
+		}
+		keyPEM, err := ioutil.ReadFile(config.ClientCertKey)
+		if err != nil {
+			return nil, err
+		}
+		cert, err := tls.X509KeyPair(certPEM, keyPEM)
+		if err != nil {
+			return nil, err
+		}
+		c.Certificates = []tls.Certificate{cert}
+	}
+	return c, nil
+}
+
+func newKakfaCluster(config KafkaConfig) *KafkaCluster {
+
+	if config.Enable == false {
+		return nil
+	}
+
+	brokers := strings.Split(config.Brokers, ",")
+	sc := sarama.NewConfig()
+
+	tlsConfig, err := TlsConfig(config)
+	if err != nil {
+		log.Errorln("Unable to create TLS config", err)
+		return nil
+	}
+
+	sc.Producer.RequiredAcks = sarama.RequiredAcks(config.RequiredAcks)
+	if config.EnableTls && tlsConfig != nil {
+		sc.Net.TLS.Enable = true
+		sc.Net.TLS.Config = tlsConfig
+	}
+
+	cl, err := sarama.NewClient(brokers, sc)
+
+	if err != nil {
+		log.Errorln("Unable to create client", err)
+		return nil
+	}
+
+	p, err := sarama.NewAsyncProducerFromClient(cl)
+
+	if err != nil {
+		log.Errorln("Unable to create producer", err)
+		return nil
+	}
+	c := &KafkaCluster{
+		producer: &p,
+		client:   &cl,
+	}
+
+	return c
+}
+
+func publishToKafka(config StartupConfig, bps influx.BatchPoints, c *KafkaCluster) error {
+
+	input := (*c.producer).Input()
+
+	for _, point := range bps.Points() {
+
+		var KafkaJSON KafkaJSON
+		KafkaJSON.Name = point.Name()
+		KafkaJSON.Tags = point.Tags()
+		KafkaJSON.Fields, _ = point.Fields()
+		KafkaJSON.Time = point.Time()
+
+		message, err := json.Marshal(KafkaJSON)

Review comment:
       Why is this error left unchecked until L438?

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -243,11 +273,18 @@ func main() {
 		case <-termChan:
 			info("Shutdown Request Received - Sending stored metrics then quitting")
 			for _, val := range Bps {
+				if c != nil {
+					publishToKafka(config, val, c)
+				}
 				sendMetrics(config, val, false)
 			}
+			startShutdown(c)
 			os.Exit(0)
 		case <-tickers.Publish:
 			for key, val := range Bps {
+				if c != nil {

Review comment:
       So, rather than checking if the kafka client isn't nil and if InfluxDB is enabled, I think it would make sense to add some kind of `type DataExporter interface { ExportData(bps influx.BatchPoints) }` of which there are two concrete implementations -- one for InfluxDB and one for Kafka. Then, we would have a `dataExporters := []DataExporter{}` which we would add the InfluxDB and Kafka instances to (if enabled). Then, the main goroutine would iterate through the `[]DataExporter` and call `ExportData(...)` on each one. This way, we don't have to constantly check all over the code if certain data exporters are enabled or not.
   
   That said, for `calcDailySummary()`, if that's just going to be an influxDB thing only, then it might make sense to still check if influxDB is enabled in that function.
   
   In the future, if someone wanted to add another data exporter implementation (e.g. Prometheus), this would make it a lot easier. What do you think of this idea?

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -243,11 +273,18 @@ func main() {
 		case <-termChan:
 			info("Shutdown Request Received - Sending stored metrics then quitting")
 			for _, val := range Bps {
+				if c != nil {
+					publishToKafka(config, val, c)
+				}
 				sendMetrics(config, val, false)
 			}
+			startShutdown(c)
 			os.Exit(0)
 		case <-tickers.Publish:
 			for key, val := range Bps {
+				if c != nil {
+					go publishToKafka(config, val, c)

Review comment:
       This line ignores any returned error

##########
File path: traffic_stats/traffic_stats.go
##########
@@ -92,6 +95,7 @@ type StartupConfig struct {
 	ToUser                      string   `json:"toUser"`
 	ToPasswd                    string   `json:"toPasswd"`
 	ToURL                       string   `json:"toUrl"`
+	EnableInflux                bool     `json:"enableInflux"`

Review comment:
       Since influx is enabled by default and should remain so for backwards-compatibility, it might make sense to call this `DisableInflux` (and reverse the logic) since it will default to `false` if left out of existing configs (which is what we want). We don't want to require preexisting config files to have to add `enableInflux: true` in order to continue working as is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@trafficcontrol.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org