You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2017/10/06 10:00:41 UTC

ignite git commit: IGNITE-6570 Web Console: Move parsing of JSON to pool of workers.

Repository: ignite
Updated Branches:
  refs/heads/master a38fdec72 -> 74f04001a


IGNITE-6570 Web Console: Move parsing of JSON to pool of workers.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/74f04001
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/74f04001
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/74f04001

Branch: refs/heads/master
Commit: 74f04001a985211c499ee4bbd73de686288684a8
Parents: a38fdec
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Fri Oct 6 17:00:39 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Fri Oct 6 17:00:39 2017 +0700

----------------------------------------------------------------------
 modules/web-console/backend/app/agentSocket.js  |  21 +---
 .../web-console/backend/app/browsersHandler.js  |   9 +-
 .../app/modules/agent/AgentManager.service.js   |  18 ++-
 .../app/modules/agent/decompress.worker.js      |  33 +++++
 .../frontend/app/utils/SimpleWorkerPool.js      | 119 +++++++++++++++++++
 modules/web-console/frontend/package.json       |   1 +
 6 files changed, 176 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/backend/app/agentSocket.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/agentSocket.js b/modules/web-console/backend/app/agentSocket.js
index 489d145..75dcd53 100644
--- a/modules/web-console/backend/app/agentSocket.js
+++ b/modules/web-console/backend/app/agentSocket.js
@@ -24,7 +24,7 @@
  */
 module.exports = {
     implements: 'agent-socket',
-    inject: ['require(lodash)', 'require(zlib)']
+    inject: ['require(lodash)']
 };
 
 /**
@@ -79,10 +79,9 @@ class Command {
 
 /**
  * @param _
- * @param zlib
  * @returns {AgentSocket}
  */
