You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cordova.apache.org by ag...@apache.org on 2014/12/31 06:56:59 UTC

[09/13] cordova-app-harness git commit: Use chrome.sockets.tcp.update({uri:}) to more efficiently pipe request payloads to files

Use chrome.sockets.tcp.update({uri:}) to more efficiently pipe request payloads to files

20 megs new takes just a few seconds!
Added in some TODOs to tweak the code once the API can take in the
number of bytes to pipe.


Project: http://git-wip-us.apache.org/repos/asf/cordova-app-harness/repo
Commit: http://git-wip-us.apache.org/repos/asf/cordova-app-harness/commit/d3e08982
Tree: http://git-wip-us.apache.org/repos/asf/cordova-app-harness/tree/d3e08982
Diff: http://git-wip-us.apache.org/repos/asf/cordova-app-harness/diff/d3e08982

Branch: refs/heads/master
Commit: d3e0898244cf8f6a8805028e677cff9162d1df5a
Parents: e1a6069
Author: Andrew Grieve <ag...@chromium.org>
Authored: Fri Nov 14 21:33:23 2014 +0100
Committer: Andrew Grieve <ag...@chromium.org>
Committed: Wed Dec 31 00:49:12 2014 -0500

----------------------------------------------------------------------
 www/cdvah/js/HarnessServer.js |  29 +-------
 www/cdvah/js/HttpServer.js    | 147 +++++++++++++++++++++++++++++++------
 2 files changed, 128 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cordova-app-harness/blob/d3e08982/www/cdvah/js/HarnessServer.js
----------------------------------------------------------------------
diff --git a/www/cdvah/js/HarnessServer.js b/www/cdvah/js/HarnessServer.js
index d6bf873..0e6b102 100644
--- a/www/cdvah/js/HarnessServer.js
+++ b/www/cdvah/js/HarnessServer.js
@@ -79,31 +79,10 @@
         }
 
         function pipeRequestToFile(req, destUrl) {
-            var writer = null;
-            function handleChunk(arrayBuffer) {
-                var ret = $q.when();
-                if (writer == null) {
-                   ret = ResourcesLoader.createFileWriter(destUrl)
-                   .then(function(w) {
-                       writer = w;
-                   });
-                }
-                return ret.then(function() {
-                    var deferred = $q.defer();
-                    writer.onwrite = deferred.resolve;
-                    writer.onerror = function() {
-                      deferred.reject(writer.error);
-                    };
-                    writer.write(arrayBuffer);
-                    return deferred.promise;
-                })
-                .then(function() {
-                    if (req.bytesRemaining > 0) {
-                        return req.readChunk().then(handleChunk);
-                    }
-                });
-            }
-            return req.readChunk().then(handleChunk);
+            return ResourcesLoader.createFileWriter(destUrl)
+            .then(function(w) {
+                return req.pipeToUri(w, destUrl);
+            });
         }
 
         function pipeFileToResponse(srcUrl, resp) {

http://git-wip-us.apache.org/repos/asf/cordova-app-harness/blob/d3e08982/www/cdvah/js/HttpServer.js
----------------------------------------------------------------------
diff --git a/www/cdvah/js/HttpServer.js b/www/cdvah/js/HttpServer.js
index 1055008..bec8175 100644
--- a/www/cdvah/js/HttpServer.js
+++ b/www/cdvah/js/HttpServer.js
@@ -37,8 +37,13 @@
         chrome.sockets.tcp.onReceive.addListener(function(receiveInfo) {
             var socket = socketMap[receiveInfo.socketId];
             if (socket) {
-                socket._pendingReadChunks.push(receiveInfo.data);
-                socket._onReceive();
+                // data will be missing when piping to a file.
+                var numBytes = receiveInfo.bytesRead;
+                if (receiveInfo.data) {
+                    numBytes = receiveInfo.data.byteLength;
+                    socket._pendingReadChunks.push(receiveInfo.data);
+                }
+                socket._onReceive(numBytes);
             }
         });
 
@@ -126,7 +131,58 @@
             });
         };
 
