You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@trafficcontrol.apache.org by GitBox <gi...@apache.org> on 2018/11/05 21:18:23 UTC

[GitHub] rawlinp closed pull request #2931: reuse single riak cluster connection, up riak heartbeat timeout

rawlinp closed pull request #2931: reuse single riak cluster connection, up riak heartbeat timeout
URL: https://github.com/apache/trafficcontrol/pull/2931
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2dbde00db..4a7653290 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
   - /api/1.1/deliveryservices/hostname/:hostname/sslkeys `GET`
   - /api/1.1/deliveryservices/sslkeys/add `POST`
   - /api/1.1/deliveryservices/xmlId/:xmlid/sslkeys/delete `GET`
+- To support reusing a single riak cluster connection, an optional parameter is added to riak.conf: "HealthCheckInterval". This options takes a 'Duration' value (ie: 10s, 5m) which affects how often the riak cluster is health checked.  Default is currently set to: "HealthCheckInterval": "5s".
   
 ### Changed
 - Issue 2821: Fixed "Traffic Router may choose wrong certificate when SNI names overlap"
diff --git a/traffic_ops/traffic_ops_golang/cdn/dnssec.go b/traffic_ops/traffic_ops_golang/cdn/dnssec.go
index 69bb065f7..a9f89989a 100644
--- a/traffic_ops/traffic_ops_golang/cdn/dnssec.go
+++ b/traffic_ops/traffic_ops_golang/cdn/dnssec.go
@@ -208,20 +208,15 @@ func DeleteDNSSECKeys(w http.ResponseWriter, r *http.Request) {
 	}
 	defer inf.Close()
 
-	key := inf.Params["name"]
-
-	riakCluster, err := riaksvc.GetRiakClusterTx(inf.Tx.Tx, inf.Config.RiakAuthOptions)
+	cluster, err := riaksvc.GetPooledCluster(inf.Tx.Tx, inf.Config.RiakAuthOptions)
 	if err != nil {
 		api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, errors.New("getting riak cluster: "+err.Error()))
 		return
 	}
-	if err := riakCluster.Start(); err != nil {
-		api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, errors.New("starting riak cluster: "+err.Error()))
-		return
-	}
-	defer riaksvc.StopCluster(riakCluster)
 
-	if err := riaksvc.DeleteObject(key, CDNDNSSECKeyType, riakCluster); err != nil {
+	key := inf.Params["name"]
+
+	if err := riaksvc.DeleteObject(key, CDNDNSSECKeyType, cluster); err != nil {
 		api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, errors.New("deleting cdn dnssec keys: "+err.Error()))
 		return
 	}
