You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/11/29 20:58:24 UTC

[GitHub] dubee closed pull request #61: Performance boost

dubee closed pull request #61: Performance boost
URL: https://github.com/apache/incubator-openwhisk-runtime-go/pull/61
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/examples/benchmark/Makefile b/examples/benchmark/Makefile
new file mode 100644
index 0000000..22a03f4
--- /dev/null
+++ b/examples/benchmark/Makefile
@@ -0,0 +1,23 @@
+IMG?=whisk/actionloop-golang-v1.11
+IMG2?=whisk/actionloop
+
+all: golang bash
+
+test.lua:
+	echo 'wrk.method = "POST"'>test.lua
+	echo "wrk.body = '{\"value\":{\"name\":\"Mike\"}}'">>test.lua
+	echo 'wrk.headers["Content-Type"] = "application/json"'>>test.lua
+
+golang: test.lua
+	docker run -d --name under-test --rm -p 8080:8080 $(IMG)
+	bash init.sh main.go
+	wrk -t1 -c1 -stest.lua http://localhost:8080/run
+	docker kill under-test
+
+bash: test.lua
+	docker run -d --name under-test --rm -p 8080:8080 $(IMG2)
+	bash init.sh main.sh
+	wrk -t1 -c1 -stest.lua http://localhost:8080/run
+	docker kill under-test
+
+.PHONY: all golang bash
diff --git a/examples/benchmark/init.sh b/examples/benchmark/init.sh
new file mode 100644
index 0000000..d34951f
--- /dev/null
+++ b/examples/benchmark/init.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+INIT=${1:?action}
+jq -n --rawfile file $INIT '{ "value": {"main":"main", "code":$file}}' >$INIT.json
+curl -XPOST -H "Content-Type: application/json" http://localhost:8080/init -d @$INIT.json
diff --git a/examples/benchmark/main.go b/examples/benchmark/main.go
new file mode 100644
index 0000000..5fdaf5d
--- /dev/null
+++ b/examples/benchmark/main.go
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package main
+
+import "fmt"
+
+// Main function for the action
+func Main(obj map[string]interface{}) map[string]interface{} {
+	name, ok := obj["name"].(string)
+	if !ok {
+		name = "world"
+	}
+	fmt.Printf("name=%s\n", name)
+	msg := make(map[string]interface{})
+	msg["golang-main-single"] = "Hello, " + name + "!"
+	return msg
+}
diff --git a/examples/benchmark/main.sh b/examples/benchmark/main.sh
new file mode 100755
index 0000000..d8ddb7f
--- /dev/null
+++ b/examples/benchmark/main.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+while read line
+do
+   name="$(echo $line | jq -r .value.name)"
+   echo msg="hello $name"
+   echo '{"hello": "'$name'"}' >&3
+done
diff --git a/openwhisk/_test/hello.sh b/openwhisk/_test/hello.sh
index ff21212..d8ddb7f 100755
--- a/openwhisk/_test/hello.sh
+++ b/openwhisk/_test/hello.sh
@@ -18,10 +18,6 @@
 while read line
 do
    name="$(echo $line | jq -r .value.name)"
-   if [ "$name" == "*" ]
-   then echo "Goodbye!" >&2
-        exit 0
-   fi
    echo msg="hello $name"
    echo '{"hello": "'$name'"}' >&3
 done
diff --git a/openwhisk/actionProxy.go b/openwhisk/actionProxy.go
index c61c1a8..cb6e441 100644
--- a/openwhisk/actionProxy.go
+++ b/openwhisk/actionProxy.go
@@ -24,9 +24,6 @@ import (
 	"os"
 )
 