-        HttpRequest.prototype.readChunk = function(/* optional */maxChunkSize) {
+        HttpRequest.prototype.pipeToUri = function(fileWriter, uri) {
+            var useNativePipeApi = uri && typeof cordova != 'undefined';
+            var self = this;
+            function handleChunk(arrayBuffer) {
+                var deferred = $q.defer();
+                // Only way to get a null arrayBuffer is when flushReadQueue() has been called.
+                if (!arrayBuffer) {
+                    // TODO: Change this to pass in self.bytesRemaining.
+                    return self._requestData.socket.pipeToUri(uri, true)
+                    .then(null, null, function(numBytesRead) {
+                        console.log('Piped chunk of size ' + numBytesRead);
+                        self._updateBytesRemaining(numBytesRead);
+                        if (self.bytesRemaining === 0) {
+                            // TODO: Delete stopPipeToFile() once pipeToUrl takes in a byteCount.
+                            self._requestData.socket.stopPipeToFile();
+                        }
+                    });
+                }
+                fileWriter.onwrite = deferred.resolve;
+                fileWriter.onerror = function() {
+                    deferred.reject(fileWriter.error);
+                };
+                fileWriter.write(arrayBuffer);
+                return deferred.promise
+                .then(function() {
+                    if (self.bytesRemaining > 0) {
+                        return self._readChunkInternal(0, useNativePipeApi).then(handleChunk);
+                    }
+                });
+            }
+            if (useNativePipeApi) {
+                return this._requestData.socket.flushReadQueue()
+                .then(function() {
+                    return self._readChunkInternal(0, true).then(handleChunk);
+                });
+            }
+            return self._readChunkInternal(0, false).then(handleChunk);
+        };
+
+        HttpRequest.prototype._updateBytesRemaining = function(numBytesRead) {
+            this.bytesRemaining -= numBytesRead;
+            if (this.bytesRemaining < 0) {
+                throw new Error('Bytes remaining negative: ' + this.bytesRemaining);
+            }
+            if (this.bytesRemaining === 0 && this._requestData.state === STATE_HEADERS_RECEIVED) {
+                changeState(this._requestData, STATE_REQUEST_RECEIVED);
+            }
+        };
+
+        HttpRequest.prototype._readChunkInternal = function(/* optional */maxChunkSize, sync) {
+            // Don't read past bytesRemaining.
+            maxChunkSize = maxChunkSize ? Math.min(maxChunkSize, this.bytesRemaining) : this.bytesRemaining;
             // Allow readChunk() to be called *once* after request is already received.
             // This is convenient for empty payloads.
             if (this._requestData.state === STATE_REQUEST_RECEIVED) {
@@ -139,21 +195,21 @@
                 }
             }
             var self = this;
-            return this._requestData.socket.read(maxChunkSize)
+            return (sync ? $q.when(this._requestData.socket.readSync(maxChunkSize)) : this._requestData.socket.read(maxChunkSize))
             .then(function(chunk) {
-                var chunkSize = chunk.byteLength;
-                console.log('Processing request chunk of size ' + chunkSize);
-                self.bytesRemaining -= chunkSize;
-                if (self.bytesRemaining < 0) {
-                    throw new Error('Bytes remaining negative: ' + self.bytesRemaining);
-                }
-                if (self.bytesRemaining === 0 && self._requestData.state === STATE_HEADERS_RECEIVED) {
-                    changeState(self._requestData, STATE_REQUEST_RECEIVED);
+                if (chunk) {
+                    var chunkSize = chunk.byteLength;
+                    console.log('Processing request chunk of size ' + chunkSize);
+                    self._updateBytesRemaining(chunkSize);
                 }
                 return chunk;
             });
         };
 
+        HttpRequest.prototype.readChunk = function(/* optional */maxChunkSize) {
+            return this._readChunkInternal(maxChunkSize, false);
+        };
+
         function HttpResponse(requestData) {
             this._requestData = requestData;
             this.headers = Object.create(null);
@@ -246,6 +302,7 @@
             this.onClose = null;
             this._pendingReadChunks = [];
             this._pendingReadDeferred = null;
+            this._pendingPipeToFileDeferred = null;
             this._writeQueue = [];
             this._paused = true;
         }
@@ -253,26 +310,32 @@
         Socket.prototype.unread = function(chunk) {
             if (chunk.byteLength > 0) {
                 this._pendingReadChunks.unshift(chunk);
-                this._onReceive();
+                this._onReceive(chunk.byteLength);
             }
         };
 
-        Socket.prototype.read = function(maxLength) {
+        // Returns data that's already in JS, or null if nothing is ready.
+        // Never unpauses the socket.
+        Socket.prototype.readSync = function(maxLength) {
             if (this._pendingReadDeferred) {
                 throw new Error('Read already in progress.');
             } else if (!this.alive) {
                 throw new Error('Socket.read called after socket closed.');
             }
             maxLength = maxLength || Infinity;
-            var deferred = $q.defer();
             var chunk = this._pendingReadChunks.shift();
+            if (chunk && chunk.byteLength > maxLength) {
+                this._pendingReadChunks.unshift(chunk.slice(maxLength));
+                chunk = chunk.slice(0, maxLength);
+            }
+            return chunk;
+        };
+
+        Socket.prototype.read = function(maxLength) {
+            var deferred = $q.defer();
+            var chunk = this.readSync(maxLength);
             if (chunk) {
-                if (chunk.byteLength <= maxLength) {
-                    deferred.resolve(chunk);
-                } else {
-                    this._pendingReadChunks.unshift(chunk.slice(maxLength));
-                    deferred.resolve(chunk.slice(0, maxLength));
-                }
+                deferred.resolve(chunk);
             } else {
                 this._pendingReadDeferred = deferred;
             }
@@ -309,6 +372,42 @@
             }
         };
 
+        // Pauses the socket, and then returns all pending read chunks in as an array.
+        // Fails if there's a pending call to read(), but you can call read() again
+        // after the flush.
+        Socket.prototype.flushReadQueue = function() {
+            if (this._pendingReadDeferred) {
+                throw new Error('socket.flushReadQueue() called during socket.read()');
+            }
+            var deferred = $q.defer();
+            this._paused = true;
+            chrome.sockets.tcp.setPaused(this.socketId, true, deferred.resolve);
+            return deferred.promise;
+        };
+
+        Socket.prototype.pipeToUri = function(uri, append) {
+            if (this._pendingReadDeferred) {
+                throw new Error('socket.pipeToUri() called during bad state.');
+            }
+            if (!this._paused) {
+                throw new Error('socket.pipeToUri() called without first calling flushReadQueue().');
+            }
+            if (this._pendingReadChunks.length) {
+                throw new Error('socket.pipeToUri() called when there are outstanding read chunks.');
+            }
+            this._pendingPipeToFileDeferred = $q.defer();
+            chrome.sockets.tcp.update(this.socketId, { destUri: uri, append: append });
+            // TODO: The sockets call to initiate piping should unpause automatically.
+            this._setPaused(false);
+            return this._pendingPipeToFileDeferred.promise;
+        };
+
+        Socket.prototype.stopPipeToFile = function() {
+            var deferred = this._pendingPipeToFileDeferred;
+            this._pendingPipeToFileDeferred = null;
+            chrome.sockets.tcp.update(this.socketId, { destUri: '' }, deferred.resolve);
+        };
+
         Socket.prototype._setPaused = function(value) {
             if (value != this._paused) {
                 chrome.sockets.tcp.setPaused(this.socketId, value, null);
@@ -316,8 +415,10 @@
             }
         };
 
-        Socket.prototype._onReceive = function() {
-            if (this._pendingReadDeferred) {
+        Socket.prototype._onReceive = function(numBytes) {
+            if (this._pendingPipeToFileDeferred) {
+                this._pendingPipeToFileDeferred.notify(numBytes);
+            } else if (this._pendingReadDeferred) {
                 var deferred = this._pendingReadDeferred;
                 this._pendingReadDeferred = null;
                 deferred.resolve(this._pendingReadChunks.shift());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cordova.apache.org
For additional commands, e-mail: commits-help@cordova.apache.org