You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/08/21 07:15:26 UTC

[GitHub] daisy-ycguo closed pull request #275: Remove prototype log processing code from invoker agent

daisy-ycguo closed pull request #275: Remove prototype log processing code from invoker agent
URL: https://github.com/apache/incubator-openwhisk-deploy-kube/pull/275
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docker/README.md b/docker/README.md
index 22a2db9..25f3c54 100644
--- a/docker/README.md
+++ b/docker/README.md
@@ -29,4 +29,4 @@ The built images are:
   * whisk-script-runner - An alpine-based utility image for running
     simple bash scripts that need the `wsk` cli available to them.
   * invoker-agent - worker node invoker agent -- used to implement
-    suspend/resume and log consolidation ops for a remote invoker
+    suspend/resume on action containers for a remote invoker
diff --git a/docker/invoker-agent/main.go b/docker/invoker-agent/main.go
index e37e0bf..1a93678 100644
--- a/docker/invoker-agent/main.go
+++ b/docker/invoker-agent/main.go
@@ -18,11 +18,8 @@
 package main
 
 import (
-	"bufio"
-	"encoding/json"
 	"fmt"
 	"github.com/gorilla/mux"
-	"io/ioutil"
 	"log"
 	"net"
 	"net/http"
@@ -32,24 +29,6 @@ import (
 	"time"
 )
 
-/* JSON structure expected as request body on /logs route */
-type LogForwardInfo struct {
-	LastOffset             int64  `json:"lastOffset"`               // last offset read from this container's log
-	SizeLimit              int    `json:"sizeLimit"`                // size limit on logs read in bytes
-	SentinelledLogs        bool   `json:"sentinelledLogs"`          // does an action's log end with sentinel lines?
-	EncodedLogLineMetadata string `json:"encodedLogLineMetadata"`   // string to be injected in every log line
-	EncodedActivation      string `json:"encodedActivation"`        // extra line to injected after all log lines are read
-}
-
-/* Size threshold for individual output files written by the logWriter */
-
-/* String constants related to logging */
-const (
-	logSentinelLine        = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
-	truncatedLogMessage    = "Logs were truncated because the total bytes size exceeds the limit of %d bytes."
-	genericLogErrorMessage = "There was an issue while collecting your logs. Data might be missing."
-)
-
 /* Should we measure and report time taken for each operation? */
 const timeOps = false
 
@@ -57,160 +36,12 @@ const timeOps = false
 var (
 	dockerSock       string = "/var/run/docker.sock"
 	containerDir     string = "/containers"
-	outputLogDir     string = "/action-logs"
 	invokerAgentPort int    = 3233
-	logSinkSize      int64  = 100 * 1024 * 1024
 )
 
 /* http.Client instance bound to dockerSock */
 var client *http.Client
 
-/* channel to send log lines to the logWriter */
-var logSinkChannel chan string
-
-/*
- * Support for writing log lines to the logSink
- */
-
-// go routine that accepts log lines from the logSinkChannel and writes them to the logSink
-func logWriter() {
-	var sinkFile *os.File = nil
-	var sinkFileBytes int64 = 0
-	var err error
-
-	for {
-		line := <-logSinkChannel
-
-		if sinkFile == nil {
-			timestamp := time.Now().UnixNano() / 1000000
-			fname := fmt.Sprintf("%s/userlogs-%d.log", outputLogDir, timestamp)
-			sinkFile, err = os.Create(fname)
-			if err != nil {
-				fmt.Fprintf(os.Stderr, "Unable to create log sink: %v\n", err)
-				panic(err)
-			}
-			sinkFileBytes = 0
-		}
-
-		bytesWritten, err := fmt.Fprintln(sinkFile, line)
-		if err != nil {
-			fmt.Fprintf(os.Stderr, "Error writing to log sink: %v\n", err)
-			panic(err)
-		}
-
-		sinkFileBytes += int64(bytesWritten)
-		if sinkFileBytes > logSinkSize {
-			sinkFile.Close()
-			sinkFile = nil
-		}
-	}
-}
-
-func writeSyntheticLogLine(msg string, metadata string) {
-	now := time.Now().UTC().Format(time.RFC3339)
-	line := fmt.Sprintf("{\"log\":\"%s\", \"stream\":\"stderr\", \"time\":\"%s\",%s}", msg, now, metadata)
-	logSinkChannel <- line
-}
-
-func reportLoggingError(w http.ResponseWriter, code int, msg string, metadata string) {
-	w.WriteHeader(code)
-	fmt.Fprint(w, msg)
-	fmt.Fprintln(os.Stderr, msg)
-	if metadata != "" {
-		writeSyntheticLogLine(genericLogErrorMessage, metadata)
-	}
-}
-
-// Request handler for /logs/<container> route
-// The container was given as part of the URL; gorilla makes it available in vars["container"]
-// The JSON body of the request is expected to contain the fields specified by the
-// LogForwardInfo struct defined above.
-// If logs are successfully forwarded, the ending offset of the log file is returned
-// to be used in a subsequent call to the /logs/<container> route.
-func forwardLogsFromUserAction(w http.ResponseWriter, r *http.Request) {
-	var start time.Time
-	if timeOps {
-		start = time.Now()
-	}
-
-	vars := mux.Vars(r)
-	container := vars["container"]
-
-	var lfi LogForwardInfo
-	b, err := ioutil.ReadAll(r.Body)
-	defer r.Body.Close()
-	if err != nil {
-		reportLoggingError(w, 400, fmt.Sprintf("Error reading request body: %v", err), "")
-		return
-	}
-	err = json.Unmarshal(b, &lfi)
-	if err != nil {
-		reportLoggingError(w, 400, fmt.Sprint("Error unmarshalling request body: %v", err), "")
-		return
-	}
-
-	logFileName := containerDir + "/" + container + "/" + container + "-json.log"
-	logFile, err := os.Open(logFileName)
-	defer logFile.Close()
-	if err != nil {
-		reportLoggingError(w, 500, fmt.Sprintf("Error opening %s: %v", logFileName, err), lfi.EncodedLogLineMetadata)
-		logSinkChannel <- lfi.EncodedActivation // Write activation record before returning with error code.
-		return
-	}
-
-	offset, err := logFile.Seek(lfi.LastOffset, 0)
-	if offset != lfi.LastOffset || err != nil {
-		reportLoggingError(w, 500, fmt.Sprintf("Unable to seek to %d in log file", lfi.LastOffset), lfi.EncodedLogLineMetadata)
-		logSinkChannel <- lfi.EncodedActivation // Write activation record before returning with error code.
-		return
-	}
-
-	sentinelsLeft := 2
-	scanner := bufio.NewScanner(logFile)
-	bytesWritten := 0
-	for sentinelsLeft > 0 && scanner.Scan() {
-		logLine := scanner.Text()
-		if lfi.SentinelledLogs && strings.Contains(logLine, logSentinelLine) {
-			sentinelsLeft -= 1
-		} else {
-			logLineLen := len(logLine)
-			bytesWritten += logLineLen
-			mungedLine := fmt.Sprintf("%s,%s}", logLine[:logLineLen-1], lfi.EncodedLogLineMetadata)
-			logSinkChannel <- mungedLine
-			if bytesWritten > lfi.SizeLimit {
-				writeSyntheticLogLine(fmt.Sprintf(truncatedLogMessage, lfi.SizeLimit), lfi.EncodedLogLineMetadata)
-				logFile.Seek(0, 2) // Seek to end of logfile to skip rest of output and prepare for next action invoke
-				sentinelsLeft = 0  // Cause loop to exit now.
-			}
-		}
-	}
-
-	if lfi.SentinelledLogs && sentinelsLeft != 0 {
-		reportLoggingError(w, 500, "Failed to find expected sentinels in log file", lfi.EncodedLogLineMetadata)
-		logSinkChannel <- lfi.EncodedActivation // Write activation record before returning with error code.
-		return
-	}
-
-	// Done copying log; write the activation record.
-	logSinkChannel <- lfi.EncodedActivation
-
-	// seek 0 bytes from current position to set logFileOffset to current fpos
-	logFileOffset, err := logFile.Seek(0, 1)
-	if err != nil {
-		reportLoggingError(w, 500, fmt.Sprintf("Unable to determine current offset in log file: %v", err), lfi.EncodedLogLineMetadata)
-		return
-	}
-
-	// Success; return updated logFileOffset to invoker
-	w.WriteHeader(200)
-	fmt.Fprintf(w, "%d", logFileOffset)
-
-	if timeOps {
-		end := time.Now()
-		elapsed := end.Sub(start)
-		fmt.Fprintf(os.Stdout, "LogForward took %s\n", elapsed.String())
-	}
-}
 
 /*
  * Suppout for suspend/resume operations
@@ -287,9 +118,6 @@ func initializeFromEnv() {
 	if os.Getenv("INVOKER_AGENT_CONTAINER_DIR") != "" {
 		containerDir = os.Getenv("INVOKER_AGENT_CONTAINER_DIR")
 	}
-	if os.Getenv("INVOKER_AGENT_OUTPUT_LOG_DIR") != "" {
-		outputLogDir = os.Getenv("INVOKER_AGENT_OUTPUT_LOG_DIR")
-	}
 	if os.Getenv("INVOKER_AGENT_PORT") != "" {
 		str := os.Getenv("INVOKER_AGENT_PORT")
 		invokerAgentPort, err = strconv.Atoi(str)
@@ -298,19 +126,10 @@ func initializeFromEnv() {
 			panic(err)
 		}
 	}
-	if os.Getenv("INVOKER_AGENT_LOG_SINK_SIZE") != "" {
-		str := os.Getenv("INVOKER_AGENT_LOG_SINK_SIZE")
-		logSinkSize, err = strconv.ParseInt(str, 10, 64)
-		if err != nil {
-			fmt.Fprintf(os.Stderr, "Invalid INVOKER_AGENT_LOG_SINK_SIZE %s; error was %v\n", str, err)
-			panic(err)
-		}
-	}
 }
 
 func handleRequests() {
 	myRouter := mux.NewRouter().StrictSlash(true)
-	myRouter.HandleFunc("/logs/{container}", forwardLogsFromUserAction)
 	myRouter.HandleFunc("/suspend/{container}", suspendUserAction)
 	myRouter.HandleFunc("/resume/{container}", resumeUserAction)
 	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", invokerAgentPort), myRouter))
@@ -328,9 +147,5 @@ func main() {
 	}
 	client = &http.Client{Transport: tr}
 
-	// initialize logSink subsystem & schedule logWrite go routine
-	logSinkChannel = make(chan string)
-	go logWriter()
-
 	handleRequests()
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services