You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficcontrol.apache.org by mi...@apache.org on 2020/08/21 18:04:09 UTC
[trafficcontrol] branch master updated: Treat io.EOF as an error
and retry up to 10 times for files with that (#4978)
This is an automated email from the ASF dual-hosted git repository.
mitchell852 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficcontrol.git
The following commit(s) were added to refs/heads/master by this push:
new e1eb046 Treat io.EOF as an error and retry up to 10 times for files with that (#4978)
e1eb046 is described below
commit e1eb0462b25e2567e41ed29bac5d55de80bc16e6
Author: Zach Hoffman <za...@zrhoffman.net>
AuthorDate: Fri Aug 21 18:03:55 2020 +0000
Treat io.EOF as an error and retry up to 10 times for files with that (#4978)
error
---
infrastructure/cdn-in-a-box/enroller/enroller.go | 57 +++++++++++++++++-------
1 file changed, 40 insertions(+), 17 deletions(-)
diff --git a/infrastructure/cdn-in-a-box/enroller/enroller.go b/infrastructure/cdn-in-a-box/enroller/enroller.go
index f3a9cbd..0beecb6 100644
--- a/infrastructure/cdn-in-a-box/enroller/enroller.go
+++ b/infrastructure/cdn-in-a-box/enroller/enroller.go
@@ -27,6 +27,7 @@ import (
"net/url"
"os"
"path/filepath"
+ "regexp"
"strings"
"time"
@@ -82,7 +83,7 @@ func enrollType(toSession *session, r io.Reader) error {
dec := json.NewDecoder(r)
var s tc.Type
err := dec.Decode(&s)
- if err != nil && err != io.EOF {
+ if err != nil {
log.Infof("error decoding Type: %s\n", err)
return err
}
@@ -109,7 +110,7 @@ func enrollCDN(toSession *session, r io.Reader) error {
dec := json.NewDecoder(r)
var s tc.CDN
err := dec.Decode(&s)
- if err != nil && err != io.EOF {
+ if err != nil {
log.Infof("error decoding CDN: %s\n", err)
return err
}
@@ -135,7 +136,7 @@ func enrollASN(toSession *session, r io.Reader) error {
dec := json.NewDecoder(r)
var s tc.ASN
err := dec.Decode(&s)
- if err != nil && err != io.EOF {
+ if err != nil {
log.Infof("error decoding ASN: %s\n", err)
return err
}
@@ -162,7 +163,7 @@ func enrollCachegroup(toSession *session, r io.Reader) error {
dec := json.NewDecoder(r)
var s tc.CacheGroupNullable
err := dec.Decode(&s)
- if err != nil && err != io.EOF {
+ if err != nil {
log.Infof("error decoding Cachegroup: %s\n", err)
return err
}
@@ -188,7 +189,7 @@ func enrollDeliveryService(toSession *session, r io.Reader) error {
dec := json.NewDecoder(r)
var s tc.DeliveryServiceNullable
err := dec.Decode(&s)
- if err != nil && err != io.EOF {
+ if err != nil {
log.Infof("error decoding DeliveryService: %s\n", err)
return err
}
@@ -216,7 +217,7 @@ func enrollDeliveryServiceServer(toSession *session, r io.Reader) error {
// DeliveryServiceServers lists ds xmlid and array of server names. Use that to create multiple DeliveryServiceServer objects
var dss tc.DeliveryServiceServers
err := dec.Decode(&dss)
- if err != nil && err != io.EOF {
+ if err != nil {
log.Infof("error decoding DeliveryServiceServer: %s\n", err)
return err
}
@@ -258,7 +259,7 @@ func enrollDivision(toSession *session, r io.Reader) error {
dec := json.NewDecoder(r)
var s tc.Division
err := dec.Decode(&s)
- if err != nil && err != io.EOF {
+ if err != nil {
log.Infof("error decoding Division: %s\n", err)
return err
}
@@ -284,7 +285,7 @@ func enrollOrigin(toSession *session, r io.Reader) error {
dec := json.NewDecoder(r)
var s tc.Origin
err := dec.Decode(&s)
- if err != nil && err != io.EOF {
+ if err != nil {
log.Infof("error decoding Origin: %s\n", err)
return err
}
@@ -310,7 +311,7 @@ func enrollParameter(toSession *session, r io.Reader) error {
dec := json.NewDecoder(r)
var params []tc.Parameter
err := dec.Decode(¶ms)
- if err != nil && err != io.EOF {
+ if err != nil {
log.Infof("error decoding Parameter: %s\n", err)
return err
}
@@ -375,7 +376,7 @@ func enrollPhysLocation(toSession *session, r io.Reader) error {
dec := json.NewDecoder(r)
var s tc.PhysLocation
err := dec.Decode(&s)
- if err != nil && err != io.EOF {
+ if err != nil {
log.Infof("error decoding PhysLocation: %s\n", err)
return err
}
@@ -401,7 +402,7 @@ func enrollRegion(toSession *session, r io.Reader) error {
dec := json.NewDecoder(r)
var s tc.Region
err := dec.Decode(&s)
- if err != nil && err != io.EOF {
+ if err != nil {
log.Infof("error decoding Region: %s\n", err)
return err
}
@@ -427,7 +428,7 @@ func enrollStatus(toSession *session, r io.Reader) error {
dec := json.NewDecoder(r)
var s tc.StatusNullable
err := dec.Decode(&s)
- if err != nil && err != io.EOF {
+ if err != nil {
log.Infof("error decoding Status: %s\n", err)
return err
}
@@ -453,7 +454,7 @@ func enrollTenant(toSession *session, r io.Reader) error {
dec := json.NewDecoder(r)
var s tc.Tenant
err := dec.Decode(&s)
- if err != nil && err != io.EOF {
+ if err != nil {
log.Infof("error decoding Tenant: %s\n", err)
return err
}
@@ -480,7 +481,7 @@ func enrollUser(toSession *session, r io.Reader) error {
var s tc.User
err := dec.Decode(&s)
log.Infof("User is %++v\n", s)
- if err != nil && err != io.EOF {
+ if err != nil {
log.Infof("error decoding User: %s\n", err)
return err
}
@@ -508,7 +509,7 @@ func enrollProfile(toSession *session, r io.Reader) error {
var profile tc.Profile
err := dec.Decode(&profile)
- if err != nil && err != io.EOF {
+ if err != nil {
log.Infof("error decoding Profile: %s\n", err)
return err
}
@@ -623,7 +624,7 @@ func enrollServer(toSession *session, r io.Reader) error {
dec := json.NewDecoder(r)
var s tc.ServerNullable
err := dec.Decode(&s)
- if err != nil && err != io.EOF {
+ if err != nil {
log.Infof("error decoding Server: %s\n", err)
return err
}
@@ -659,7 +660,12 @@ func newDirWatcher(toSession *session) (*dirWatcher, error) {
const (
processed = ".processed"
rejected = ".rejected"
+ retry = ".retry"
)
+ originalNameRegex := regexp.MustCompile(`(\.retry)*$`)
+
+ emptyCount := map[string]int{}
+ const maxEmptyTries = 10
for {
select {
@@ -692,10 +698,27 @@ func newDirWatcher(toSession *session) (*dirWatcher, error) {
if f, ok := dw.watched[dir]; ok {
t := filepath.Base(dir)
log.Infoln("creating " + t + " from " + event.Name)
- // TODO: ensure file content is there before attempting to read. For now, this does the trick..
+ // Sleep for 100 milliseconds so that the file content is probably there when the directory watcher
+ // sees the file
time.Sleep(100 * time.Millisecond)
err := f(toSession, event.Name)
+ // If a file is empty, try reading from it 10 times before giving up on that file
+ if err == io.EOF {
+ originalName := originalNameRegex.ReplaceAllString(event.Name, "")
+ if _, exists := emptyCount[originalName]; !exists {
+ emptyCount[originalName] = 0
+ }
+ emptyCount[originalName]++
+ log.Infof("empty json object %s: %s\ntried file %d out of %d times", originalName, err.Error(), emptyCount[originalName], maxEmptyTries)
+ if emptyCount[originalName] < maxEmptyTries {
+ newName := event.Name + retry
+ if err := os.Rename(event.Name, newName); err != nil {
+ log.Infof("error renaming %s to %s: %s", event.Name, newName, err)
+ }
+ continue
+ }
+ }
if err != nil {
log.Infof("error creating %s from %s: %s\n", dir, event.Name, err.Error())
} else {