You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/04/06 11:09:32 UTC
[incubator-openwhisk-runtime-go] branch master updated: The
foundation of an openwhisk runtime proxy in Go (#5)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-runtime-go.git
The following commit(s) were added to refs/heads/master by this push:
new c16d4d3 The foundation of an openwhisk runtime proxy in Go (#5)
c16d4d3 is described below
commit c16d4d3c0aa5e3f70470bc826b4adbb4ffc8c39e
Author: Sciabarra.com ltd <30...@users.noreply.github.com>
AuthorDate: Fri Apr 6 13:09:31 2018 +0200
The foundation of an openwhisk runtime proxy in Go (#5)
This is a runtime proxy for openwhisk actions authored in Go. It is richer than the Node.js and python proxies in that it allows for a re-initialization, and a socket interface for receiving inputs and outputs from the function handlers.
---
.gitignore | 7 +-
README.md | 6 +-
main/proxy.go | 15 +-
openwhisk/_test/bc.sh | 3 +
openwhisk/_test/build.sh | 5 +
openwhisk/_test/dir/etc | 0
openwhisk/_test/etc | 0
openwhisk/_test/exec.go | 2 +
openwhisk/_test/first/3/.gitkeep | 1 +
openwhisk/_test/hello.sh | 12 ++
openwhisk/_test/second/17/.gitkeep | 1 +
openwhisk/_test/second/3/.gitkeep | 1 +
openwhisk/_test/second/8/.gitkeep | 1 +
openwhisk/actionProxy.go | 84 +++++++++++
openwhisk/actionProxy_test.go | 44 ++++++
openwhisk/executor.go | 208 ++++++++++++++++++++++++++++
openwhisk/executor_test.go | 121 ++++++++++++++++
openwhisk/extractor.go | 131 ++++++++++++++++++
openwhisk/extractor_test.go | 95 +++++++++++++
openwhisk/initHandler.go | 96 +++++++++++++
openwhisk/runHandler.go | 126 +++++++++++++++++
test/.gitignore | 4 +
test/README.md | 9 ++
test/bin/.gitignore | 3 +
test/bin/build.sh | 10 ++
test/bin/init.sh | 10 ++
test/bin/post.sh | 3 +
test/bin/run.sh | 6 +
test/etc/hello.js | 5 +
test/etc/hello.sh | 12 ++
main/proxy.go => test/src/empty.go | 9 +-
test/src/hello/hello.go | 30 ++++
test/src/hello/hello_test.go | 28 ++++
main/proxy.go => test/src/hello_greeting.go | 9 +-
main/proxy.go => test/src/hello_message.go | 21 ++-
main/proxy.go => test/src/hi.go | 7 +-
test/start.sh | 8 ++
test/test.t | 93 +++++++++++++
test/zip/.gitignore | 1 +
39 files changed, 1203 insertions(+), 24 deletions(-)
diff --git a/.gitignore b/.gitignore
index 21e05c2..6e7acee 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,7 @@
-core/proxy
.gradle/
.gogradle/
-
+vendor/
+core/proxy
+openwhisk/_test/exec
+openwhisk/_test/exec.zip
+openwhisk/action/
diff --git a/README.md b/README.md
index 70d2067..8eacd6c 100644
--- a/README.md
+++ b/README.md
@@ -2,9 +2,5 @@
This is an OpenWhisk runtime for Golang.
-Local development
-
-`./gradle :docker:distDocker`
-
-This will produce the image `whisk/golang`
+Check [test documentation](./test/README.md) for testing.
diff --git a/main/proxy.go b/main/proxy.go
index 8def703..fe8c0bc 100644
--- a/main/proxy.go
+++ b/main/proxy.go
@@ -17,9 +17,20 @@
package main
import (
- "fmt"
+ "flag"
+ "io/ioutil"
+ "log"
+
+ "github.com/sciabarracom/incubator-openwhisk-runtime-go/openwhisk"
)
+// disable stderr except when debugging
+var debug = flag.Bool("debug", false, "enable debug output")
+
func main() {
- fmt.Println("Just a placeholder")
+ flag.Parse()
+ if !*debug {
+ log.SetOutput(ioutil.Discard)
+ }
+ openwhisk.Start()
}
diff --git a/openwhisk/_test/bc.sh b/openwhisk/_test/bc.sh
new file mode 100755
index 0000000..ff78878
--- /dev/null
+++ b/openwhisk/_test/bc.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+bc -q >&3
+
diff --git a/openwhisk/_test/build.sh b/openwhisk/_test/build.sh
new file mode 100755
index 0000000..82f3378
--- /dev/null
+++ b/openwhisk/_test/build.sh
@@ -0,0 +1,5 @@
+#!/bin/bash
+cd "$(dirname $0)"
+test -e exec || GOARCH=amd64 GOOS=linux go build -o exec exec.go
+test -e exec.zip || zip -q -r exec.zip exec etc dir
+cd -
diff --git a/openwhisk/_test/dir/etc b/openwhisk/_test/dir/etc
new file mode 100644
index 0000000..e69de29
diff --git a/openwhisk/_test/etc b/openwhisk/_test/etc
new file mode 100644
index 0000000..e69de29
diff --git a/openwhisk/_test/exec.go b/openwhisk/_test/exec.go
new file mode 100644
index 0000000..792a517
--- /dev/null
+++ b/openwhisk/_test/exec.go
@@ -0,0 +1,2 @@
+package main
+func main() { }
diff --git a/openwhisk/_test/first/3/.gitkeep b/openwhisk/_test/first/3/.gitkeep
new file mode 100644
index 0000000..72e8ffc
--- /dev/null
+++ b/openwhisk/_test/first/3/.gitkeep
@@ -0,0 +1 @@
+*
diff --git a/openwhisk/_test/hello.sh b/openwhisk/_test/hello.sh
new file mode 100755
index 0000000..cc38507
--- /dev/null
+++ b/openwhisk/_test/hello.sh
@@ -0,0 +1,12 @@
+#!/bin/bash
+while read line
+do
+ name="$(echo $line | jq -r .name)"
+ if [ "$name" == "*" ]
+ then echo "Goodbye!" >&2
+ exit 0
+ fi
+ echo msg="hello $name"
+ echo '{"hello": "'$name'"}' >&3
+done
+
diff --git a/openwhisk/_test/second/17/.gitkeep b/openwhisk/_test/second/17/.gitkeep
new file mode 100644
index 0000000..72e8ffc
--- /dev/null
+++ b/openwhisk/_test/second/17/.gitkeep
@@ -0,0 +1 @@
+*
diff --git a/openwhisk/_test/second/3/.gitkeep b/openwhisk/_test/second/3/.gitkeep
new file mode 100644
index 0000000..72e8ffc
--- /dev/null
+++ b/openwhisk/_test/second/3/.gitkeep
@@ -0,0 +1 @@
+*
diff --git a/openwhisk/_test/second/8/.gitkeep b/openwhisk/_test/second/8/.gitkeep
new file mode 100644
index 0000000..72e8ffc
--- /dev/null
+++ b/openwhisk/_test/second/8/.gitkeep
@@ -0,0 +1 @@
+*
diff --git a/openwhisk/actionProxy.go b/openwhisk/actionProxy.go
new file mode 100644
index 0000000..5889b01
--- /dev/null
+++ b/openwhisk/actionProxy.go
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package openwhisk
+
+import (
+ "fmt"
+ "log"
+ "net/http"
+ "os"
+)
+
+// theServer is the current server
+var theServer http.Server
+
+// theChannel is the channel communicating with the action
+var theExecutor *Executor
+
+// StartLatestAction tries to start
+// the more recently uplodaded
+// action if valid, otherwise remove it
+// and fallback to the previous, if any
+func StartLatestAction() error {
+
+ // find the action if any
+ highestDir := highestDir("./action")
+ if highestDir == 0 {
+ log.Println("no action found")
+ theExecutor = nil
+ return fmt.Errorf("no valid actions available")
+ }
+
+ // save the current executor
+ curExecutor := theExecutor
+
+ // try to launch the action
+ executable := fmt.Sprintf("./action/%d/exec", highestDir)
+ newExecutor := NewExecutor(executable)
+ log.Printf("starting %s", executable)
+ err := newExecutor.Start()
+ if err == nil {
+ theExecutor = newExecutor
+ if curExecutor != nil {
+ log.Println("stopping old executor")
+ curExecutor.Stop()
+ }
+ return nil
+ }
+
+ // cannot start, removing the action
+ // and leaving the current executor running
+
+ exeDir := fmt.Sprintf("./action/%d/", highestDir)
+ log.Printf("removing the failed action in %s", exeDir)
+ os.RemoveAll(exeDir)
+ return err
+}
+
+// Start creates a proxy to execute actions
+func Start() {
+ // handle initialization
+ http.HandleFunc("/init", initHandler)
+ // handle execution
+ http.HandleFunc("/run", runHandler)
+
+ // start
+ log.Println("Started!")
+ theServer.Addr = ":8080"
+ log.Fatal(theServer.ListenAndServe())
+}
diff --git a/openwhisk/actionProxy_test.go b/openwhisk/actionProxy_test.go
new file mode 100644
index 0000000..b46ca3d
--- /dev/null
+++ b/openwhisk/actionProxy_test.go
@@ -0,0 +1,44 @@
+package openwhisk
+
+import (
+ "os"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestStartLatestAction(t *testing.T) {
+
+ // cleanup
+ os.RemoveAll("./action")
+ theExecutor = nil
+
+ // start an action that terminate immediately
+ buf := []byte("#!/bin/sh\ntrue\n")
+ extractAction(&buf, true)
+ StartLatestAction()
+ assert.Nil(t, theExecutor)
+
+ // start the action that emits 1
+ buf = []byte("#!/bin/sh\nwhile read a; do echo 1 >&3 ; done\n")
+ extractAction(&buf, true)
+ StartLatestAction()
+ theExecutor.io <- "x"
+ assert.Equal(t, <-theExecutor.io, "1")
+
+ // now start an action that terminate immediately
+ buf = []byte("#!/bin/sh\ntrue\n")
+ extractAction(&buf, true)
+ StartLatestAction()
+ theExecutor.io <- "y"
+ assert.Equal(t, <-theExecutor.io, "1")
+
+ // start the action that emits 2
+ buf = []byte("#!/bin/sh\nwhile read a; do echo 2 >&3 ; done\n")
+ extractAction(&buf, true)
+ StartLatestAction()
+ theExecutor.io <- "z"
+ assert.Equal(t, <-theExecutor.io, "2")
+ /**/
+ theExecutor.Stop()
+}
diff --git a/openwhisk/executor.go b/openwhisk/executor.go
new file mode 100644
index 0000000..6a48f4b
--- /dev/null
+++ b/openwhisk/executor.go
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package openwhisk
+
+import (
+ "bufio"
+ "fmt"
+ "log"
+ "os"
+ "os/exec"
+ "runtime"
+ "time"
+)
+
+// TIMEOUT to wait for process to start
+// and log to be produced
+const TIMEOUT = 5 * time.Millisecond
+
+// Executor is the container and the guardian of a child process
+// It starts a command, feeds input and output, read logs and control its termination
+type Executor struct {
+ io chan string
+ log chan bool
+ exit chan error
+ _cmd *exec.Cmd
+ _input *bufio.Writer
+ _output *bufio.Scanner
+ _logout *bufio.Scanner
+ _logerr *bufio.Scanner
+}
+
+// NewExecutor creates a child subprocess using the provided command line.
+// You can then start it getting a communication channel
+func NewExecutor(command string, args ...string) (proc *Executor) {
+ cmd := exec.Command(command, args...)
+
+ stdin, err := cmd.StdinPipe()
+ if err != nil {
+ return nil
+ }
+
+ stdout, err := cmd.StdoutPipe()
+ if err != nil {
+ return nil
+ }
+
+ stderr, err := cmd.StderrPipe()
+ if err != nil {
+ return nil
+ }
+
+ pipeOut, pipeIn, err := os.Pipe()
+ if err != nil {
+ return nil
+ }
+ cmd.ExtraFiles = []*os.File{pipeIn}
+
+ return &Executor{
+ make(chan string),
+ make(chan bool),
+ make(chan error),
+ cmd,
+ bufio.NewWriter(stdin),
+ bufio.NewScanner(pipeOut),
+ bufio.NewScanner(stdout),
+ bufio.NewScanner(stderr),
+ }
+}
+
+// collect log from a stream
+func _collect(ch chan string, scan *bufio.Scanner) {
+ for scan.Scan() {
+ ch <- scan.Text()
+ }
+}
+
+// loop over the command executing
+// returning when the command exits
+func (proc *Executor) run() {
+ log.Println("run: start")
+ err := proc._cmd.Start()
+ if err != nil {
+ proc.exit <- err
+ log.Println("run: early exit")
+ proc._cmd = nil // do not kill
+ return
+ }
+ // wait for the exit
+ proc.exit <- proc._cmd.Wait()
+ proc._cmd = nil // do not kill
+ log.Println("run: end")
+}
+
+// manage copying stdout and stder in output
+// with log guards
+func (proc *Executor) logger() {
+ log.Println("logger: start")
+ // poll stdout and stderr
+ chOut := make(chan string)
+ go _collect(chOut, proc._logout)
+ chErr := make(chan string)
+ go _collect(chErr, proc._logerr)
+
+ // wait for the signal
+ for <-proc.log {
+ // flush stdout
+ runtime.Gosched()
+ for loop := true; loop; {
+ select {
+ case buf := <-chOut:
+ fmt.Println(buf)
+ case <-time.After(TIMEOUT):
+ loop = false
+ }
+ }
+ fmt.Println("XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+
+ // flush stderr
+ runtime.Gosched()
+ for loop := true; loop; {
+ select {
+ case buf := <-chErr:
+ fmt.Println(buf)
+ case <-time.After(TIMEOUT):
+ loop = false
+ }
+ }
+ fmt.Println("XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+ }
+ log.Printf("logger: end")
+}
+
+// main service function
+// writing in input
+// and reading in output
+// using the provide channels
+func (proc *Executor) service() {
+ log.Println("service: start")
+ for {
+ in := <-proc.io
+ if in == "" {
+ log.Println("terminated upon request")
+ break
+ }
+ // input/output with the process
+ log.Printf(">>>%s\n", in)
+ proc._input.WriteString(in + "\n")
+ proc._input.Flush()
+ if proc._output.Scan() {
+ out := proc._output.Text()
+ log.Printf("<<<%s\n", out)
+ proc.io <- out
+ if out == "" {
+ break
+ }
+ }
+ }
+ log.Printf("service: end")
+}
+
+// Start execution of the command
+// returns an error if the program fails
+func (proc *Executor) Start() error {
+ // start the underlying executable
+ // check if died
+ go proc.run()
+ select {
+ case <-proc.exit:
+ // oops, it died
+ return fmt.Errorf("command exited")
+ case <-time.After(TIMEOUT):
+ // ok let's process it
+ go proc.service()
+ go proc.logger()
+ }
+ return nil
+}
+
+// Stop will kill the process
+// and close the channels
+func (proc *Executor) Stop() {
+ log.Println("stopping")
+ if proc._cmd != nil {
+ proc.log <- false
+ proc.io <- ""
+ proc._cmd.Process.Kill()
+ <-proc.exit
+ proc._cmd = nil
+ }
+ close(proc.io)
+ close(proc.exit)
+ close(proc.log)
+}
diff --git a/openwhisk/executor_test.go b/openwhisk/executor_test.go
new file mode 100644
index 0000000..b904cb3
--- /dev/null
+++ b/openwhisk/executor_test.go
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package openwhisk
+
+import (
+ "fmt"
+ "time"
+)
+
+func ExampleNewExecutor_failed() {
+ proc := NewExecutor("true")
+ err := proc.Start()
+ fmt.Println(err)
+ proc.Stop()
+ proc = NewExecutor("/bin/pwd")
+ err = proc.Start()
+ fmt.Println(err)
+ proc.Stop()
+ proc = NewExecutor("donotexist")
+ err = proc.Start()
+ fmt.Println(err)
+ proc.Stop()
+ proc = NewExecutor("/etc/passwd")
+ err = proc.Start()
+ fmt.Println(err)
+ proc.Stop()
+ // Output:
+ // command exited
+ // command exited
+ // command exited
+ // command exited
+}
+
+func ExampleNewExecutor_bc() {
+ proc := NewExecutor("_test/bc.sh")
+ err := proc.Start()
+ fmt.Println(err)
+ //proc.log <- true
+ proc.io <- "2+2"
+ fmt.Println(<-proc.io)
+ // and now, exit detection
+ proc.io <- "quit"
+ proc.log <- true
+ select {
+ case in := <-proc.io:
+ fmt.Println(in)
+ case <-proc.exit:
+ fmt.Println("exit")
+ }
+ time.Sleep(100 * time.Millisecond)
+ proc.Stop()
+ // Output:
+ // <nil>
+ // 4
+ // exit
+ // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+ // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+}
+
+func ExampleNewExecutor_hello() {
+ proc := NewExecutor("_test/hello.sh")
+ err := proc.Start()
+ fmt.Println(err)
+ proc.io <- `{"name":"Mike"}`
+ fmt.Println(<-proc.io)
+ proc.log <- true
+ time.Sleep(100 * time.Millisecond)
+ proc.Stop()
+ time.Sleep(100 * time.Millisecond)
+ _, ok := <-proc.io
+ fmt.Printf("io %v\n", ok)
+ // Unordered output:
+ // <nil>
+ // {"hello": "Mike"}
+ // msg=hello Mike
+ // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+ // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+ // io false
+}
+
+func ExampleNewExecutor_term() {
+ proc := NewExecutor("_test/hello.sh")
+ err := proc.Start()
+ fmt.Println(err)
+ proc.io <- `{"name":"*"}`
+ var exited bool
+ select {
+ case <-proc.io:
+ exited = false
+ case <-proc.exit:
+ exited = true
+ }
+ proc.log <- true
+ fmt.Printf("exit %v\n", exited)
+ time.Sleep(100 * time.Millisecond)
+ proc.Stop()
+ time.Sleep(100 * time.Millisecond)
+ _, ok := <-proc.io
+ fmt.Printf("io %v\n", ok)
+ // Unordered output:
+ // <nil>
+ // exit true
+ // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+ // Goodbye!
+ // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+ // io false
+}
diff --git a/openwhisk/extractor.go b/openwhisk/extractor.go
new file mode 100644
index 0000000..17965bb
--- /dev/null
+++ b/openwhisk/extractor.go
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package openwhisk
+
+import (
+ "archive/zip"
+ "bytes"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "os"
+ "path/filepath"
+ "strconv"
+
+ "github.com/h2non/filetype"
+)
+
+func unzip(src []byte, dest string) error {
+ reader := bytes.NewReader(src)
+ r, err := zip.NewReader(reader, int64(len(src)))
+ if err != nil {
+ return err
+ }
+
+ os.MkdirAll(dest, 0755)
+
+ // Closure to address file descriptors issue with all the deferred .Close() methods
+ extractAndWriteFile := func(f *zip.File) error {
+ rc, err := f.Open()
+ if err != nil {
+ return err
+ }
+ defer func() {
+ rc.Close()
+ }()
+
+ path := filepath.Join(dest, f.Name)
+
+ if f.FileInfo().IsDir() {
+ os.MkdirAll(path, f.Mode())
+ } else {
+ os.MkdirAll(filepath.Dir(path), f.Mode())
+ f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
+ if err != nil {
+ return err
+ }
+ defer func() {
+ f.Close()
+ }()
+
+ _, err = io.Copy(f, rc)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+
+ for _, f := range r.File {
+ err := extractAndWriteFile(f)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// higherDir will find the highest numeric name a sub directory has
+// 0 if no numeric dir names found
+func highestDir(dir string) int {
+ files, err := ioutil.ReadDir(dir)
+ if err != nil {
+ return 0
+ }
+ max := 0
+ for _, file := range files {
+ n, err := strconv.Atoi(file.Name())
+ if err == nil {
+ if n > max {
+ max = n
+ }
+ }
+ }
+ return max
+}
+
+var currentDir = highestDir("./action")
+
+// extractAction accept a byte array write it to a file
+func extractAction(buf *[]byte, isScript bool) error {
+ if buf == nil || len(*buf) == 0 {
+ return fmt.Errorf("no file")
+ }
+ currentDir++
+ newDir := fmt.Sprintf("./action/%d", currentDir)
+ os.MkdirAll(newDir, 0755)
+ kind, err := filetype.Match(*buf)
+ if err != nil {
+ return err
+ }
+ if kind.Extension == "zip" {
+ log.Println("Extract Action, assuming a zip")
+ return unzip(*buf, newDir)
+ }
+ if kind.Extension == "elf" || isScript {
+ if isScript {
+ log.Println("Extract Action, assuming a script")
+ } else {
+ log.Println("Extract Action, assuming a binary")
+ }
+ return ioutil.WriteFile(newDir+"/exec", *buf, 0755)
+ }
+ log.Println("No valid action found")
+ return fmt.Errorf("unknown filetype %s", kind)
+}
diff --git a/openwhisk/extractor_test.go b/openwhisk/extractor_test.go
new file mode 100644
index 0000000..ddc6522
--- /dev/null
+++ b/openwhisk/extractor_test.go
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package openwhisk
+
+import (
+ "fmt"
+ "io/ioutil"
+ "log"
+ "os"
+ "os/exec"
+ "testing"
+
+ "github.com/h2non/filetype"
+ "github.com/stretchr/testify/assert"
+)
+
+func sys(cli string) {
+ cmd := exec.Command(cli)
+ out, err := cmd.CombinedOutput()
+ if err != nil {
+ log.Print(err)
+ } else {
+ fmt.Print(string(out))
+ }
+}
+
+func TestExtractActionTest_exec(t *testing.T) {
+ sys("_test/build.sh")
+ // cleanup
+ assert.Nil(t, os.RemoveAll("./action"))
+ file, _ := ioutil.ReadFile("_test/exec")
+ extractAction(&file, false)
+ assert.Nil(t, exists("./action", "exec"))
+}
+
+func exists(dir, filename string) error {
+ path := fmt.Sprintf("%s/%d/%s", dir, highestDir(dir), filename)
+ _, err := os.Stat(path)
+ return err
+}
+
+func detect(dir, filename string) string {
+ path := fmt.Sprintf("%s/%d/%s", dir, highestDir(dir), filename)
+ file, _ := ioutil.ReadFile(path)
+ kind, _ := filetype.Match(file)
+ return kind.Extension
+}
+func TestExtractActionTest_exe(t *testing.T) {
+ sys("_test/build.sh")
+ // cleanup
+ assert.Nil(t, os.RemoveAll("./action"))
+ // match exe
+ file, _ := ioutil.ReadFile("_test/exec")
+ extractAction(&file, false)
+ assert.Equal(t, detect("./action", "exec"), "elf")
+}
+
+func TestExtractActionTest_zip(t *testing.T) {
+ sys("_test/build.sh")
+ // cleanup
+ assert.Nil(t, os.RemoveAll("./action"))
+ // match exe
+ file, _ := ioutil.ReadFile("_test/exec.zip")
+ extractAction(&file, false)
+ assert.Equal(t, detect("./action", "exec"), "elf")
+ assert.Nil(t, exists("./action", "etc"))
+ assert.Nil(t, exists("./action", "dir/etc"))
+}
+
+func TestExtractAction_script(t *testing.T) {
+ buf := []byte("#!/bin/sh\necho ok")
+ assert.NotNil(t, extractAction(&buf, false))
+ assert.Nil(t, extractAction(&buf, true))
+}
+
+func TestHighestDir(t *testing.T) {
+ assert.Equal(t, highestDir("./_test"), 0)
+ assert.Equal(t, highestDir("./_test/first"), 3)
+ assert.Equal(t, highestDir("./_test/second"), 17)
+}
diff --git a/openwhisk/initHandler.go b/openwhisk/initHandler.go
new file mode 100644
index 0000000..59dce64
--- /dev/null
+++ b/openwhisk/initHandler.go
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package openwhisk
+
+import (
+ "encoding/base64"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+)
+
+type initRequest struct {
+ Value struct {
+ Code string `json:",omitempty"`
+ Binary bool `json:",omitempty"`
+ } `json:",omitempty"`
+}
+
+func sendOK(w http.ResponseWriter) {
+ // answer OK
+ w.Header().Set("Content-Type", "application/json")
+ w.Header().Set("Content-Length", "12")
+ w.Write([]byte("{\"ok\":true}\n"))
+ if f, ok := w.(http.Flusher); ok {
+ f.Flush()
+ }
+}
+
+func initHandler(w http.ResponseWriter, r *http.Request) {
+
+ // read body of the request
+ // log.Println("init: reading")
+ body, err := ioutil.ReadAll(r.Body)
+ defer r.Body.Close()
+ if err != nil {
+ sendError(w, http.StatusBadRequest, fmt.Sprintf("%v", err))
+ }
+
+ // decode request parameters
+ //log.Println("init: decoding")
+ var request initRequest
+ err = json.Unmarshal(body, &request)
+
+ if err != nil {
+ sendError(w, http.StatusBadRequest, fmt.Sprintf("Error unmarshaling request: %v", err))
+ return
+ }
+
+ if request.Value.Code == "" {
+ sendOK(w)
+ return
+ }
+
+ // check if it is a binary
+ if request.Value.Binary {
+ var decoded []byte
+ decoded, err = base64.StdEncoding.DecodeString(request.Value.Code)
+ if err != nil {
+ sendError(w, http.StatusBadRequest, "cannot decode the request: "+err.Error())
+ return
+ }
+ // extract the replacement, stopping and then starting the action
+ err = extractAction(&decoded, false)
+ } else {
+ buf := []byte(request.Value.Code)
+ err = extractAction(&buf, true)
+ }
+ if err != nil {
+ sendError(w, http.StatusBadRequest, "invalid action: "+err.Error())
+ return
+ }
+
+ // stop and start
+ err = StartLatestAction()
+ if err != nil {
+ sendError(w, http.StatusBadRequest, "cannot start action: "+err.Error())
+ return
+ }
+ sendOK(w)
+}
diff --git a/openwhisk/runHandler.go b/openwhisk/runHandler.go
new file mode 100644
index 0000000..027a9d8
--- /dev/null
+++ b/openwhisk/runHandler.go
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package openwhisk
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "net/http"
+ "strings"
+)
+
+// Params are the parameteres sent to the action
+type Params struct {
+ Value json.RawMessage `json:"value"`
+}
+
+// ErrResponse is the response when there are errors
+type ErrResponse struct {
+ Error string `json:"error"`
+}
+
+func sendError(w http.ResponseWriter, code int, cause string) {
+ errResponse := ErrResponse{Error: cause}
+ b, err := json.Marshal(errResponse)
+ if err != nil {
+ b = []byte("error marshalling error response")
+ fmt.Println(b, err)
+ }
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(code)
+ w.Write(b)
+ w.Write([]byte("\n"))
+}
+
+func runHandler(w http.ResponseWriter, r *http.Request) {
+
+ // parse the request
+ params := Params{}
+ body, err := ioutil.ReadAll(r.Body)
+ defer r.Body.Close()
+ if err != nil {
+ sendError(w, http.StatusBadRequest, fmt.Sprintf("Error reading request body: %v", err))
+ return
+ }
+
+ // decode request parameters
+ err = json.Unmarshal(body, ¶ms)
+ if err != nil {
+ sendError(w, http.StatusBadRequest, fmt.Sprintf("Error unmarshaling request: %v", err))
+ return
+ }
+
+ // check if you have an action
+ if theExecutor == nil {
+ sendError(w, http.StatusBadRequest, fmt.Sprintf("no action defined yet"))
+ return
+ }
+
+ // execute the action
+ // and check for early termination
+ theExecutor.io <- string(params.Value)
+ var response string
+ var exited bool
+ select {
+ case response = <-theExecutor.io:
+ exited = false
+ case err = <-theExecutor.exit:
+ exited = true
+ }
+
+ // check for early termination
+ if exited {
+ theExecutor = nil
+ sendError(w, http.StatusBadRequest, fmt.Sprintf("command exited"))
+ return
+ }
+
+ // flush the logs sending the activation message at the end
+ theExecutor.log <- true
+
+ // check response
+ if response == "" {
+ sendError(w, http.StatusBadRequest, fmt.Sprintf("%v", err))
+ return
+ }
+
+ // return the response
+ if !strings.HasSuffix(response, "\n") {
+ response = response + "\n"
+ }
+ log.Print(response)
+ w.Header().Set("Content-Type", "application/json")
+ numBytesWritten, err := w.Write([]byte(response))
+
+ // flush output
+ if f, ok := w.(http.Flusher); ok {
+ f.Flush()
+ }
+
+ // diagnostic when writing problems
+ if err != nil {
+ sendError(w, http.StatusInternalServerError, fmt.Sprintf("Error writing response: %v", err))
+ return
+ }
+ if numBytesWritten != len(response) {
+ sendError(w, http.StatusInternalServerError, fmt.Sprintf("Only wrote %d of %d bytes to response", numBytesWritten, len(response)))
+ return
+ }
+}
diff --git a/test/.gitignore b/test/.gitignore
new file mode 100644
index 0000000..4499418
--- /dev/null
+++ b/test/.gitignore
@@ -0,0 +1,4 @@
+action/
+*.json
+*.err
+
diff --git a/test/README.md b/test/README.md
new file mode 100644
index 0000000..3a601c9
--- /dev/null
+++ b/test/README.md
@@ -0,0 +1,9 @@
+# Test
+
+How to run tests:
+
+- ensure you are running them under linux !
+- install `cram`: `sudo pip install cram`
+- start the tester: `cd test ; ./start.sh`
+- in another terminal `cd test ; cram test.t`
+
diff --git a/test/bin/.gitignore b/test/bin/.gitignore
new file mode 100644
index 0000000..b374f99
--- /dev/null
+++ b/test/bin/.gitignore
@@ -0,0 +1,3 @@
+hello_*
+empty
+hi
diff --git a/test/bin/build.sh b/test/bin/build.sh
new file mode 100755
index 0000000..4ed5b0a
--- /dev/null
+++ b/test/bin/build.sh
@@ -0,0 +1,10 @@
+#!/bin/sh
+FILE=${1:?go file}
+OUT=$(basename $FILE)
+BIN=${OUT%%.go}
+ZIP=${BIN}.zip
+go build -i -o bin/$BIN $FILE
+GOOS=linux GOARCH=amd64 go build -o exec $FILE
+zip zip/$ZIP exec
+rm exec
+echo "built bin/$BIN zip/$ZIP"
diff --git a/test/bin/init.sh b/test/bin/init.sh
new file mode 100755
index 0000000..e700bb4
--- /dev/null
+++ b/test/bin/init.sh
@@ -0,0 +1,10 @@
+#!/bin/bash
+FILE=${1:?file}
+JSON=${2:-/tmp/json$$}
+if file -i $FILE | grep text/ >/dev/null
+then echo '{"value":{"main":"main","code":'$(cat $FILE | jq -R -s .)'}}' >/tmp/json$$
+else echo '{"value":{"binary":true,"code":"'$(base64 -w 0 $FILE)'"}}' >/tmp/json$$
+fi
+#cat $JSON | jq .
+curl -H "Content-Type: application/json" -XPOST -w "%{http_code}\n" http://localhost:${PORT:-8080}/init -d @$JSON 2>/dev/null
+rm /tmp/json$$ 2>/dev/null
diff --git a/test/bin/post.sh b/test/bin/post.sh
new file mode 100755
index 0000000..6255f39
--- /dev/null
+++ b/test/bin/post.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+FILE=${1:?file}
+curl -H "Content-Type: application/json" -XPOST -w "%{http_code}\n" http://localhost:${PORT:-8080}/init -d @$FILE 2>/dev/null
diff --git a/test/bin/run.sh b/test/bin/run.sh
new file mode 100755
index 0000000..0df821e
--- /dev/null
+++ b/test/bin/run.sh
@@ -0,0 +1,6 @@
+#!/bin/bash
+DEFAULT='{"name": "Mike"}'
+JSON=${1:-$DEFAULT}
+DATA='{"value":'$JSON'}'
+curl -H "Content-Type: application/json" -w "%{http_code}\n" -XPOST http://localhost:${PORT:-8080}/run -d "$DATA" 2>/dev/null
+
diff --git a/test/etc/hello.js b/test/etc/hello.js
new file mode 100644
index 0000000..a1bf768
--- /dev/null
+++ b/test/etc/hello.js
@@ -0,0 +1,5 @@
+function main(args) {
+ return {
+ "result": "Hello, "+args.name
+ }
+}
diff --git a/test/etc/hello.sh b/test/etc/hello.sh
new file mode 100755
index 0000000..9b7f318
--- /dev/null
+++ b/test/etc/hello.sh
@@ -0,0 +1,12 @@
+#!/bin/bash
+while read line
+do
+ name="$(echo $line | jq -r .name)"
+ if test "$name" == ""
+ then exit
+ fi
+ echo "name=$name"
+ hello="Hello, $name"
+ echo '{"hello":"'$hello'"}' >&3
+done
+
diff --git a/main/proxy.go b/test/src/empty.go
similarity index 92%
copy from main/proxy.go
copy to test/src/empty.go
index 8def703..35a2611 100644
--- a/main/proxy.go
+++ b/test/src/empty.go
@@ -14,12 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package main
-import (
- "fmt"
-)
+package main
-func main() {
- fmt.Println("Just a placeholder")
-}
+func main() {}
diff --git a/test/src/hello/hello.go b/test/src/hello/hello.go
new file mode 100644
index 0000000..b0592d0
--- /dev/null
+++ b/test/src/hello/hello.go
@@ -0,0 +1,30 @@
+package hello
+
+import (
+ "encoding/json"
+ "fmt"
+ "log"
+)
+
+// Hello receive an event in format
+// { "name": "Mike"}
+// and returns a greeting in format
+// { "greetings": "Hello, Mike"}
+func Hello(event json.RawMessage) (json.RawMessage, error) {
+ // input and output
+ var input struct {
+ Name string
+ }
+ var output struct {
+ Greetings string `json:"greetings"`
+ }
+ // read the input event
+ json.Unmarshal(event, &input)
+ if input.Name != "" {
+ // handle the event
+ output.Greetings = "Hello, " + input.Name
+ log.Println(output.Greetings)
+ return json.Marshal(output)
+ }
+ return nil, fmt.Errorf("no name specified")
+}
diff --git a/test/src/hello/hello_test.go b/test/src/hello/hello_test.go
new file mode 100644
index 0000000..0624cad
--- /dev/null
+++ b/test/src/hello/hello_test.go
@@ -0,0 +1,28 @@
+package hello
+
+import (
+ "fmt"
+)
+
+func ExampleHello() {
+ name := []byte(`{ "name": "Mike"}`)
+ data, _ := Hello(name)
+ fmt.Printf("%s", data)
+ // Output:
+ // {"greetings":"Hello, Mike"}
+}
+
+func ExampleHello_noName() {
+ name := []byte(`{ "noname": "Mike"}`)
+ _, err := Hello(name)
+ fmt.Print(err)
+ // Output:
+ // no name specified
+}
+func ExampleHello_badJson() {
+ name := []byte(`{{`)
+ _, err := Hello(name)
+ fmt.Print(err)
+ // Output:
+ // no name specified
+}
diff --git a/main/proxy.go b/test/src/hello_greeting.go
similarity index 78%
copy from main/proxy.go
copy to test/src/hello_greeting.go
index 8def703..f70bd2c 100644
--- a/main/proxy.go
+++ b/test/src/hello_greeting.go
@@ -17,9 +17,14 @@
package main
import (
- "fmt"
+ "log"
+ "os"
+
+ "github.com/apache/incubator-openwhisk-client-go/whisk"
+ "github.com/sciabarracom/incubator-openwhisk-runtime-go/test/src/hello"
)
func main() {
- fmt.Println("Just a placeholder")
+ log.SetPrefix("hello_greeting: ")
+ whisk.StartWithArgs(hello.Hello, os.Args[1:])
}
diff --git a/main/proxy.go b/test/src/hello_message.go
similarity index 63%
copy from main/proxy.go
copy to test/src/hello_message.go
index 8def703..c29170d 100644
--- a/main/proxy.go
+++ b/test/src/hello_message.go
@@ -17,9 +17,26 @@
package main
import (
- "fmt"
+ "encoding/json"
+ "log"
+ "os"
+
+ "github.com/apache/incubator-openwhisk-client-go/whisk"
)
+func sayHello(event json.RawMessage) (json.RawMessage, error) {
+ var obj map[string]interface{}
+ json.Unmarshal(event, &obj)
+ name, ok := obj["name"].(string)
+ if !ok {
+ name = "Stranger"
+ }
+ log.Printf("name=%s\n", name)
+ msg := map[string]string{"message": ("Hello, " + name + "!")}
+ return json.Marshal(msg)
+}
+
func main() {
- fmt.Println("Just a placeholder")
+ log.SetPrefix("hello_message: ")
+ whisk.StartWithArgs(sayHello, os.Args[1:])
}
diff --git a/main/proxy.go b/test/src/hi.go
similarity index 94%
copy from main/proxy.go
copy to test/src/hi.go
index 8def703..6a07be8 100644
--- a/main/proxy.go
+++ b/test/src/hi.go
@@ -14,12 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package main
-import (
- "fmt"
-)
+import "fmt"
func main() {
- fmt.Println("Just a placeholder")
+ fmt.Println("hi")
}
diff --git a/test/start.sh b/test/start.sh
new file mode 100755
index 0000000..5310d7f
--- /dev/null
+++ b/test/start.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+cd "$(dirname $0)"
+bin/build.sh src/hello_greeting.go
+bin/build.sh src/hello_message.go
+bin/build.sh src/empty.go
+bin/build.sh src/hi.go
+rm -Rvf action
+go run ../main/proxy.go -debug
diff --git a/test/test.t b/test/test.t
new file mode 100644
index 0000000..a85b86c
--- /dev/null
+++ b/test/test.t
@@ -0,0 +1,93 @@
+ $ export T=$TESTDIR
+
+ $ $T/bin/run.sh
+ {"error":"no action defined yet"}
+ 400
+
+ $ $T/bin/post.sh $T/etc/empty.json
+ {"ok":true}
+ 200
+
+ $ $T/bin/init.sh $T/test.t
+ {"error":"cannot start action: command exited"}
+ 400
+
+ $ $T/bin/init.sh $T/bin/empty
+ {"error":"cannot start action: command exited"}
+ 400
+
+ $ $T/bin/init.sh $T/bin/hi
+ {"error":"cannot start action: command exited"}
+ 400
+
+ $ $T/bin/run.sh
+ {"error":"no action defined yet"}
+ 400
+
+ $ $T/bin/init.sh $T/bin/hello_message
+ {"ok":true}
+ 200
+
+ $ $T/bin/run.sh
+ {"message":"Hello, Mike!"}
+ 200
+
+ $ $T/bin/init.sh $T/bin/hello_greeting
+ {"ok":true}
+ 200
+
+ $ $T/bin/run.sh
+ {"greetings":"Hello, Mike"}
+ 200
+
+ $ $T/bin/init.sh $T/zip/hello_message.zip
+ {"ok":true}
+ 200
+
+ $ $T/bin/run.sh
+ {"message":"Hello, Mike!"}
+ 200
+
+ $ $T/bin/init.sh $T/zip/hello_greeting.zip
+ {"ok":true}
+ 200
+
+ $ $T/bin/run.sh
+ {"greetings":"Hello, Mike"}
+ 200
+
+ $ $T/bin/init.sh $T/test.t
+ {"error":"cannot start action: command exited"}
+ 400
+
+ $ $T/bin/run.sh
+ {"greetings":"Hello, Mike"}
+ 200
+
+ $ $T/bin/init.sh $T/bin/empty
+ {"error":"cannot start action: command exited"}
+ 400
+
+ $ $T/bin/run.sh
+ {"greetings":"Hello, Mike"}
+ 200
+
+ $ $T/bin/init.sh $T/bin/hi
+ {"error":"cannot start action: command exited"}
+ 400
+
+ $ $T/bin/init.sh $T/etc/hello.sh
+ {"ok":true}
+ 200
+
+ $ $T/bin/run.sh
+ {"hello":"Hello, Mike"}
+ 200
+
+ $ $T/bin/run.sh '{"name": ""}'
+ {"error":"command exited"}
+ 400
+
+ $ $T/bin/run.sh
+ {"error":"no action defined yet"}
+ 400
\ No newline at end of file
diff --git a/test/zip/.gitignore b/test/zip/.gitignore
new file mode 100644
index 0000000..c4c4ffc
--- /dev/null
+++ b/test/zip/.gitignore
@@ -0,0 +1 @@
+*.zip
--
To stop receiving notification emails like this one, please contact
rabbah@apache.org.