-// OutputGuard constant string
-const OutputGuard = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
-
 // ActionProxy is the container of the data specific to a server
 type ActionProxy struct {
 
diff --git a/openwhisk/actionProxy_test.go b/openwhisk/actionProxy_test.go
index 395e811..2d767af 100644
--- a/openwhisk/actionProxy_test.go
+++ b/openwhisk/actionProxy_test.go
@@ -52,8 +52,7 @@ func TestStartLatestAction_emit1(t *testing.T) {
 	buf := []byte("#!/bin/sh\nwhile read a; do echo 1 >&3 ; done\n")
 	ap.ExtractAction(&buf, "bin")
 	ap.StartLatestAction()
-	ap.theExecutor.io <- []byte("x")
-	res := <-ap.theExecutor.io
+	res, _ := ap.theExecutor.Interact([]byte("x"))
 	assert.Equal(t, res, []byte("1\n"))
 	ap.theExecutor.Stop()
 }
@@ -77,8 +76,8 @@ func TestStartLatestAction_emit2(t *testing.T) {
 	buf := []byte("#!/bin/sh\nwhile read a; do echo 2 >&3 ; done\n")
 	ap.ExtractAction(&buf, "bin")
 	ap.StartLatestAction()
-	ap.theExecutor.io <- []byte("z")
-	assert.Equal(t, <-ap.theExecutor.io, []byte("2\n"))
+	res, _ := ap.theExecutor.Interact([]byte("z"))
+	assert.Equal(t, res, []byte("2\n"))
 	/**/
 	ap.theExecutor.Stop()
 }
diff --git a/openwhisk/executor.go b/openwhisk/executor.go
index 1658fe8..06d875c 100644
--- a/openwhisk/executor.go
+++ b/openwhisk/executor.go
@@ -23,29 +23,22 @@ import (
 	"io"
 	"os"
 	"os/exec"
-	"runtime"
 	"time"
 )
 
-// DefaultTimeoutInit to wait for a process to start
-var DefaultTimeoutInit = 5 * time.Millisecond
+// OutputGuard constant string
+const OutputGuard = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX\n"
 
-// DefaultTimeoutDrain to wait for draining logs
-var DefaultTimeoutDrain = 5 * time.Millisecond
+// DefaultTimeoutStart to wait for a process to start
+var DefaultTimeoutStart = 5 * time.Millisecond
 
 // Executor is the container and the guardian  of a child process
 // It starts a command, feeds input and output, read logs and control its termination
 type Executor struct {
-	io      chan []byte
-	log     chan bool
-	exit    chan error
-	_cmd    *exec.Cmd
-	_input  io.WriteCloser
-	_output *bufio.Reader
-	_logout *bufio.Reader
-	_logerr *bufio.Reader
-	_outbuf *bufio.Writer
-	_errbuf *bufio.Writer
+	cmd    *exec.Cmd
+	input  io.WriteCloser
+	output *bufio.Reader
+	exited chan bool
 }
 
 // NewExecutor creates a child subprocess using the provided command line,
@@ -53,188 +46,85 @@ type Executor struct {
 // You can then start it getting a communication channel
 func NewExecutor(logout *os.File, logerr *os.File, command string, args ...string) (proc *Executor) {
 	cmd := exec.Command(command, args...)
+	cmd.Stdout = logout
+	cmd.Stderr = logerr
 	cmd.Env = []string{
 		"__OW_API_HOST=" + os.Getenv("__OW_API_HOST"),
 	}
+	Debug("env: %v", cmd.Env)
 	if Debugging {
 		cmd.Env = append(cmd.Env, "OW_DEBUG=/tmp/action.log")
 	}
-	Debug("env: %v", cmd.Env)
-
-	stdin, err := cmd.StdinPipe()
-	if err != nil {
-		return nil
-	}
-
-	stdout, err := cmd.StdoutPipe()
+	input, err := cmd.StdinPipe()
 	if err != nil {
 		return nil
 	}
-
-	stderr, err := cmd.StderrPipe()
-	if err != nil {
-		return nil
-	}
-
 	pipeOut, pipeIn, err := os.Pipe()
 	if err != nil {
 		return nil
 	}
 	cmd.ExtraFiles = []*os.File{pipeIn}
-
-	pout := bufio.NewReader(pipeOut)
-	sout := bufio.NewReader(stdout)
-	serr := bufio.NewReader(stderr)
-	outbuf := bufio.NewWriter(logout)
-	errbuf := bufio.NewWriter(logerr)
-
+	output := bufio.NewReader(pipeOut)
 	return &Executor{
-		make(chan []byte),
-		make(chan bool),
-		make(chan error),
 		cmd,
-		stdin,
-		pout,
-		sout,
-		serr,
-		outbuf,
-		errbuf,
-	}
-}
-
-// collect log from a stream
-func _collect(ch chan string, reader *bufio.Reader) {
-	for {
-		buf, err := reader.ReadBytes('\n')
-		if err != nil {
-			break
-		}
-		ch <- string(buf)
-	}
-}
-
-// loop over the command executing
-// returning when the command exits
-func (proc *Executor) run() {
-	Debug("run: start")
-	err := proc._cmd.Start()
-	if err != nil {
-		proc.exit <- err
-		Debug("run: early exit")
-		proc._cmd = nil // do not kill
-		return
-	}
-	Debug("pid: %d", proc._cmd.Process.Pid)
-	// wait for the exit
-	proc.exit <- proc._cmd.Wait()
-	proc._cmd = nil // do not kill
-	Debug("run: end")
-}
-
-func drain(ch chan string, out *bufio.Writer) {
-	for loop := true; loop; {
-		runtime.Gosched()
-		select {
-		case buf := <-ch:
-			fmt.Fprint(out, buf)
-			out.Flush()
-		case <-time.After(DefaultTimeoutDrain):
-			loop = false
-		}
+		input,
+		output,
+		make(chan bool),
 	}
-	fmt.Fprintln(out, OutputGuard)
-	out.Flush()
 }
 
-// manage copying stdout and stder in output
-// with log guards
-func (proc *Executor) logger() {
-	Debug("logger: start")
-	// poll stdout and stderr
-	chOut := make(chan string)
-	go _collect(chOut, proc._logout)
-	chErr := make(chan string)
-	go _collect(chErr, proc._logerr)
-
-	// loop draining the loop until asked to exit
-	for <-proc.log {
-		// drain stdout
-		Debug("draining stdout")
-		drain(chOut, proc._outbuf)
-		// drain stderr
-		Debug("draining stderr")
-		drain(chErr, proc._errbuf)
-		proc.log <- true
-	}
-	Debug("logger: end")
+// Interact interacts with the underlying process
+func (proc *Executor) Interact(in []byte) ([]byte, error) {
+	// input to the subprocess
+	proc.input.Write(in)
+	proc.input.Write([]byte("\n"))
+	out, err := proc.output.ReadBytes('\n')
+	proc.cmd.Stdout.Write([]byte(OutputGuard))
+	proc.cmd.Stderr.Write([]byte(OutputGuard))
+	return out, err
 }
 
-// main service function
-// writing in input
-// and reading in output
-// using the provide channels
-func (proc *Executor) service() {
-	Debug("service: start")
-	for {
-		in := <-proc.io
-		if len(in) == 0 {
-			Debug("terminated upon request")
-			break
-		}
-		// input to the subprocess
-		DebugLimit(">>>", in, 120)
-		proc._input.Write(in)
-		proc._input.Write([]byte("\n"))
-		Debug("done")
-
-		// ok now give a chance to run to goroutines
-		runtime.Gosched()
-
-		// input to the subprocess
-		out, err := proc._output.ReadBytes('\n')
-		if err != nil {
-			break
-		}
-		DebugLimit("<<<", out, 120)
-		proc.io <- out
-		if len(out) == 0 {
-			Debug("empty input - exiting")
-			break
-		}
+// Exited checks if the underlying command exited
+func (proc *Executor) Exited() bool {
+	select {
+	case <-proc.exited:
+		return true
+	default:
+		return false
 	}
-	Debug("service: end")
 }
 
 // Start execution of the command
+// wait a bit to check if the command exited
 // returns an error if the program fails
 func (proc *Executor) Start() error {
 	// start the underlying executable
-	// check if died
-	go proc.run()
+	Debug("Start:")
+	err := proc.cmd.Start()
+	if err != nil {
+		Debug("run: early exit")
+		proc.cmd = nil // no need to kill
+		return fmt.Errorf("command exited")
+	}
+	Debug("pid: %d", proc.cmd.Process.Pid)
+	go func() {
+		proc.cmd.Wait()
+		proc.exited <- true
+	}()
 	select {
-	case <-proc.exit:
-		// oops, it died
+	case <-proc.exited:
 		return fmt.Errorf("command exited")
-	case <-time.After(DefaultTimeoutInit):
-		// ok let's process it
-		go proc.service()
-		go proc.logger()
+	case <-time.After(DefaultTimeoutStart):
+		return nil
 	}
-	return nil
 }
 
 // Stop will kill the process
 // and close the channels
 func (proc *Executor) Stop() {
 	Debug("stopping")
-	if proc._cmd != nil {
-		proc.log <- false
-		proc.io <- []byte("")
-		proc._cmd.Process.Kill()
-		<-proc.exit
-		proc._cmd = nil
+	if proc.cmd != nil {
+		proc.cmd.Process.Kill()
+		proc.cmd = nil
 	}
-	close(proc.io)
-	close(proc.exit)
-	close(proc.log)
 }
diff --git a/openwhisk/executor_test.go b/openwhisk/executor_test.go
index 8504fd9..6ab5bd7 100644
--- a/openwhisk/executor_test.go
+++ b/openwhisk/executor_test.go
@@ -51,24 +51,13 @@ func ExampleNewExecutor_bc() {
 	proc := NewExecutor(log, log, "_test/bc.sh")
 	err := proc.Start()
 	fmt.Println(err)
-	proc.io <- []byte("2+2")
-	fmt.Printf("%s", <-proc.io)
-	proc.log <- true
-	<-proc.log
-	// and now, exit detection
-	proc.io <- []byte("quit")
-	select {
-	case in := <-proc.io:
-		fmt.Printf("%s", in)
-	case <-proc.exit:
-		fmt.Println("exit")
-	}
+	res, _ := proc.Interact([]byte("2+2"))
+	fmt.Printf("%s", res)
 	proc.Stop()
 	dump(log)
 	// Output:
 	// <nil>
 	// 4
-	// exit
 	// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
 	// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
 }
@@ -78,48 +67,14 @@ func ExampleNewExecutor_hello() {
 	proc := NewExecutor(log, log, "_test/hello.sh")
 	err := proc.Start()
 	fmt.Println(err)
-	proc.io <- []byte(`{"value":{"name":"Mike"}}`)
-	fmt.Printf("%s", <-proc.io)
-	proc.log <- true
-	<-proc.log
+	res, _ := proc.Interact([]byte(`{"value":{"name":"Mike"}}`))
+	fmt.Printf("%s", res)
 	proc.Stop()
-	_, ok := <-proc.io
-	fmt.Printf("io %v\n", ok)
 	dump(log)
-	// Unordered output:
+	// Output:
 	// <nil>
 	// {"hello": "Mike"}
 	// msg=hello Mike
 	// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
 	// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
-	// io false
-}
-
-func ExampleNewExecutor_term() {
-	log, _ := ioutil.TempFile("", "log")
-	proc := NewExecutor(log, log, "_test/hello.sh")
-	err := proc.Start()
-	fmt.Println(err)
-	proc.io <- []byte(`{"value":{"name":"*"}}`)
-	var exited bool
-	select {
-	case <-proc.io:
-		exited = false
-	case <-proc.exit:
-		exited = true
-	}
-	proc.log <- true
-	<-proc.log
-	fmt.Printf("exit %v\n", exited)
-	proc.Stop()
-	_, ok := <-proc.io
-	fmt.Printf("io %v\n", ok)
-	dump(log)
-	// Unordered output:
-	// <nil>
-	// exit true
-	// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
-	// Goodbye!
-	// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
-	// io false
 }
diff --git a/openwhisk/initHandler_test.go b/openwhisk/initHandler_test.go
index 81f00dc..de71f6d 100644
--- a/openwhisk/initHandler_test.go
+++ b/openwhisk/initHandler_test.go
@@ -94,20 +94,18 @@ func Example_shell_nocompiler() {
 	doRun(ts, "")
 	doInit(ts, initBinary("_test/hello.sh", ""))
 	doRun(ts, "")
-	doRun(ts, `{"name":"*"}`)
-	doRun(ts, "")
+	doRun(ts, `{"name":"world"}`)
 	stopTestServer(ts, cur, log)
 	// Output:
 	// 500 {"error":"no action defined yet"}
 	// 200 {"ok":true}
 	// 200 {"hello": "Mike"}
-	// 400 {"error":"command exited"}
-	// 500 {"error":"no action defined yet"}
+	// 200 {"hello": "world"}
 	// msg=hello Mike
 	// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
 	// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+	// msg=hello world
 	// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
-	// Goodbye!
 	// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
 }
 
@@ -226,4 +224,5 @@ func Example_badinit_nocompiler() {
 	// 400 {"error":"cannot start action: command exited"}
 	// 400 {"error":"cannot start action: command exited"}
 	// 500 {"error":"no action defined yet"}
+	// hi
 }
diff --git a/openwhisk/runHandler.go b/openwhisk/runHandler.go
index 2463255..3345cb9 100644
--- a/openwhisk/runHandler.go
+++ b/openwhisk/runHandler.go
@@ -59,29 +59,20 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r *http.Request) {
 		sendError(w, http.StatusInternalServerError, fmt.Sprintf("no action defined yet"))
 		return
 	}
+	// check if the process exited
+	if ap.theExecutor.Exited() {
+		sendError(w, http.StatusInternalServerError, fmt.Sprintf("command exited"))
+		return
+	}
 
 	// remove newlines
 	body = bytes.Replace(body, []byte("\n"), []byte(""), -1)
 
 	// execute the action
-	ap.theExecutor.io <- body
+	response, err := ap.theExecutor.Interact(body)
 
 	// check for early termination
-	var response []byte
-	var exited bool
-	select {
-	case response = <-ap.theExecutor.io:
-		exited = false
-	case err = <-ap.theExecutor.exit:
-		exited = true
-	}
-
-	// flush the logs sending the activation message at the end
-	ap.theExecutor.log <- true
-	<-ap.theExecutor.log
-
-	// check for early termination
-	if exited {
+	if err != nil {
 		Debug("WARNING! Command exited")
 		ap.theExecutor = nil
 		sendError(w, http.StatusBadRequest, fmt.Sprintf("command exited"))
@@ -106,7 +97,7 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r *http.Request) {
 		f.Flush()
 	}
 
-	// diagnostic when writing problems
+	// diagnostic when you have writing problems
 	if err != nil {
 		sendError(w, http.StatusInternalServerError, fmt.Sprintf("Error writing response: %v", err))
 		return
diff --git a/openwhisk/util_test.go b/openwhisk/util_test.go
index b0d8084..1593760 100644
--- a/openwhisk/util_test.go
+++ b/openwhisk/util_test.go
@@ -185,9 +185,7 @@ func TestMain(m *testing.M) {
 	}
 
 	// increase timeouts for init
-	DefaultTimeoutInit = 1000 * time.Millisecond
-	// timeout for drain - shoud less (or you can get stuck on stdout without getting the stderr)
-	DefaultTimeoutDrain = 100 * time.Millisecond
+	DefaultTimeoutStart = 1000 * time.Millisecond
 	// build some test stuff
 	sys("_test/build.sh")
 	sys("_test/zips.sh")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services