You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2018/10/08 03:00:45 UTC

ignite git commit: IGNITE-9808 Web Console: Refactored sockets caching.

Repository: ignite
Updated Branches:
  refs/heads/master 08fff5e71 -> 76daf05fe


IGNITE-9808 Web Console: Refactored sockets caching.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/76daf05f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/76daf05f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/76daf05f

Branch: refs/heads/master
Commit: 76daf05fef1392bb77c3be77c799df28dbaea7a1
Parents: 08fff5e
Author: Andrey Novikov <an...@apache.org>
Authored: Mon Oct 8 10:00:43 2018 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Mon Oct 8 10:00:43 2018 +0700

----------------------------------------------------------------------
 modules/web-console/backend/app/agentSocket.js  |  20 ++-
 .../web-console/backend/app/agentsHandler.js    | 142 ++++++++-----------
 .../web-console/backend/app/browsersHandler.js  |  51 +++----
 modules/web-console/backend/app/configure.js    |  22 +--
 modules/web-console/backend/app/settings.js     |   3 +-
 5 files changed, 113 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/76daf05f/modules/web-console/backend/app/agentSocket.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/agentSocket.js b/modules/web-console/backend/app/agentSocket.js
index aff62c4..0c7e6b2 100644
--- a/modules/web-console/backend/app/agentSocket.js
+++ b/modules/web-console/backend/app/agentSocket.js
@@ -38,21 +38,33 @@ module.exports.factory = function() {
     class AgentSocket {
         /**
          * @param {Socket} socket Socket for interaction.
+         * @param {Object} accounts Active accounts.
          * @param {Array.<String>} tokens Agent tokens.
          * @param {String} demoEnabled Demo enabled.
          */
-        constructor(socket, tokens, demoEnabled) {
+        constructor(socket, accounts, tokens, demoEnabled) {
             Object.assign(this, {
-                socket,
-                tokens,
+                accounts,
                 cluster: null,
                 demo: {
                     enabled: demoEnabled,
                     browserSockets: []
-                }
+                },
+                socket,
+                tokens
             });
         }
 
+        resetToken(oldToken) {
+            _.pull(this.tokens, oldToken);
+
+            this.emitEvent('agent:reset:token', oldToken)
+                .then(() => {
+                    if (_.isEmpty(this.tokens) && this.socket.connected)
+                        this.socket.close();
+                });
+        }
+
         /**
          * Send event to agent.
          *

http://git-wip-us.apache.org/repos/asf/ignite/blob/76daf05f/modules/web-console/backend/app/agentsHandler.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/agentsHandler.js b/modules/web-console/backend/app/agentsHandler.js
index 8d72ded..fd55ac3 100644
--- a/modules/web-console/backend/app/agentsHandler.js
+++ b/modules/web-console/backend/app/agentsHandler.js
@@ -50,22 +50,22 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
             this.sockets = new Map();
         }
 
-        get(token) {
-            let sockets = this.sockets.get(token);
+        get(account) {
+            let sockets = this.sockets.get(account._id.toString());
 
             if (_.isEmpty(sockets))
-                this.sockets.set(token, sockets = []);
+                this.sockets.set(account._id.toString(), sockets = []);
 
             return sockets;
         }
 
         /**
          * @param {AgentSocket} sock
-         * @param {String} token
+         * @param {String} account
          * @return {Array.<AgentSocket>}
          */
-        add(token, sock) {
-            const sockets = this.get(token);
+        add(account, sock) {
+            const sockets = this.get(account);
 
             sockets.push(sock);
         }
@@ -75,9 +75,9 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
          * @return {AgentSocket}
          */
         find(browserSocket) {
-            const token = browserSocket.request.user.token;
+            const {_id} = browserSocket.request.user;
 
-            const sockets = this.sockets.get(token);
+            const sockets = this.sockets.get(_id);
 
             return _.find(sockets, (sock) => _.includes(sock.demo.browserSockets, browserSocket));
         }
@@ -228,19 +228,26 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
          * Link agent with browsers by account.
          *
          * @param {Socket} sock
+         * @param {Array.<mongo.Account>} accounts
          * @param {Array.<String>} tokens
          * @param {boolean} demoEnabled
          *
          * @private
          */
-        onConnect(sock, tokens, demoEnabled) {
-            const agentSocket = new AgentSocket(sock, tokens, demoEnabled);
+        onConnect(sock, accounts, tokens, demoEnabled) {
+            const agentSocket = new AgentSocket(sock, accounts, tokens, demoEnabled);
+
+            _.forEach(accounts, (account) => {
+                this._agentSockets.add(account, agentSocket);
+
+                this._browsersHnd.agentStats(account);
+            });
 
             sock.on('disconnect', () => {
-                _.forEach(tokens, (token) => {
-                    _.pull(this._agentSockets.get(token), agentSocket);
+                _.forEach(accounts, (account) => {
+                    _.pull(this._agentSockets.get(account), agentSocket);
 
-                    this._browsersHnd.agentStats(token);
+                    this._browsersHnd.agentStats(account);
                 });
             });
 
@@ -258,8 +265,8 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
                 if (agentSocket.cluster !== cluster) {
                     agentSocket.cluster = cluster;
 
-                    _.forEach(tokens, (token) => {
-                        this._browsersHnd.agentStats(token);
+                    _.forEach(accounts, (account) => {
+                        this._browsersHnd.agentStats(account);
                     });
                 }
                 else {
@@ -268,8 +275,8 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
                     if (changed) {
                         cluster.update(top);
 
-                        _.forEach(tokens, (token) => {
-                            this._browsersHnd.clusterChanged(token, cluster);
+                        _.forEach(accounts, (account) => {
+                            this._browsersHnd.clusterChanged(account, cluster);
                         });
                     }
                 }
@@ -282,16 +289,17 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
 
                 agentSocket.cluster = null;
 
-                _.forEach(tokens, (token) => {
-                    this._browsersHnd.agentStats(token);
+                _.forEach(accounts, (account) => {
+                    this._browsersHnd.agentStats(account);
                 });
             });
 
-            _.forEach(tokens, (token) => {
-                this._agentSockets.add(token, agentSocket);
+            return agentSocket;
+        }
 
-                this._browsersHnd.agentStats(token);
-            });
+        getAccounts(tokens) {
+            return mongo.Account.find({token: {$in: tokens}}, '_id token').lean().exec()
+                .then((accounts) => ({accounts, activeTokens: _.uniq(_.map(accounts, 'token'))}));
         }
 
         /**
@@ -301,17 +309,27 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
         attach(srv, browsersHnd) {
             this._browsersHnd = browsersHnd;
 
-            if (this.io)
-                throw 'Agent server already started!';
-
             this._collectSupportedAgents()
                 .then((supportedAgents) => {
                     this.currentAgent = _.get(supportedAgents, 'current');
 
+                    if (this.io)
+                        throw 'Agent server already started!';
+
                     this.io = socketio(srv, {path: '/agents'});
 
                     this.io.on('connection', (sock) => {
+                        const sockId = sock.id;
+
+                        console.log('Connected agent with socketId: ', sockId);
+
+                        sock.on('disconnect', (reason) => {
+                            console.log(`Agent disconnected with [socketId=${sockId}, reason=${reason}]`);
+                        });
+
                         sock.on('agent:auth', ({ver, bt, tokens, disableDemo} = {}, cb) => {
+                            console.log(`Received authentication request [socketId=${sockId}, tokens=${tokens}, ver=${ver}].`);
+
                             if (_.isEmpty(tokens))
                                 return cb('Tokens not set. Please reload agent archive or check settings');
 
@@ -322,32 +340,33 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
                                     return cb('You are using an older version of the agent. Please reload agent');
                             }
 
-                            return mongo.Account.find({token: {$in: tokens}}, '_id token').lean().exec()
-                                .then((accounts) => {
-                                    const activeTokens = _.uniq(_.map(accounts, 'token'));
-
+                            return this.getAccounts(tokens)
+                                .then(({accounts, activeTokens}) => {
                                     if (_.isEmpty(activeTokens))
                                         return cb(`Failed to authenticate with token(s): ${tokens.join(',')}. Please reload agent archive or check settings`);
 
                                     cb(null, activeTokens);
 
-                                    return this.onConnect(sock, activeTokens, disableDemo);
+                                    return this.onConnect(sock, accounts, activeTokens, disableDemo);
                                 })
                                 // TODO IGNITE-1379 send error to web master.
                                 .catch(() => cb(`Invalid token(s): ${tokens.join(',')}. Please reload agent archive or check settings`));
                         });
                     });
+                })
+                .catch(() => {
+                    console.log('Failed to collect supported agents');
                 });
         }
 
-        agent(token, demo, clusterId) {
+        agent(account, demo, clusterId) {
             if (!this.io)
                 return Promise.reject(new Error('Agent server not started yet!'));
 
-            const socks = this._agentSockets.get(token);
+            const socks = this._agentSockets.get(account);
 
             if (_.isEmpty(socks))
-                return Promise.reject(new Error('Failed to find connected agent for this token'));
+                return Promise.reject(new Error('Failed to find connected agent for this account'));
 
             if (demo || _.isNil(clusterId))
                 return Promise.resolve(_.head(socks));
@@ -360,11 +379,11 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
             return Promise.resolve(sock);
         }
 
-        agents(token) {
+        agents(account) {
             if (!this.io)
                 return Promise.reject(new Error('Agent server not started yet!'));
 
-            const socks = this._agentSockets.get(token);
+            const socks = this._agentSockets.get(account);
 
             if (_.isEmpty(socks))
                 return Promise.reject(new Error('Failed to find connected agent for this token'));
@@ -372,61 +391,18 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
             return Promise.resolve(socks);
         }
 
-        tryStopDemo(browserSocket) {
-            const agentSock = this._agentSockets.find(browserSocket);
-        }
-
-        /**
-         * @param {ObjectId} token
-         * @param {Socket} browserSock
-         * @returns {int} Connected agent count.
-         */
-        onBrowserConnect(token, browserSock) {
-            this.emitAgentsCount(token);
-
-            // If connect from browser with enabled demo.
-            const demo = browserSock.request._query.IgniteDemoMode === 'true';
-
-            // Agents where possible to run demo.
-            const agentSockets = _.filter(this._agentSockets[token], 'demo.enabled');
-
-            if (demo && _.size(agentSockets)) {
-                const agentSocket = _.find(agentSockets, (agent) => _.includes(agent.demo.tokens, token));
-
-                if (agentSocket)
-                    agentSocket.attachToDemoCluster(browserSock);
-                else
-                    _.head(agentSockets).runDemoCluster(token, [browserSock]);
-            }
-        }
-
-        /**
-         * @param {Socket} browserSock.
-         */
-        onBrowserDisconnect(browserSock) {
-            const token = browserSock.client.request.user.token;
-
-            this._browserSockets.pull(token, browserSock);
-
-            // If connect from browser with enabled demo.
-            if (browserSock.request._query.IgniteDemoMode === 'true')
-                this._agentSockets.find(token, (agent) => _.includes(agent.demo.browserSockets, browserSock));
-
-            // TODO If latest browser with demo need stop demo cluster on agent.
-        }
-
         /**
          * Try stop agent for token if not used by others.
          *
-         * @param {String} token
+         * @param {mongo.Account} account
          */
-        onTokenReset(token) {
+        onTokenReset(account) {
             if (_.isNil(this.io))
                 return;
 
-            const sockets = this._agentSockets[token];
+            const agentSockets = this._agentSockets.get(account);
 
-            _.forEach(sockets, (socket) => socket._sendToAgent('agent:reset:token', token));
+            _.forEach(agentSockets, (sock) => sock.resetToken(account.token));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/76daf05f/modules/web-console/backend/app/browsersHandler.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/browsersHandler.js b/modules/web-console/backend/app/browsersHandler.js
index d0cd112..a18b467 100644
--- a/modules/web-console/backend/app/browsersHandler.js
+++ b/modules/web-console/backend/app/browsersHandler.js
@@ -38,14 +38,14 @@ module.exports = {
              * @param {Socket} sock
              */
             add(sock) {
-                const token = sock.request.user.token;
+                const token = sock.request.user._id.toString();
 
                 if (this.sockets.has(token))
                     this.sockets.get(token).push(sock);
                 else
                     this.sockets.set(token, [sock]);
 
-                return this.sockets.get(token);
+                return this.sockets.get(sock.request.user);
             }
 
             /**
@@ -61,11 +61,13 @@ module.exports = {
                 return sockets;
             }
 
-            get(token) {
-                if (this.sockets.has(token))
-                    return this.sockets.get(token);
+            get(account) {
+                let sockets = this.sockets.get(account._id.toString());
 
-                return [];
+                if (_.isEmpty(sockets))
+                    this.sockets.set(account._id.toString(), sockets = []);
+
+                return sockets;
             }
 
             demo(token) {
@@ -103,11 +105,11 @@ module.exports = {
             }
 
             /**
-             * @param {String} token
+             * @param {String} account
              * @param {Array.<Socket>} [socks]
              */
-            agentStats(token, socks = this._browserSockets.get(token)) {
-                return this._agentHnd.agents(token)
+            agentStats(account, socks = this._browserSockets.get(account)) {
+                return this._agentHnd.agents(account)
                     .then((agentSocks) => {
                         const stat = _.reduce(agentSocks, (acc, agentSock) => {
                             acc.count += 1;
@@ -127,8 +129,8 @@ module.exports = {
                     .then((stat) => _.forEach(socks, (sock) => sock.emit('agents:stat', stat)));
             }
 
-            clusterChanged(token, cluster) {
-                const socks = this._browserSockets.get(token);
+            clusterChanged(account, cluster) {
+                const socks = this._browserSockets.get(account);
 
                 _.forEach(socks, (sock) => sock.emit('cluster:changed', cluster));
             }
@@ -153,10 +155,10 @@ module.exports = {
                 }
             }
 
-            executeOnAgent(token, demo, event, ...args) {
+            executeOnAgent(account, demo, event, ...args) {
                 const cb = _.last(args);
 
-                return this._agentHnd.agent(token, demo)
+                return this._agentHnd.agent(account, demo)
                     .then((agentSock) => agentSock.emitEvent(event, ..._.dropRight(args)))
                     .then((res) => cb(null, res))
                     .catch((err) => cb(this.errorTransformer(err)));
@@ -164,21 +166,21 @@ module.exports = {
 
             agentListeners(sock) {
                 const demo = sock.request._query.IgniteDemoMode === 'true';
-                const token = () => sock.request.user.token;
+                const account = () => sock.request.user;
 
                 // Return available drivers to browser.
                 sock.on('schemaImport:drivers', (...args) => {
-                    this.executeOnAgent(token(), demo, 'schemaImport:drivers', ...args);
+                    this.executeOnAgent(account(), demo, 'schemaImport:drivers', ...args);
                 });
 
                 // Return schemas from database to browser.
                 sock.on('schemaImport:schemas', (...args) => {
-                    this.executeOnAgent(token(), demo, 'schemaImport:schemas', ...args);
+                    this.executeOnAgent(account(), demo, 'schemaImport:schemas', ...args);
                 });
 
                 // Return tables from database to browser.
                 sock.on('schemaImport:metadata', (...args) => {
-                    this.executeOnAgent(token(), demo, 'schemaImport:metadata', ...args);
+                    this.executeOnAgent(account(), demo, 'schemaImport:metadata', ...args);
                 });
             }
 
@@ -218,9 +220,7 @@ module.exports = {
                         return cb('Invalid format of message: "node:rest"');
                     }
 
-                    const token = sock.request.user.token;
-
-                    const agent = this._agentHnd.agent(token, demo, clusterId);
+                    const agent = this._agentHnd.agent(sock.request.user, demo, clusterId);
 
                     this.executeOnNode(agent, demo, credentials, params)
                         .then((data) => cb(null, data))
@@ -259,8 +259,6 @@ module.exports = {
                         return cb('Invalid format of message: "node:visor"');
                     }
 
-                    const token = sock.request.user.token;
-
                     const {taskId, nids, args = []} = params;
 
                     const desc = this._visorTasks.get(taskId);
@@ -277,7 +275,7 @@ module.exports = {
 
                     _.forEach(_.concat(desc.argCls, args), (param, idx) => { exeParams[`p${idx + 3}`] = param; });
 
-                    const agent = this._agentHnd.agent(token, demo, clusterId);
+                    const agent = this._agentHnd.agent(sock.request.user, demo, clusterId);
 
                     this.executeOnNode(agent, demo, credentials, exeParams)
                         .then((data) => {
@@ -317,18 +315,13 @@ module.exports = {
                             // Handle browser disconnect event.
                             sock.on('disconnect', () => {
                                 this._browserSockets.remove(sock);
-
-                                const demo = sock.request._query.IgniteDemoMode === 'true';
-
-                                // Stop demo if latest demo tab for this token.
-                                demo && agentHnd.tryStopDemo(sock);
                             });
 
                             this.agentListeners(sock);
                             this.nodeListeners(sock);
 
                             this.pushInitialData(sock);
-                            this.agentStats(sock.request.user.token, [sock]);
+                            this.agentStats(sock.request.user, [sock]);
                             this.emitNotification(sock);
                         });
                     });

http://git-wip-us.apache.org/repos/asf/ignite/blob/76daf05f/modules/web-console/backend/app/configure.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/configure.js b/modules/web-console/backend/app/configure.js
index b7bdb49..a0e5190 100644
--- a/modules/web-console/backend/app/configure.js
+++ b/modules/web-console/backend/app/configure.js
@@ -47,8 +47,6 @@ module.exports.factory = function(settings, mongo, apis) {
 
             _.forEach(apis, (api) => app.use(api));
 
-            app.use(cookieParser(settings.sessionSecret));
-
             app.use(bodyParser.json({limit: '50mb'}));
             app.use(bodyParser.urlencoded({limit: '50mb', extended: true}));
 
@@ -67,18 +65,26 @@ module.exports.factory = function(settings, mongo, apis) {
             app.use(passport.initialize());
             app.use(passport.session());
 
-            passport.serializeUser(mongo.Account.serializeUser());
-            passport.deserializeUser(mongo.Account.deserializeUser());
+            passport.serializeUser((user, done) => done(null, user._id));
+
+            passport.deserializeUser((id, done) => {
+                if (mongo.ObjectId.isValid(id))
+                    return mongo.Account.findById(id, done);
+
+                // Invalidates the existing login session.
+                done(null, false);
+            });
 
             passport.use(mongo.Account.createStrategy());
         },
         socketio: (io) => {
-            const _onAuthorizeSuccess = (data, accept) => {
-                accept(null, true);
-            };
+            const _onAuthorizeSuccess = (data, accept) => accept();
 
             const _onAuthorizeFail = (data, message, error, accept) => {
-                accept(null, false);
+                if (error)
+                    accept(new Error(message));
+
+                return accept(new Error(message));
             };
 
             io.use(passportSocketIo.authorize({

http://git-wip-us.apache.org/repos/asf/ignite/blob/76daf05f/modules/web-console/backend/app/settings.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/settings.js b/modules/web-console/backend/app/settings.js
index d206107..104b66d 100644
--- a/modules/web-console/backend/app/settings.js
+++ b/modules/web-console/backend/app/settings.js
@@ -61,7 +61,8 @@ module.exports = {
             server: {
                 host: nconf.get('server:host') || dfltHost,
                 port: _normalizePort(nconf.get('server:port') || dfltPort),
-                SSLOptions: nconf.get('server:ssl') && {
+                // eslint-disable-next-line eqeqeq
+                SSLOptions: nconf.get('server:ssl') == 'true' && {
                     enable301Redirects: true,
                     trustXFPHeader: true,
                     key: fs.readFileSync(nconf.get('server:key')),