diff --git a/traffic_ops/traffic_ops_golang/riaksvc/dsutil.go b/traffic_ops/traffic_ops_golang/riaksvc/dsutil.go
index a91253059..2427331aa 100644
--- a/traffic_ops/traffic_ops_golang/riaksvc/dsutil.go
+++ b/traffic_ops/traffic_ops_golang/riaksvc/dsutil.go
@@ -46,7 +46,7 @@ func MakeDSSSLKeyKey(dsName, version string) string {
 func GetDeliveryServiceSSLKeysObj(xmlID string, version string, tx *sql.Tx, authOpts *riak.AuthOptions) (tc.DeliveryServiceSSLKeys, bool, error) {
 	key := tc.DeliveryServiceSSLKeys{}
 	found := false
-	err := WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
+	err := WithCluster(tx, authOpts, func(cluster StorageCluster) error {
 		// get the deliveryservice ssl keys by xmlID and version
 		ro, err := FetchObjectValues(MakeDSSSLKeyKey(xmlID, version), DeliveryServiceSSLKeysBucket, cluster)
 		if err != nil {
@@ -73,7 +73,7 @@ func PutDeliveryServiceSSLKeysObj(key tc.DeliveryServiceSSLKeys, tx *sql.Tx, aut
 	if err != nil {
 		return errors.New("marshalling key: " + err.Error())
 	}
-	err = WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
+	err = WithCluster(tx, authOpts, func(cluster StorageCluster) error {
 		obj := &riak.Object{
 			ContentType:     "application/json",
 			Charset:         "utf-8",
@@ -99,7 +99,7 @@ func Ping(tx *sql.Tx, authOpts *riak.AuthOptions) (tc.RiakPingResp, error) {
 		return tc.RiakPingResp{}, errors.New("getting riak servers: " + err.Error())
 	}
 	for _, server := range servers {
-		cluster, err := RiakServersToCluster([]ServerAddr{server}, authOpts)
+		cluster, err := GetRiakStorageCluster([]ServerAddr{server}, authOpts)
 		if err != nil {
 			log.Errorf("RiakServersToCluster error for server %+v: %+v\n", server, err.Error())
 			continue // try another server
@@ -126,7 +126,7 @@ func Ping(tx *sql.Tx, authOpts *riak.AuthOptions) (tc.RiakPingResp, error) {
 func GetDNSSECKeys(cdnName string, tx *sql.Tx, authOpts *riak.AuthOptions) (tc.DNSSECKeys, bool, error) {
 	key := tc.DNSSECKeys{}
 	found := false
-	err := WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
+	err := WithCluster(tx, authOpts, func(cluster StorageCluster) error {
 		ro, err := FetchObjectValues(cdnName, DNSSECKeysBucket, cluster)
 		if err != nil {
 			return err
@@ -151,7 +151,7 @@ func PutDNSSECKeys(keys tc.DNSSECKeys, cdnName string, tx *sql.Tx, authOpts *ria
 	if err != nil {
 		return errors.New("marshalling keys: " + err.Error())
 	}
-	err = WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
+	err = WithCluster(tx, authOpts, func(cluster StorageCluster) error {
 		obj := &riak.Object{
 			ContentType:     "application/json",
 			Charset:         "utf-8",
@@ -170,7 +170,7 @@ func PutDNSSECKeys(keys tc.DNSSECKeys, cdnName string, tx *sql.Tx, authOpts *ria
 func GetBucketKey(tx *sql.Tx, authOpts *riak.AuthOptions, bucket string, key string) ([]byte, bool, error) {
 	val := []byte{}
 	found := false
-	err := WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
+	err := WithCluster(tx, authOpts, func(cluster StorageCluster) error {
 		// get the deliveryservice ssl keys by xmlID and version
 		ro, err := FetchObjectValues(key, bucket, cluster)
 		if err != nil {
@@ -190,7 +190,7 @@ func GetBucketKey(tx *sql.Tx, authOpts *riak.AuthOptions, bucket string, key str
 }
 
 func DeleteDSSSLKeys(tx *sql.Tx, authOpts *riak.AuthOptions, xmlID string, version string) error {
-	err := WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
+	err := WithCluster(tx, authOpts, func(cluster StorageCluster) error {
 		if err := DeleteObject(MakeDSSSLKeyKey(xmlID, version), DeliveryServiceSSLKeysBucket, cluster); err != nil {
 			return errors.New("deleting SSL keys: " + err.Error())
 		}
@@ -209,7 +209,7 @@ func GetURLSigKeys(tx *sql.Tx, authOpts *riak.AuthOptions, ds tc.DeliveryService
 	val := tc.URLSigKeys{}
 	found := false
 	key := GetURLSigConfigFileName(ds)
-	err := WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
+	err := WithCluster(tx, authOpts, func(cluster StorageCluster) error {
 		ro, err := FetchObjectValues(key, URLSigKeysBucket, cluster)
 		if err != nil {
 			return err
@@ -234,7 +234,7 @@ func PutURLSigKeys(tx *sql.Tx, authOpts *riak.AuthOptions, ds tc.DeliveryService
 	if err != nil {
 		return errors.New("marshalling keys: " + err.Error())
 	}
-	err = WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
+	err = WithCluster(tx, authOpts, func(cluster StorageCluster) error {
 		obj := &riak.Object{
 			ContentType:     "application/json",
 			Charset:         "utf-8",
@@ -255,7 +255,7 @@ const CDNSSLKeysLimit = 1000 // TODO: emulates Perl; reevaluate?
 
 func GetCDNSSLKeysObj(tx *sql.Tx, authOpts *riak.AuthOptions, cdnName string) ([]tc.CDNSSLKey, error) {
 	keys := []tc.CDNSSLKey{}
-	err := WithClusterTx(tx, authOpts, func(cluster StorageCluster) error {
+	err := WithCluster(tx, authOpts, func(cluster StorageCluster) error {
 		// get the deliveryservice ssl keys by xmlID and version
 		query := `cdn:` + cdnName
 		filterQuery := `_yz_rk:*latest`
diff --git a/traffic_ops/traffic_ops_golang/riaksvc/riak_services.go b/traffic_ops/traffic_ops_golang/riaksvc/riak_services.go
index f9057e1ea..193387929 100644
--- a/traffic_ops/traffic_ops_golang/riaksvc/riak_services.go
+++ b/traffic_ops/traffic_ops_golang/riaksvc/riak_services.go
@@ -26,7 +26,11 @@ import (
 	"errors"
 	"fmt"
 	"io/ioutil"
+	"reflect"
+	"runtime"
+	"sort"
 	"strconv"
+	"sync"
 	"time"
 
 	"github.com/apache/trafficcontrol/lib/go-log"
@@ -34,14 +38,21 @@ import (
 	"github.com/basho/riak-go-client"
 )
 
-// RiakPort is the port RIAK is listening on.
-const RiakPort = 8087
+const (
+	RiakPort = 8087
+	TimeOut  = time.Second * 5
 
-// 5 second timeout
-const timeOut = time.Second * 5
+	DefaultHealthCheckInterval         = time.Second * 5
+	DefaultMaxCommandExecutionAttempts = 5
+)
+
+var (
+	clusterServers []ServerAddr
+	sharedCluster  *riak.Cluster
+	clusterMutex   sync.Mutex
 
-// MaxCommandExecutionAttempts ...
-const MaxCommandExecutionAttempts = 5
+	healthCheckInterval time.Duration
+)
 
 type AuthOptions riak.AuthOptions
 
@@ -73,18 +84,42 @@ func (ri RiakStorageCluster) Execute(command riak.Command) error {
 }
 
 func GetRiakConfig(riakConfigFile string) (bool, *riak.AuthOptions, error) {
-	riakConfBytes, err := ioutil.ReadFile(riakConfigFile)
+	riakConfString, err := ioutil.ReadFile(riakConfigFile)
 	if err != nil {
 		return false, nil, fmt.Errorf("reading riak conf '%v': %v", riakConfigFile, err)
 	}
 
+	riakConfBytes := []byte(riakConfString)
+
 	rconf := &riak.AuthOptions{}
 	rconf.TlsConfig = &tls.Config{}
-	err = json.Unmarshal([]byte(riakConfBytes), &rconf)
+	err = json.Unmarshal(riakConfBytes, &rconf)
 	if err != nil {
-		return false, nil, fmt.Errorf("Unmarshalling riak conf '%v': %v", riakConfigFile, err)
+		return false, nil, fmt.Errorf("Unmarshaling riak conf '%v': %v", riakConfigFile, err)
+	}
+
+	type config struct {
+		Hci string `json:"HealthCheckInterval"`
+	}
+
+	var checkconfig config
+	err = json.Unmarshal(riakConfBytes, &checkconfig)
+	if err == nil {
+		hci, _ := time.ParseDuration(checkconfig.Hci)
+		if 0 < hci {
+			healthCheckInterval = hci
+		}
+	} else {
+		log.Infoln("Error unmarshalling riak config options: " + err.Error())
+	}
+
+	if healthCheckInterval <= 0 {
+		healthCheckInterval = DefaultHealthCheckInterval
+		log.Infoln("HeathCheckInterval override")
 	}
 
+	log.Infoln("Riak health check interval set to:", healthCheckInterval)
+
 	return true, rconf, nil
 }
 
@@ -98,19 +133,13 @@ func DeleteObject(key string, bucket string, cluster StorageCluster) error {
 	cmd, err := riak.NewDeleteValueCommandBuilder().
 		WithBucket(bucket).
 		WithKey(key).
-		WithTimeout(timeOut).
+		WithTimeout(TimeOut).
 		Build()
 	if err != nil {
 		return err
 	}
 
-	err = cluster.Execute(cmd)
-
-	if err != nil {
-		return err
-	}
-
-	return nil
+	return cluster.Execute(cmd)
 }
 
 // PingCluster pings the given Riak cluster, and returns nil on success, or any error
@@ -148,7 +177,7 @@ func FetchObjectValues(key string, bucket string, cluster StorageCluster) ([]*ri
 	cmd, err := riak.NewFetchValueCommandBuilder().
 		WithBucket(bucket).
 		WithKey(key).
-		WithTimeout(timeOut).
+		WithTimeout(TimeOut).
 		Build()
 	if err != nil {
 		return nil, err
@@ -178,17 +207,13 @@ func SaveObject(obj *riak.Object, bucket string, cluster StorageCluster) error {
 	cmd, err := riak.NewStoreValueCommandBuilder().
 		WithBucket(bucket).
 		WithContent(obj).
-		WithTimeout(timeOut).
+		WithTimeout(TimeOut).
 		Build()
 	if err != nil {
 		return err
 	}
-	err = cluster.Execute(cmd)
-	if err != nil {
-		return err
-	}
 
-	return nil
+	return cluster.Execute(cmd)
 }
 
 type ServerAddr struct {
@@ -216,18 +241,20 @@ WHERE t.name = 'RIAK' AND st.name = 'ONLINE'
 		}
 		servers = append(servers, s)
 	}
+
 	return servers, nil
 }
 
-func RiakServersToCluster(servers []ServerAddr, authOptions *riak.AuthOptions) (StorageCluster, error) {
+func GetRiakCluster(servers []ServerAddr, authOptions *riak.AuthOptions) (*riak.Cluster, error) {
 	if authOptions == nil {
 		return nil, errors.New("ERROR: no riak auth information from riak.conf, cannot authenticate to any riak servers")
 	}
 	nodes := []*riak.Node{}
 	for _, srv := range servers {
 		nodeOpts := &riak.NodeOptions{
-			RemoteAddress: srv.FQDN + ":" + srv.Port,
-			AuthOptions:   authOptions,
+			RemoteAddress:       srv.FQDN + ":" + srv.Port,
+			AuthOptions:         authOptions,
+			HealthCheckInterval: healthCheckInterval,
 		}
 		nodeOpts.AuthOptions.TlsConfig.ServerName = srv.FQDN
 		node, err := riak.NewNode(nodeOpts)
@@ -241,60 +268,88 @@ func RiakServersToCluster(servers []ServerAddr, authOptions *riak.AuthOptions) (
 	}
 	opts := &riak.ClusterOptions{
 		Nodes:             nodes,
-		ExecutionAttempts: MaxCommandExecutionAttempts,
+		ExecutionAttempts: DefaultMaxCommandExecutionAttempts,
 	}
 	cluster, err := riak.NewCluster(opts)
 	if err != nil {
 		return nil, errors.New("creating riak cluster: " + err.Error())
 	}
-	return RiakStorageCluster{Cluster: cluster}, nil
+	return cluster, err
 }
 
-func GetRiakClusterTx(tx *sql.Tx, authOptions *riak.AuthOptions) (StorageCluster, error) {
-	servers, err := GetRiakServers(tx)
-	if err != nil {
-		return nil, errors.New("getting riak servers: " + err.Error())
-	}
-	cluster, err := RiakServersToCluster(servers, authOptions)
+func GetRiakStorageCluster(servers []ServerAddr, authOptions *riak.AuthOptions) (StorageCluster, error) {
+	cluster, err := GetRiakCluster(servers, authOptions)
 	if err != nil {
-		return nil, errors.New("creating riak cluster from servers: " + err.Error())
+		return nil, err
 	}
-	return cluster, nil
+	return RiakStorageCluster{Cluster: cluster}, nil
 }
 
-func WithClusterTx(tx *sql.Tx, authOpts *riak.AuthOptions, f func(StorageCluster) error) error {
-	cluster, err := GetRiakClusterTx(tx, authOpts)
-	if err != nil {
-		return errors.New("getting riak cluster: " + err.Error())
-	}
-	if err = cluster.Start(); err != nil {
-		return errors.New("starting riak cluster: " + err.Error())
-	}
-	defer func() {
-		if err := cluster.Stop(); err != nil {
-			log.Errorln("error stopping Riak cluster: " + err.Error())
+func GetPooledCluster(tx *sql.Tx, authOptions *riak.AuthOptions) (StorageCluster, error) {
+
+	clusterMutex.Lock()
+	defer clusterMutex.Unlock()
+
+	tryLoad := false
+
+	// should we try to reload the cluster?
+	newservers, err := GetRiakServers(tx)
+
+	if err == nil {
+		if 0 < len(newservers) {
+			sort.Slice(newservers, func(ii, jj int) bool {
+				return newservers[ii].FQDN < newservers[jj].FQDN ||
+					(newservers[ii].FQDN == newservers[jj].FQDN && newservers[ii].Port < newservers[jj].Port)
+			})
+			if !reflect.DeepEqual(newservers, clusterServers) {
+				tryLoad = true
+				log.Infoln("Attempting to load a new set of riak servers")
+				log.Infoln("new riak servers")
+				for _, srv := range newservers {
+					log.Infoln(" ", srv.FQDN+":"+srv.Port)
+				}
+			}
+		}
+	} else {
+		log.Errorln("getting riak servers: " + err.Error())
+	}
+
+	if tryLoad {
+		newcluster, err := GetRiakCluster(newservers, authOptions)
+		if err == nil {
+			if err := newcluster.Start(); err == nil {
+				log.Infoln("New cluster started")
+
+				if sharedCluster != nil {
+					runtime.SetFinalizer(sharedCluster, sharedCluster.Stop())
+				}
+
+				sharedCluster = newcluster
+				clusterServers = newservers
+			} else {
+				log.Errorln("starting riak cluster, reverting to previous: " + err.Error())
+			}
+		} else {
+			log.Errorln("creating riak cluster, reverting to previous: " + err.Error())
 		}
-	}()
-	return f(cluster)
-}
-
-// StartCluster gets and starts a riak cluster, returning an error if either getting or starting fails.
-func StartCluster(tx *sql.Tx, authOptions *riak.AuthOptions) (StorageCluster, error) {
-	cluster, err := GetRiakClusterTx(tx, authOptions)
-	if err != nil {
-		return nil, errors.New("getting cluster: " + err.Error())
 	}
-	if err = cluster.Start(); err != nil {
-		return nil, errors.New("starting cluster: " + err.Error())
+
+	cluster := sharedCluster
+
+	if cluster == nil {
+		log.Errorln("GetPooledCluster failed, returning nil cluster")
+		return nil, errors.New("GetPooledClusterTX unable to return cluster")
 	}
-	return cluster, nil
+
+	return RiakStorageCluster{Cluster: cluster}, nil
 }
 
-// StopCluster stops the cluster, logging any error rather than returning it. This is designed to be called in a defer.
-func StopCluster(c StorageCluster) {
-	if err := c.Stop(); err != nil {
-		log.Errorln("stopping riak cluster: " + err.Error())
+func WithCluster(tx *sql.Tx, authOpts *riak.AuthOptions, f func(StorageCluster) error) error {
+	cluster, err := GetPooledCluster(tx, authOpts)
+	if err != nil {
+		return errors.New("getting riak pooled cluster: " + err.Error())
 	}
+	return f(cluster)
 }
 
 // Search searches Riak for the given query. Returns nil and a nil error if no object was found.
diff --git a/traffic_ops/traffic_ops_golang/riaksvc/riak_services_test.go b/traffic_ops/traffic_ops_golang/riaksvc/riak_services_test.go
index 974ef4ae6..4d6a01c03 100644
--- a/traffic_ops/traffic_ops_golang/riaksvc/riak_services_test.go
+++ b/traffic_ops/traffic_ops_golang/riaksvc/riak_services_test.go
@@ -164,7 +164,7 @@ func TestGetRiakCluster(t *testing.T) {
 	}
 	defer tx.Commit()
 
-	if _, err := GetRiakClusterTx(tx, nil); err == nil {
+	if _, err := GetRiakCluster(tx, nil); err == nil {
 		t.Errorf("expected an error due to nil RiakAuthoptions in the config but, go no error.")
 	}
 
@@ -174,14 +174,22 @@ func TestGetRiakCluster(t *testing.T) {
 		TlsConfig: &tls.Config{},
 	}
 
-	if _, err := GetRiakClusterTx(tx, &authOptions); err != nil {
+	if _, err := GetRiakCluster(tx, &authOptions); err != nil {
+		t.Errorf("expected no errors, actual: %s.", err)
+	}
+
+	if _, err := GetPooledCluster(tx, &authOptions); err != nil {
 		t.Errorf("expected no errors, actual: %s.", err)
 	}
 
 	rows2 := sqlmock.NewRows([]string{"s.host_name", "s.domain_name"})
 	mock.ExpectQuery("SELECT").WillReturnRows(rows2)
 
-	if _, err := GetRiakClusterTx(tx, &authOptions); err == nil {
+	if _, err := GetRiakCluster(tx, &authOptions); err == nil {
+		t.Errorf("expected an error due to no available riak servers.")
+	}
+
+	if _, err := GetPooledCluster(tx, &authOptions); err == nil {
 		t.Errorf("expected an error due to no available riak servers.")
 	}
 }
diff --git a/traffic_ops/traffic_ops_golang/urisigning.go b/traffic_ops/traffic_ops_golang/urisigning.go
index 74d13b62f..f9e0dd7fa 100644
--- a/traffic_ops/traffic_ops_golang/urisigning.go
+++ b/traffic_ops/traffic_ops_golang/urisigning.go
@@ -65,12 +65,11 @@ func getURIsignkeysHandler(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	cluster, err := riaksvc.StartCluster(inf.Tx.Tx, inf.Config.RiakAuthOptions)
+	cluster, err := riaksvc.GetPooledCluster(inf.Tx.Tx, inf.Config.RiakAuthOptions)
 	if err != nil {
 		api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, errors.New("starting riak cluster: "+err.Error()))
 		return
 	}
-	defer riaksvc.StopCluster(cluster)
 
 	ro, err := riaksvc.FetchObjectValues(xmlID, CDNURIKeysBucket, cluster)
 	if err != nil {
@@ -105,12 +104,11 @@ func removeDeliveryServiceURIKeysHandler(w http.ResponseWriter, r *http.Request)
 		return
 	}
 
-	cluster, err := riaksvc.StartCluster(inf.Tx.Tx, inf.Config.RiakAuthOptions)
+	cluster, err := riaksvc.GetPooledCluster(inf.Tx.Tx, inf.Config.RiakAuthOptions)
 	if err != nil {
 		api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, errors.New("starting riak cluster: "+err.Error()))
 		return
 	}
-	defer riaksvc.StopCluster(cluster)
 
 	ro, err := riaksvc.FetchObjectValues(xmlID, CDNURIKeysBucket, cluster)
 	if err != nil {
@@ -165,12 +163,11 @@ func saveDeliveryServiceURIKeysHandler(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	cluster, err := riaksvc.StartCluster(inf.Tx.Tx, inf.Config.RiakAuthOptions)
+	cluster, err := riaksvc.GetPooledCluster(inf.Tx.Tx, inf.Config.RiakAuthOptions)
 	if err != nil {
 		api.HandleErr(w, r, inf.Tx.Tx, http.StatusInternalServerError, nil, errors.New("starting riak cluster: "+err.Error()))
 		return
 	}
-	defer riaksvc.StopCluster(cluster)
 
 	obj := &riak.Object{
 		ContentType:     "text/json",


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services