You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2017/04/18 03:37:38 UTC
[4/4] ignite git commit: IGNITE-4995 Multi-cluster support for Web
Console.
IGNITE-4995 Multi-cluster support for Web Console.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/323e3870
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/323e3870
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/323e3870
Branch: refs/heads/master
Commit: 323e38709d99c08142c54916e7a718895f867be8
Parents: 8c03220
Author: anovikov <an...@gridgain.com>
Authored: Tue Apr 18 10:38:19 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Tue Apr 18 10:38:19 2017 +0700
----------------------------------------------------------------------
.../JettyRestProcessorAbstractSelfTest.java | 3 +-
.../internal/visor/query/VisorQueryArg.java | 16 +-
.../internal/visor/query/VisorQueryTask.java | 21 +-
modules/web-console/backend/app/agent.js | 926 -------------------
modules/web-console/backend/app/agentSocket.js | 249 +++++
.../web-console/backend/app/agentsHandler.js | 400 ++++++++
modules/web-console/backend/app/apiServer.js | 68 ++
modules/web-console/backend/app/app.js | 63 --
modules/web-console/backend/app/browser.js | 539 -----------
.../web-console/backend/app/browsersHandler.js | 279 ++++++
modules/web-console/backend/app/routes.js | 6 +-
modules/web-console/backend/index.js | 44 +-
modules/web-console/backend/package.json | 52 +-
modules/web-console/backend/routes/agent.js | 57 --
modules/web-console/backend/routes/demo.js | 14 +-
modules/web-console/backend/routes/downloads.js | 57 ++
modules/web-console/backend/services/agents.js | 83 --
.../web-console/backend/services/downloads.js | 80 ++
modules/web-console/backend/services/users.js | 10 +-
.../web-console/backend/test/app/httpAgent.js | 6 +-
.../web-console/backend/test/routes/clusters.js | 4 +-
modules/web-console/frontend/app/app.config.js | 1 +
modules/web-console/frontend/app/app.js | 12 +-
.../components/activities-user-dialog/index.js | 1 -
.../cluster-select/cluster-select.controller.js | 55 ++
.../cluster-select/cluster-select.pug | 40 +
.../app/components/cluster-select/index.js | 28 +
.../input-dialog/input-dialog.service.js | 1 -
.../list-of-registered-users.column-defs.js | 2 +-
.../app/helpers/jade/form/form-field-text.pug | 19 +-
.../frontend/app/helpers/jade/mixins.pug | 6 +
.../app/modules/agent/AgentManager.service.js | 529 +++++++++++
.../app/modules/agent/AgentModal.service.js | 89 ++
.../frontend/app/modules/agent/agent.module.js | 347 +------
.../frontend/app/modules/cluster/Cache.js | 51 +
.../app/modules/cluster/CacheMetrics.js | 51 +
.../frontend/app/modules/cluster/Node.js | 54 ++
.../frontend/app/modules/cluster/NodeMetrics.js | 19 +
.../frontend/app/modules/demo/Demo.module.js | 5 +-
.../app/modules/dialog/dialog.factory.js | 1 -
.../getting-started/GettingStarted.provider.js | 2 +-
.../app/modules/navbar/userbar.directive.js | 4 +-
.../frontend/app/modules/nodes/Nodes.service.js | 1 -
.../app/modules/sql/notebook.controller.js | 2 +-
.../frontend/app/modules/sql/sql.controller.js | 49 +-
.../frontend/app/modules/states/admin.state.js | 4 +-
.../app/modules/states/configuration.state.js | 17 +-
.../app/modules/states/profile.state.js | 2 +-
.../frontend/app/modules/user/Auth.service.js | 6 +-
.../frontend/app/services/Confirm.service.js | 2 +-
.../app/services/ConfirmBatch.service.js | 1 -
.../frontend/controllers/caches-controller.js | 2 +-
.../frontend/controllers/domains-controller.js | 20 +-
modules/web-console/frontend/package.json | 62 +-
.../stylesheets/_bootstrap-variables.scss | 2 +-
.../stylesheets/_font-awesome-custom.scss | 5 +
.../frontend/public/stylesheets/style.scss | 57 +-
.../frontend/views/includes/header.pug | 23 +-
.../views/templates/agent-download.tpl.pug | 39 +-
.../frontend/views/templates/demo-info.tpl.pug | 2 +-
modules/web-console/web-agent/pom.xml | 16 +-
.../console/agent/AgentConfiguration.java | 47 +-
.../ignite/console/agent/AgentLauncher.java | 196 ++--
.../apache/ignite/console/agent/AgentUtils.java | 6 +-
.../console/agent/handlers/ClusterListener.java | 266 ++++++
.../agent/handlers/DatabaseListener.java | 6 +-
.../console/agent/handlers/DemoListener.java | 131 +++
.../console/agent/handlers/RestListener.java | 229 +----
.../ignite/console/agent/rest/RestExecutor.java | 197 ++++
.../ignite/console/agent/rest/RestResult.java | 81 ++
.../ignite/console/demo/AgentClusterDemo.java | 175 ++--
.../ignite/console/demo/AgentDemoUtils.java | 2 +-
.../demo/service/DemoCachesLoadService.java | 35 +-
73 files changed, 3346 insertions(+), 2631 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index cd3011c..c383de0 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -1408,7 +1408,8 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
ret = content(new VisorGatewayArgument(VisorQueryTask.class)
.forNode(locNode)
- .argument(VisorQueryArg.class, "person", URLEncoder.encode("select * from Person", CHARSET), false, false, false, 1));
+ .argument(VisorQueryArg.class, "person", URLEncoder.encode("select * from Person", CHARSET),
+ false, false, false, false, 1));
info("VisorQueryTask result: " + ret);
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
index 1cb1f0d..d4eb65a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
@@ -43,6 +43,9 @@ public class VisorQueryArg extends VisorDataTransferObject {
/** Enforce join order flag. */
private boolean enforceJoinOrder;
+ /** Query contains only replicated tables flag.*/
+ private boolean replicatedOnly;
+
/** Flag whether to execute query locally. */
private boolean loc;
@@ -61,15 +64,17 @@ public class VisorQueryArg extends VisorDataTransferObject {
* @param qryTxt Query text.
* @param distributedJoins If {@code true} then distributed joins enabled.
* @param enforceJoinOrder If {@code true} then enforce join order.
+ * @param replicatedOnly {@code true} then query contains only replicated tables.
* @param loc Flag whether to execute query locally.
* @param pageSize Result batch size.
*/
public VisorQueryArg(String cacheName, String qryTxt,
- boolean distributedJoins, boolean enforceJoinOrder, boolean loc, int pageSize) {
+ boolean distributedJoins, boolean enforceJoinOrder, boolean replicatedOnly, boolean loc, int pageSize) {
this.cacheName = cacheName;
this.qryTxt = qryTxt;
this.distributedJoins = distributedJoins;
this.enforceJoinOrder = enforceJoinOrder;
+ this.replicatedOnly = replicatedOnly;
this.loc = loc;
this.pageSize = pageSize;
}
@@ -103,7 +108,14 @@ public class VisorQueryArg extends VisorDataTransferObject {
}
/**
- * @return {@code true} if query should be executed locally.
+ * @return {@code true} If the query contains only replicated tables.
+ */
+ public boolean isReplicatedOnly() {
+ return replicatedOnly;
+ }
+
+ /**
+ * @return {@code true} If query should be executed locally.
*/
public boolean isLocal() {
return loc;
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
index 303e6b6..815cf6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
@@ -22,10 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
-import javax.cache.Cache;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.task.GridInternal;
@@ -35,7 +32,6 @@ import org.apache.ignite.internal.visor.VisorEither;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorOneNodeTask;
import org.apache.ignite.internal.visor.util.VisorExceptionWrapper;
-import org.apache.ignite.lang.IgniteBiPredicate;
import static org.apache.ignite.internal.visor.query.VisorQueryUtils.SQL_QRY_NAME;
import static org.apache.ignite.internal.visor.query.VisorQueryUtils.fetchSqlQueryRows;
@@ -71,22 +67,6 @@ public class VisorQueryTask extends VisorOneNodeTask<VisorQueryArg, VisorEither<
super(arg, debug);
}
- /**
- * Execute scan query.
- *
- * @param c Cache to scan.
- * @param arg Job argument with query parameters.
- * @return Query cursor.
- */
- private QueryCursor<Cache.Entry<Object, Object>> scan(IgniteCache<Object, Object> c, VisorQueryArg arg,
- IgniteBiPredicate<Object, Object> filter) {
- ScanQuery<Object, Object> qry = new ScanQuery<>(filter);
- qry.setPageSize(arg.getPageSize());
- qry.setLocal(arg.isLocal());
-
- return c.withKeepBinary().query(qry);
- }
-
/** {@inheritDoc} */
@Override protected VisorEither<VisorQueryResult> run(final VisorQueryArg arg) {
try {
@@ -98,6 +78,7 @@ public class VisorQueryTask extends VisorOneNodeTask<VisorQueryArg, VisorEither<
qry.setLocal(arg.isLocal());
qry.setDistributedJoins(arg.isDistributedJoins());
qry.setEnforceJoinOrder(arg.isEnforceJoinOrder());
+ qry.setReplicatedOnly(arg.isReplicatedOnly());
long start = U.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/backend/app/agent.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/agent.js b/modules/web-console/backend/app/agent.js
deleted file mode 100644
index 758d31b..0000000
--- a/modules/web-console/backend/app/agent.js
+++ /dev/null
@@ -1,926 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-'use strict';
-
-// Fire me up!
-
-/**
- * Module interaction with agents.
- */
-module.exports = {
- implements: 'agent-manager',
- inject: ['require(lodash)', 'require(fs)', 'require(path)', 'require(jszip)', 'require(socket.io)', 'settings', 'mongo', 'services/activities']
-};
-
-/**
- * @param _
- * @param fs
- * @param path
- * @param JSZip
- * @param socketio
- * @param settings
- * @param mongo
- * @param {ActivitiesService} activitiesService
- * @returns {AgentManager}
- */
-module.exports.factory = function(_, fs, path, JSZip, socketio, settings, mongo, activitiesService) {
- /**
- *
- */
- class Command {
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} name Command name.
- */
- constructor(demo, name) {
- this._demo = demo;
-
- /**
- * Command name.
- * @type {String}
- */
- this._name = name;
-
- /**
- * Command parameters.
- * @type {Array.<String>}
- */
- this._params = [];
- }
-
- /**
- * Add parameter to command.
- * @param {string} key Parameter key.
- * @param {Object} value Parameter value.
- * @returns {Command}
- */
- addParam(key, value) {
- this._params.push({key, value});
-
- return this;
- }
- }
-
- /**
- * Connected agent descriptor.
- */
- class Agent {
- /**
- * @param {socketIo.Socket} socket - Agent socket for interaction.
- */
- constructor(socket) {
- /**
- * Agent socket for interaction.
- *
- * @type {socketIo.Socket}
- * @private
- */
- this._socket = socket;
- }
-
- /**
- * Send message to agent.
- *
- * @this {Agent}
- * @param {String} event Command name.
- * @param {Object} data Command params.
- * @param {Function} [callback] on finish
- */
- _emit(event, data, callback) {
- if (!this._socket.connected) {
- if (callback)
- callback('org.apache.ignite.agent.AgentException: Connection is closed');
-
- return;
- }
-
- this._socket.emit(event, data, callback);
- }
-
- /**
- * Send message to agent.
- *
- * @param {String} event - Event name.
- * @param {Object?} data - Transmitted data.
- * @returns {Promise}
- */
- executeAgent(event, data) {
- return new Promise((resolve, reject) =>
- this._emit(event, data, (error, res) => {
- if (error)
- return reject(error);
-
- resolve(res);
- })
- );
- }
-
- /**
- * Execute rest request on node.
- *
- * @param {Command} cmd - REST command.
- * @return {Promise}
- */
- executeRest(cmd) {
- const params = {cmd: cmd._name};
-
- for (const param of cmd._params)
- params[param.key] = param.value;
-
- return new Promise((resolve, reject) => {
- this._emit('node:rest', {uri: 'ignite', params, demo: cmd._demo, method: 'GET'}, (error, res) => {
- if (error)
- return reject(new Error(error));
-
- error = res.error;
-
- const code = res.code;
-
- if (code === 401)
- return reject(new Error('Agent failed to authenticate in grid. Please check agent\'s login and password or node port.'));
-
- if (code !== 200)
- return reject(new Error(error || 'Failed connect to node and execute REST command.'));
-
- try {
- const msg = JSON.parse(res.data);
-
- if (msg.successStatus === 0)
- return resolve(msg.response);
-
- if (msg.successStatus === 2)
- return reject(new Error('Agent failed to authenticate in grid. Please check agent\'s login and password or node port.'));
-
- reject(new Error(msg.error));
- }
- catch (e) {
- return reject(e);
- }
- });
- });
- }
-
- /**
- * @param {String} driverPath
- * @param {String} driverClass
- * @param {String} url
- * @param {Object} info
- * @returns {Promise} Promise on list of tables (see org.apache.ignite.schema.parser.DbTable java class)
- */
- metadataSchemas(driverPath, driverClass, url, info) {
- return this.executeAgent('schemaImport:schemas', {driverPath, driverClass, url, info});
- }
-
- /**
- * @param {String} driverPath
- * @param {String} driverClass
- * @param {String} url
- * @param {Object} info
- * @param {Array} schemas
- * @param {Boolean} tablesOnly
- * @returns {Promise} Promise on list of tables (see org.apache.ignite.schema.parser.DbTable java class)
- */
- metadataTables(driverPath, driverClass, url, info, schemas, tablesOnly) {
- return this.executeAgent('schemaImport:metadata', {driverPath, driverClass, url, info, schemas, tablesOnly});
- }
-
- /**
- * @returns {Promise} Promise on list of jars from driver folder.
- */
- availableDrivers() {
- return this.executeAgent('schemaImport:drivers');
- }
-
- /**
- *
- * @param {Boolean} demo Is need run command on demo node.
- * @param {Boolean} attr Get attributes, if this parameter has value true. Default value: true.
- * @param {Boolean} mtr Get metrics, if this parameter has value true. Default value: false.
- * @returns {Promise}
- */
- topology(demo, attr, mtr) {
- const cmd = new Command(demo, 'top')
- .addParam('attr', attr !== false)
- .addParam('mtr', !!mtr);
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} nid Node id.
- * @param {String} cacheName Cache name.
- * @param {String} query Query.
- * @param {Boolean} nonCollocatedJoins Flag whether to execute non collocated joins.
- * @param {Boolean} enforceJoinOrder Flag whether enforce join order is enabled.
- * @param {Boolean} local Flag whether to execute query locally.
- * @param {int} pageSize Page size.
- * @returns {Promise}
- */
- fieldsQuery(demo, nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, local, pageSize) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nid)
- .addParam('p2', 'org.apache.ignite.internal.visor.query.VisorQueryTask')
- .addParam('p3', 'org.apache.ignite.internal.visor.query.VisorQueryArg')
- .addParam('p4', cacheName)
- .addParam('p5', query)
- .addParam('p6', nonCollocatedJoins)
- .addParam('p7', enforceJoinOrder)
- .addParam('p8', local)
- .addParam('p9', pageSize);
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} nid Node id.
- * @param {String} cacheName Cache name.
- * @param {String} filter Filter text.
- * @param {Boolean} regEx Flag whether filter by regexp.
- * @param {Boolean} caseSensitive Case sensitive filtration.
- * @param {Boolean} near Scan near cache.
- * @param {Boolean} local Flag whether to execute query locally.
- * @param {int} pageSize Page size.
- * @returns {Promise}
- */
- queryScan(demo, nid, cacheName, filter, regEx, caseSensitive, near, local, pageSize) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nid)
- .addParam('p2', 'org.apache.ignite.internal.visor.query.VisorScanQueryTask')
- .addParam('p3', 'org.apache.ignite.internal.visor.query.VisorScanQueryArg')
- .addParam('p4', cacheName)
- .addParam('p5', filter)
- .addParam('p6', regEx)
- .addParam('p7', caseSensitive)
- .addParam('p8', near)
- .addParam('p9', local)
- .addParam('p10', pageSize);
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} nid Node id.
- * @param {int} queryId Query Id.
- * @param {int} pageSize Page size.
- * @returns {Promise}
- */
- queryFetch(demo, nid, queryId, pageSize) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nid)
- .addParam('p2', 'org.apache.ignite.internal.visor.query.VisorQueryNextPageTask')
- .addParam('p3', 'org.apache.ignite.lang.VisorQueryNextPageTaskArg')
- .addParam('p4', 'java.lang.String')
- .addParam('p5', 'java.lang.Integer')
- .addParam('p6', queryId)
- .addParam('p7', pageSize);
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} nid Node id.
- * @param {int} queryId Query Id.
- * @returns {Promise}
- */
- queryClose(demo, nid, queryId) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', '')
- .addParam('p2', 'org.apache.ignite.internal.visor.query.VisorQueryCleanupTask')
- .addParam('p3', 'java.util.Map')
- .addParam('p4', 'java.util.UUID')
- .addParam('p5', 'java.util.Set')
- .addParam('p6', `${nid}=${queryId}`);
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {Array.<String>} nids Node ids.
- * @param {Number} since Metrics since.
- * @returns {Promise}
- */
- queryDetailMetrics(demo, nids, since) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nids)
- .addParam('p2', 'org.apache.ignite.internal.visor.query.VisorQueryDetailMetricsCollectorTask')
- .addParam('p3', 'java.lang.Long')
- .addParam('p4', since);
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {Array.<String>} nids Node ids.
- * @returns {Promise}
- */
- queryResetDetailMetrics(demo, nids) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nids)
- .addParam('p2', 'org.apache.ignite.internal.visor.query.VisorQueryResetDetailMetricsTask')
- .addParam('p3', 'java.lang.Void');
-
- return this.executeRest(cmd);
- }
-
- /**
- * Collect running queries
- * @param {Boolean} demo Is need run command on demo node.
- * @param {Number} duration minimum duration time of running queries.
- * @returns {Promise}
- */
- queryCollectRunning(demo, duration) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', '')
- .addParam('p2', 'org.apache.ignite.internal.visor.query.VisorRunningQueriesCollectorTask')
- .addParam('p3', 'java.lang.Long')
- .addParam('p4', duration);
-
- return this.executeRest(cmd);
- }
-
- /**
- * Cancel running query.
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} nid Node id.
- * @param {Number} queryId query id to cancel.
- * @returns {Promise}
- */
- queryCancel(demo, nid, queryId) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nid)
- .addParam('p2', 'org.apache.ignite.internal.visor.query.VisorQueryCancelTask')
- .addParam('p3', 'java.lang.Long')
- .addParam('p4', queryId);
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} cacheName Cache name.
- * @returns {Promise}
- */
- metadata(demo, cacheName) {
- const cmd = new Command(demo, 'metadata')
- .addParam('cacheName', cacheName);
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} evtOrderKey Event order key, unique for tab instance.
- * @param {String} evtThrottleCntrKey Event throttle counter key, unique for tab instance.
- * @returns {Promise}
- */
- collect(demo, evtOrderKey, evtThrottleCntrKey) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', '')
- .addParam('p2', 'org.apache.ignite.internal.visor.node.VisorNodeDataCollectorTask')
- .addParam('p3', 'org.apache.ignite.internal.visor.node.VisorNodeDataCollectorTaskArg')
- .addParam('p4', true)
- .addParam('p5', 'CONSOLE_' + evtOrderKey)
- .addParam('p6', evtThrottleCntrKey)
- .addParam('p7', 10)
- .addParam('p8', false);
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} nid Node id.
- * @returns {Promise}
- */
- collectNodeConfiguration(demo, nid) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nid)
- .addParam('p2', 'org.apache.ignite.internal.visor.node.VisorNodeConfigurationCollectorTask')
- .addParam('p3', 'java.lang.Void')
- .addParam('p4', null);
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} nid Node id.
- * @param {Array.<String>} caches Caches deployment IDs to collect configuration.
- * @returns {Promise}
- */
- collectCacheConfigurations(demo, nid, caches) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nid)
- .addParam('p2', 'org.apache.ignite.internal.visor.cache.VisorCacheConfigurationCollectorTask')
- .addParam('p3', 'java.util.Collection')
- .addParam('p4', 'org.apache.ignite.lang.IgniteUuid')
- .addParam('p5', caches);
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} nid Node id.
- * @param {String} cacheName Cache name.
- * @returns {Promise}
- */
- cacheClear(demo, nid, cacheName) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nid)
- .addParam('p2', 'org.apache.ignite.internal.visor.cache.VisorCacheClearTask')
- .addParam('p3', 'java.lang.String')
- .addParam('p4', cacheName);
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {Array.<String>} nids Node ids.
- * @param {Boolean} near true if near cache should be started.
- * @param {String} cacheName Name for near cache.
- * @param {String} cfg Cache XML configuration.
- * @returns {Promise}
- */
- cacheStart(demo, nids, near, cacheName, cfg) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nids)
- .addParam('p2', 'org.apache.ignite.internal.visor.cache.VisorCacheStartTask')
- .addParam('p3', 'org.apache.ignite.internal.visor.cache.VisorCacheStartArg')
- .addParam('p4', near)
- .addParam('p5', cacheName)
- .addParam('p6', cfg);
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} nid Node id.
- * @param {String} cacheName Cache name.
- * @returns {Promise}
- */
- cacheStop(demo, nid, cacheName) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nid)
- .addParam('p2', 'org.apache.ignite.internal.visor.cache.VisorCacheStopTask')
- .addParam('p3', 'java.lang.String')
- .addParam('p4', cacheName);
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} nid Node id.
- * @param {String} cacheName Cache name.
- * @returns {Promise}
- */
- cacheResetMetrics(demo, nid, cacheName) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nid)
- .addParam('p2', 'org.apache.ignite.internal.visor.cache.VisorCacheResetMetricsTask')
- .addParam('p3', 'java.lang.String')
- .addParam('p4', cacheName);
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} nids Node ids.
- * @returns {Promise}
- */
- gc(demo, nids) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nids)
- .addParam('p2', 'org.apache.ignite.internal.visor.node.VisorNodeGcTask')
- .addParam('p3', 'java.lang.Void');
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} taskNid node that is not node we want to ping.
- * @param {String} nid Id of the node to ping.
- * @returns {Promise}
- */
- ping(demo, taskNid, nid) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', taskNid)
- .addParam('p2', 'org.apache.ignite.internal.visor.node.VisorNodePingTask')
- .addParam('p3', 'java.util.UUID')
- .addParam('p4', nid);
-
- return this.executeRest(cmd);
- }
-
- /**
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} nid Id of the node to get thread dump.
- * @returns {Promise}
- */
- threadDump(demo, nid) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nid)
- .addParam('p2', 'org.apache.ignite.internal.visor.debug.VisorThreadDumpTask')
- .addParam('p3', 'java.lang.Void');
-
- return this.executeRest(cmd);
- }
-
- /**
- * Collect cache partitions.
- * @param {Boolean} demo Is need run command on demo node.
- * @param {Array.<String>} nids Cache node IDs.
- * @param {String} cacheName Cache name.
- * @returns {Promise}
- */
- partitions(demo, nids, cacheName) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nids)
- .addParam('p2', 'org.apache.ignite.internal.visor.cache.VisorCachePartitionsTask')
- .addParam('p3', 'java.lang.String')
- .addParam('p4', cacheName);
-
- return this.executeRest(cmd);
- }
-
- /**
- * Stops given node IDs.
- * @param {Boolean} demo Is need run command on demo node.
- * @param {Array.<String>} nids Nodes IDs.
- * @returns {Promise}
- */
- stopNodes(demo, nids) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nids)
- .addParam('p2', 'org.apache.ignite.internal.visor.node.VisorNodeStopTask')
- .addParam('p3', 'java.lang.Void');
-
- return this.executeRest(cmd);
- }
-
- /**
- * Restarts given node IDs.
- * @param {Boolean} demo Is need run command on demo node.
- * @param {Array.<String>} nids Nodes IDs.
- * @returns {Promise}
- */
- restartNodes(demo, nids) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nids)
- .addParam('p2', 'org.apache.ignite.internal.visor.node.VisorNodeRestartTask')
- .addParam('p3', 'java.lang.Void');
-
- return this.executeRest(cmd);
- }
-
- /**
- * Collect service information.
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} nid Node ID.
- * @returns {Promise}
- */
- services(demo, nid) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nid)
- .addParam('p2', 'org.apache.ignite.internal.visor.service.VisorServiceTask')
- .addParam('p3', 'java.lang.Void');
-
- return this.executeRest(cmd);
- }
-
- /**
- * Cancel service with specified name.
- * @param {Boolean} demo Is need run command on demo node.
- * @param {String} nid Node ID.
- * @param {String} name Name of service to cancel.
- * @returns {Promise}
- */
- serviceCancel(demo, nid, name) {
- const cmd = new Command(demo, 'exe')
- .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
- .addParam('p1', nid)
- .addParam('p2', 'org.apache.ignite.internal.visor.service.VisorCancelServiceTask')
- .addParam('p3', 'java.lang.String')
- .addParam('p4', name);
-
- return this.executeRest(cmd);
- }
- }
-
- /**
- * Connected agents manager.
- */
- class AgentManager {
- /**
- * @constructor
- */
- constructor() {
- /**
- * Connected agents by user id.
- * @type {Object.<ObjectId, Array.<Agent>>}
- */
- this._agents = {};
-
- /**
- * Connected browsers by user id.
- * @type {Object.<ObjectId, Array.<Socket>>}
- */
- this._browsers = {};
-
- const agentArchives = fs.readdirSync(settings.agent.dists)
- .filter((file) => path.extname(file) === '.zip');
-
- /**
- * Supported agents distribution.
- * @type {Object.<String, String>}
- */
- this.supportedAgents = {};
-
- const jarFilter = (file) => path.extname(file) === '.jar';
-
- const agentsPromises = _.map(agentArchives, (fileName) => {
- const filePath = path.join(settings.agent.dists, fileName);
-
- return JSZip.loadAsync(fs.readFileSync(filePath))
- .then((zip) => {
- const jarPath = _.find(_.keys(zip.files), jarFilter);
-
- return JSZip.loadAsync(zip.files[jarPath].async('nodebuffer'))
- .then((jar) => jar.files['META-INF/MANIFEST.MF'].async('string'))
- .then((lines) => lines.trim()
- .split(/\s*\n+\s*/)
- .map((line, r) => {
- r = line.split(/\s*:\s*/);
-
- this[r[0]] = r[1];
-
- return this;
- }, {})[0])
- .then((manifest) => {
- const ver = manifest['Implementation-Version'];
- const buildTime = manifest['Build-Time'];
-
- if (ver && buildTime)
- return { fileName, filePath, ver, buildTime };
- });
- });
- });
-
- Promise.all(agentsPromises)
- .then((agents) => {
- this.supportedAgents = _.keyBy(_.remove(agents, null), 'ver');
-
- const latest = _.head(Object.keys(this.supportedAgents).sort((a, b) => {
- const aParts = a.split('.');
- const bParts = b.split('.');
-
- for (let i = 0; i < aParts.length; ++i) {
- if (aParts[i] !== bParts[i])
- return aParts[i] < bParts[i] ? 1 : -1;
- }
-
- if (aParts.length === bParts.length)
- return 0;
-
- return aParts.length < bParts.length ? 1 : -1;
- }));
-
- // Latest version of agent distribution.
- if (latest)
- this.supportedAgents.latest = this.supportedAgents[latest];
- });
- }
-
- attachLegacy(srv) {
- /**
- * @type {socketIo.Server}
- */
- const io = socketio(srv);
-
- io.on('connection', (socket) => {
- socket.on('agent:auth', (data, cb) => {
- return cb('You are using an older version of the agent. Please reload agent archive');
- });
- });
- }
-
- /**
- * @param {http.Server|https.Server} srv Server instance that we want to attach agent handler.
- */
- attach(srv) {
- if (this._server)
- throw 'Agent server already started!';
-
- this._server = srv;
-
- /**
- * @type {socketIo.Server}
- */
- this._socket = socketio(this._server, {path: '/agents'});
-
- this._socket.on('connection', (socket) => {
- socket.on('agent:auth', (data, cb) => {
- if (!_.isEmpty(this.supportedAgents)) {
- const ver = data.ver;
- const bt = data.bt;
-
- if (_.isEmpty(ver) || _.isEmpty(bt) || _.isEmpty(this.supportedAgents[ver]) ||
- this.supportedAgents[ver].buildTime > bt)
- return cb('You are using an older version of the agent. Please reload agent archive');
- }
-
- const tokens = data.tokens;
-
- mongo.Account.find({token: {$in: tokens}}, '_id token').lean().exec()
- .then((accounts) => {
- if (!accounts.length)
- return cb('Agent is failed to authenticate. Please check agent\'s token(s)');
-
- const agent = new Agent(socket);
-
- const accountIds = _.map(accounts, (account) => account._id);
-
- socket.on('disconnect', () => this._agentDisconnected(accountIds, agent));
-
- this._agentConnected(accountIds, agent);
-
- const missedTokens = _.difference(tokens, _.map(accounts, (account) => account.token));
-
- if (missedTokens.length) {
- agent._emit('agent:warning',
- `Failed to authenticate with token(s): ${missedTokens.join(', ')}.`);
- }
-
- cb();
- })
- // TODO IGNITE-1379 send error to web master.
- .catch(() => cb('Agent is failed to authenticate. Please check agent\'s tokens'));
- });
- });
- }
-
- /**
- * @param {ObjectId} accountId
- * @param {Socket} socket
- * @returns {int} Connected agent count.
- */
- addAgentListener(accountId, socket) {
- let sockets = this._browsers[accountId];
-
- if (!sockets)
- this._browsers[accountId] = sockets = [];
-
- sockets.push(socket);
-
- const agents = this._agents[accountId];
-
- return agents ? agents.length : 0;
- }
-
- /**
- * @param {ObjectId} accountId.
- * @param {Socket} socket.
- * @returns {int} connected agent count.
- */
- removeAgentListener(accountId, socket) {
- const sockets = this._browsers[accountId];
-
- _.pull(sockets, socket);
- }
-
- /**
- * @param {ObjectId} accountId
- * @returns {Promise.<Agent>}
- */
- findAgent(accountId) {
- if (!this._server)
- return Promise.reject(new Error('Agent server not started yet!'));
-
- const agents = this._agents[accountId];
-
- if (!agents || agents.length === 0)
- return Promise.reject(new Error('Failed to connect to agent'));
-
- return Promise.resolve(agents[0]);
- }
-
- /**
- * Close connections for all user agents.
- * @param {ObjectId} accountId
- * @param {String} oldToken
- */
- close(accountId, oldToken) {
- if (!this._server)
- return;
-
- const agentsForClose = this._agents[accountId];
-
- const agentsForWarning = _.clone(agentsForClose);
-
- this._agents[accountId] = [];
-
- _.forEach(this._agents, (sockets) => _.pullAll(agentsForClose, sockets));
-
- _.pullAll(agentsForWarning, agentsForClose);
-
- const msg = `Security token has been reset: ${oldToken}`;
-
- _.forEach(agentsForWarning, (socket) => socket._emit('agent:warning', msg));
-
- _.forEach(agentsForClose, (socket) => socket._emit('agent:close', msg));
-
- _.forEach(this._browsers[accountId], (socket) => socket.emit('agent:count', {count: 0}));
- }
-
- /**
- * @param {ObjectId} accountIds
- * @param {Agent} agent
- */
- _agentConnected(accountIds, agent) {
- _.forEach(accountIds, (accountId) => {
- let agents = this._agents[accountId];
-
- if (!agents)
- this._agents[accountId] = agents = [];
-
- agents.push(agent);
-
- const sockets = this._browsers[accountId];
-
- _.forEach(sockets, (socket) => socket.emit('agent:count', {count: agents.length}));
-
- activitiesService.merge(accountId, {
- group: 'agent',
- action: '/agent/start'
- });
- });
- }
-
- /**
- * @param {ObjectId} accountIds
- * @param {Agent} agent
- */
- _agentDisconnected(accountIds, agent) {
- _.forEach(accountIds, (accountId) => {
- const agents = this._agents[accountId];
-
- if (agents && agents.length)
- _.pull(agents, agent);
-
- const sockets = this._browsers[accountId];
-
- _.forEach(sockets, (socket) => socket.emit('agent:count', {count: agents.length}));
- });
- }
- }
-
- return new AgentManager();
-};
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/backend/app/agentSocket.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/agentSocket.js b/modules/web-console/backend/app/agentSocket.js
new file mode 100644
index 0000000..db1deaa
--- /dev/null
+++ b/modules/web-console/backend/app/agentSocket.js
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'use strict';
+
+// Fire me up!
+
+/**
+ * Module interaction with agents.
+ */
+module.exports = {
+ implements: 'agent-socket',
+ inject: ['require(lodash)']
+};
+
+/**
+ * Helper class to contract REST command.
+ */
+class Command {
+ /**
+ * @param {Boolean} demo Is need run command on demo node.
+ * @param {String} name Command name.
+ */
+ constructor(demo, name) {
+ this.demo = demo;
+
+ /**
+ * Command name.
+ * @type {String}
+ */
+ this._name = name;
+
+ /**
+ * Command parameters.
+ * @type {Array.<Object.<String, String>>}
+ */
+ this._params = [];
+
+ this._paramsLastIdx = 1;
+ }
+
+ /**
+ * Add parameter to command.
+ * @param {Object} value Parameter value.
+ * @returns {Command}
+ */
+ addParam(value) {
+ this._params.push({key: `p${this._paramsLastIdx++}`, value});
+
+ return this;
+ }
+
+ /**
+ * Add parameter to command.
+ * @param {String} key Parameter key.
+ * @param {Object} value Parameter value.
+ * @returns {Command}
+ */
+ addNamedParam(key, value) {
+ this._params.push({key, value});
+
+ return this;
+ }
+}
+
+/**
+ * @param _
+ * @returns {AgentSocket}
+ */
+module.exports.factory = function(_) {
+ /**
+ * Connected agent descriptor.
+ */
+ class AgentSocket {
+ /**
+ * @param {Socket} socket Socket for interaction.
+ * @param {String} tokens Active tokens.
+ * @param {String} demoEnabled Demo enabled.
+ */
+ constructor(socket, tokens, demoEnabled) {
+ Object.assign(this, {
+ socket,
+ tokens,
+ cluster: null,
+ demo: {
+ enabled: demoEnabled,
+ browserSockets: []
+ }
+ });
+ }
+
+ /**
+ * Send event to agent.
+ *
+ * @this {AgentSocket}
+ * @param {String} event Command name.
+ * @param {Array.<Object>} args - Transmitted arguments.
+ * @param {Function} [callback] on finish
+ */
+ _emit(event, args, callback) {
+ if (!this.socket.connected) {
+ if (callback)
+ callback('org.apache.ignite.agent.AgentException: Connection is closed');
+
+ return;
+ }
+
+ this.socket.emit(event, ...args, callback);
+ }
+
+ /**
+ * Send event to agent.
+ *
+ * @param {String} event - Event name.
+ * @param {Array.<Object>?} args - Transmitted arguments.
+ * @returns {Promise}
+ */
+ emitEvent(event, ...args) {
+ return new Promise((resolve, reject) =>
+ this._emit(event, args, (error, res) => {
+ if (error)
+ return reject(error);
+
+ resolve(res);
+ })
+ );
+ }
+
+ restResultParse(res) {
+ if (res.status === 0)
+ return JSON.parse(res.data);
+
+ if (res.status === 2)
+ throw new Error('AgentSocket failed to authenticate in grid. Please check agent\'s login and password or node port.');
+
+ throw new Error(res.error);
+ }
+
+ /**
+ * @param {String} token
+ * @param {Array.<Socket>} browserSockets
+ */
+ runDemoCluster(token, browserSockets) {
+ this.emitEvent('demo:broadcast:start')
+ .then(() => {
+ this.demo.tokens.push(token);
+ this.demo.browserSockets.push(...browserSockets);
+
+ this.socket.on('demo:topology', (res) => {
+ try {
+ const top = this.restResultParse(res);
+
+ _.forEach(this.demo.browserSockets, (sock) => sock.emit('topology', top));
+ } catch (err) {
+ _.forEach(this.demo.browserSockets, (sock) => sock.emit('topology:err', err));
+ }
+ });
+ });
+ }
+
+ /**
+ * @param {Socket} browserSocket
+ */
+ attachToDemoCluster(browserSocket) {
+ this.demo.browserSockets.push(...browserSocket);
+ }
+
+ startCollectTopology(timeout) {
+ return this.emitEvent('start:collect:topology', timeout);
+ }
+
+ stopCollectTopology(demo) {
+ return this.emitEvent('stop:collect:topology', demo);
+ }
+
+ /**
+ * Execute REST request on node.
+ *
+ * @param {Boolean} demo Is need run command on demo node.
+ * @param {String} cmd REST command.
+ * @param {Array.<String>} args - REST command arguments.
+ * @return {Promise}
+ */
+ restCommand(demo, cmd, ...args) {
+ const params = {cmd};
+
+ _.forEach(args, (arg, idx) => {
+ params[`p${idx + 1}`] = args[idx];
+ });
+
+ return this.emitEvent('node:rest', {uri: 'ignite', demo, params, method: 'GET'})
+ .then(this.restResultParse);
+ }
+
+ gatewayCommand(demo, nids, taskCls, argCls, ...args) {
+ const cmd = new Command(demo, 'exe')
+ .addNamedParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask')
+ .addParam(nids)
+ .addParam(taskCls)
+ .addParam(argCls);
+
+ _.forEach(args, (arg) => cmd.addParam(arg));
+
+ return this.restCommand(cmd);
+ }
+
+ /**
+ * @param {Boolean} demo Is need run command on demo node.
+ * @param {Boolean} attr Get attributes, if this parameter has value true. Default value: true.
+ * @param {Boolean} mtr Get metrics, if this parameter has value true. Default value: false.
+ * @returns {Promise}
+ */
+ topology(demo, attr, mtr) {
+ const cmd = new Command(demo, 'top')
+ .addNamedParam('attr', attr !== false)
+ .addNamedParam('mtr', !!mtr);
+
+ return this.restCommand(cmd);
+ }
+
+ /**
+ * @param {Boolean} demo Is need run command on demo node.
+ * @param {String} cacheName Cache name.
+ * @returns {Promise}
+ */
+ metadata(demo, cacheName) {
+ const cmd = new Command(demo, 'metadata')
+ .addNamedParam('cacheName', cacheName);
+
+ return this.restCommand(cmd);
+ }
+ }
+
+ return AgentSocket;
+};
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/backend/app/agentsHandler.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/agentsHandler.js b/modules/web-console/backend/app/agentsHandler.js
new file mode 100644
index 0000000..d24cef8
--- /dev/null
+++ b/modules/web-console/backend/app/agentsHandler.js
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'use strict';
+
+// Fire me up!
+
+/**
+ * Module interaction with agents.
+ */
+module.exports = {
+ implements: 'agents-handler',
+ inject: ['require(lodash)', 'require(fs)', 'require(path)', 'require(jszip)', 'require(socket.io)', 'settings', 'mongo', 'agent-socket']
+};
+
+/**
+ * @param _
+ * @param fs
+ * @param path
+ * @param JSZip
+ * @param socketio
+ * @param settings
+ * @param mongo
+ * @param {AgentSocket} AgentSocket
+ * @returns {AgentsHandler}
+ */
+module.exports.factory = function(_, fs, path, JSZip, socketio, settings, mongo, AgentSocket) {
+ class AgentSockets {
+ constructor() {
+ /**
+ * @type {Map.<String, Array.<String>>}
+ */
+ this.sockets = new Map();
+ }
+
+ get(token) {
+ let sockets = this.sockets.get(token);
+
+ if (_.isEmpty(sockets))
+ this.sockets.set(token, sockets = []);
+
+ return sockets;
+ }
+
+ /**
+ * @param {AgentSocket} sock
+ * @param {String} token
+ * @return {Array.<AgentSocket>}
+ */
+ add(token, sock) {
+ const sockets = this.get(token);
+
+ sockets.push(sock);
+ }
+
+ /**
+ * @param {Socket} browserSocket
+ * @return {AgentSocket}
+ */
+ find(browserSocket) {
+ const token = browserSocket.request.user.token;
+
+ const sockets = this.sockets.get(token);
+
+ return _.find(sockets, (sock) => _.includes(sock.demo.browserSockets, browserSocket));
+ }
+ }
+
+ class Cluster {
+ constructor(nids) {
+ let d = new Date().getTime();
+
+ this.id = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (c) => {
+ const r = (d + Math.random() * 16) % 16 | 0;
+
+ d = Math.floor(d / 16);
+
+ return (c === 'x' ? r : (r & 0x3 | 0x8)).toString(16);
+ });
+
+ this.nids = nids;
+ }
+
+ same(nids) {
+ return _.intersection(this.nids, nids).length > 0;
+ }
+
+ updateTopology(nids) {
+ this.nids = nids;
+ }
+ }
+
+ /**
+ * Connected agents manager.
+ */
+ class AgentsHandler {
+ /**
+ * @constructor
+ */
+ constructor() {
+ /**
+ * Connected agents.
+ * @type {AgentSockets}
+ */
+ this._agentSockets = new AgentSockets();
+
+ this.clusters = [];
+ }
+
+ /**
+ * Collect supported agents list.
+ * @private
+ */
+ _collectSupportedAgents() {
+ const jarFilter = (file) => path.extname(file) === '.jar';
+
+ const agentArchives = fs.readdirSync(settings.agent.dists)
+ .filter((file) => path.extname(file) === '.zip');
+
+ const agentsPromises = _.map(agentArchives, (fileName) => {
+ const filePath = path.join(settings.agent.dists, fileName);
+
+ return JSZip.loadAsync(fs.readFileSync(filePath))
+ .then((zip) => {
+ const jarPath = _.find(_.keys(zip.files), jarFilter);
+
+ return JSZip.loadAsync(zip.files[jarPath].async('nodebuffer'))
+ .then((jar) => jar.files['META-INF/MANIFEST.MF'].async('string'))
+ .then((lines) =>
+ _.reduce(lines.split(/\s*\n+\s*/), (acc, line) => {
+ if (!_.isEmpty(line)) {
+ const arr = line.split(/\s*:\s*/);
+
+ acc[arr[0]] = arr[1];
+ }
+
+ return acc;
+ }, {}))
+ .then((manifest) => {
+ const ver = manifest['Implementation-Version'];
+ const buildTime = manifest['Build-Time'];
+
+ if (ver && buildTime)
+ return { fileName, filePath, ver, buildTime };
+ });
+ });
+ });
+
+ return Promise.all(agentsPromises)
+ .then((descs) => {
+ const agentDescs = _.keyBy(_.remove(descs, null), 'ver');
+
+ const latestVer = _.head(Object.keys(agentDescs).sort((a, b) => {
+ const aParts = a.split('.');
+ const bParts = b.split('.');
+
+ for (let i = 0; i < aParts.length; ++i) {
+ if (aParts[i] !== bParts[i])
+ return aParts[i] < bParts[i] ? 1 : -1;
+ }
+
+ if (aParts.length === bParts.length)
+ return 0;
+
+ return aParts.length < bParts.length ? 1 : -1;
+ }));
+
+ // Latest version of agent distribution.
+ if (latestVer)
+ agentDescs.current = agentDescs[latestVer];
+
+ return agentDescs;
+ });
+ }
+
+ getOrCreateCluster(nids) {
+ const cluster = _.find(this.clusters, (c) => c.same(nids));
+
+ if (_.isNil(cluster))
+ this.clusters.push(new Cluster(nids));
+
+ return cluster;
+ }
+
+ /**
+ * Link agent with browsers by account.
+ *
+ * @param {Socket} sock
+ * @param {Array.<String>} tokens
+ * @param {boolean} demoEnabled
+ *
+ * @private
+ */
+ onConnect(sock, tokens, demoEnabled) {
+ const agentSocket = new AgentSocket(sock, tokens, demoEnabled);
+
+ sock.on('disconnect', () => {
+ _.forEach(tokens, (token) => {
+ _.pull(this._agentSockets.get(token), agentSocket);
+
+ this._browsersHnd.agentStats(token);
+ });
+ });
+
+ sock.on('cluster:topology', (nids) => {
+ const cluster = this.getOrCreateCluster(nids);
+
+ if (_.isNil(agentSocket.cluster)) {
+ agentSocket.cluster = cluster;
+
+ _.forEach(tokens, (token) => {
+ this._browsersHnd.agentStats(token);
+ });
+ }
+ else
+ cluster.updateTopology(nids);
+ });
+
+ sock.on('cluster:collector', (top) => {
+
+ });
+
+ sock.on('cluster:disconnected', () => {
+ agentSocket.cluster = null;
+
+ _.forEach(tokens, (token) => {
+ this._browsersHnd.agentStats(token);
+ });
+ });
+
+ _.forEach(tokens, (token) => {
+ this._agentSockets.add(token, agentSocket);
+
+ // TODO start demo if needed.
+ // const browserSockets = _.filter(this._browserSockets[token], 'request._query.IgniteDemoMode');
+ //
+ // // First agent join after user start demo.
+ // if (_.size(browserSockets))
+ // agentSocket.runDemoCluster(token, browserSockets);
+
+ this._browsersHnd.agentStats(token);
+ });
+
+ // ioSocket.on('cluster:topology', (top) => {
+ //
+ // });
+ }
+
+ /**
+ * @param {http.Server|https.Server} srv Server instance that we want to attach agent handler.
+ * @param {BrowsersHandler} browsersHnd
+ */
+ attach(srv, browsersHnd) {
+ this._browsersHnd = browsersHnd;
+
+ if (this.io)
+ throw 'Agent server already started!';
+
+ this._collectSupportedAgents()
+ .then((supportedAgents) => {
+ this.currentAgent = _.get(supportedAgents, 'current');
+
+ this.io = socketio(srv, {path: '/agents'});
+
+ this.io.on('connection', (sock) => {
+ sock.on('agent:auth', ({ver, bt, tokens, disableDemo}, cb) => {
+ if (_.isEmpty(tokens))
+ return cb('Tokens not set. Please reload agent archive or check settings');
+
+ if (ver && bt && !_.isEmpty(supportedAgents)) {
+ const btDistr = _.get(supportedAgents, [ver, 'buildTime']);
+
+ if (_.isEmpty(btDistr) || btDistr < bt)
+ return cb('You are using an older version of the agent. Please reload agent');
+ }
+
+ return mongo.Account.find({token: {$in: tokens}}, '_id token').lean().exec()
+ .then((accounts) => {
+ const activeTokens = _.uniq(_.map(accounts, 'token'));
+
+ if (_.isEmpty(activeTokens))
+ return cb(`Failed to authenticate with token(s): ${tokens.join(',')}. Please reload agent archive or check settings`);
+
+ cb(null, activeTokens);
+
+ return this.onConnect(sock, activeTokens, disableDemo);
+ })
+ // TODO IGNITE-1379 send error to web master.
+ .catch(() => cb(`Invalid token(s): ${tokens.join(',')}. Please reload agent archive or check settings`));
+ });
+ });
+ });
+ }
+
+ agent(token, demo, clusterId) {
+ if (!this.io)
+ return Promise.reject(new Error('Agent server not started yet!'));
+
+ const socks = this._agentSockets.get(token);
+
+ if (_.isEmpty(socks))
+ return Promise.reject(new Error('Failed to find connected agent for this token'));
+
+ if (demo || _.isNil(clusterId))
+ return Promise.resolve(_.head(socks));
+
+ const sock = _.find(socks, (agentSock) => _.get(agentSock, 'cluster.id') === clusterId);
+
+ if (_.isEmpty(sock))
+ return Promise.reject(new Error('Failed to find agent connected to cluster'));
+
+ return Promise.resolve(sock);
+ }
+
+ agents(token) {
+ if (!this.io)
+ return Promise.reject(new Error('Agent server not started yet!'));
+
+ const socks = this._agentSockets.get(token);
+
+ if (_.isEmpty(socks))
+ return Promise.reject(new Error('Failed to find connected agent for this token'));
+
+ return Promise.resolve(socks);
+ }
+
+ tryStopDemo(browserSocket) {
+ const agentSock = this._agentSockets.find(browserSocket);
+ }
+
+ /**
+ * @param {ObjectId} token
+ * @param {Socket} browserSock
+ * @returns {int} Connected agent count.
+ */
+ onBrowserConnect(token, browserSock) {
+ this.emitAgentsCount(token);
+
+ // If connect from browser with enabled demo.
+ const demo = browserSock.request._query.IgniteDemoMode === 'true';
+
+ // Agents where possible to run demo.
+ const agentSockets = _.filter(this._agentSockets[token], 'demo.enabled');
+
+ if (demo && _.size(agentSockets)) {
+ const agentSocket = _.find(agentSockets, (agent) => _.includes(agent.demo.tokens, token));
+
+ if (agentSocket)
+ agentSocket.attachToDemoCluster(browserSock);
+ else
+ _.head(agentSockets).runDemoCluster(token, [browserSock]);
+ }
+ }
+
+ /**
+ * @param {Socket} browserSock.
+ */
+ onBrowserDisconnect(browserSock) {
+ const token = browserSock.client.request.user.token;
+
+ this._browserSockets.pull(token, browserSock);
+
+ // If connect from browser with enabled demo.
+ if (browserSock.request._query.IgniteDemoMode === 'true')
+ this._agentSockets.find(token, (agent) => _.includes(agent.demo.browserSockets, browserSock));
+
+ // TODO If latest browser with demo need stop demo cluster on agent.
+ }
+
+ /**
+ * Try stop agent for token if not used by others.
+ *
+ * @param {String} token
+ */
+ onTokenReset(token) {
+ if (_.isNil(this.io))
+ return;
+
+ const sockets = this._agentSockets[token];
+
+ _.forEach(sockets, (socket) => socket._emit('agent:reset:token', token));
+ }
+ }
+
+ return new AgentsHandler();
+};
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/backend/app/apiServer.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/apiServer.js b/modules/web-console/backend/app/apiServer.js
new file mode 100644
index 0000000..affb9c9
--- /dev/null
+++ b/modules/web-console/backend/app/apiServer.js
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'use strict';
+
+// Fire me up!
+
+module.exports = {
+ implements: 'api-server',
+ inject: ['require(express)', 'configure', 'routes']
+};
+
+module.exports.factory = function(Express, configure, routes) {
+ /**
+ * Connected agents manager.
+ */
+ class ApiServer {
+ /**
+ * @param {Server} srv
+ */
+ attach(srv) {
+ const app = new Express();
+
+ configure.express(app);
+
+ routes.register(app);
+
+ // Catch 404 and forward to error handler.
+ app.use((req, res, next) => {
+ const err = new Error('Not Found: ' + req.originalUrl);
+
+ err.status = 404;
+
+ next(err);
+ });
+
+ // Production error handler: no stacktraces leaked to user.
+ app.use((err, req, res) => {
+ res.status(err.status || 500);
+
+ res.render('error', {
+ message: err.message,
+ error: {}
+ });
+ });
+
+ srv.addListener('request', app);
+
+ return app;
+ }
+ }
+
+ return new ApiServer();
+};
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/backend/app/app.js
----------------------------------------------------------------------
diff --git a/modules/web-console/backend/app/app.js b/modules/web-console/backend/app/app.js
deleted file mode 100644
index eb236e7..0000000
--- a/modules/web-console/backend/app/app.js
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-'use strict';
-
-// Fire me up!
-
-module.exports = {
- implements: 'app',
- inject: ['require(express)', 'configure', 'routes']
-};
-
-module.exports.factory = function(Express, configure, routes) {
- return {
- /**
- * @param {Server} srv
- */
- listen: (srv) => {
- const app = new Express();
-
- configure.express(app);
-
- routes.register(app);
-
- // Catch 404 and forward to error handler.
- app.use((req, res, next) => {
- const err = new Error('Not Found: ' + req.originalUrl);
-
- err.status = 404;
-
- next(err);
- });
-
- // Production error handler: no stacktraces leaked to user.
- app.use((err, req, res) => {
- res.status(err.status || 500);
-
- res.render('error', {
- message: err.message,
- error: {}
- });
- });
-
- srv.addListener('request', app);
-
- return app;
- }
- };
-};