You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2017/10/06 10:00:41 UTC
ignite git commit: IGNITE-6570 Web Console: Move parsing of JSON to
pool of workers.
Repository: ignite
Updated Branches:
refs/heads/master a38fdec72 -> 74f04001a
IGNITE-6570 Web Console: Move parsing of JSON to pool of workers.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/74f04001
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/74f04001
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/74f04001
Branch: refs/heads/master
Commit: 74f04001a985211c499ee4bbd73de686288684a8
Parents: a38fdec
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Fri Oct 6 17:00:39 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Fri Oct 6 17:00:39 2017 +0700
----------------------------------------------------------------------
modules/web-console/backend/app/agentSocket.js | 21 +---
.../web-console/backend/app/browsersHandler.js | 9 +-
.../app/modules/agent/AgentManager.service.js | 18 ++-
.../app/modules/agent/decompress.worker.js | 33 +++++
.../frontend/app/utils/SimpleWorkerPool.js | 119 +++++++++++++++++++
modules/web-console/frontend/package.json | 1 +
6 files changed, 176 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/backend/app/agentSocket.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/agentSocket.js b/modules/web-console/backend/app/agentSocket.js
index 489d145..75dcd53 100644
--- a/modules/web-console/backend/app/agentSocket.js
+++ b/modules/web-console/backend/app/agentSocket.js
@@ -24,7 +24,7 @@
*/
module.exports = {
implements: 'agent-socket',
- inject: ['require(lodash)', 'require(zlib)']
+ inject: ['require(lodash)']
};
/**
@@ -79,10 +79,9 @@ class Command {
/**
* @param _
- * @param zlib
* @returns {AgentSocket}
*/
-module.exports.factory = function(_, zlib) {
+module.exports.factory = function(_) {
/**
* Connected agent descriptor.
*/
@@ -136,21 +135,7 @@ module.exports.factory = function(_, zlib) {
if (resErr)
return reject(resErr);
- if (res.zipped) {
- // TODO IGNITE-6127 Temporary solution until GZip support for socket.io-client-java.
- // See: https://github.com/socketio/socket.io-client-java/issues/312
- // We can GZip manually for now.
- zlib.gunzip(new Buffer(res.data, 'base64'), (unzipErr, unzipped) => {
- if (unzipErr)
- return reject(unzipErr);
-
- res.data = unzipped.toString();
-
- resolve(res);
- });
- }
- else
- resolve(res);
+ resolve(res);
})
);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/backend/app/browsersHandler.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/browsersHandler.js b/modules/web-console/backend/app/browsersHandler.js
index 4fb5088..f4ff23c 100644
--- a/modules/web-console/backend/app/browsersHandler.js
+++ b/modules/web-console/backend/app/browsersHandler.js
@@ -181,8 +181,12 @@ module.exports.factory = (_, socketio, configure, errors, mongo) => {
return agent
.then((agentSock) => agentSock.emitEvent('node:rest', {uri: 'ignite', demo, params}))
.then((res) => {
- if (res.status === 0)
+ if (res.status === 0) {
+ if (res.zipped)
+ return res;
+
return JSON.parse(res.data);
+ }
throw new Error(res.error);
});
@@ -250,6 +254,9 @@ module.exports.factory = (_, socketio, configure, errors, mongo) => {
this.executeOnNode(agent, demo, params)
.then((data) => {
+ if (data.zipped)
+ return cb(null, data);
+
if (data.finished)
return cb(null, data.result);
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/modules/agent/AgentManager.service.js b/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
index 288ec94..752b4f0 100644
--- a/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
+++ b/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
@@ -17,6 +17,8 @@
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
+import Worker from 'worker!./decompress.worker';
+import SimpleWorkerPool from '../../utils/SimpleWorkerPool';
import maskNull from 'app/core/utils/maskNull';
const State = {
@@ -82,11 +84,9 @@ export default class IgniteAgentManager {
this.promises = new Set();
- /**
- * Connection to backend.
- * @type {Socket}
- */
- this.socket = null;
+ this.pool = new SimpleWorkerPool('decompressor', Worker, 4);
+
+ this.socket = null; // Connection to backend.
let cluster;
@@ -364,7 +364,13 @@ export default class IgniteAgentManager {
* @private
*/
_rest(event, ...args) {
- return this._emit(event, _.get(this.connectionSbj.getValue(), 'cluster.id'), ...args);
+ return this._emit(event, _.get(this.connectionSbj.getValue(), 'cluster.id'), ...args)
+ .then((data) => {
+ if (data.zipped)
+ return this.pool.postMessage(data.data);
+
+ return data;
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/frontend/app/modules/agent/decompress.worker.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/modules/agent/decompress.worker.js b/modules/web-console/frontend/app/modules/agent/decompress.worker.js
new file mode 100644
index 0000000..d8e176d
--- /dev/null
+++ b/modules/web-console/frontend/app/modules/agent/decompress.worker.js
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import _ from 'lodash';
+import pako from 'pako';
+
+/** This worker decode & decompress BASE64/Zipped data and parse to JSON. */
+// eslint-disable-next-line no-undef
+onmessage = function(e) {
+ const data = e.data;
+
+ const binaryString = atob(data); // Decode from BASE64
+
+ const unzipped = pako.inflate(binaryString, {to: 'string'});
+
+ const res = JSON.parse(unzipped);
+
+ postMessage(_.get(res, 'result', res));
+};
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/frontend/app/utils/SimpleWorkerPool.js
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/app/utils/SimpleWorkerPool.js b/modules/web-console/frontend/app/utils/SimpleWorkerPool.js
new file mode 100644
index 0000000..d8ed28b
--- /dev/null
+++ b/modules/web-console/frontend/app/utils/SimpleWorkerPool.js
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import {Observable} from 'rxjs/Observable';
+import {Subject} from 'rxjs/Subject';
+import 'rxjs/add/observable/race';
+import 'rxjs/add/operator/filter';
+import 'rxjs/add/operator/pluck';
+import 'rxjs/add/operator/take';
+import 'rxjs/add/operator/toPromise';
+
+/**
+ * Simple implementation of workers pool.
+ */
+export default class SimpleWorkerPool {
+ constructor(name, WorkerClass, poolSize = (navigator.hardwareConcurrency || 4), dbg = false) {
+ this._name = name;
+ this._WorkerClass = WorkerClass;
+ this._tasks = [];
+ this._msgId = 0;
+ this.messages$ = new Subject();
+ this.errors$ = new Subject();
+ this.__dbg = dbg;
+
+ this._workers = _.range(poolSize).map(() => {
+ const worker = new this._WorkerClass();
+
+ worker.onmessage = (m) => {
+ this.messages$.next({tid: worker.tid, m});
+
+ worker.tid = null;
+
+ this._run();
+ };
+
+ worker.onerror = (e) => {
+ this.errors$.next({tid: worker.tid, e});
+
+ worker.tid = null;
+
+ this._run();
+ };
+
+ return worker;
+ });
+ }
+
+ _makeTaskID() {
+ return this._msgId++;
+ }
+
+ _getNextWorker() {
+ return this._workers.find((w) => !w.tid);
+ }
+
+ _getNextTask() {
+ return this._tasks.shift();
+ }
+
+ _run() {
+ const worker = this._getNextWorker();
+
+ if (!worker || !this._tasks.length)
+ return;
+
+ const task = this._getNextTask();
+
+ worker.tid = task.tid;
+
+ if (this.__dbg)
+ console.time(`Post message[pool=${this._name}]`);
+
+ worker.postMessage(task.data);
+
+ if (this.__dbg)
+ console.timeEnd('Post message');
+ }
+
+ terminate() {
+ this._workers.forEach((w) => w.terminate());
+
+ this.messages$.complete();
+ this.errors$.complete();
+
+ this._workers = null;
+ }
+
+ postMessage(data) {
+ const tid = this._makeTaskID();
+
+ this._tasks.push({tid, data});
+
+ if (this.__dbg)
+ console.log(`Pool: [name=${this._name}, queue=${this._tasks.length}]`);
+
+ this._run();
+
+ return Observable.race(
+ this.messages$.filter((e) => e.tid === tid).take(1).pluck('m', 'data'),
+ this.errors$.filter((e) => e.tid === tid).take(1).map((e) => {
+ throw e.e;
+ }))
+ .take(1).toPromise();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/74f04001/modules/web-console/frontend/package.json
----------------------------------------------------------------------
diff --git a/modules/web-console/frontend/package.json b/modules/web-console/frontend/package.json
index d828e17..2083640 100644
--- a/modules/web-console/frontend/package.json
+++ b/modules/web-console/frontend/package.json
@@ -80,6 +80,7 @@
"lodash": "4.17.4",
"node-sass": "4.5.3",
"nvd3": "1.8.4",
+ "pako": "1.0.6",
"progress-bar-webpack-plugin": "1.10.0",
"pug-html-loader": "1.1.0",
"pug-loader": "2.3.0",