You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2018/10/08 03:00:45 UTC
ignite git commit: IGNITE-9808 Web Console: Refactored sockets
caching.
Repository: ignite
Updated Branches:
refs/heads/master 08fff5e71 -> 76daf05fe
IGNITE-9808 Web Console: Refactored sockets caching.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/76daf05f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/76daf05f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/76daf05f
Branch: refs/heads/master
Commit: 76daf05fef1392bb77c3be77c799df28dbaea7a1
Parents: 08fff5e
Author: Andrey Novikov <an...@apache.org>
Authored: Mon Oct 8 10:00:43 2018 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Mon Oct 8 10:00:43 2018 +0700
----------------------------------------------------------------------
modules/web-console/backend/app/agentSocket.js | 20 ++-
.../web-console/backend/app/agentsHandler.js | 142 ++++++++-----------
.../web-console/backend/app/browsersHandler.js | 51 +++----
modules/web-console/backend/app/configure.js | 22 +--
modules/web-console/backend/app/settings.js | 3 +-
5 files changed, 113 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/76daf05f/modules/web-console/backend/app/agentSocket.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/agentSocket.js b/modules/web-console/backend/app/agentSocket.js
index aff62c4..0c7e6b2 100644
--- a/modules/web-console/backend/app/agentSocket.js
+++ b/modules/web-console/backend/app/agentSocket.js
@@ -38,21 +38,33 @@ module.exports.factory = function() {
class AgentSocket {
/**
* @param {Socket} socket Socket for interaction.
+ * @param {Object} accounts Active accounts.
* @param {Array.<String>} tokens Agent tokens.
* @param {String} demoEnabled Demo enabled.
*/
- constructor(socket, tokens, demoEnabled) {
+ constructor(socket, accounts, tokens, demoEnabled) {
Object.assign(this, {
- socket,
- tokens,
+ accounts,
cluster: null,
demo: {
enabled: demoEnabled,
browserSockets: []
- }
+ },
+ socket,
+ tokens
});
}
+ resetToken(oldToken) {
+ _.pull(this.tokens, oldToken);
+
+ this.emitEvent('agent:reset:token', oldToken)
+ .then(() => {
+ if (_.isEmpty(this.tokens) && this.socket.connected)
+ this.socket.close();
+ });
+ }
+
/**
* Send event to agent.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/76daf05f/modules/web-console/backend/app/agentsHandler.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/agentsHandler.js b/modules/web-console/backend/app/agentsHandler.js
index 8d72ded..fd55ac3 100644
--- a/modules/web-console/backend/app/agentsHandler.js
+++ b/modules/web-console/backend/app/agentsHandler.js
@@ -50,22 +50,22 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
this.sockets = new Map();
}
- get(token) {
- let sockets = this.sockets.get(token);
+ get(account) {
+ let sockets = this.sockets.get(account._id.toString());
if (_.isEmpty(sockets))
- this.sockets.set(token, sockets = []);
+ this.sockets.set(account._id.toString(), sockets = []);
return sockets;
}
/**
* @param {AgentSocket} sock
- * @param {String} token
+ * @param {String} account
* @return {Array.<AgentSocket>}
*/
- add(token, sock) {
- const sockets = this.get(token);
+ add(account, sock) {
+ const sockets = this.get(account);
sockets.push(sock);
}
@@ -75,9 +75,9 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
* @return {AgentSocket}
*/
find(browserSocket) {
- const token = browserSocket.request.user.token;
+ const {_id} = browserSocket.request.user;
- const sockets = this.sockets.get(token);
+ const sockets = this.sockets.get(_id);
return _.find(sockets, (sock) => _.includes(sock.demo.browserSockets, browserSocket));
}
@@ -228,19 +228,26 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
* Link agent with browsers by account.
*
* @param {Socket} sock
+ * @param {Array.<mongo.Account>} accounts
* @param {Array.<String>} tokens
* @param {boolean} demoEnabled
*
* @private
*/
- onConnect(sock, tokens, demoEnabled) {
- const agentSocket = new AgentSocket(sock, tokens, demoEnabled);
+ onConnect(sock, accounts, tokens, demoEnabled) {
+ const agentSocket = new AgentSocket(sock, accounts, tokens, demoEnabled);
+
+ _.forEach(accounts, (account) => {
+ this._agentSockets.add(account, agentSocket);
+
+ this._browsersHnd.agentStats(account);
+ });
sock.on('disconnect', () => {
- _.forEach(tokens, (token) => {
- _.pull(this._agentSockets.get(token), agentSocket);
+ _.forEach(accounts, (account) => {
+ _.pull(this._agentSockets.get(account), agentSocket);
- this._browsersHnd.agentStats(token);
+ this._browsersHnd.agentStats(account);
});
});
@@ -258,8 +265,8 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
if (agentSocket.cluster !== cluster) {
agentSocket.cluster = cluster;
- _.forEach(tokens, (token) => {
- this._browsersHnd.agentStats(token);
+ _.forEach(accounts, (account) => {
+ this._browsersHnd.agentStats(account);
});
}
else {
@@ -268,8 +275,8 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
if (changed) {
cluster.update(top);
- _.forEach(tokens, (token) => {
- this._browsersHnd.clusterChanged(token, cluster);
+ _.forEach(accounts, (account) => {
+ this._browsersHnd.clusterChanged(account, cluster);
});
}
}
@@ -282,16 +289,17 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
agentSocket.cluster = null;
- _.forEach(tokens, (token) => {
- this._browsersHnd.agentStats(token);
+ _.forEach(accounts, (account) => {
+ this._browsersHnd.agentStats(account);
});
});
- _.forEach(tokens, (token) => {
- this._agentSockets.add(token, agentSocket);
+ return agentSocket;
+ }
- this._browsersHnd.agentStats(token);
- });
+ getAccounts(tokens) {
+ return mongo.Account.find({token: {$in: tokens}}, '_id token').lean().exec()
+ .then((accounts) => ({accounts, activeTokens: _.uniq(_.map(accounts, 'token'))}));
}
/**
@@ -301,17 +309,27 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
attach(srv, browsersHnd) {
this._browsersHnd = browsersHnd;
- if (this.io)
- throw 'Agent server already started!';
-
this._collectSupportedAgents()
.then((supportedAgents) => {
this.currentAgent = _.get(supportedAgents, 'current');
+ if (this.io)
+ throw 'Agent server already started!';
+
this.io = socketio(srv, {path: '/agents'});
this.io.on('connection', (sock) => {
+ const sockId = sock.id;
+
+ console.log('Connected agent with socketId: ', sockId);
+
+ sock.on('disconnect', (reason) => {
+ console.log(`Agent disconnected with [socketId=${sockId}, reason=${reason}]`);
+ });
+
sock.on('agent:auth', ({ver, bt, tokens, disableDemo} = {}, cb) => {
+ console.log(`Received authentication request [socketId=${sockId}, tokens=${tokens}, ver=${ver}].`);
+
if (_.isEmpty(tokens))
return cb('Tokens not set. Please reload agent archive or check settings');
@@ -322,32 +340,33 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
return cb('You are using an older version of the agent. Please reload agent');
}
- return mongo.Account.find({token: {$in: tokens}}, '_id token').lean().exec()
- .then((accounts) => {
- const activeTokens = _.uniq(_.map(accounts, 'token'));
-
+ return this.getAccounts(tokens)
+ .then(({accounts, activeTokens}) => {
if (_.isEmpty(activeTokens))
return cb(`Failed to authenticate with token(s): ${tokens.join(',')}. Please reload agent archive or check settings`);
cb(null, activeTokens);
- return this.onConnect(sock, activeTokens, disableDemo);
+ return this.onConnect(sock, accounts, activeTokens, disableDemo);
})
// TODO IGNITE-1379 send error to web master.
.catch(() => cb(`Invalid token(s): ${tokens.join(',')}. Please reload agent archive or check settings`));
});
});
+ })
+ .catch(() => {
+ console.log('Failed to collect supported agents');
});
}
- agent(token, demo, clusterId) {
+ agent(account, demo, clusterId) {
if (!this.io)
return Promise.reject(new Error('Agent server not started yet!'));
- const socks = this._agentSockets.get(token);
+ const socks = this._agentSockets.get(account);
if (_.isEmpty(socks))
- return Promise.reject(new Error('Failed to find connected agent for this token'));
+ return Promise.reject(new Error('Failed to find connected agent for this account'));
if (demo || _.isNil(clusterId))
return Promise.resolve(_.head(socks));
@@ -360,11 +379,11 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
return Promise.resolve(sock);
}
- agents(token) {
+ agents(account) {
if (!this.io)
return Promise.reject(new Error('Agent server not started yet!'));
- const socks = this._agentSockets.get(token);
+ const socks = this._agentSockets.get(account);
if (_.isEmpty(socks))
return Promise.reject(new Error('Failed to find connected agent for this token'));
@@ -372,61 +391,18 @@ module.exports.factory = function(settings, mongo, AgentSocket) {
return Promise.resolve(socks);
}
- tryStopDemo(browserSocket) {
- const agentSock = this._agentSockets.find(browserSocket);
- }
-
- /**
- * @param {ObjectId} token
- * @param {Socket} browserSock
- * @returns {int} Connected agent count.
- */
- onBrowserConnect(token, browserSock) {
- this.emitAgentsCount(token);
-
- // If connect from browser with enabled demo.
- const demo = browserSock.request._query.IgniteDemoMode === 'true';
-
- // Agents where possible to run demo.
- const agentSockets = _.filter(this._agentSockets[token], 'demo.enabled');
-
- if (demo && _.size(agentSockets)) {
- const agentSocket = _.find(agentSockets, (agent) => _.includes(agent.demo.tokens, token));
-
- if (agentSocket)
- agentSocket.attachToDemoCluster(browserSock);
- else
- _.head(agentSockets).runDemoCluster(token, [browserSock]);
- }
- }
-
- /**
- * @param {Socket} browserSock.
- */
- onBrowserDisconnect(browserSock) {
- const token = browserSock.client.request.user.token;
-
- this._browserSockets.pull(token, browserSock);
-
- // If connect from browser with enabled demo.
- if (browserSock.request._query.IgniteDemoMode === 'true')
- this._agentSockets.find(token, (agent) => _.includes(agent.demo.browserSockets, browserSock));
-
- // TODO If latest browser with demo need stop demo cluster on agent.
- }
-
/**
* Try stop agent for token if not used by others.
*
- * @param {String} token
+ * @param {mongo.Account} account
*/
- onTokenReset(token) {
+ onTokenReset(account) {
if (_.isNil(this.io))
return;
- const sockets = this._agentSockets[token];
+ const agentSockets = this._agentSockets.get(account);
- _.forEach(sockets, (socket) => socket._sendToAgent('agent:reset:token', token));
+ _.forEach(agentSockets, (sock) => sock.resetToken(account.token));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/76daf05f/modules/web-console/backend/app/browsersHandler.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/browsersHandler.js b/modules/web-console/backend/app/browsersHandler.js
index d0cd112..a18b467 100644
--- a/modules/web-console/backend/app/browsersHandler.js
+++ b/modules/web-console/backend/app/browsersHandler.js
@@ -38,14 +38,14 @@ module.exports = {
* @param {Socket} sock
*/
add(sock) {
- const token = sock.request.user.token;
+ const token = sock.request.user._id.toString();
if (this.sockets.has(token))
this.sockets.get(token).push(sock);
else
this.sockets.set(token, [sock]);
- return this.sockets.get(token);
+ return this.sockets.get(sock.request.user);
}
/**
@@ -61,11 +61,13 @@ module.exports = {
return sockets;
}
- get(token) {
- if (this.sockets.has(token))
- return this.sockets.get(token);
+ get(account) {
+ let sockets = this.sockets.get(account._id.toString());
- return [];
+ if (_.isEmpty(sockets))
+ this.sockets.set(account._id.toString(), sockets = []);
+
+ return sockets;
}
demo(token) {
@@ -103,11 +105,11 @@ module.exports = {
}
/**
- * @param {String} token
+ * @param {String} account
* @param {Array.<Socket>} [socks]
*/
- agentStats(token, socks = this._browserSockets.get(token)) {
- return this._agentHnd.agents(token)
+ agentStats(account, socks = this._browserSockets.get(account)) {
+ return this._agentHnd.agents(account)
.then((agentSocks) => {
const stat = _.reduce(agentSocks, (acc, agentSock) => {
acc.count += 1;
@@ -127,8 +129,8 @@ module.exports = {
.then((stat) => _.forEach(socks, (sock) => sock.emit('agents:stat', stat)));
}
- clusterChanged(token, cluster) {
- const socks = this._browserSockets.get(token);
+ clusterChanged(account, cluster) {
+ const socks = this._browserSockets.get(account);
_.forEach(socks, (sock) => sock.emit('cluster:changed', cluster));
}
@@ -153,10 +155,10 @@ module.exports = {
}
}
- executeOnAgent(token, demo, event, ...args) {
+ executeOnAgent(account, demo, event, ...args) {
const cb = _.last(args);
- return this._agentHnd.agent(token, demo)
+ return this._agentHnd.agent(account, demo)
.then((agentSock) => agentSock.emitEvent(event, ..._.dropRight(args)))
.then((res) => cb(null, res))
.catch((err) => cb(this.errorTransformer(err)));
@@ -164,21 +166,21 @@ module.exports = {
agentListeners(sock) {
const demo = sock.request._query.IgniteDemoMode === 'true';
- const token = () => sock.request.user.token;
+ const account = () => sock.request.user;
// Return available drivers to browser.
sock.on('schemaImport:drivers', (...args) => {
- this.executeOnAgent(token(), demo, 'schemaImport:drivers', ...args);
+ this.executeOnAgent(account(), demo, 'schemaImport:drivers', ...args);
});
// Return schemas from database to browser.
sock.on('schemaImport:schemas', (...args) => {
- this.executeOnAgent(token(), demo, 'schemaImport:schemas', ...args);
+ this.executeOnAgent(account(), demo, 'schemaImport:schemas', ...args);
});
// Return tables from database to browser.
sock.on('schemaImport:metadata', (...args) => {
- this.executeOnAgent(token(), demo, 'schemaImport:metadata', ...args);
+ this.executeOnAgent(account(), demo, 'schemaImport:metadata', ...args);
});
}
@@ -218,9 +220,7 @@ module.exports = {
return cb('Invalid format of message: "node:rest"');
}
- const token = sock.request.user.token;
-
- const agent = this._agentHnd.agent(token, demo, clusterId);
+ const agent = this._agentHnd.agent(sock.request.user, demo, clusterId);
this.executeOnNode(agent, demo, credentials, params)
.then((data) => cb(null, data))
@@ -259,8 +259,6 @@ module.exports = {
return cb('Invalid format of message: "node:visor"');
}
- const token = sock.request.user.token;
-
const {taskId, nids, args = []} = params;
const desc = this._visorTasks.get(taskId);
@@ -277,7 +275,7 @@ module.exports = {
_.forEach(_.concat(desc.argCls, args), (param, idx) => { exeParams[`p${idx + 3}`] = param; });
- const agent = this._agentHnd.agent(token, demo, clusterId);
+ const agent = this._agentHnd.agent(sock.request.user, demo, clusterId);
this.executeOnNode(agent, demo, credentials, exeParams)
.then((data) => {
@@ -317,18 +315,13 @@ module.exports = {
// Handle browser disconnect event.
sock.on('disconnect', () => {
this._browserSockets.remove(sock);
-
- const demo = sock.request._query.IgniteDemoMode === 'true';
-
- // Stop demo if latest demo tab for this token.
- demo && agentHnd.tryStopDemo(sock);
});
this.agentListeners(sock);
this.nodeListeners(sock);
this.pushInitialData(sock);
- this.agentStats(sock.request.user.token, [sock]);
+ this.agentStats(sock.request.user, [sock]);
this.emitNotification(sock);
});
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/76daf05f/modules/web-console/backend/app/configure.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/configure.js b/modules/web-console/backend/app/configure.js
index b7bdb49..a0e5190 100644
--- a/modules/web-console/backend/app/configure.js
+++ b/modules/web-console/backend/app/configure.js
@@ -47,8 +47,6 @@ module.exports.factory = function(settings, mongo, apis) {
_.forEach(apis, (api) => app.use(api));
- app.use(cookieParser(settings.sessionSecret));
-
app.use(bodyParser.json({limit: '50mb'}));
app.use(bodyParser.urlencoded({limit: '50mb', extended: true}));
@@ -67,18 +65,26 @@ module.exports.factory = function(settings, mongo, apis) {
app.use(passport.initialize());
app.use(passport.session());
- passport.serializeUser(mongo.Account.serializeUser());
- passport.deserializeUser(mongo.Account.deserializeUser());
+ passport.serializeUser((user, done) => done(null, user._id));
+
+ passport.deserializeUser((id, done) => {
+ if (mongo.ObjectId.isValid(id))
+ return mongo.Account.findById(id, done);
+
+ // Invalidates the existing login session.
+ done(null, false);
+ });
passport.use(mongo.Account.createStrategy());
},
socketio: (io) => {
- const _onAuthorizeSuccess = (data, accept) => {
- accept(null, true);
- };
+ const _onAuthorizeSuccess = (data, accept) => accept();
const _onAuthorizeFail = (data, message, error, accept) => {
- accept(null, false);
+ if (error)
+ accept(new Error(message));
+
+ return accept(new Error(message));
};
io.use(passportSocketIo.authorize({
http://git-wip-us.apache.org/repos/asf/ignite/blob/76daf05f/modules/web-console/backend/app/settings.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/settings.js b/modules/web-console/backend/app/settings.js
index d206107..104b66d 100644
--- a/modules/web-console/backend/app/settings.js
+++ b/modules/web-console/backend/app/settings.js
@@ -61,7 +61,8 @@ module.exports = {
server: {
host: nconf.get('server:host') || dfltHost,
port: _normalizePort(nconf.get('server:port') || dfltPort),
- SSLOptions: nconf.get('server:ssl') && {
+ // eslint-disable-next-line eqeqeq
+ SSLOptions: nconf.get('server:ssl') == 'true' && {
enable301Redirects: true,
trustXFPHeader: true,
key: fs.readFileSync(nconf.get('server:key')),