-module.exports.factory = function(_, zlib) {
+module.exports.factory = function(_) {
     /**
      * Connected agent descriptor.
      */
@@ -136,21 +135,7 @@ module.exports.factory = function(_, zlib) {
                     if (resErr)
                         return reject(resErr);
 
-                    if (res.zipped) {
-                        // TODO IGNITE-6127 Temporary solution until GZip support for socket.io-client-java.
-                        // See: https://github.com/socketio/socket.io-client-java/issues/312
-                        // We can GZip manually for now.
-                        zlib.gunzip(new Buffer(res.data, 'base64'), (unzipErr, unzipped) => {
-                            if (unzipErr)
-                                return reject(unzipErr);
-
-                            res.data = unzipped.toString();
-
-                            resolve(res);
-                        });
-                    }
-                    else
-                        resolve(res);
+                    resolve(res);
                 })
             );
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/backend/app/browsersHandler.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/browsersHandler.js b/modules/web-console/backend/app/browsersHandler.js
index 4fb5088..f4ff23c 100644
--- a/modules/web-console/backend/app/browsersHandler.js
+++ b/modules/web-console/backend/app/browsersHandler.js
@@ -181,8 +181,12 @@ module.exports.factory = (_, socketio, configure, errors, mongo) => {
             return agent
                 .then((agentSock) => agentSock.emitEvent('node:rest', {uri: 'ignite', demo, params}))
                 .then((res) => {
-                    if (res.status === 0)
+                    if (res.status === 0) {
+                        if (res.zipped)
+                            return res;
+
                         return JSON.parse(res.data);
+                    }
 
                     throw new Error(res.error);
                 });
@@ -250,6 +254,9 @@ module.exports.factory = (_, socketio, configure, errors, mongo) => {
 
                 this.executeOnNode(agent, demo, params)
                     .then((data) => {
+                        if (data.zipped)
+                            return cb(null, data);
+
                         if (data.finished)
                             return cb(null, data.result);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/modules/agent/AgentManager.service.js b/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
index 288ec94..752b4f0 100644
--- a/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
+++ b/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
@@ -17,6 +17,8 @@
 
 import { BehaviorSubject } from 'rxjs/BehaviorSubject';
 
+import Worker from 'worker!./decompress.worker';
+import SimpleWorkerPool from '../../utils/SimpleWorkerPool';
 import maskNull from 'app/core/utils/maskNull';
 
 const State = {
@@ -82,11 +84,9 @@ export default class IgniteAgentManager {
 
         this.promises = new Set();
 
-        /**
-         * Connection to backend.
-         * @type {Socket}
-         */
-        this.socket = null;
+        this.pool = new SimpleWorkerPool('decompressor', Worker, 4);
+
+        this.socket = null; // Connection to backend.
 
         let cluster;
 
@@ -364,7 +364,13 @@ export default class IgniteAgentManager {
      * @private
      */
     _rest(event, ...args) {
-        return this._emit(event, _.get(this.connectionSbj.getValue(), 'cluster.id'), ...args);
+        return this._emit(event, _.get(this.connectionSbj.getValue(), 'cluster.id'), ...args)
+            .then((data) => {
+                if (data.zipped)
+                    return this.pool.postMessage(data.data);
+
+                return data;
+            });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/frontend/app/modules/agent/decompress.worker.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/modules/agent/decompress.worker.js b/modules/web-console/frontend/app/modules/agent/decompress.worker.js
new file mode 100644
index 0000000..d8e176d
--- /dev/null
+++ b/modules/web-console/frontend/app/modules/agent/decompress.worker.js
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import _ from 'lodash';
+import pako from 'pako';
+
+/** This worker decode & decompress BASE64/Zipped data and parse to JSON. */
+// eslint-disable-next-line no-undef
+onmessage = function(e) {
+    const data = e.data;
+
+    const binaryString = atob(data); // Decode from BASE64
+
+    const unzipped = pako.inflate(binaryString, {to: 'string'});
+
+    const res = JSON.parse(unzipped);
+
+    postMessage(_.get(res, 'result', res));
+};

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/frontend/app/utils/SimpleWorkerPool.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/utils/SimpleWorkerPool.js b/modules/web-console/frontend/app/utils/SimpleWorkerPool.js
new file mode 100644
index 0000000..d8ed28b
--- /dev/null
+++ b/modules/web-console/frontend/app/utils/SimpleWorkerPool.js
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import {Observable} from 'rxjs/Observable';
+import {Subject} from 'rxjs/Subject';
+import 'rxjs/add/observable/race';
+import 'rxjs/add/operator/filter';
+import 'rxjs/add/operator/pluck';
+import 'rxjs/add/operator/take';
+import 'rxjs/add/operator/toPromise';
+
+/**
+ * Simple implementation of workers pool.
+ */
+export default class SimpleWorkerPool {
+    constructor(name, WorkerClass, poolSize = (navigator.hardwareConcurrency || 4), dbg = false) {
+        this._name = name;
+        this._WorkerClass = WorkerClass;
+        this._tasks = [];
+        this._msgId = 0;
+        this.messages$ = new Subject();
+        this.errors$ = new Subject();
+        this.__dbg = dbg;
+
+        this._workers = _.range(poolSize).map(() => {
+            const worker = new this._WorkerClass();
+
+            worker.onmessage = (m) => {
+                this.messages$.next({tid: worker.tid, m});
+
+                worker.tid = null;
+
+                this._run();
+            };
+
+            worker.onerror = (e) => {
+                this.errors$.next({tid: worker.tid, e});
+
+                worker.tid = null;
+
+                this._run();
+            };
+
+            return worker;
+        });
+    }
+
+    _makeTaskID() {
+        return this._msgId++;
+    }
+
+    _getNextWorker() {
+        return this._workers.find((w) => !w.tid);
+    }
+
+    _getNextTask() {
+        return this._tasks.shift();
+    }
+
+    _run() {
+        const worker = this._getNextWorker();
+
+        if (!worker || !this._tasks.length)
+            return;
+
+        const task = this._getNextTask();
+
+        worker.tid = task.tid;
+
+        if (this.__dbg)
+            console.time(`Post message[pool=${this._name}]`);
+
+        worker.postMessage(task.data);
+
+        if (this.__dbg)
+            console.timeEnd('Post message');
+    }
+
+    terminate() {
+        this._workers.forEach((w) => w.terminate());
+
+        this.messages$.complete();
+        this.errors$.complete();
+
+        this._workers = null;
+    }
+
+    postMessage(data) {
+        const tid = this._makeTaskID();
+
+        this._tasks.push({tid, data});
+
+        if (this.__dbg)
+            console.log(`Pool: [name=${this._name}, queue=${this._tasks.length}]`);
+
+        this._run();
+
+        return Observable.race(
+            this.messages$.filter((e) => e.tid === tid).take(1).pluck('m', 'data'),
+            this.errors$.filter((e) => e.tid === tid).take(1).map((e) => {
+                throw e.e;
+            }))
+            .take(1).toPromise();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/frontend/package.json
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/package.json b/modules/web-console/frontend/package.json
index d828e17..2083640 100644
--- a/modules/web-console/frontend/package.json
+++ b/modules/web-console/frontend/package.json
@@ -80,6 +80,7 @@
     "lodash": "4.17.4",
     "node-sass": "4.5.3",
     "nvd3": "1.8.4",
+    "pako": "1.0.6",
     "progress-bar-webpack-plugin": "1.10.0",
     "pug-html-loader": "1.1.0",
     "pug-loader": "2.3.0",