You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by GitBox <gi...@apache.org> on 2020/05/19 21:16:36 UTC

[GitHub] [mynewt-newtmgr] ccollins476ad commented on a change in pull request #162: Optimize DFU

ccollins476ad commented on a change in pull request #162:
URL: https://github.com/apache/mynewt-newtmgr/pull/162#discussion_r427538275



##########
File path: nmxact/xact/image.go
##########
@@ -185,33 +207,163 @@ func nextImageUploadReq(s sesn.Sesn, upgrade bool, data []byte, off int, imageNu
 	return r, nil
 }
 
+func (t *ImageUploadIntTracker) UpdateTracker(t_off int, r_off int, status int) {
+	if status == IMAGE_UPLOAD_STATUS_MISSED {
+		t.RspMap[t_off] = IMAGE_UPLOAD_CHUNK_MISSED_WM
+		return
+	}
+
+	if _, ok := t.RspMap[t_off]; ok {
+		// send chunk@offset, increase count
+		t.RspMap[t_off] += 1
+	} else {
+		// send chunk@offset for the first time
+		t.RspMap[t_off] = 1
+	}
+
+	if _, ok := t.RspMap[r_off]; ok {
+		// chunk@offset requested, in transit
+		t.RspMap[r_off] -= 1
+	} else {
+		// chunk@offset was not sent
+		t.RspMap[r_off] = -1
+	}
+
+}
+
+func (t *ImageUploadIntTracker) CheckWindow() bool {
+	t.Mutex.Lock()
+	defer t.Mutex.Unlock()
+
+	return t.CheckWindowUL()
+}
+
+// Unlocked version, when the mutex is already held
+func (t *ImageUploadIntTracker) CheckWindowUL() bool {
+	return t.WCount < t.WCap
+}
+
+func (t *ImageUploadIntTracker) process_missed_chunks() {
+	for o, c := range t.RspMap {
+		if c < -(IMAGE_UPLOAD_MAX_WS * 2) {
+			delete(t.RspMap, o)
+			t.Off = o
+			log.Debugf("missed? off %d count %d", o, c)
+		}
+		// clean up done chunks
+		if c == 0 {
+			delete(t.RspMap, o)
+		}
+	}
+}
+
+func (t *ImageUploadIntTracker) HandleResponse(c *ImageUploadCmd, off int, rsp nmp.NmpRsp, res *ImageUploadResult) {
+	irsp := rsp.(*nmp.ImageUploadRsp)
+	res.Rsps = append(res.Rsps, irsp)
+	t.UpdateTracker(off, int(irsp.Off), 0)
+	if t.MaxRxOff < int64(irsp.Off) {
+		t.MaxRxOff = int64(irsp.Off)
+	}
+	if c.ProgressCb != nil {
+		c.ProgressCb(c, irsp)
+	}
+	if t.TuneWS && t.WCap < IMAGE_UPLOAD_MAX_WS {
+		atomic.AddInt64(&t.WCap, 1)
+	}
+	atomic.AddInt64(&t.WCount, -1)
+}
+
+func (t *ImageUploadIntTracker) HandleError(off int, err error) {
+	/*XXX: there could be an Unauthorize or EOF error  when the rate is too high
+	  due to a large window example:
+	  "failed to decrypt message: coap_sec_tunnel: decode GCM fail EOF"
+	  Since the error is sent with fmt.Errorf() API, with no code,
+	  the string may have to be parsed to know the particular error */
+	if t.WCount > IMAGE_UPLOAD_START_WS+1 {
+		atomic.AddInt64(&t.WCap, -1)
+	}
+	t.TuneWS = false
+	atomic.AddInt64(&t.WCount, -1)
+	t.UpdateTracker(off, -1, IMAGE_UPLOAD_STATUS_MISSED)
+}
+
 func (c *ImageUploadCmd) Run(s sesn.Sesn) (Result, error) {
 	res := newImageUploadResult()
+	done := make(chan int)
+	rsp_c := make(chan nmp.NmpRsp, IMAGE_UPLOAD_MAX_WS)
+	err_c := make(chan error, IMAGE_UPLOAD_MAX_WS)
 
-	for off := c.StartOff; off < len(c.Data); {
-		r, err := nextImageUploadReq(s, c.Upgrade, c.Data, off, c.ImageNum)
-		if err != nil {
-			return nil, err
+	t := ImageUploadIntTracker{
+		TuneWS:   true,
+		WCount:   0,
+		WCap:     IMAGE_UPLOAD_START_WS,
+		Off:      c.StartOff,
+		RspMap:   make(map[int]int),
+		MaxRxOff: 0,
+	}
+
+	var wg sync.WaitGroup
+
+	for int(atomic.LoadInt64(&t.MaxRxOff)) < len(c.Data) {
+		// Block if window is full
+		for !t.CheckWindow() {
 		}
 
-		rsp, err := txReq(s, r.Msg(), &c.CmdBase)
-		if err != nil {
-			return nil, err
+		t.Mutex.Lock()
+
+		t.process_missed_chunks()
+
+		if t.Off == len(c.Data) {
+			t.Mutex.Unlock()
+			break
 		}

Review comment:
       If I am reading correctly, this condition will break out of the loop early.  `t.Off` is increased whenever we send a request.  So we break out as soon as we send the final chunk which is not quite right.  We don't want to break until we receive the final response.

##########
File path: nmxact/xact/image.go
##########
@@ -185,33 +207,163 @@ func nextImageUploadReq(s sesn.Sesn, upgrade bool, data []byte, off int, imageNu
 	return r, nil
 }
 
+func (t *ImageUploadIntTracker) UpdateTracker(t_off int, r_off int, status int) {
+	if status == IMAGE_UPLOAD_STATUS_MISSED {
+		t.RspMap[t_off] = IMAGE_UPLOAD_CHUNK_MISSED_WM
+		return
+	}
+
+	if _, ok := t.RspMap[t_off]; ok {
+		// send chunk@offset, increase count
+		t.RspMap[t_off] += 1
+	} else {
+		// send chunk@offset for the first time
+		t.RspMap[t_off] = 1
+	}
+
+	if _, ok := t.RspMap[r_off]; ok {
+		// chunk@offset requested, in transit
+		t.RspMap[r_off] -= 1
+	} else {
+		// chunk@offset was not sent
+		t.RspMap[r_off] = -1
+	}
+
+}
+
+func (t *ImageUploadIntTracker) CheckWindow() bool {
+	t.Mutex.Lock()
+	defer t.Mutex.Unlock()
+
+	return t.CheckWindowUL()
+}
+
+// Unlocked version, when the mutex is already held
+func (t *ImageUploadIntTracker) CheckWindowUL() bool {
+	return t.WCount < t.WCap
+}
+
+func (t *ImageUploadIntTracker) process_missed_chunks() {
+	for o, c := range t.RspMap {
+		if c < -(IMAGE_UPLOAD_MAX_WS * 2) {
+			delete(t.RspMap, o)
+			t.Off = o
+			log.Debugf("missed? off %d count %d", o, c)
+		}
+		// clean up done chunks
+		if c == 0 {
+			delete(t.RspMap, o)
+		}
+	}
+}
+
+func (t *ImageUploadIntTracker) HandleResponse(c *ImageUploadCmd, off int, rsp nmp.NmpRsp, res *ImageUploadResult) {
+	irsp := rsp.(*nmp.ImageUploadRsp)
+	res.Rsps = append(res.Rsps, irsp)
+	t.UpdateTracker(off, int(irsp.Off), 0)
+	if t.MaxRxOff < int64(irsp.Off) {
+		t.MaxRxOff = int64(irsp.Off)
+	}
+	if c.ProgressCb != nil {
+		c.ProgressCb(c, irsp)
+	}
+	if t.TuneWS && t.WCap < IMAGE_UPLOAD_MAX_WS {
+		atomic.AddInt64(&t.WCap, 1)
+	}
+	atomic.AddInt64(&t.WCount, -1)
+}
+
+func (t *ImageUploadIntTracker) HandleError(off int, err error) {
+	/*XXX: there could be an Unauthorize or EOF error  when the rate is too high
+	  due to a large window example:
+	  "failed to decrypt message: coap_sec_tunnel: decode GCM fail EOF"
+	  Since the error is sent with fmt.Errorf() API, with no code,
+	  the string may have to be parsed to know the particular error */
+	if t.WCount > IMAGE_UPLOAD_START_WS+1 {
+		atomic.AddInt64(&t.WCap, -1)
+	}
+	t.TuneWS = false
+	atomic.AddInt64(&t.WCount, -1)
+	t.UpdateTracker(off, -1, IMAGE_UPLOAD_STATUS_MISSED)
+}
+
 func (c *ImageUploadCmd) Run(s sesn.Sesn) (Result, error) {
 	res := newImageUploadResult()
+	done := make(chan int)
+	rsp_c := make(chan nmp.NmpRsp, IMAGE_UPLOAD_MAX_WS)
+	err_c := make(chan error, IMAGE_UPLOAD_MAX_WS)
 
-	for off := c.StartOff; off < len(c.Data); {
-		r, err := nextImageUploadReq(s, c.Upgrade, c.Data, off, c.ImageNum)
-		if err != nil {
-			return nil, err
+	t := ImageUploadIntTracker{
+		TuneWS:   true,
+		WCount:   0,
+		WCap:     IMAGE_UPLOAD_START_WS,
+		Off:      c.StartOff,
+		RspMap:   make(map[int]int),
+		MaxRxOff: 0,
+	}
+
+	var wg sync.WaitGroup
+
+	for int(atomic.LoadInt64(&t.MaxRxOff)) < len(c.Data) {
+		// Block if window is full
+		for !t.CheckWindow() {
 		}

Review comment:
       This is a bit anti-social :).  newtmgr will use 100% CPU while waiting for the device to respond.  It would be better to block on a channel rather than poll in a loop.

##########
File path: nmxact/xact/image.go
##########
@@ -185,33 +207,163 @@ func nextImageUploadReq(s sesn.Sesn, upgrade bool, data []byte, off int, imageNu
 	return r, nil
 }
 
+func (t *ImageUploadIntTracker) UpdateTracker(t_off int, r_off int, status int) {

Review comment:
       I still don't think this is quite right, but I'll admit I don't totally understand what the counts in `RspMap` represent.
   
   I think the below is a typical scenario (assume a window size of 3, request size of 100).  We start by filling the window with three requests:
   1. Tx req0, start go0 to listen for response (pass `off=0` to routine).
   2. Tx req1, start go1 to listen for response (pass `off=100` to routine).
   3. Tx req2, start go2 to listen for response (pass `off=200` to routine).
   
   Now we have three go routines running, all listening on the same two channels (`err_c` and `rsp_c`).
   
   We receive the first response:
   
   4. Rx rsp0
   
   The go routines are all listening on the same channel at the same time.  It is indeterminate which one actually processes the response.  If we get lucky and go0 gets the response, the code does the right thing: it passes `t_off=0,r_off=0` to `UpdateTracker`.
   
   If go1 receives the response, the code does the wrong thing (as I understand it).  It passes `t_off=100,r_off=0` to `t.HandleResponse`.  The response doesn't correspond to the request.  Now `t.RspMap` contains:
   ```
       [0]: -1
       [100]: 1
   ```
   
   Is that OK?  Like I said, I don't totally understand how `RspMap` gets used so maybe that is not a problem (and maybe I just need to spend some more time understanding it).  It just looked a little off.
   
   If `RspMap` needs to track each request/response pair accurately, why not increment an element at tx time and decrement at rx time.  Right now the code does both at rx time.

##########
File path: nmxact/xact/image.go
##########
@@ -185,33 +207,163 @@ func nextImageUploadReq(s sesn.Sesn, upgrade bool, data []byte, off int, imageNu
 	return r, nil
 }
 
+func (t *ImageUploadIntTracker) UpdateTracker(t_off int, r_off int, status int) {
+	if status == IMAGE_UPLOAD_STATUS_MISSED {
+		t.RspMap[t_off] = IMAGE_UPLOAD_CHUNK_MISSED_WM
+		return
+	}
+
+	if _, ok := t.RspMap[t_off]; ok {
+		// send chunk@offset, increase count
+		t.RspMap[t_off] += 1
+	} else {
+		// send chunk@offset for the first time
+		t.RspMap[t_off] = 1
+	}
+
+	if _, ok := t.RspMap[r_off]; ok {
+		// chunk@offset requested, in transit
+		t.RspMap[r_off] -= 1
+	} else {
+		// chunk@offset was not sent
+		t.RspMap[r_off] = -1
+	}
+
+}
+
+func (t *ImageUploadIntTracker) CheckWindow() bool {
+	t.Mutex.Lock()
+	defer t.Mutex.Unlock()
+
+	return t.CheckWindowUL()
+}
+
+// Unlocked version, when the mutex is already held
+func (t *ImageUploadIntTracker) CheckWindowUL() bool {
+	return t.WCount < t.WCap
+}
+
+func (t *ImageUploadIntTracker) process_missed_chunks() {
+	for o, c := range t.RspMap {
+		if c < -(IMAGE_UPLOAD_MAX_WS * 2) {
+			delete(t.RspMap, o)
+			t.Off = o
+			log.Debugf("missed? off %d count %d", o, c)
+		}
+		// clean up done chunks
+		if c == 0 {
+			delete(t.RspMap, o)
+		}
+	}
+}
+
+func (t *ImageUploadIntTracker) HandleResponse(c *ImageUploadCmd, off int, rsp nmp.NmpRsp, res *ImageUploadResult) {
+	irsp := rsp.(*nmp.ImageUploadRsp)
+	res.Rsps = append(res.Rsps, irsp)
+	t.UpdateTracker(off, int(irsp.Off), 0)
+	if t.MaxRxOff < int64(irsp.Off) {
+		t.MaxRxOff = int64(irsp.Off)
+	}
+	if c.ProgressCb != nil {
+		c.ProgressCb(c, irsp)
+	}
+	if t.TuneWS && t.WCap < IMAGE_UPLOAD_MAX_WS {
+		atomic.AddInt64(&t.WCap, 1)
+	}
+	atomic.AddInt64(&t.WCount, -1)
+}
+
+func (t *ImageUploadIntTracker) HandleError(off int, err error) {
+	/*XXX: there could be an Unauthorize or EOF error  when the rate is too high
+	  due to a large window example:
+	  "failed to decrypt message: coap_sec_tunnel: decode GCM fail EOF"
+	  Since the error is sent with fmt.Errorf() API, with no code,
+	  the string may have to be parsed to know the particular error */
+	if t.WCount > IMAGE_UPLOAD_START_WS+1 {
+		atomic.AddInt64(&t.WCap, -1)
+	}
+	t.TuneWS = false
+	atomic.AddInt64(&t.WCount, -1)
+	t.UpdateTracker(off, -1, IMAGE_UPLOAD_STATUS_MISSED)
+}
+
 func (c *ImageUploadCmd) Run(s sesn.Sesn) (Result, error) {
 	res := newImageUploadResult()
+	done := make(chan int)
+	rsp_c := make(chan nmp.NmpRsp, IMAGE_UPLOAD_MAX_WS)
+	err_c := make(chan error, IMAGE_UPLOAD_MAX_WS)
 
-	for off := c.StartOff; off < len(c.Data); {
-		r, err := nextImageUploadReq(s, c.Upgrade, c.Data, off, c.ImageNum)
-		if err != nil {
-			return nil, err
+	t := ImageUploadIntTracker{
+		TuneWS:   true,
+		WCount:   0,
+		WCap:     IMAGE_UPLOAD_START_WS,
+		Off:      c.StartOff,
+		RspMap:   make(map[int]int),
+		MaxRxOff: 0,
+	}
+
+	var wg sync.WaitGroup
+
+	for int(atomic.LoadInt64(&t.MaxRxOff)) < len(c.Data) {

Review comment:
       Is this condition necessary?  The code breaks out of the loop if `t.MaxRxOff >= len(c.Data`) down below, so it seems this could just be turned into an infinite loop.

##########
File path: nmxact/xact/image.go
##########
@@ -185,33 +207,163 @@ func nextImageUploadReq(s sesn.Sesn, upgrade bool, data []byte, off int, imageNu
 	return r, nil
 }
 
+func (t *ImageUploadIntTracker) UpdateTracker(t_off int, r_off int, status int) {
+	if status == IMAGE_UPLOAD_STATUS_MISSED {
+		t.RspMap[t_off] = IMAGE_UPLOAD_CHUNK_MISSED_WM
+		return
+	}
+
+	if _, ok := t.RspMap[t_off]; ok {
+		// send chunk@offset, increase count
+		t.RspMap[t_off] += 1
+	} else {
+		// send chunk@offset for the first time
+		t.RspMap[t_off] = 1
+	}
+
+	if _, ok := t.RspMap[r_off]; ok {
+		// chunk@offset requested, in transit
+		t.RspMap[r_off] -= 1
+	} else {
+		// chunk@offset was not sent
+		t.RspMap[r_off] = -1
+	}
+
+}
+
+func (t *ImageUploadIntTracker) CheckWindow() bool {
+	t.Mutex.Lock()
+	defer t.Mutex.Unlock()
+
+	return t.CheckWindowUL()
+}
+
+// Unlocked version, when the mutex is already held
+func (t *ImageUploadIntTracker) CheckWindowUL() bool {
+	return t.WCount < t.WCap
+}
+
+func (t *ImageUploadIntTracker) process_missed_chunks() {
+	for o, c := range t.RspMap {
+		if c < -(IMAGE_UPLOAD_MAX_WS * 2) {
+			delete(t.RspMap, o)
+			t.Off = o
+			log.Debugf("missed? off %d count %d", o, c)
+		}
+		// clean up done chunks
+		if c == 0 {
+			delete(t.RspMap, o)
+		}
+	}
+}
+
+func (t *ImageUploadIntTracker) HandleResponse(c *ImageUploadCmd, off int, rsp nmp.NmpRsp, res *ImageUploadResult) {
+	irsp := rsp.(*nmp.ImageUploadRsp)
+	res.Rsps = append(res.Rsps, irsp)
+	t.UpdateTracker(off, int(irsp.Off), 0)
+	if t.MaxRxOff < int64(irsp.Off) {
+		t.MaxRxOff = int64(irsp.Off)
+	}
+	if c.ProgressCb != nil {
+		c.ProgressCb(c, irsp)
+	}
+	if t.TuneWS && t.WCap < IMAGE_UPLOAD_MAX_WS {
+		atomic.AddInt64(&t.WCap, 1)
+	}
+	atomic.AddInt64(&t.WCount, -1)
+}
+
+func (t *ImageUploadIntTracker) HandleError(off int, err error) {
+	/*XXX: there could be an Unauthorize or EOF error  when the rate is too high
+	  due to a large window example:
+	  "failed to decrypt message: coap_sec_tunnel: decode GCM fail EOF"
+	  Since the error is sent with fmt.Errorf() API, with no code,
+	  the string may have to be parsed to know the particular error */
+	if t.WCount > IMAGE_UPLOAD_START_WS+1 {
+		atomic.AddInt64(&t.WCap, -1)
+	}
+	t.TuneWS = false
+	atomic.AddInt64(&t.WCount, -1)
+	t.UpdateTracker(off, -1, IMAGE_UPLOAD_STATUS_MISSED)
+}
+
 func (c *ImageUploadCmd) Run(s sesn.Sesn) (Result, error) {
 	res := newImageUploadResult()
+	done := make(chan int)
+	rsp_c := make(chan nmp.NmpRsp, IMAGE_UPLOAD_MAX_WS)
+	err_c := make(chan error, IMAGE_UPLOAD_MAX_WS)
 
-	for off := c.StartOff; off < len(c.Data); {
-		r, err := nextImageUploadReq(s, c.Upgrade, c.Data, off, c.ImageNum)
-		if err != nil {
-			return nil, err
+	t := ImageUploadIntTracker{
+		TuneWS:   true,
+		WCount:   0,
+		WCap:     IMAGE_UPLOAD_START_WS,
+		Off:      c.StartOff,
+		RspMap:   make(map[int]int),
+		MaxRxOff: 0,
+	}
+
+	var wg sync.WaitGroup
+
+	for int(atomic.LoadInt64(&t.MaxRxOff)) < len(c.Data) {
+		// Block if window is full
+		for !t.CheckWindow() {
 		}
 
-		rsp, err := txReq(s, r.Msg(), &c.CmdBase)
-		if err != nil {
-			return nil, err
+		t.Mutex.Lock()
+
+		t.process_missed_chunks()
+
+		if t.Off == len(c.Data) {
+			t.Mutex.Unlock()
+			break
 		}
-		irsp := rsp.(*nmp.ImageUploadRsp)
 
-		off = int(irsp.Off)
+		if int(t.MaxRxOff) == len(c.Data) {
+			t.Mutex.Unlock()
+			break
+		}
 
-		if c.ProgressCb != nil {
-			c.ProgressCb(c, irsp)
+		r, err := nextImageUploadReq(s, c.Upgrade, c.Data, t.Off, c.ImageNum)
+		t.Mutex.Unlock()
+		if err != nil {
+			return nil, err
 		}
 
-		res.Rsps = append(res.Rsps, irsp)
-		if irsp.Rc != 0 {
+		t.Off = (int(r.Off) + len(r.Data))
+
+		// Use up a chunk in window
+		atomic.AddInt64(&t.WCount, 1)
+		err = txReq_async(s, r.Msg(), &c.CmdBase, rsp_c, err_c)
+		if err != nil {
+			log.Debugf("err txReq_async %v", err)
 			break
 		}
+
+		wg.Add(1)

Review comment:
       Is this WaitGroup necessary?  It seems like the `done` channel does everything you need.

##########
File path: nmxact/xact/image.go
##########
@@ -185,33 +207,163 @@ func nextImageUploadReq(s sesn.Sesn, upgrade bool, data []byte, off int, imageNu
 	return r, nil
 }
 
+func (t *ImageUploadIntTracker) UpdateTracker(t_off int, r_off int, status int) {
+	if status == IMAGE_UPLOAD_STATUS_MISSED {
+		t.RspMap[t_off] = IMAGE_UPLOAD_CHUNK_MISSED_WM
+		return
+	}
+
+	if _, ok := t.RspMap[t_off]; ok {
+		// send chunk@offset, increase count
+		t.RspMap[t_off] += 1
+	} else {
+		// send chunk@offset for the first time
+		t.RspMap[t_off] = 1
+	}
+
+	if _, ok := t.RspMap[r_off]; ok {
+		// chunk@offset requested, in transit
+		t.RspMap[r_off] -= 1
+	} else {
+		// chunk@offset was not sent
+		t.RspMap[r_off] = -1
+	}
+
+}
+
+func (t *ImageUploadIntTracker) CheckWindow() bool {
+	t.Mutex.Lock()
+	defer t.Mutex.Unlock()
+
+	return t.CheckWindowUL()
+}
+
+// Unlocked version, when the mutex is already held
+func (t *ImageUploadIntTracker) CheckWindowUL() bool {
+	return t.WCount < t.WCap
+}
+
+func (t *ImageUploadIntTracker) process_missed_chunks() {
+	for o, c := range t.RspMap {
+		if c < -(IMAGE_UPLOAD_MAX_WS * 2) {
+			delete(t.RspMap, o)
+			t.Off = o
+			log.Debugf("missed? off %d count %d", o, c)
+		}
+		// clean up done chunks
+		if c == 0 {
+			delete(t.RspMap, o)
+		}
+	}
+}
+
+func (t *ImageUploadIntTracker) HandleResponse(c *ImageUploadCmd, off int, rsp nmp.NmpRsp, res *ImageUploadResult) {
+	irsp := rsp.(*nmp.ImageUploadRsp)
+	res.Rsps = append(res.Rsps, irsp)
+	t.UpdateTracker(off, int(irsp.Off), 0)
+	if t.MaxRxOff < int64(irsp.Off) {
+		t.MaxRxOff = int64(irsp.Off)
+	}
+	if c.ProgressCb != nil {
+		c.ProgressCb(c, irsp)
+	}
+	if t.TuneWS && t.WCap < IMAGE_UPLOAD_MAX_WS {
+		atomic.AddInt64(&t.WCap, 1)
+	}
+	atomic.AddInt64(&t.WCount, -1)
+}
+
+func (t *ImageUploadIntTracker) HandleError(off int, err error) {
+	/*XXX: there could be an Unauthorize or EOF error  when the rate is too high
+	  due to a large window example:
+	  "failed to decrypt message: coap_sec_tunnel: decode GCM fail EOF"
+	  Since the error is sent with fmt.Errorf() API, with no code,
+	  the string may have to be parsed to know the particular error */
+	if t.WCount > IMAGE_UPLOAD_START_WS+1 {
+		atomic.AddInt64(&t.WCap, -1)
+	}
+	t.TuneWS = false
+	atomic.AddInt64(&t.WCount, -1)
+	t.UpdateTracker(off, -1, IMAGE_UPLOAD_STATUS_MISSED)
+}
+
 func (c *ImageUploadCmd) Run(s sesn.Sesn) (Result, error) {
 	res := newImageUploadResult()
+	done := make(chan int)
+	rsp_c := make(chan nmp.NmpRsp, IMAGE_UPLOAD_MAX_WS)
+	err_c := make(chan error, IMAGE_UPLOAD_MAX_WS)
 
-	for off := c.StartOff; off < len(c.Data); {
-		r, err := nextImageUploadReq(s, c.Upgrade, c.Data, off, c.ImageNum)
-		if err != nil {
-			return nil, err
+	t := ImageUploadIntTracker{
+		TuneWS:   true,
+		WCount:   0,
+		WCap:     IMAGE_UPLOAD_START_WS,
+		Off:      c.StartOff,
+		RspMap:   make(map[int]int),
+		MaxRxOff: 0,
+	}
+
+	var wg sync.WaitGroup
+
+	for int(atomic.LoadInt64(&t.MaxRxOff)) < len(c.Data) {
+		// Block if window is full
+		for !t.CheckWindow() {
 		}
 
-		rsp, err := txReq(s, r.Msg(), &c.CmdBase)
-		if err != nil {
-			return nil, err
+		t.Mutex.Lock()
+
+		t.process_missed_chunks()
+
+		if t.Off == len(c.Data) {
+			t.Mutex.Unlock()
+			break
 		}
-		irsp := rsp.(*nmp.ImageUploadRsp)
 
-		off = int(irsp.Off)
+		if int(t.MaxRxOff) == len(c.Data) {
+			t.Mutex.Unlock()
+			break
+		}
 
-		if c.ProgressCb != nil {
-			c.ProgressCb(c, irsp)
+		r, err := nextImageUploadReq(s, c.Upgrade, c.Data, t.Off, c.ImageNum)
+		t.Mutex.Unlock()
+		if err != nil {
+			return nil, err
 		}
 
-		res.Rsps = append(res.Rsps, irsp)
-		if irsp.Rc != 0 {
+		t.Off = (int(r.Off) + len(r.Data))
+
+		// Use up a chunk in window
+		atomic.AddInt64(&t.WCount, 1)
+		err = txReq_async(s, r.Msg(), &c.CmdBase, rsp_c, err_c)
+		if err != nil {
+			log.Debugf("err txReq_async %v", err)
 			break
 		}
+
+		wg.Add(1)
+		go func(off int) {
+			select {
+			case err := <-err_c:
+				t.Mutex.Lock()
+				t.HandleError(off, err)
+				t.Mutex.Unlock()
+				wg.Done()
+				return
+			case rsp := <-rsp_c:

Review comment:
       I have a feeling this response map isn't quite right.  I am guilty of overcomplicating things in some of my earlier comments so I'm directing this comment at all of us!  Let's take a step back and look at this from a high level.
   
   In the below, I am assuming a reliable transport like Bluetooth.  If we want to support something like UDP, that is fine, but it will take some more work and testing.
   
   1. If we receive an error response, the upgrade has failed.  Abort the operation.
   2. At all times, we have an *expected offset* (EO) that the next response should contain.  This offset is: `oldest_request_offset + chunk_size`.
   3. Upon receiving a response:
       a. The offset should never be greater than EO since Bluetooth guarantees order of delivery.
       b. If the offset is equal to EO, great.  Free up one element in the window and set EO to `rsp.offset`.
       c. If the offset is less than EO then we have a problem.  The device must have dropped a request, causing a subsequent request to have an invalid offset.  Ignore the response since it doesn't allow us to make progress.
   
   If c happens, we're going to have to re-send some requests.  How do we know which requests need to be re-sent and which are are still in-flight?  The only way to know which requests are in-flight is to match responses to requests using the sequence number.
   
   Something more concrete:
   
   * Create a slice (`s`).  Each element is a request in flight (`*nmp.ImageUploadReq`).
   * `s` is sorted by request offset.
   * `len(s)` never exceeds the window size.
   * When we want to send a request, scan `s` for the first "hole".  That is, a place where two adjacent elements do not have consecutive offsets.  Fill the hole by creating a request and inserting it into the slice.  Then transmit the request.
   * When we receive a response, scan `s` for the request with the same sequence number.  When found, remove the entry from `s`.
   * When our window is full, we can only start sending again when we remove the first element from `s`.  That is, when we receive a response with offset equal to EO.  When this happens, keep sending until `len(s)` equals window size (or if there are no more requests to send).
   
   I'm definitely not saying you have to do it this way.  If you prefer parts of the current implementation then I am totally fine with that.  But hopefully this helps a little bit in explaining my misgivings with the map approach.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org