You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by da...@apache.org on 2018/08/21 07:15:26 UTC

[incubator-openwhisk-deploy-kube] branch master updated: Remove prototype log processing code from invoker agent (#275)

This is an automated email from the ASF dual-hosted git repository.

daisyguo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-deploy-kube.git


The following commit(s) were added to refs/heads/master by this push:
     new 63cd5e3  Remove prototype log processing code from invoker agent (#275)
63cd5e3 is described below

commit 63cd5e3d239eb35edc6b03eeb035b8cedd97c7d6
Author: David Grove <dg...@users.noreply.github.com>
AuthorDate: Tue Aug 21 03:15:24 2018 -0400

    Remove prototype log processing code from invoker agent (#275)
    
    A better design is to focus on enhancing the upstream logging
    infrastructure so that user container logs can be fed directly
    into the platform logging service.  This prototype had unsolved
    issues with log rotation (not supported), so removing the partially
    completed functionality.
---
 docker/README.md             |   2 +-
 docker/invoker-agent/main.go | 185 -------------------------------------------
 2 files changed, 1 insertion(+), 186 deletions(-)

diff --git a/docker/README.md b/docker/README.md
index 22a2db9..25f3c54 100644
--- a/docker/README.md
+++ b/docker/README.md
@@ -29,4 +29,4 @@ The built images are:
   * whisk-script-runner - An alpine-based utility image for running
     simple bash scripts that need the `wsk` cli available to them.
   * invoker-agent - worker node invoker agent -- used to implement
-    suspend/resume and log consolidation ops for a remote invoker
+    suspend/resume on action containers for a remote invoker
diff --git a/docker/invoker-agent/main.go b/docker/invoker-agent/main.go
index e37e0bf..1a93678 100644
--- a/docker/invoker-agent/main.go
+++ b/docker/invoker-agent/main.go
@@ -18,11 +18,8 @@
 package main
 
 import (
-	"bufio"
-	"encoding/json"
 	"fmt"
 	"github.com/gorilla/mux"
-	"io/ioutil"
 	"log"
 	"net"
 	"net/http"
@@ -32,24 +29,6 @@ import (
 	"time"
 )
 
-/* JSON structure expected as request body on /logs route */
-type LogForwardInfo struct {
-	LastOffset             int64  `json:"lastOffset"`               // last offset read from this container's log
-	SizeLimit              int    `json:"sizeLimit"`                // size limit on logs read in bytes
-	SentinelledLogs        bool   `json:"sentinelledLogs"`          // does an action's log end with sentinel lines?
-	EncodedLogLineMetadata string `json:"encodedLogLineMetadata"`   // string to be injected in every log line
-	EncodedActivation      string `json:"encodedActivation"`        // extra line to injected after all log lines are read
-}
-
-/* Size threshold for individual output files written by the logWriter */
-
-/* String constants related to logging */
-const (
-	logSentinelLine        = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
-	truncatedLogMessage    = "Logs were truncated because the total bytes size exceeds the limit of %d bytes."
-	genericLogErrorMessage = "There was an issue while collecting your logs. Data might be missing."
-)
-
 /* Should we measure and report time taken for each operation? */
 const timeOps = false
 
@@ -57,160 +36,12 @@ const timeOps = false
 var (
 	dockerSock       string = "/var/run/docker.sock"
 	containerDir     string = "/containers"
-	outputLogDir     string = "/action-logs"
 	invokerAgentPort int    = 3233
-	logSinkSize      int64  = 100 * 1024 * 1024
 )
 
 /* http.Client instance bound to dockerSock */
 var client *http.Client
 
-/* channel to send log lines to the logWriter */
-var logSinkChannel chan string
-
-/*
- * Support for writing log lines to the logSink
- */
-
-// go routine that accepts log lines from the logSinkChannel and writes them to the logSink
-func logWriter() {
-	var sinkFile *os.File = nil
-	var sinkFileBytes int64 = 0
-	var err error
-
-	for {
-		line := <-logSinkChannel
-
-		if sinkFile == nil {
-			timestamp := time.Now().UnixNano() / 1000000
-			fname := fmt.Sprintf("%s/userlogs-%d.log", outputLogDir, timestamp)
-			sinkFile, err = os.Create(fname)
-			if err != nil {
-				fmt.Fprintf(os.Stderr, "Unable to create log sink: %v\n", err)
-				panic(err)
-			}
-			sinkFileBytes = 0
-		}
-
-		bytesWritten, err := fmt.Fprintln(sinkFile, line)
-		if err != nil {
-			fmt.Fprintf(os.Stderr, "Error writing to log sink: %v\n", err)
-			panic(err)
-		}
-
-		sinkFileBytes += int64(bytesWritten)
-		if sinkFileBytes > logSinkSize {
-			sinkFile.Close()
-			sinkFile = nil
-		}
-	}
-}
-
-func writeSyntheticLogLine(msg string, metadata string) {
-	now := time.Now().UTC().Format(time.RFC3339)
-	line := fmt.Sprintf("{\"log\":\"%s\", \"stream\":\"stderr\", \"time\":\"%s\",%s}", msg, now, metadata)
-	logSinkChannel <- line
-}
-
-func reportLoggingError(w http.ResponseWriter, code int, msg string, metadata string) {
-	w.WriteHeader(code)
-	fmt.Fprint(w, msg)
-	fmt.Fprintln(os.Stderr, msg)
-	if metadata != "" {
-		writeSyntheticLogLine(genericLogErrorMessage, metadata)
-	}
-}
-
-// Request handler for /logs/<container> route
-// The container was given as part of the URL; gorilla makes it available in vars["container"]
-// The JSON body of the request is expected to contain the fields specified by the
-// LogForwardInfo struct defined above.
-// If logs are successfully forwarded, the ending offset of the log file is returned
-// to be used in a subsequent call to the /logs/<container> route.
-func forwardLogsFromUserAction(w http.ResponseWriter, r *http.Request) {
-	var start time.Time
-	if timeOps {
-		start = time.Now()
-	}
-
-	vars := mux.Vars(r)
-	container := vars["container"]
-
-	var lfi LogForwardInfo
-	b, err := ioutil.ReadAll(r.Body)
-	defer r.Body.Close()
-	if err != nil {
-		reportLoggingError(w, 400, fmt.Sprintf("Error reading request body: %v", err), "")
-		return
-	}
-	err = json.Unmarshal(b, &lfi)
-	if err != nil {
-		reportLoggingError(w, 400, fmt.Sprint("Error unmarshalling request body: %v", err), "")
-		return
-	}
-
-	logFileName := containerDir + "/" + container + "/" + container + "-json.log"
-	logFile, err := os.Open(logFileName)
-	defer logFile.Close()
-	if err != nil {
-		reportLoggingError(w, 500, fmt.Sprintf("Error opening %s: %v", logFileName, err), lfi.EncodedLogLineMetadata)
-		logSinkChannel <- lfi.EncodedActivation // Write activation record before returning with error code.
-		return
-	}
-
-	offset, err := logFile.Seek(lfi.LastOffset, 0)
-	if offset != lfi.LastOffset || err != nil {
-		reportLoggingError(w, 500, fmt.Sprintf("Unable to seek to %d in log file", lfi.LastOffset), lfi.EncodedLogLineMetadata)
-		logSinkChannel <- lfi.EncodedActivation // Write activation record before returning with error code.
-		return
-	}
-
-	sentinelsLeft := 2
-	scanner := bufio.NewScanner(logFile)
-	bytesWritten := 0
-	for sentinelsLeft > 0 && scanner.Scan() {
-		logLine := scanner.Text()
-		if lfi.SentinelledLogs && strings.Contains(logLine, logSentinelLine) {
-			sentinelsLeft -= 1
-		} else {
-			logLineLen := len(logLine)
-			bytesWritten += logLineLen
-			mungedLine := fmt.Sprintf("%s,%s}", logLine[:logLineLen-1], lfi.EncodedLogLineMetadata)
-			logSinkChannel <- mungedLine
-			if bytesWritten > lfi.SizeLimit {
-				writeSyntheticLogLine(fmt.Sprintf(truncatedLogMessage, lfi.SizeLimit), lfi.EncodedLogLineMetadata)
-				logFile.Seek(0, 2) // Seek to end of logfile to skip rest of output and prepare for next action invoke
-				sentinelsLeft = 0  // Cause loop to exit now.
-			}
-		}
-	}
-
-	if lfi.SentinelledLogs && sentinelsLeft != 0 {
-		reportLoggingError(w, 500, "Failed to find expected sentinels in log file", lfi.EncodedLogLineMetadata)
-		logSinkChannel <- lfi.EncodedActivation // Write activation record before returning with error code.
-		return
-	}
-
-	// Done copying log; write the activation record.
-	logSinkChannel <- lfi.EncodedActivation
-
-	// seek 0 bytes from current position to set logFileOffset to current fpos
-	logFileOffset, err := logFile.Seek(0, 1)
-	if err != nil {
-		reportLoggingError(w, 500, fmt.Sprintf("Unable to determine current offset in log file: %v", err), lfi.EncodedLogLineMetadata)
-		return
-	}
-
-	// Success; return updated logFileOffset to invoker
-	w.WriteHeader(200)
-	fmt.Fprintf(w, "%d", logFileOffset)
-
-	if timeOps {
-		end := time.Now()
-		elapsed := end.Sub(start)
-		fmt.Fprintf(os.Stdout, "LogForward took %s\n", elapsed.String())
-	}
-}
 
 /*
  * Suppout for suspend/resume operations
@@ -287,9 +118,6 @@ func initializeFromEnv() {
 	if os.Getenv("INVOKER_AGENT_CONTAINER_DIR") != "" {
 		containerDir = os.Getenv("INVOKER_AGENT_CONTAINER_DIR")
 	}
-	if os.Getenv("INVOKER_AGENT_OUTPUT_LOG_DIR") != "" {
-		outputLogDir = os.Getenv("INVOKER_AGENT_OUTPUT_LOG_DIR")
-	}
 	if os.Getenv("INVOKER_AGENT_PORT") != "" {
 		str := os.Getenv("INVOKER_AGENT_PORT")
 		invokerAgentPort, err = strconv.Atoi(str)
@@ -298,19 +126,10 @@ func initializeFromEnv() {
 			panic(err)
 		}
 	}
-	if os.Getenv("INVOKER_AGENT_LOG_SINK_SIZE") != "" {
-		str := os.Getenv("INVOKER_AGENT_LOG_SINK_SIZE")
-		logSinkSize, err = strconv.ParseInt(str, 10, 64)
-		if err != nil {
-			fmt.Fprintf(os.Stderr, "Invalid INVOKER_AGENT_LOG_SINK_SIZE %s; error was %v\n", str, err)
-			panic(err)
-		}
-	}
 }
 
 func handleRequests() {
 	myRouter := mux.NewRouter().StrictSlash(true)
-	myRouter.HandleFunc("/logs/{container}", forwardLogsFromUserAction)
 	myRouter.HandleFunc("/suspend/{container}", suspendUserAction)
 	myRouter.HandleFunc("/resume/{container}", resumeUserAction)
 	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", invokerAgentPort), myRouter))
@@ -328,9 +147,5 @@ func main() {
 	}
 	client = &http.Client{Transport: tr}
 
-	// initialize logSink subsystem & schedule logWrite go routine
-	logSinkChannel = make(chan string)
-	go logWriter()
-
 	handleRequests()
 }