You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficcontrol.apache.org by mi...@apache.org on 2020/08/21 18:04:09 UTC

[trafficcontrol] branch master updated: Treat io.EOF as an error and retry up to 10 times for files with that (#4978)

This is an automated email from the ASF dual-hosted git repository.

mitchell852 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficcontrol.git


The following commit(s) were added to refs/heads/master by this push:
     new e1eb046  Treat io.EOF as an error and retry up to 10 times for files with that (#4978)
e1eb046 is described below

commit e1eb0462b25e2567e41ed29bac5d55de80bc16e6
Author: Zach Hoffman <za...@zrhoffman.net>
AuthorDate: Fri Aug 21 18:03:55 2020 +0000

    Treat io.EOF as an error and retry up to 10 times for files with that (#4978)
    
    error
---
 infrastructure/cdn-in-a-box/enroller/enroller.go | 57 +++++++++++++++++-------
 1 file changed, 40 insertions(+), 17 deletions(-)

diff --git a/infrastructure/cdn-in-a-box/enroller/enroller.go b/infrastructure/cdn-in-a-box/enroller/enroller.go
index f3a9cbd..0beecb6 100644
--- a/infrastructure/cdn-in-a-box/enroller/enroller.go
+++ b/infrastructure/cdn-in-a-box/enroller/enroller.go
@@ -27,6 +27,7 @@ import (
 	"net/url"
 	"os"
 	"path/filepath"
+	"regexp"
 	"strings"
 	"time"
 
@@ -82,7 +83,7 @@ func enrollType(toSession *session, r io.Reader) error {
 	dec := json.NewDecoder(r)
 	var s tc.Type
 	err := dec.Decode(&s)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		log.Infof("error decoding Type: %s\n", err)
 		return err
 	}
@@ -109,7 +110,7 @@ func enrollCDN(toSession *session, r io.Reader) error {
 	dec := json.NewDecoder(r)
 	var s tc.CDN
 	err := dec.Decode(&s)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		log.Infof("error decoding CDN: %s\n", err)
 		return err
 	}
@@ -135,7 +136,7 @@ func enrollASN(toSession *session, r io.Reader) error {
 	dec := json.NewDecoder(r)
 	var s tc.ASN
 	err := dec.Decode(&s)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		log.Infof("error decoding ASN: %s\n", err)
 		return err
 	}
@@ -162,7 +163,7 @@ func enrollCachegroup(toSession *session, r io.Reader) error {
 	dec := json.NewDecoder(r)
 	var s tc.CacheGroupNullable
 	err := dec.Decode(&s)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		log.Infof("error decoding Cachegroup: %s\n", err)
 		return err
 	}
@@ -188,7 +189,7 @@ func enrollDeliveryService(toSession *session, r io.Reader) error {
 	dec := json.NewDecoder(r)
 	var s tc.DeliveryServiceNullable
 	err := dec.Decode(&s)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		log.Infof("error decoding DeliveryService: %s\n", err)
 		return err
 	}
@@ -216,7 +217,7 @@ func enrollDeliveryServiceServer(toSession *session, r io.Reader) error {
 	// DeliveryServiceServers lists ds xmlid and array of server names.  Use that to create multiple DeliveryServiceServer objects
 	var dss tc.DeliveryServiceServers
 	err := dec.Decode(&dss)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		log.Infof("error decoding DeliveryServiceServer: %s\n", err)
 		return err
 	}
@@ -258,7 +259,7 @@ func enrollDivision(toSession *session, r io.Reader) error {
 	dec := json.NewDecoder(r)
 	var s tc.Division
 	err := dec.Decode(&s)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		log.Infof("error decoding Division: %s\n", err)
 		return err
 	}
@@ -284,7 +285,7 @@ func enrollOrigin(toSession *session, r io.Reader) error {
 	dec := json.NewDecoder(r)
 	var s tc.Origin
 	err := dec.Decode(&s)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		log.Infof("error decoding Origin: %s\n", err)
 		return err
 	}
@@ -310,7 +311,7 @@ func enrollParameter(toSession *session, r io.Reader) error {
 	dec := json.NewDecoder(r)
 	var params []tc.Parameter
 	err := dec.Decode(&params)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		log.Infof("error decoding Parameter: %s\n", err)
 		return err
 	}
@@ -375,7 +376,7 @@ func enrollPhysLocation(toSession *session, r io.Reader) error {
 	dec := json.NewDecoder(r)
 	var s tc.PhysLocation
 	err := dec.Decode(&s)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		log.Infof("error decoding PhysLocation: %s\n", err)
 		return err
 	}
@@ -401,7 +402,7 @@ func enrollRegion(toSession *session, r io.Reader) error {
 	dec := json.NewDecoder(r)
 	var s tc.Region
 	err := dec.Decode(&s)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		log.Infof("error decoding Region: %s\n", err)
 		return err
 	}
@@ -427,7 +428,7 @@ func enrollStatus(toSession *session, r io.Reader) error {
 	dec := json.NewDecoder(r)
 	var s tc.StatusNullable
 	err := dec.Decode(&s)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		log.Infof("error decoding Status: %s\n", err)
 		return err
 	}
@@ -453,7 +454,7 @@ func enrollTenant(toSession *session, r io.Reader) error {
 	dec := json.NewDecoder(r)
 	var s tc.Tenant
 	err := dec.Decode(&s)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		log.Infof("error decoding Tenant: %s\n", err)
 		return err
 	}
@@ -480,7 +481,7 @@ func enrollUser(toSession *session, r io.Reader) error {
 	var s tc.User
 	err := dec.Decode(&s)
 	log.Infof("User is %++v\n", s)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		log.Infof("error decoding User: %s\n", err)
 		return err
 	}
@@ -508,7 +509,7 @@ func enrollProfile(toSession *session, r io.Reader) error {
 	var profile tc.Profile
 
 	err := dec.Decode(&profile)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		log.Infof("error decoding Profile: %s\n", err)
 		return err
 	}
@@ -623,7 +624,7 @@ func enrollServer(toSession *session, r io.Reader) error {
 	dec := json.NewDecoder(r)
 	var s tc.ServerNullable
 	err := dec.Decode(&s)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		log.Infof("error decoding Server: %s\n", err)
 		return err
 	}
@@ -659,7 +660,12 @@ func newDirWatcher(toSession *session) (*dirWatcher, error) {
 		const (
 			processed = ".processed"
 			rejected  = ".rejected"
+			retry  = ".retry"
 		)
+		originalNameRegex := regexp.MustCompile(`(\.retry)*$`)
+
+		emptyCount := map[string]int{}
+		const maxEmptyTries = 10
 
 		for {
 			select {
@@ -692,10 +698,27 @@ func newDirWatcher(toSession *session) (*dirWatcher, error) {
 				if f, ok := dw.watched[dir]; ok {
 					t := filepath.Base(dir)
 					log.Infoln("creating " + t + " from " + event.Name)
-					// TODO: ensure file content is there before attempting to read.  For now, this does the trick..
+					// Sleep for 100 milliseconds so that the file content is probably there when the directory watcher
+					// sees the file
 					time.Sleep(100 * time.Millisecond)
 
 					err := f(toSession, event.Name)
+					// If a file is empty, try reading from it 10 times before giving up on that file
+					if err == io.EOF {
+						originalName := originalNameRegex.ReplaceAllString(event.Name, "")
+						if _, exists := emptyCount[originalName]; !exists {
+							emptyCount[originalName] = 0
+						}
+						emptyCount[originalName]++
+						log.Infof("empty json object %s: %s\ntried file %d out of %d times", originalName, err.Error(), emptyCount[originalName], maxEmptyTries)
+						if emptyCount[originalName] < maxEmptyTries {
+							newName := event.Name + retry
+							if err := os.Rename(event.Name, newName); err != nil {
+								log.Infof("error renaming %s to %s: %s", event.Name, newName, err)
+							}
+							continue
+						}
+					}
 					if err != nil {
 						log.Infof("error creating %s from %s: %s\n", dir, event.Name, err.Error())
 					} else {