You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by cc...@apache.org on 2017/03/31 03:22:31 UTC
[1/2] incubator-mynewt-newt git commit: unixchild additions
Repository: incubator-mynewt-newt
Updated Branches:
refs/heads/develop 9797af33e -> df674371e
unixchild additions
* User specifies socket accept timeout
* New error type indicating accept error
* Don't allow double start or stop
Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newt/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newt/commit/df674371
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newt/tree/df674371
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newt/diff/df674371
Branch: refs/heads/develop
Commit: df674371e341eeb3761c734aa596cc37b6c67ef9
Parents: aa7aa13
Author: Christopher Collins <cc...@apache.org>
Authored: Fri Mar 10 11:10:40 2017 -0800
Committer: Christopher Collins <cc...@apache.org>
Committed: Thu Mar 30 20:23:55 2017 -0700
----------------------------------------------------------------------
util/unixchild/unixchild.go | 160 +++++++++++++++++++++++++++++++--------
1 file changed, 128 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newt/blob/df674371/util/unixchild/unixchild.go
----------------------------------------------------------------------
diff --git a/util/unixchild/unixchild.go b/util/unixchild/unixchild.go
index bc311fc..b8be35d 100644
--- a/util/unixchild/unixchild.go
+++ b/util/unixchild/unixchild.go
@@ -22,7 +22,6 @@ package unixchild
import (
"bufio"
"encoding/binary"
- "errors"
"fmt"
"io"
"net"
@@ -35,38 +34,73 @@ import (
log "github.com/Sirupsen/logrus"
)
+type UcAcceptError struct {
+ Text string
+}
+
+func (err *UcAcceptError) Error() string {
+ return err.Text
+}
+
+func NewUcAcceptError(text string) *UcAcceptError {
+ return &UcAcceptError{
+ Text: text,
+ }
+}
+
+func IsUcAcceptError(err error) bool {
+ _, ok := err.(*UcAcceptError)
+ return ok
+}
+
type Config struct {
- SockPath string
- ChildPath string
- ChildArgs []string
- Depth int
- MaxMsgSz int
+ SockPath string
+ ChildPath string
+ ChildArgs []string
+ Depth int
+ MaxMsgSz int
+ AcceptTimeout time.Duration
+ Restart bool
}
+type clientState uint32
+
+const (
+ CLIENT_STATE_STOPPED clientState = iota
+ CLIENT_STATE_STARTING
+ CLIENT_STATE_STARTED
+ CLIENT_STATE_STOPPING
+)
+
type Client struct {
- FromChild chan []byte
- ToChild chan []byte
- ErrChild chan error
- childPath string
- sockPath string
- childArgs []string
- maxMsgSz int
- stopping bool
- stop chan bool
- stopped chan bool
+ FromChild chan []byte
+ ToChild chan []byte
+ ErrChild chan error
+ childPath string
+ sockPath string
+ childArgs []string
+ maxMsgSz int
+ acceptTimeout time.Duration
+ restart bool
+ stop chan bool
+ stopped chan bool
+ state clientState
+ stateMutex sync.Mutex
}
func New(conf Config) *Client {
c := &Client{
- childPath: conf.ChildPath,
- sockPath: conf.SockPath,
- childArgs: conf.ChildArgs,
- maxMsgSz: conf.MaxMsgSz,
- FromChild: make(chan []byte, conf.Depth),
- ToChild: make(chan []byte, conf.Depth),
- ErrChild: make(chan error),
- stop: make(chan bool),
- stopped: make(chan bool),
+ childPath: conf.ChildPath,
+ sockPath: conf.SockPath,
+ childArgs: conf.ChildArgs,
+ maxMsgSz: conf.MaxMsgSz,
+ FromChild: make(chan []byte, conf.Depth),
+ ToChild: make(chan []byte, conf.Depth),
+ ErrChild: make(chan error),
+ acceptTimeout: conf.AcceptTimeout,
+ restart: conf.Restart,
+ stop: make(chan bool),
+ stopped: make(chan bool),
}
if c.maxMsgSz == 0 {
@@ -76,6 +110,42 @@ func New(conf Config) *Client {
return c
}
+func (c *Client) getState() clientState {
+ c.stateMutex.Lock()
+ defer c.stateMutex.Unlock()
+
+ return c.state
+}
+
+func (c *Client) setState(toState clientState) {
+ c.stateMutex.Lock()
+ defer c.stateMutex.Unlock()
+
+ c.state = toState
+}
+
+func (c *Client) setStateIf(toState clientState,
+ pred func(st clientState) bool) (bool, clientState) {
+
+ c.stateMutex.Lock()
+ defer c.stateMutex.Unlock()
+
+ if pred(c.state) {
+ c.state = toState
+ return true, toState
+ }
+
+ return false, c.state
+
+}
+
+func (c *Client) setStateFrom(fromState clientState,
+ toState clientState) (bool, clientState) {
+
+ return c.setStateIf(toState,
+ func(st clientState) bool { return st == fromState })
+}
+
func (c *Client) startChild() (*exec.Cmd, error) {
subProcess := exec.Command(c.childPath, c.childArgs...)
@@ -179,10 +249,14 @@ func (c *Client) handleChild(con net.Conn) {
}
func (c *Client) Stop() {
- if c.stopping {
+ ok, _ := c.setStateIf(CLIENT_STATE_STOPPING,
+ func(st clientState) bool {
+ return st != CLIENT_STATE_STOPPING
+ })
+ if !ok {
return
}
- c.stopping = true
+
log.Debugf("Stopping client")
c.stop <- true
@@ -194,10 +268,24 @@ func (c *Client) Stop() {
}
}
+func (c *Client) acceptDeadline() *time.Time {
+ if c.acceptTimeout == 0 {
+ return nil
+ }
+
+ t := time.Now().Add(c.acceptTimeout)
+ return &t
+}
+
func (c *Client) Start() error {
+ ok, state := c.setStateFrom(CLIENT_STATE_STOPPED, CLIENT_STATE_STARTING)
+ if !ok {
+ return fmt.Errorf("client in invalid state for stating: %d", state)
+ }
l, err := net.Listen("unix", c.sockPath)
if err != nil {
+ c.setState(CLIENT_STATE_STOPPED)
return err
}
@@ -211,16 +299,21 @@ func (c *Client) Start() error {
log.Debugf("unixchild start error: %s", err.Error())
c.ErrChild <- fmt.Errorf("Child start error: %s", err.Error())
} else {
+ if t := c.acceptDeadline(); t != nil {
+ l.(*net.UnixListener).SetDeadline(*t)
+ }
fd, err := l.Accept()
if err != nil {
- log.Debugf("unixchild accept error: %s", err.Error())
+ text := fmt.Sprintf("unixchild accept error: %s",
+ err.Error())
+ c.ErrChild <- NewUcAcceptError(text)
} else {
+ c.setState(CLIENT_STATE_STARTED)
c.handleChild(fd)
+ c.ErrChild <- fmt.Errorf("Child exited")
}
- cmd.Process.Kill()
- c.ErrChild <- errors.New("Child exited")
}
- if c.stopping {
+ if c.getState() == CLIENT_STATE_STOPPING {
log.Debugf("unixchild exit loop")
return
}
@@ -231,10 +324,13 @@ func (c *Client) Start() error {
go func() {
select {
case <-c.stop:
- l.Close()
+ if c.getState() == CLIENT_STATE_STARTED {
+ l.Close()
+ }
if cmd != nil {
cmd.Process.Kill()
}
+ log.Debugf("deleting socket")
os.Remove(c.sockPath)
c.stopped <- true
}
[2/2] incubator-mynewt-newt git commit: unixchild util library
Posted by cc...@apache.org.
unixchild util library
Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newt/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newt/commit/aa7aa137
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newt/tree/aa7aa137
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newt/diff/aa7aa137
Branch: refs/heads/develop
Commit: aa7aa137139fd41b2c0cd22220c802314e21889b
Parents: 9797af3
Author: Christopher Collins <cc...@apache.org>
Authored: Fri Mar 10 18:09:13 2017 -0800
Committer: Christopher Collins <cc...@apache.org>
Committed: Thu Mar 30 20:23:55 2017 -0700
----------------------------------------------------------------------
util/unixchild/unixchild.go | 244 +++++++++++++++++++++++++++++++++++++++
1 file changed, 244 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newt/blob/aa7aa137/util/unixchild/unixchild.go
----------------------------------------------------------------------
diff --git a/util/unixchild/unixchild.go b/util/unixchild/unixchild.go
new file mode 100644
index 0000000..bc311fc
--- /dev/null
+++ b/util/unixchild/unixchild.go
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package unixchild
+
+import (
+ "bufio"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "os"
+ "os/exec"
+ "strings"
+ "sync"
+ "time"
+
+ log "github.com/Sirupsen/logrus"
+)
+
+type Config struct {
+ SockPath string
+ ChildPath string
+ ChildArgs []string
+ Depth int
+ MaxMsgSz int
+}
+
+type Client struct {
+ FromChild chan []byte
+ ToChild chan []byte
+ ErrChild chan error
+ childPath string
+ sockPath string
+ childArgs []string
+ maxMsgSz int
+ stopping bool
+ stop chan bool
+ stopped chan bool
+}
+
+func New(conf Config) *Client {
+ c := &Client{
+ childPath: conf.ChildPath,
+ sockPath: conf.SockPath,
+ childArgs: conf.ChildArgs,
+ maxMsgSz: conf.MaxMsgSz,
+ FromChild: make(chan []byte, conf.Depth),
+ ToChild: make(chan []byte, conf.Depth),
+ ErrChild: make(chan error),
+ stop: make(chan bool),
+ stopped: make(chan bool),
+ }
+
+ if c.maxMsgSz == 0 {
+ c.maxMsgSz = 1024
+ }
+
+ return c
+}
+
+func (c *Client) startChild() (*exec.Cmd, error) {
+ subProcess := exec.Command(c.childPath, c.childArgs...)
+
+ stdin, err := subProcess.StdinPipe()
+ if err != nil {
+ return nil, err
+ }
+ stdin.Close()
+
+ stdout, _ := subProcess.StdoutPipe()
+ stderr, _ := subProcess.StderrPipe()
+
+ if err = subProcess.Start(); err != nil {
+ return nil, err
+ }
+
+ go func() {
+ br := bufio.NewReader(stdout)
+ for {
+ s, err := br.ReadString('\n')
+ if err != nil {
+ return
+ }
+ log.Debugf("child stdout: %s", strings.TrimSuffix(s, "\n"))
+ }
+ }()
+
+ go func() {
+ br := bufio.NewReader(stderr)
+ for {
+ s, err := br.ReadString('\n')
+ if err != nil {
+ return
+ }
+ log.Debugf("child stderr: %s", strings.TrimSuffix(s, "\n"))
+ }
+ }()
+
+ go subProcess.Wait() // reap dead children
+
+ return subProcess, nil
+}
+
+func (c *Client) handleChild(con net.Conn) {
+ var wg sync.WaitGroup
+
+ bail := make(chan bool)
+
+ fromDataPump := func() {
+ defer wg.Done()
+ for {
+ var mlen uint16
+
+ err := binary.Read(con, binary.BigEndian, &mlen)
+ if err != nil {
+ log.Debugln("fromDataPump error: ", err)
+ bail <- true
+ return
+ }
+
+ buf := make([]byte, mlen)
+ _, err = io.ReadFull(con, buf)
+ if err != nil {
+ log.Debugln("fromDataPump error: ", err)
+ bail <- true
+ return
+ }
+
+ c.FromChild <- buf
+ }
+ }
+
+ toDataPump := func() {
+ defer wg.Done()
+ for {
+ select {
+ case buf := <-c.ToChild:
+ mlen := uint16(len(buf))
+ err := binary.Write(con, binary.BigEndian, mlen)
+ if err != nil {
+ log.Debugln("toDataPump error: ", err)
+ return
+ }
+ _, err = con.Write(buf)
+ if err != nil {
+ log.Debugln("toDataPump error: ", err)
+ return
+ }
+ case <-bail:
+ log.Debugln("toDataPump bail")
+ return
+ }
+ }
+ }
+
+ wg.Add(1)
+ go fromDataPump()
+ wg.Add(1)
+ go toDataPump()
+ wg.Wait()
+}
+
+func (c *Client) Stop() {
+ if c.stopping {
+ return
+ }
+ c.stopping = true
+ log.Debugf("Stopping client")
+
+ c.stop <- true
+
+ select {
+ case <-c.stopped:
+ log.Debugf("Stopped client")
+ return
+ }
+}
+
+func (c *Client) Start() error {
+
+ l, err := net.Listen("unix", c.sockPath)
+ if err != nil {
+ return err
+ }
+
+ var cmd *exec.Cmd
+
+ go func() {
+ for {
+ var err error
+ cmd, err = c.startChild()
+ if err != nil {
+ log.Debugf("unixchild start error: %s", err.Error())
+ c.ErrChild <- fmt.Errorf("Child start error: %s", err.Error())
+ } else {
+ fd, err := l.Accept()
+ if err != nil {
+ log.Debugf("unixchild accept error: %s", err.Error())
+ } else {
+ c.handleChild(fd)
+ }
+ cmd.Process.Kill()
+ c.ErrChild <- errors.New("Child exited")
+ }
+ if c.stopping {
+ log.Debugf("unixchild exit loop")
+ return
+ }
+ time.Sleep(time.Second)
+ }
+ }()
+
+ go func() {
+ select {
+ case <-c.stop:
+ l.Close()
+ if cmd != nil {
+ cmd.Process.Kill()
+ }
+ os.Remove(c.sockPath)
+ c.stopped <- true
+ }
+ }()
+
+ return nil
+}