You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/18 09:00:51 UTC
[39/50] [abbrv] ignite git commit: IGNITE-4995 Multi-cluster support
for Web Console.
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentConfiguration.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentConfiguration.java
index 8f70100..6f2b60e 100644
--- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentConfiguration.java
+++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentConfiguration.java
@@ -23,17 +23,18 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+import org.apache.ignite.internal.util.typedef.F;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
/**
* Agent configuration.
*/
public class AgentConfiguration {
- /** Default Ignite node HTTP port. */
- public static final int DFLT_NODE_PORT = 8080;
-
/** Default path to agent property file. */
public static final String DFLT_CFG_PATH = "default.properties";
@@ -51,8 +52,8 @@ public class AgentConfiguration {
/** */
@Parameter(names = {"-s", "--server-uri"},
description = "URI for connect to Ignite Console via web-socket protocol" +
- " " +
- " Default value: " + DFLT_SERVER_URI)
+ " " +
+ " Default value: " + DFLT_SERVER_URI)
private String srvUri;
/** */
@@ -77,7 +78,13 @@ public class AgentConfiguration {
private String driversFolder;
/** */
- @Parameter(names = { "-h", "--help" }, help = true, description = "Print this help message")
+ @Parameter(names = {"-dd", "--disable-demo"}, description = "Disable demo mode on this agent " +
+ " " +
+ " Default value: false")
+ private Boolean disableDemo;
+
+ /** */
+ @Parameter(names = {"-h", "--help"}, help = true, description = "Print this help message")
private Boolean help;
/**
@@ -158,6 +165,20 @@ public class AgentConfiguration {
}
/**
+ * @return Disable demo mode.
+ */
+ public Boolean disableDemo() {
+ return disableDemo != null ? disableDemo : Boolean.FALSE;
+ }
+
+ /**
+ * @param disableDemo Disable demo mode.
+ */
+ public void disableDemo(Boolean disableDemo) {
+ this.disableDemo = disableDemo;
+ }
+
+ /**
* @return {@code true} If agent options usage should be printed.
*/
public Boolean help() {
@@ -170,14 +191,14 @@ public class AgentConfiguration {
public void load(URL cfgUrl) throws IOException {
Properties props = new Properties();
- try (Reader reader = new InputStreamReader(cfgUrl.openStream())) {
+ try (Reader reader = new InputStreamReader(cfgUrl.openStream(), UTF_8)) {
props.load(reader);
}
String val = (String)props.remove("tokens");
if (val != null)
- tokens(Arrays.asList(val.split(",")));
+ tokens(new ArrayList<>(Arrays.asList(val.split(","))));
val = (String)props.remove("server-uri");
@@ -216,13 +237,16 @@ public class AgentConfiguration {
if (driversFolder == null)
driversFolder(cmd.driversFolder());
+
+ if (disableDemo == null)
+ disableDemo(cmd.disableDemo());
}
/** {@inheritDoc} */
@Override public String toString() {
StringBuilder sb = new StringBuilder();
- if (tokens != null && tokens.size() > 0) {
+ if (!F.isEmpty(tokens)) {
sb.append("User's security tokens : ");
boolean first = true;
@@ -231,7 +255,7 @@ public class AgentConfiguration {
if (first)
first = false;
else
- sb.append(",");
+ sb.append(',');
if (tok.length() > 4) {
sb.append(new String(new char[tok.length() - 4]).replace('\0', '*'));
@@ -258,7 +282,8 @@ public class AgentConfiguration {
drvFld = new File(agentHome, "jdbc-drivers").getPath();
}
- sb.append("Path to JDBC drivers folder : ").append(drvFld);
+ sb.append("Path to JDBC drivers folder : ").append(drvFld).append('\n');
+ sb.append("Demo mode : ").append(disableDemo() ? "disabled" : "enabled");
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentLauncher.java
----------------------------------------------------------------------
diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentLauncher.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentLauncher.java
index a3d609f..65b8192 100644
--- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentLauncher.java
+++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentLauncher.java
@@ -34,6 +34,8 @@ import java.net.URL;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Scanner;
+import java.util.Collection;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.jar.Attributes;
import java.util.jar.Manifest;
@@ -41,29 +43,48 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
+import org.apache.ignite.console.agent.handlers.ClusterListener;
+import org.apache.ignite.console.agent.handlers.DemoListener;
+import org.apache.ignite.console.agent.rest.RestExecutor;
import org.apache.ignite.console.agent.handlers.DatabaseListener;
import org.apache.ignite.console.agent.handlers.RestListener;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
-import org.apache.log4j.Logger;
+import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.bridge.SLF4JBridgeHandler;
import static io.socket.client.Socket.EVENT_CONNECT;
-import static io.socket.client.Socket.EVENT_CONNECTING;
import static io.socket.client.Socket.EVENT_CONNECT_ERROR;
import static io.socket.client.Socket.EVENT_DISCONNECT;
import static io.socket.client.Socket.EVENT_ERROR;
-import static io.socket.client.Socket.EVENT_RECONNECTING;
+import static org.apache.ignite.console.agent.AgentUtils.fromJSON;
+import static org.apache.ignite.console.agent.AgentUtils.toJSON;
/**
* Control Center Agent launcher.
*/
public class AgentLauncher {
/** */
- private static final Logger log = Logger.getLogger(AgentLauncher.class.getName());
+ private static final Logger log = LoggerFactory.getLogger(AgentLauncher.class);
/** */
- private static final String EVENT_NODE_REST = "node:rest";
+ private static final String EVENT_CLUSTER_BROADCAST_START = "cluster:broadcast:start";
+
+ /** */
+ private static final String EVENT_CLUSTER_BROADCAST_STOP = "cluster:broadcast:stop";
+
+ /** */
+ private static final String EVENT_CLUSTER_DISCONNECTED = "cluster:disconnected";
+
+ /** */
+ private static final String EVENT_DEMO_BROADCAST_START = "demo:broadcast:start";
+
+ /** */
+ private static final String EVENT_DEMO_BROADCAST_STOP = "demo:broadcast:stop";
/** */
private static final String EVENT_SCHEMA_IMPORT_DRIVERS = "schemaImport:drivers";
@@ -75,10 +96,24 @@ public class AgentLauncher {
private static final String EVENT_SCHEMA_IMPORT_METADATA = "schemaImport:metadata";
/** */
- private static final String EVENT_AGENT_WARNING = "agent:warning";
+ private static final String EVENT_NODE_VISOR_TASK = "node:visorTask";
+
+ /** */
+ private static final String EVENT_NODE_REST = "node:rest";
+
+ /** */
+ private static final String EVENT_RESET_TOKENS = "agent:reset:token";
/** */
- private static final String EVENT_AGENT_CLOSE = "agent:close";
+ private static final String EVENT_LOG_WARNING = "log:warn";
+
+ static {
+ // Optionally remove existing handlers attached to j.u.l root logger.
+ SLF4JBridgeHandler.removeHandlersForRootLogger();
+
+ // Add SLF4JBridgeHandler to j.u.l's root logger.
+ SLF4JBridgeHandler.install();
+ }
/**
* Create a trust manager that trusts all certificates It is not using a particular keyStore
@@ -86,15 +121,18 @@ public class AgentLauncher {
private static TrustManager[] getTrustManagers() {
return new TrustManager[] {
new X509TrustManager() {
- public java.security.cert.X509Certificate[] getAcceptedIssuers() {
+ /** {@inheritDoc} */
+ @Override public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return null;
}
- public void checkClientTrusted(
+ /** {@inheritDoc} */
+ @Override public void checkClientTrusted(
java.security.cert.X509Certificate[] certs, String authType) {
}
- public void checkServerTrusted(
+ /** {@inheritDoc} */
+ @Override public void checkServerTrusted(
java.security.cert.X509Certificate[] certs, String authType) {
}
}};
@@ -104,14 +142,13 @@ public class AgentLauncher {
* On error listener.
*/
private static final Emitter.Listener onError = new Emitter.Listener() {
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override public void call(Object... args) {
Throwable e = (Throwable)args[0];
ConnectException ce = X.cause(e, ConnectException.class);
if (ce != null)
- log.error("Failed to receive response from server (connection refused).");
+ log.error("Failed to establish connection to server (connection refused).");
else {
Exception ignore = X.cause(e, SSLHandshakeException.class);
@@ -172,7 +209,25 @@ public class AgentLauncher {
*/
private static final Emitter.Listener onDisconnect = new Emitter.Listener() {
@Override public void call(Object... args) {
- log.error(String.format("Connection closed: %s.", args));
+ log.error("Connection closed: {}", args);
+ }
+ };
+
+ /**
+ * On token reset listener.
+ */
+ private static final Emitter.Listener onLogWarning = new Emitter.Listener() {
+ @Override public void call(Object... args) {
+ log.warn(String.valueOf(args[0]));
+ }
+ };
+
+ /**
+ * On demo start request.
+ */
+ private static final Emitter.Listener onDemoStart = new Emitter.Listener() {
+ @Override public void call(Object... args) {
+ log.warn(String.valueOf(args[0]));
}
};
@@ -205,7 +260,6 @@ public class AgentLauncher {
/**
* @param args Args.
*/
- @SuppressWarnings("BusyWait")
public static void main(String[] args) throws Exception {
log.info("Starting Apache Ignite Web Console Agent...");
@@ -236,13 +290,13 @@ public class AgentLauncher {
File f = AgentUtils.resolvePath(prop);
if (f == null)
- log.warn("Failed to find agent property file: " + prop);
+ log.warn("Failed to find agent property file: {}", prop);
else
propCfg.load(f.toURI().toURL());
}
- catch (IOException ignore) {
+ catch (IOException e) {
if (!AgentConfiguration.DFLT_CFG_PATH.equals(prop))
- log.warn("Failed to load agent property file: " + prop, ignore);
+ log.warn("Failed to load agent property file: " + prop, e);
}
cfg.merge(propCfg);
@@ -314,17 +368,11 @@ public class AgentLauncher {
}
final Socket client = IO.socket(uri, opts);
-
- final RestListener restHnd = new RestListener(cfg);
-
- final DatabaseListener dbHnd = new DatabaseListener(cfg);
+ final RestExecutor restExecutor = new RestExecutor(cfg.nodeUri());
try {
- Emitter.Listener onConnecting = new Emitter.Listener() {
- @Override public void call(Object... args) {
- log.info("Connecting to: " + cfg.serverUri());
- }
- };
+ final ClusterListener clusterLsnr = new ClusterListener(client, restExecutor);
+ final DemoListener demoHnd = new DemoListener(client, restExecutor);
Emitter.Listener onConnect = new Emitter.Listener() {
@Override public void call(Object... args) {
@@ -333,7 +381,8 @@ public class AgentLauncher {
JSONObject authMsg = new JSONObject();
try {
- authMsg.put("tokens", cfg.tokens());
+ authMsg.put("tokens", toJSON(cfg.tokens()));
+ authMsg.put("disableDemo", cfg.disableDemo());
String clsName = AgentLauncher.class.getSimpleName() + ".class";
@@ -353,14 +402,49 @@ public class AgentLauncher {
client.emit("agent:auth", authMsg, new Ack() {
@Override public void call(Object... args) {
- // Authentication failed if response contains args.
- if (args != null && args.length > 0) {
- onDisconnect.call(args);
+ if (args != null) {
+ if (args[0] instanceof String) {
+ log.error((String)args[0]);
+
+ System.exit(1);
+ }
+
+ if (args[0] == null && args[1] instanceof JSONArray) {
+ try {
+ List<String> activeTokens = fromJSON(args[1], List.class);
+
+ if (!F.isEmpty(activeTokens)) {
+ Collection<String> missedTokens = cfg.tokens();
+
+ cfg.tokens(activeTokens);
+
+ missedTokens.removeAll(activeTokens);
+
+ if (!F.isEmpty(missedTokens)) {
+ String tokens = F.concat(missedTokens, ", ");
- System.exit(1);
+ log.warn("Failed to authenticate with token(s): {}. " +
+ "Please reload agent archive or check settings", tokens);
+ }
+
+ log.info("Authentication success.");
+
+ clusterLsnr.watch();
+
+ return;
+ }
+ }
+ catch (Exception e) {
+ log.error("Failed to authenticate agent. Please check agent\'s tokens", e);
+
+ System.exit(1);
+ }
+ }
}
- log.info("Authentication success.");
+ log.error("Failed to authenticate agent. Please check agent\'s tokens");
+
+ System.exit(1);
}
});
}
@@ -372,44 +456,52 @@ public class AgentLauncher {
}
};
+ DatabaseListener dbHnd = new DatabaseListener(cfg);
+ RestListener restHnd = new RestListener(restExecutor);
+
final CountDownLatch latch = new CountDownLatch(1);
+ log.info("Connecting to: {}", cfg.serverUri());
+
client
- .on(EVENT_CONNECTING, onConnecting)
.on(EVENT_CONNECT, onConnect)
.on(EVENT_CONNECT_ERROR, onError)
- .on(EVENT_RECONNECTING, onConnecting)
- .on(EVENT_NODE_REST, restHnd)
- .on(EVENT_SCHEMA_IMPORT_DRIVERS, dbHnd.availableDriversListener())
- .on(EVENT_SCHEMA_IMPORT_SCHEMAS, dbHnd.schemasListener())
- .on(EVENT_SCHEMA_IMPORT_METADATA, dbHnd.metadataListener())
.on(EVENT_ERROR, onError)
.on(EVENT_DISCONNECT, onDisconnect)
- .on(EVENT_AGENT_WARNING, new Emitter.Listener() {
- @Override public void call(Object... args) {
- log.warn(args[0]);
- }
- })
- .on(EVENT_AGENT_CLOSE, new Emitter.Listener() {
+ .on(EVENT_LOG_WARNING, onLogWarning)
+ .on(EVENT_CLUSTER_BROADCAST_START, clusterLsnr.start())
+ .on(EVENT_CLUSTER_BROADCAST_STOP, clusterLsnr.stop())
+ .on(EVENT_DEMO_BROADCAST_START, demoHnd.start())
+ .on(EVENT_DEMO_BROADCAST_STOP, demoHnd.stop())
+ .on(EVENT_RESET_TOKENS, new Emitter.Listener() {
@Override public void call(Object... args) {
- onDisconnect.call(args);
+ String tok = String.valueOf(args[0]);
+
+ log.warn("Security token has been reset: {}", tok);
- client.off();
+ cfg.tokens().remove(tok);
- latch.countDown();
+ if (cfg.tokens().isEmpty()) {
+ client.off();
+
+ latch.countDown();
+ }
}
- });
+ })
+ .on(EVENT_SCHEMA_IMPORT_DRIVERS, dbHnd.availableDriversListener())
+ .on(EVENT_SCHEMA_IMPORT_SCHEMAS, dbHnd.schemasListener())
+ .on(EVENT_SCHEMA_IMPORT_METADATA, dbHnd.metadataListener())
+ .on(EVENT_NODE_VISOR_TASK, restHnd)
+ .on(EVENT_NODE_REST, restHnd);
client.connect();
latch.await();
}
finally {
- client.close();
+ restExecutor.stop();
- restHnd.stop();
-
- dbHnd.stop();
+ client.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentUtils.java
----------------------------------------------------------------------
diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentUtils.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentUtils.java
index cb22651..1999afc 100644
--- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentUtils.java
+++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentUtils.java
@@ -59,7 +59,7 @@ public class AgentUtils {
mapper.registerModule(module);
}
-
+
/**
* Default constructor.
*/
@@ -153,7 +153,7 @@ public class AgentUtils {
/**
* Remove callback from handler arguments.
- *
+ *
* @param args Arguments.
* @return Arguments without callback.
*/
@@ -165,7 +165,7 @@ public class AgentUtils {
/**
* Map java object to JSON object.
- *
+ *
* @param obj Java object.
* @return {@link JSONObject} or {@link JSONArray}.
* @throws IllegalArgumentException If conversion fails due to incompatible type.
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/ClusterListener.java
----------------------------------------------------------------------
diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/ClusterListener.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/ClusterListener.java
new file mode 100644
index 0000000..23b8fc7
--- /dev/null
+++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/ClusterListener.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.console.agent.handlers;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.socket.client.Socket;
+import io.socket.emitter.Emitter;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.console.agent.rest.RestExecutor;
+import org.apache.ignite.console.agent.rest.RestResult;
+import org.apache.ignite.internal.processors.rest.client.message.GridClientNodeBean;
+import org.apache.ignite.internal.processors.rest.protocols.http.jetty.GridJettyObjectMapper;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.ignite.console.agent.AgentUtils.toJSON;
+import static org.apache.ignite.internal.processors.rest.GridRestResponse.STATUS_SUCCESS;
+
+/**
+ *
+ */
+public class ClusterListener {
+ /** */
+ private static final Logger log = LoggerFactory.getLogger(ClusterListener.class);
+
+ /** */
+ private static final String EVENT_CLUSTER_CONNECTED = "cluster:connected";
+
+ /** */
+ private static final String EVENT_CLUSTER_TOPOLOGY = "cluster:topology";
+
+ /** */
+ private static final String EVENT_CLUSTER_DISCONNECTED = "cluster:disconnected";
+
+ /** Default timeout. */
+ private static final long DFLT_TIMEOUT = 3000L;
+
+ /** JSON object mapper. */
+ private static final ObjectMapper mapper = new GridJettyObjectMapper();
+
+ /** Nids. */
+ private Collection<UUID> latestNids = Collections.emptyList();
+
+ /** */
+ private final WatchTask watchTask = new WatchTask();
+
+ /** */
+ private final BroadcastTask broadcastTask = new BroadcastTask();
+
+ /** */
+ private static final IgniteClosure<GridClientNodeBean, UUID> NODE2ID = new IgniteClosure<GridClientNodeBean, UUID>() {
+ @Override public UUID apply(GridClientNodeBean n) {
+ return n.getNodeId();
+ }
+
+ @Override public String toString() {
+ return "Node bean to node ID transformer closure.";
+ }
+ };
+
+ /** */
+ private static final IgniteClosure<UUID, String> ID2ID8 = new IgniteClosure<UUID, String>() {
+ @Override public String apply(UUID nid) {
+ return U.id8(nid).toUpperCase();
+ }
+
+ @Override public String toString() {
+ return "Node ID to ID8 transformer closure.";
+ }
+ };
+
+ /** */
+ private static final ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
+
+ /** */
+ private ScheduledFuture<?> refreshTask;
+
+ /** */
+ private Socket client;
+
+ /** */
+ private RestExecutor restExecutor;
+
+ /**
+ * @param client Client.
+ * @param restExecutor Client.
+ */
+ public ClusterListener(Socket client, RestExecutor restExecutor) {
+ this.client = client;
+ this.restExecutor = restExecutor;
+ }
+
+ /**
+ * Callback on cluster connect.
+ *
+ * @param nids Cluster nodes IDs.
+ */
+ private void clusterConnect(Collection<UUID> nids) {
+ log.info("Connection successfully established to cluster with nodes: {}", F.viewReadOnly(nids, ID2ID8));
+
+ client.emit(EVENT_CLUSTER_CONNECTED, toJSON(nids));
+ }
+
+ /**
+ * Callback on disconnect from cluster.
+ */
+ private void clusterDisconnect() {
+ if (latestNids.isEmpty())
+ return;
+
+ latestNids = Collections.emptyList();
+
+ log.info("Connection to cluster was lost");
+
+ client.emit(EVENT_CLUSTER_DISCONNECTED, latestNids);
+ }
+
+ /**
+ * Stop refresh task.
+ */
+ private void safeStopRefresh() {
+ if (refreshTask != null)
+ refreshTask.cancel(true);
+ }
+
+ /**
+ * Start watch cluster.
+ */
+ public void watch() {
+ safeStopRefresh();
+
+ refreshTask = pool.scheduleWithFixedDelay(watchTask, 0L, DFLT_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Start broadcast topology to server-side.
+ */
+ public Emitter.Listener start() {
+ return new Emitter.Listener() {
+ @Override public void call(Object... args) {
+ safeStopRefresh();
+
+ final long timeout = args.length > 1 && args[1] instanceof Long ? (long)args[1] : DFLT_TIMEOUT;
+
+ refreshTask = pool.scheduleWithFixedDelay(broadcastTask, 0L, timeout, TimeUnit.MILLISECONDS);
+ }
+ };
+ }
+
+ /**
+ * Stop broadcast topology to server-side.
+ */
+ public Emitter.Listener stop() {
+ return new Emitter.Listener() {
+ @Override public void call(Object... args) {
+ refreshTask.cancel(true);
+
+ watch();
+ }
+ };
+ }
+
+ /** */
+ private class WatchTask implements Runnable {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ RestResult top = restExecutor.topology(false, false);
+
+ switch (top.getStatus()) {
+ case STATUS_SUCCESS:
+ List<GridClientNodeBean> nodes = mapper.readValue(top.getData(),
+ new TypeReference<List<GridClientNodeBean>>() {});
+
+ Collection<UUID> nids = F.viewReadOnly(nodes, NODE2ID);
+
+ if (Collections.disjoint(latestNids, nids))
+ log.info("Connection successfully established to cluster with nodes: {}", F.viewReadOnly(nids, ID2ID8));
+
+ client.emit(EVENT_CLUSTER_TOPOLOGY, nids);
+
+ latestNids = nids;
+
+ break;
+
+ default:
+ log.warn(top.getError());
+
+ clusterDisconnect();
+ }
+ }
+ catch (IOException ignore) {
+ clusterDisconnect();
+ }
+ }
+ }
+
+ /** */
+ private class BroadcastTask implements Runnable {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ RestResult top = restExecutor.topology(false, true);
+
+ switch (top.getStatus()) {
+ case STATUS_SUCCESS:
+ List<GridClientNodeBean> nodes = mapper.readValue(top.getData(),
+ new TypeReference<List<GridClientNodeBean>>() {});
+
+ Collection<UUID> nids = F.viewReadOnly(nodes, NODE2ID);
+
+ if (Collections.disjoint(latestNids, nids)) {
+ clusterConnect(nids);
+
+ clusterDisconnect();
+
+ watch();
+ }
+
+ latestNids = nids;
+
+ client.emit(EVENT_CLUSTER_TOPOLOGY, top.getData());
+
+ break;
+
+ default:
+ log.warn(top.getError());
+
+ clusterDisconnect();
+ }
+ }
+ catch (IOException ignore) {
+ clusterDisconnect();
+
+ watch();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DatabaseListener.java
----------------------------------------------------------------------
diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DatabaseListener.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DatabaseListener.java
index a0e9f8f..745c1f2 100644
--- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DatabaseListener.java
+++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DatabaseListener.java
@@ -38,6 +38,7 @@ import org.apache.ignite.console.agent.db.DbMetadataReader;
import org.apache.ignite.console.agent.db.DbTable;
import org.apache.log4j.Logger;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.ignite.console.agent.AgentUtils.resolvePath;
/**
@@ -84,7 +85,6 @@ public class DatabaseListener {
/** */
private final AbstractListener metadataLsnr = new AbstractListener() {
- @SuppressWarnings("unchecked")
@Override public Object execute(Map<String, Object> args) throws Exception {
String driverPath = null;
@@ -155,7 +155,7 @@ public class DatabaseListener {
URL url = new URL("jar", null,
"file:" + (win ? "/" : "") + file.getPath() + "!/META-INF/services/java.sql.Driver");
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream()))) {
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), UTF_8))) {
String jdbcDriverCls = reader.readLine();
res.add(new JdbcDriver(file.getName(), jdbcDriverCls));
@@ -209,7 +209,7 @@ public class DatabaseListener {
* @param jdbcUrl JDBC URL.
* @param jdbcInfo Properties to connect to database.
* @return Collection of schema names.
- * @throws SQLException If failed to load schemas.
+ * @throws SQLException If failed to collect schemas.
*/
protected Collection<String> schemas(String jdbcDriverJarPath, String jdbcDriverCls, String jdbcUrl,
Properties jdbcInfo) throws SQLException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DemoListener.java
----------------------------------------------------------------------
diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DemoListener.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DemoListener.java
new file mode 100644
index 0000000..c496817
--- /dev/null
+++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DemoListener.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.console.agent.handlers;
+
+import io.socket.client.Ack;
+import io.socket.client.Socket;
+import io.socket.emitter.Emitter;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.console.agent.rest.RestExecutor;
+import org.apache.ignite.console.agent.rest.RestResult;
+import org.apache.ignite.console.demo.AgentClusterDemo;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.ignite.console.agent.AgentUtils.safeCallback;
+import static org.apache.ignite.console.agent.AgentUtils.toJSON;
+
+/**
+ *
+ */
+public class DemoListener {
+ /** */
+ private static final String EVENT_DEMO_TOPOLOGY = "demo:topology";
+
+ /** Default timeout. */
+ private static final long DFLT_TIMEOUT = 3000L;
+
+ /** */
+ private static final Logger log = LoggerFactory.getLogger(DemoListener.class);
+
+ /** */
+ private static final ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
+
+ /** */
+ private ScheduledFuture<?> refreshTask;
+
+ /** */
+ private Socket client;
+
+ /** */
+ private RestExecutor restExecutor;
+
+ /**
+ * @param client Client.
+ * @param restExecutor Client.
+ */
+ public DemoListener(Socket client, RestExecutor restExecutor) {
+ this.client = client;
+ this.restExecutor = restExecutor;
+ }
+
+ /**
+ * Start broadcast topology to server-side.
+ */
+ public Emitter.Listener start() {
+ return new Emitter.Listener() {
+ @Override public void call(final Object... args) {
+ final Ack demoStartCb = safeCallback(args);
+
+ final long timeout = args.length > 1 && args[1] instanceof Long ? (long)args[1] : DFLT_TIMEOUT;
+
+ if (refreshTask != null)
+ refreshTask.cancel(true);
+
+ final CountDownLatch latch = AgentClusterDemo.tryStart();
+
+ pool.schedule(new Runnable() {
+ @Override public void run() {
+ try {
+ U.await(latch);
+
+ demoStartCb.call();
+
+ refreshTask = pool.scheduleWithFixedDelay(new Runnable() {
+ @Override public void run() {
+ try {
+ RestResult top = restExecutor.topology(true, true);
+
+ client.emit(EVENT_DEMO_TOPOLOGY, toJSON(top));
+ }
+ catch (IOException e) {
+ log.info("Lost connection to the demo cluster", e);
+
+ stop().call(); // TODO WTF????
+ }
+ }
+ }, 0L, timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (Exception e) {
+ demoStartCb.call(e);
+ }
+ }
+ }, 0, TimeUnit.MILLISECONDS);
+ }
+ };
+ }
+
+ /**
+ * Stop broadcast topology to server-side.
+ */
+ public Emitter.Listener stop() {
+ return new Emitter.Listener() {
+ @Override public void call(Object... args) {
+ refreshTask.cancel(true);
+
+ AgentClusterDemo.stop();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/RestListener.java
----------------------------------------------------------------------
diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/RestListener.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/RestListener.java
index fcacc88..2588e8e 100644
--- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/RestListener.java
+++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/RestListener.java
@@ -17,93 +17,32 @@
package org.apache.ignite.console.agent.handlers;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.URISyntaxException;
-import java.nio.charset.Charset;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.commons.codec.Charsets;
-import org.apache.http.Header;
-import org.apache.http.NameValuePair;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpRequestBase;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
-import org.apache.ignite.console.agent.AgentConfiguration;
-import org.apache.ignite.console.demo.AgentClusterDemo;
-import org.apache.log4j.Logger;
-
-import static org.apache.ignite.console.agent.AgentConfiguration.DFLT_NODE_PORT;
+import org.apache.ignite.console.agent.rest.RestExecutor;
/**
* API to translate REST requests to Ignite cluster.
*/
public class RestListener extends AbstractListener {
/** */
- private static final Logger log = Logger.getLogger(RestListener.class.getName());
-
- /** */
- private final AgentConfiguration cfg;
-
- /** */
- private CloseableHttpClient httpClient;
+ private final RestExecutor restExecutor;
/**
- * @param cfg Config.
+ * @param restExecutor Config.
*/
- public RestListener(AgentConfiguration cfg) {
- super();
-
- this.cfg = cfg;
-
- // Create a connection manager with custom configuration.
- PoolingHttpClientConnectionManager connMgr = new PoolingHttpClientConnectionManager();
-
- connMgr.setDefaultMaxPerRoute(Integer.MAX_VALUE);
- connMgr.setMaxTotal(Integer.MAX_VALUE);
-
- httpClient = HttpClientBuilder.create().setConnectionManager(connMgr).build();
- }
-
- /** {@inheritDoc} */
- @Override public void stop() {
- super.stop();
-
- if (httpClient != null) {
- try {
- httpClient.close();
- }
- catch (IOException ignore) {
- // No-op.
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override protected ExecutorService newThreadPool() {
- return Executors.newCachedThreadPool();
+ public RestListener(RestExecutor restExecutor) {
+ this.restExecutor = restExecutor;
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public Object execute(Map<String, Object> args) throws Exception {
if (log.isDebugEnabled())
log.debug("Start parse REST command args: " + args);
- String uri = null;
+ String path = null;
if (args.containsKey("uri"))
- uri = args.get("uri").toString();
+ path = args.get("uri").toString();
Map<String, Object> params = null;
@@ -130,158 +69,6 @@ public class RestListener extends AbstractListener {
if (args.containsKey("body"))
body = args.get("body").toString();
- return executeRest(uri, params, demo, mtd, headers, body);
- }
-
- /**
- * @param uri Url.
- * @param params Params.
- * @param demo Use demo node.
- * @param mtd Method.
- * @param headers Headers.
- * @param body Body.
- */
- protected RestResult executeRest(String uri, Map<String, Object> params, boolean demo,
- String mtd, Map<String, Object> headers, String body) throws IOException, URISyntaxException {
- if (log.isDebugEnabled())
- log.debug("Start execute REST command [method=" + mtd + ", uri=/" + (uri == null ? "" : uri) +
- ", parameters=" + params + "]");
-
- final URIBuilder builder;
-
- if (demo) {
- // try start demo if needed.
- AgentClusterDemo.testDrive(cfg);
-
- // null if demo node not started yet.
- if (cfg.demoNodeUri() == null)
- return RestResult.fail("Demo node is not started yet.", 404);
-
- builder = new URIBuilder(cfg.demoNodeUri());
- }
- else
- builder = new URIBuilder(cfg.nodeUri());
-
- if (builder.getPort() == -1)
- builder.setPort(DFLT_NODE_PORT);
-
- if (uri != null) {
- if (!uri.startsWith("/") && !cfg.nodeUri().endsWith("/"))
- uri = '/' + uri;
-
- builder.setPath(uri);
- }
-
- if (params != null) {
- for (Map.Entry<String, Object> entry : params.entrySet()) {
- if (entry.getValue() != null)
- builder.addParameter(entry.getKey(), entry.getValue().toString());
- }
- }
-
- HttpRequestBase httpReq = null;
-
- try {
- if ("GET".equalsIgnoreCase(mtd))
- httpReq = new HttpGet(builder.build());
- else if ("POST".equalsIgnoreCase(mtd)) {
- HttpPost post;
-
- if (body == null) {
- List<NameValuePair> nvps = builder.getQueryParams();
-
- builder.clearParameters();
-
- post = new HttpPost(builder.build());
-
- if (!nvps.isEmpty())
- post.setEntity(new UrlEncodedFormEntity(nvps));
- }
- else {
- post = new HttpPost(builder.build());
-
- post.setEntity(new StringEntity(body));
- }
-
- httpReq = post;
- }
- else
- throw new IOException("Unknown HTTP-method: " + mtd);
-
- if (headers != null) {
- for (Map.Entry<String, Object> entry : headers.entrySet())
- httpReq.addHeader(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
- }
-
- try (CloseableHttpResponse resp = httpClient.execute(httpReq)) {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
-
- resp.getEntity().writeTo(out);
-
- Charset charset = Charsets.UTF_8;
-
- Header encodingHdr = resp.getEntity().getContentEncoding();
-
- if (encodingHdr != null) {
- String encoding = encodingHdr.getValue();
-
- charset = Charsets.toCharset(encoding);
- }
-
- return RestResult.success(resp.getStatusLine().getStatusCode(), new String(out.toByteArray(), charset));
- }
- catch (ConnectException e) {
- log.info("Failed connect to node and execute REST command [uri=" + builder.build() + "]");
-
- return RestResult.fail("Failed connect to node and execute REST command.", 404);
- }
- }
- finally {
- if (httpReq != null)
- httpReq.reset();
- }
- }
-
- /**
- * Request result.
- */
- public static class RestResult {
- /** The field contains description of error if server could not handle the request. */
- public final String error;
-
- /** REST http code. */
- public final int code;
-
- /** The field contains result of command. */
- public final String data;
-
- /**
- * @param error The field contains description of error if server could not handle the request.
- * @param resCode REST http code.
- * @param res The field contains result of command.
- */
- private RestResult(String error, int resCode, String res) {
- this.error = error;
- this.code = resCode;
- this.data = res;
- }
-
- /**
- * @param error The field contains description of error if server could not handle the request.
- * @param restCode REST http code.
- * @return Request result.
- */
- public static RestResult fail(String error, int restCode) {
- return new RestResult(error, restCode, null);
- }
-
- /**
- * @param code REST http code.
- * @param data The field contains result of command.
- * @return Request result.
- */
- public static RestResult success(int code, String data) {
- return new RestResult(null, code, data);
- }
+ return restExecutor.execute(demo, path, params, mtd, headers, body);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestExecutor.java
----------------------------------------------------------------------
diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestExecutor.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestExecutor.java
new file mode 100644
index 0000000..d651aca
--- /dev/null
+++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestExecutor.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.console.agent.rest;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.Map;
+import okhttp3.FormBody;
+import okhttp3.HttpUrl;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import org.apache.ignite.console.demo.*;
+import org.apache.ignite.internal.processors.rest.protocols.http.jetty.GridJettyObjectMapper;
+import org.apache.log4j.Logger;
+
+import static org.apache.ignite.internal.processors.rest.GridRestResponse.STATUS_AUTH_FAILED;
+import static org.apache.ignite.internal.processors.rest.GridRestResponse.STATUS_FAILED;
+import static org.apache.ignite.internal.processors.rest.GridRestResponse.STATUS_SUCCESS;
+
+/**
+ *
+ */
+public class RestExecutor {
+ /** */
+ private static final Logger log = Logger.getLogger(RestExecutor.class);
+
+ /** JSON object mapper. */
+ private static final ObjectMapper mapper = new GridJettyObjectMapper();
+
+ /** */
+ private final OkHttpClient httpClient;
+
+ /** Node URL. */
+ private String nodeUrl;
+
+ /**
+ * Default constructor.
+ */
+ public RestExecutor(String nodeUrl) {
+ this.nodeUrl = nodeUrl;
+
+ httpClient = new OkHttpClient.Builder().build();
+ }
+
+ /**
+ * Stop HTTP client.
+ */
+ public void stop() {
+ if (httpClient != null)
+ httpClient.dispatcher().executorService().shutdown();
+ }
+
+ /** */
+ private RestResult sendRequest(boolean demo, String path, Map<String, Object> params,
+ String mtd, Map<String, Object> headers, String body) throws IOException {
+ if (demo && AgentClusterDemo.getDemoUrl() == null) {
+ try {
+ AgentClusterDemo.tryStart().await();
+ }
+ catch (InterruptedException ignore) {
+ throw new IllegalStateException("Failed to execute request because of embedded node for demo mode is not started yet.");
+ }
+ }
+
+ String url = demo ? AgentClusterDemo.getDemoUrl() : nodeUrl;
+
+ HttpUrl.Builder urlBuilder = HttpUrl.parse(url)
+ .newBuilder();
+
+ if (path != null)
+ urlBuilder.addPathSegment(path);
+
+ final Request.Builder reqBuilder = new Request.Builder();
+
+ if (headers != null) {
+ for (Map.Entry<String, Object> entry : headers.entrySet())
+ if (entry.getValue() != null)
+ reqBuilder.addHeader(entry.getKey(), entry.getValue().toString());
+ }
+
+ if ("GET".equalsIgnoreCase(mtd)) {
+ if (params != null) {
+ for (Map.Entry<String, Object> entry : params.entrySet()) {
+ if (entry.getValue() != null)
+ urlBuilder.addQueryParameter(entry.getKey(), entry.getValue().toString());
+ }
+ }
+ }
+ else if ("POST".equalsIgnoreCase(mtd)) {
+ if (body != null) {
+ MediaType contentType = MediaType.parse("text/plain");
+
+ reqBuilder.post(RequestBody.create(contentType, body));
+ }
+ else {
+ FormBody.Builder formBody = new FormBody.Builder();
+
+ if (params != null) {
+ for (Map.Entry<String, Object> entry : params.entrySet()) {
+ if (entry.getValue() != null)
+ formBody.add(entry.getKey(), entry.getValue().toString());
+ }
+ }
+
+ reqBuilder.post(formBody.build());
+ }
+ }
+ else
+ throw new IllegalArgumentException("Unknown HTTP-method: " + mtd);
+
+ reqBuilder.url(urlBuilder.build());
+
+ try (Response resp = httpClient.newCall(reqBuilder.build()).execute()) {
+ String content = resp.body().string();
+
+ if (resp.isSuccessful()) {
+ JsonNode node = mapper.readTree(content);
+
+ int status = node.get("successStatus").asInt();
+
+ switch (status) {
+ case STATUS_SUCCESS:
+ return RestResult.success(node.get("response").toString());
+
+ default:
+ return RestResult.fail(status, node.get("error").asText());
+ }
+ }
+
+ if (resp.code() == 401)
+ return RestResult.fail(STATUS_AUTH_FAILED, "Failed to authenticate in grid. Please check agent\'s login and password or node port.");
+
+ return RestResult.fail(STATUS_FAILED, "Failed connect to node and execute REST command.");
+ }
+ catch (ConnectException ignore) {
+ throw new ConnectException("Failed connect to node and execute REST command [url=" + urlBuilder + "]");
+ }
+ }
+
+ /**
+ * @param demo Is demo node request.
+ * @param path Path segment.
+ * @param params Params.
+ * @param mtd Method.
+ * @param headers Headers.
+ * @param body Body.
+ */
+ public RestResult execute(boolean demo, String path, Map<String, Object> params,
+ String mtd, Map<String, Object> headers, String body) {
+ log.debug("Start execute REST command [method=" + mtd + ", uri=/" + (path == null ? "" : path) +
+ ", parameters=" + params + "]");
+
+ try {
+ return sendRequest(demo, path, params, mtd, headers, body);
+ }
+ catch (Exception e) {
+ log.info("Failed to execute REST command [method=" + mtd + ", uri=/" + (path == null ? "" : path) +
+ ", parameters=" + params + "]", e);
+
+ return RestResult.fail(404, e.getMessage());
+ }
+ }
+
+ /**
+ * @param demo Is demo node request.
+ */
+ public RestResult topology(boolean demo, boolean full) throws IOException {
+ Map<String, Object> params = new HashMap<>(3);
+
+ params.put("cmd", "top");
+ params.put("attr", full);
+ params.put("mtr", full);
+
+ return sendRequest(demo, "ignite", params, "GET", null, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestResult.java
----------------------------------------------------------------------
diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestResult.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestResult.java
new file mode 100644
index 0000000..5beeee7
--- /dev/null
+++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestResult.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.console.agent.rest;
+
+/**
+ * Request result.
+ */
+public class RestResult {
+ /** REST http code. */
+ private final int status;
+
+ /** The field contains description of error if server could not handle the request. */
+ private final String error;
+
+ /** The field contains result of command. */
+ private final String data;
+
+ /**
+ * @param status REST http code.
+ * @param error The field contains description of error if server could not handle the request.
+ * @param data The field contains result of command.
+ */
+ private RestResult(int status, String error, String data) {
+ this.status = status;
+ this.error = error;
+ this.data = data;
+ }
+
+ /**
+ * @param status REST http code.
+ * @param error The field contains description of error if server could not handle the request.
+ * @return Request result.
+ */
+ public static RestResult fail(int status, String error) {
+ return new RestResult(status, error, null);
+ }
+
+ /**
+ * @param data The field contains result of command.
+ * @return Request result.
+ */
+ public static RestResult success(String data) {
+ return new RestResult(0, null, data);
+ }
+
+ /**
+ * @return REST http code.
+ */
+ public int getStatus() {
+ return status;
+ }
+
+ /**
+ * @return The field contains description of error if server could not handle the request.
+ */
+ public String getError() {
+ return error;
+ }
+
+ /**
+ * @return The field contains result of command.
+ */
+ public String getData() {
+ return data;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentClusterDemo.java
----------------------------------------------------------------------
diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentClusterDemo.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentClusterDemo.java
index 3bd0b5a..776e407 100644
--- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentClusterDemo.java
+++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentClusterDemo.java
@@ -19,36 +19,38 @@ package org.apache.ignite.console.demo;
import java.util.Collection;
import java.util.Collections;
-import java.util.concurrent.Executors;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.console.agent.AgentConfiguration;
import org.apache.ignite.console.demo.service.DemoCachesLoadService;
import org.apache.ignite.console.demo.service.DemoRandomCacheLoadService;
-import org.apache.ignite.console.demo.service.DemoServiceMultipleInstances;
import org.apache.ignite.console.demo.service.DemoServiceClusterSingleton;
import org.apache.ignite.console.demo.service.DemoServiceKeyAffinity;
+import org.apache.ignite.console.demo.service.DemoServiceMultipleInstances;
import org.apache.ignite.console.demo.service.DemoServiceNodeSingleton;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.logger.log4j.Log4JLogger;
+import org.apache.ignite.logger.slf4j.Slf4jLogger;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_PERFORMANCE_SUGGESTIONS_DISABLED;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_JETTY_PORT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_ASCII;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PERFORMANCE_SUGGESTIONS_DISABLED;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_QUIET;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER;
+import static org.apache.ignite.console.demo.AgentDemoUtils.newScheduledThreadPool;
import static org.apache.ignite.events.EventType.EVTS_DISCOVERY;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_REST_JETTY_ADDRS;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_REST_JETTY_PORT;
@@ -60,21 +62,28 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_REST_JETTY_PO
*/
public class AgentClusterDemo {
/** */
- private static final Logger log = Logger.getLogger(AgentClusterDemo.class.getName());
+ private static final Logger log = LoggerFactory.getLogger(AgentClusterDemo.class);
+
+ /** */
+ private static final AtomicBoolean initGuard = new AtomicBoolean();
+
+ /** */
+ private static CountDownLatch initLatch = new CountDownLatch(1);
/** */
- private static final AtomicBoolean initLatch = new AtomicBoolean();
+ private static volatile String demoUrl;
/** */
private static final int NODE_CNT = 3;
/**
* Configure node.
+ * @param basePort Base port.
* @param gridIdx Ignite instance name index.
* @param client If {@code true} then start client node.
* @return IgniteConfiguration
*/
- private static IgniteConfiguration igniteConfiguration(int gridIdx, boolean client) {
+ private static IgniteConfiguration igniteConfiguration(int basePort, int gridIdx, boolean client) {
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setIgniteInstanceName((client ? "demo-client-" : "demo-server-" ) + gridIdx);
@@ -82,14 +91,20 @@ public class AgentClusterDemo {
cfg.setEventStorageSpi(new MemoryEventStorageSpi());
cfg.setIncludeEventTypes(EVTS_DISCOVERY);
+ cfg.getConnectorConfiguration().setPort(basePort);
+
+ System.setProperty(IGNITE_JETTY_PORT, String.valueOf(basePort + 10));
+
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
- ipFinder.setAddresses(Collections.singletonList("127.0.0.1:60900.." + (60900 + NODE_CNT - 1)));
+ int discoPort = basePort + 20;
+
+ ipFinder.setAddresses(Collections.singletonList("127.0.0.1:" + discoPort + ".." + (discoPort + NODE_CNT - 1)));
// Configure discovery SPI.
TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
- discoSpi.setLocalPort(60900);
+ discoSpi.setLocalPort(discoPort);
discoSpi.setIpFinder(ipFinder);
cfg.setDiscoverySpi(discoSpi);
@@ -97,12 +112,15 @@ public class AgentClusterDemo {
TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
commSpi.setSharedMemoryPort(-1);
- commSpi.setLocalPort(60800);
+ commSpi.setMessageQueueLimit(10);
+
+ int commPort = basePort + 30;
+
+ commSpi.setLocalPort(commPort);
cfg.setCommunicationSpi(commSpi);
- cfg.setGridLogger(new Log4JLogger(log));
+ cfg.setGridLogger(new Slf4jLogger(log));
cfg.setMetricsLogFrequency(0);
- cfg.getConnectorConfiguration().setPort(60700);
if (client)
cfg.setClientMode(true);
@@ -113,88 +131,107 @@ public class AgentClusterDemo {
/**
* Starts read and write from cache in background.
*
- * @param ignite Ignite.
- * @param cnt - maximum count read/write key
+ * @param services Distributed services on the grid.
*/
- private static void startLoad(final Ignite ignite, final int cnt) {
- ignite.services().deployClusterSingleton("Demo caches load service", new DemoCachesLoadService(cnt));
- ignite.services().deployNodeSingleton("RandomCache load service", new DemoRandomCacheLoadService(cnt));
+ private static void deployServices(IgniteServices services) {
+ services.deployMultiple("Demo service: Multiple instances", new DemoServiceMultipleInstances(), 7, 3);
+ services.deployNodeSingleton("Demo service: Node singleton", new DemoServiceNodeSingleton());
+ services.deployClusterSingleton("Demo service: Cluster singleton", new DemoServiceClusterSingleton());
+ services.deployKeyAffinitySingleton("Demo service: Key affinity singleton",
+ new DemoServiceKeyAffinity(), DemoCachesLoadService.CAR_CACHE_NAME, "id");
+
+ services.deployClusterSingleton("Demo caches load service", new DemoCachesLoadService(20));
+ services.deployNodeSingleton("RandomCache load service", new DemoRandomCacheLoadService(20));
+ }
+
+ /** */
+ public static String getDemoUrl() {
+ return demoUrl;
}
/**
* Start ignite node with cacheEmployee and populate it with data.
*/
- public static boolean testDrive(AgentConfiguration acfg) {
- if (initLatch.compareAndSet(false, true)) {
+ public static CountDownLatch tryStart() {
+ if (initGuard.compareAndSet(false, true)) {
log.info("DEMO: Starting embedded nodes for demo...");
+ System.setProperty(IGNITE_NO_ASCII, "true");
+ System.setProperty(IGNITE_QUIET, "false");
+ System.setProperty(IGNITE_UPDATE_NOTIFIER, "false");
+
System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "1");
System.setProperty(IGNITE_PERFORMANCE_SUGGESTIONS_DISABLED, "true");
- System.setProperty(IGNITE_UPDATE_NOTIFIER, "false");
- System.setProperty(IGNITE_JETTY_PORT, "60800");
- System.setProperty(IGNITE_NO_ASCII, "true");
+ final AtomicInteger basePort = new AtomicInteger(60700);
+ final AtomicInteger cnt = new AtomicInteger(-1);
- try {
- IgniteEx ignite = (IgniteEx)Ignition.start(igniteConfiguration(0, false));
+ final ScheduledExecutorService execSrv = newScheduledThreadPool(1, "demo-nodes-start");
- final AtomicInteger cnt = new AtomicInteger(0);
+ execSrv.scheduleAtFixedRate(new Runnable() {
+ @Override public void run() {
+ int idx = cnt.incrementAndGet();
+ int port = basePort.get();
- final ScheduledExecutorService execSrv = Executors.newSingleThreadScheduledExecutor();
+ try {
+ IgniteEx ignite = (IgniteEx)Ignition.start(igniteConfiguration(port, idx, idx == NODE_CNT));
- execSrv.scheduleAtFixedRate(new Runnable() {
- @Override public void run() {
- int idx = cnt.incrementAndGet();
+ if (idx == 0) {
+ Collection<String> jettyAddrs = ignite.localNode().attribute(ATTR_REST_JETTY_ADDRS);
- try {
- Ignition.start(igniteConfiguration(idx, idx == NODE_CNT));
- }
- catch (Throwable e) {
- log.error("DEMO: Failed to start embedded node: " + e.getMessage());
- }
- finally {
- if (idx == NODE_CNT)
- execSrv.shutdown();
- }
- }
- }, 10, 10, TimeUnit.SECONDS);
+ if (jettyAddrs == null) {
+ ignite.cluster().stopNodes();
+
+ throw new IgniteException("DEMO: Failed to start Jetty REST server on embedded node");
+ }
- IgniteServices services = ignite.services();
+ String jettyHost = jettyAddrs.iterator().next();
- services.deployMultiple("Demo service: Multiple instances", new DemoServiceMultipleInstances(), 7, 3);
- services.deployNodeSingleton("Demo service: Node singleton", new DemoServiceNodeSingleton());
- services.deployClusterSingleton("Demo service: Cluster singleton", new DemoServiceClusterSingleton());
- services.deployKeyAffinitySingleton("Demo service: Key affinity singleton",
- new DemoServiceKeyAffinity(), DemoCachesLoadService.CAR_CACHE_NAME, "id");
+ Integer jettyPort = ignite.localNode().attribute(ATTR_REST_JETTY_PORT);
- if (log.isDebugEnabled())
- log.debug("DEMO: Started embedded nodes with indexed enabled caches...");
+ if (F.isEmpty(jettyHost) || jettyPort == null)
+ throw new IgniteException("DEMO: Failed to start Jetty REST handler on embedded node");
- Collection<String> jettyAddrs = ignite.localNode().attribute(ATTR_REST_JETTY_ADDRS);
+ log.info("DEMO: Started embedded node for demo purpose [TCP binary port={}, Jetty REST port={}]", port, jettyPort);
- String host = jettyAddrs == null ? null : jettyAddrs.iterator().next();
+ demoUrl = String.format("http://%s:%d", jettyHost, jettyPort);
- Integer port = ignite.localNode().attribute(ATTR_REST_JETTY_PORT);
+ initLatch.countDown();
- if (F.isEmpty(host) || port == null) {
- log.error("DEMO: Failed to start embedded node with rest!");
+ deployServices(ignite.services());
+ }
+ }
+ catch (Throwable e) {
+ if (idx == 0) {
+ basePort.getAndAdd(50);
- return false;
+ log.warn("DEMO: Failed to start embedded node.", e);
+ }
+ else
+ log.error("DEMO: Failed to start embedded node.", e);
+ }
+ finally {
+ if (idx == NODE_CNT) {
+ log.info("DEMO: All embedded nodes for demo successfully started");
+
+ execSrv.shutdown();
+ }
+ }
}
+ }, 1, 10, TimeUnit.SECONDS);
+ }
- acfg.demoNodeUri(String.format("http://%s:%d", host, port));
+ return initLatch;
+ }
- log.info("DEMO: Embedded nodes for sql and monitoring demo successfully started");
+ /** */
+ public static void stop() {
+ demoUrl = null;
- startLoad(ignite, 20);
- }
- catch (Exception e) {
- log.error("DEMO: Failed to start embedded node for sql and monitoring demo!", e);
+ Ignition.stopAll(true);
- return false;
- }
- }
+ initLatch = new CountDownLatch(1);
- return true;
+ initGuard.compareAndSet(true, false);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentDemoUtils.java
----------------------------------------------------------------------
diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentDemoUtils.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentDemoUtils.java
index fb34de7..fb5edb2 100644
--- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentDemoUtils.java
+++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentDemoUtils.java
@@ -24,7 +24,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * Utilites for Agent demo mode.
+ * Utilities for Agent demo mode.
*/
public class AgentDemoUtils {
/** Counter for threads in pool. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/service/DemoCachesLoadService.java
----------------------------------------------------------------------
diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/service/DemoCachesLoadService.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/service/DemoCachesLoadService.java
index 5f7823b..b21716c 100644
--- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/service/DemoCachesLoadService.java
+++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/service/DemoCachesLoadService.java
@@ -120,11 +120,9 @@ public class DemoCachesLoadService implements Service {
/** {@inheritDoc} */
@Override public void init(ServiceContext ctx) throws Exception {
- ignite.createCache(cacheCountry());
- ignite.createCache(cacheDepartment());
- ignite.createCache(cacheEmployee());
- ignite.createCache(cacheCar());
- ignite.createCache(cacheParking());
+ ignite.getOrCreateCaches(Arrays.asList(
+ cacheCountry(), cacheDepartment(), cacheEmployee(), cacheCar(), cacheParking()
+ ));
populateCacheEmployee();
populateCacheCar();
@@ -203,8 +201,8 @@ public class DemoCachesLoadService implements Service {
* @param name cache name.
* @return Cache configuration with basic properties set.
*/
- private static <K, V> CacheConfiguration<K, V> cacheConfiguration(String name) {
- CacheConfiguration<K, V> ccfg = new CacheConfiguration<>(name);
+ private static CacheConfiguration cacheConfiguration(String name) {
+ CacheConfiguration ccfg = new CacheConfiguration<>(name);
ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
ccfg.setQueryDetailMetricsSize(10);
@@ -218,8 +216,8 @@ public class DemoCachesLoadService implements Service {
/**
* Configure cacheCountry.
*/
- private static <K, V> CacheConfiguration<K, V> cacheCountry() {
- CacheConfiguration<K, V> ccfg = cacheConfiguration(COUNTRY_CACHE_NAME);
+ private static CacheConfiguration cacheCountry() {
+ CacheConfiguration ccfg = cacheConfiguration(COUNTRY_CACHE_NAME);
// Configure cacheCountry types.
Collection<QueryEntity> qryEntities = new ArrayList<>();
@@ -249,8 +247,8 @@ public class DemoCachesLoadService implements Service {
/**
* Configure cacheEmployee.
*/
- private static <K, V> CacheConfiguration<K, V> cacheDepartment() {
- CacheConfiguration<K, V> ccfg = cacheConfiguration(DEPARTMENT_CACHE_NAME);
+ private static CacheConfiguration cacheDepartment() {
+ CacheConfiguration ccfg = cacheConfiguration(DEPARTMENT_CACHE_NAME);
// Configure cacheDepartment types.
Collection<QueryEntity> qryEntities = new ArrayList<>();
@@ -280,8 +278,8 @@ public class DemoCachesLoadService implements Service {
/**
* Configure cacheEmployee.
*/
- private static <K, V> CacheConfiguration<K, V> cacheEmployee() {
- CacheConfiguration<K, V> ccfg = cacheConfiguration(EMPLOYEE_CACHE_NAME);
+ private static CacheConfiguration cacheEmployee() {
+ CacheConfiguration ccfg = cacheConfiguration(EMPLOYEE_CACHE_NAME);
ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
ccfg.setBackups(1);
@@ -314,7 +312,6 @@ public class DemoCachesLoadService implements Service {
type.setFields(qryFlds);
// Indexes for EMPLOYEE.
- Collection<QueryIndex> indexes = new ArrayList<>();
QueryIndex idx = new QueryIndex();
@@ -327,6 +324,8 @@ public class DemoCachesLoadService implements Service {
idx.setFields(indFlds);
+ Collection<QueryIndex> indexes = new ArrayList<>();
+
indexes.add(idx);
indexes.add(new QueryIndex("salary", QueryIndexType.SORTED, false, "EMP_SALARY"));
@@ -340,8 +339,8 @@ public class DemoCachesLoadService implements Service {
/**
* Configure cacheEmployee.
*/
- private static <K, V> CacheConfiguration<K, V> cacheParking() {
- CacheConfiguration<K, V> ccfg = cacheConfiguration(PARKING_CACHE_NAME);
+ private static CacheConfiguration cacheParking() {
+ CacheConfiguration ccfg = cacheConfiguration(PARKING_CACHE_NAME);
// Configure cacheParking types.
Collection<QueryEntity> qryEntities = new ArrayList<>();
@@ -371,8 +370,8 @@ public class DemoCachesLoadService implements Service {
/**
* Configure cacheEmployee.
*/
- private static <K, V> CacheConfiguration<K, V> cacheCar() {
- CacheConfiguration<K, V> ccfg = cacheConfiguration(CAR_CACHE_NAME);
+ private static CacheConfiguration cacheCar() {
+ CacheConfiguration ccfg = cacheConfiguration(CAR_CACHE_NAME);
// Configure cacheCar types.
Collection<QueryEntity> qryEntities = new ArrayList<>();