You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/10/22 11:12:30 UTC
[07/51] [abbrv] ignite git commit: ignite-1168 Added support for
metadata, scan commands in rest.
ignite-1168 Added support for metadata, scan commands in rest.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/12235254
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/12235254
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/12235254
Branch: refs/heads/ignite-1282
Commit: 1223525478a80f527abcce7327a4d8b92c8b085a
Parents: a1e5cc5
Author: Andrey <an...@gridgain.com>
Authored: Mon Oct 12 15:51:48 2015 +0700
Committer: Andrey <an...@gridgain.com>
Committed: Mon Oct 12 15:51:48 2015 +0700
----------------------------------------------------------------------
.../JettyRestProcessorAbstractSelfTest.java | 225 ++++++++
.../discovery/GridDiscoveryManager.java | 515 ++++++++++++++++++-
.../processors/cache/GridCacheProcessor.java | 48 +-
.../cache/query/GridCacheQueryManager.java | 74 ++-
.../cache/query/GridCacheSqlIndexMetadata.java | 7 +-
.../cache/query/GridCacheSqlMetadata.java | 22 +-
.../processors/rest/GridRestCommand.java | 8 +-
.../processors/rest/GridRestProcessor.java | 63 ++-
.../handlers/cache/GridCacheCommandHandler.java | 362 +++++++------
.../handlers/query/QueryCommandHandler.java | 195 +++++--
.../top/GridTopologyCommandHandler.java | 160 +++++-
.../rest/request/RestQueryRequest.java | 175 +++++++
.../rest/request/RestSqlQueryRequest.java | 125 -----
.../http/jetty/GridJettyJsonConfig.java | 158 +++++-
.../http/jetty/GridJettyRestHandler.java | 186 ++++---
15 files changed, 1849 insertions(+), 474 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index ac0edff..bb6e67e 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLEncoder;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -31,14 +32,24 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
+import net.sf.json.JSONNull;
import net.sf.json.JSONObject;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.query.GridCacheSqlIndexMetadata;
+import org.apache.ignite.internal.processors.cache.query.GridCacheSqlMetadata;
import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandler;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_JETTY_PORT;
@@ -907,6 +918,106 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
}
/**
+ * @param meta Metadata for Ignite cache.
+ * @throws Exception If failed.
+ */
+ private void testMetadata(GridCacheSqlMetadata meta) throws Exception {
+ Map<String, String> params = F.asMap("cmd", GridRestCommand.CACHE_METADATA.key());
+
+ if (meta.cacheName() != null)
+ params.put("cacheName", meta.cacheName());
+
+ String ret = content(params);
+
+ assertNotNull(ret);
+ assertTrue(!ret.isEmpty());
+
+ info("Cache metadata result: " + ret);
+
+ jsonEquals(ret, pattern("\\{.+\\}", true));
+
+ Map res = (Map)JSONObject.fromObject(ret).get("response");
+
+ Collection types = (Collection)res.get("types");
+
+ assertNotNull(types);
+ assertEqualsCollections(meta.types(), types);
+
+ Map keyClasses = (Map)res.get("keyClasses");
+
+ assertNotNull(keyClasses);
+ assertTrue(meta.keyClasses().equals(keyClasses));
+
+ Map valClasses = (Map)res.get("valClasses");
+
+ assertNotNull(valClasses);
+ assertTrue(meta.valClasses().equals(valClasses));
+
+ Map fields = (Map)res.get("fields");
+
+ assertNotNull(fields);
+ assertTrue(meta.fields().equals(fields));
+
+ Map indexesByType = (Map)res.get("indexes");
+
+ assertNotNull(indexesByType);
+ assertEquals(meta.indexes().size(), indexesByType.size());
+
+ for (Map.Entry<String, Collection<GridCacheSqlIndexMetadata>> metaIndexes : meta.indexes().entrySet()) {
+ Collection<Map> indexes = (Collection<Map>)indexesByType.get(metaIndexes.getKey());
+
+ assertNotNull(indexes);
+ assertEquals(metaIndexes.getValue().size(), indexes.size());
+
+ for (final GridCacheSqlIndexMetadata metaIdx : metaIndexes.getValue()) {
+ Map idx = F.find(indexes, null, new IgnitePredicate<Map>() {
+ @Override public boolean apply(Map map) {
+ return metaIdx.name().equals(map.get("name"));
+ }
+ });
+
+ assertNotNull(idx);
+
+ assertEqualsCollections(metaIdx.fields(), (Collection)idx.get("fields"));
+ assertEqualsCollections(metaIdx.descendings(), (Collection)idx.get("descendings"));
+ assertEquals(metaIdx.unique(), idx.get("unique"));
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMetadataLocal() throws Exception {
+ GridCacheProcessor cacheProc = grid(0).context().cache();
+
+ for (IgniteInternalCache<?, ?> cache : cacheProc.caches()) {
+ if (CU.isSystemCache(cache.name()))
+ continue;
+
+ GridCacheSqlMetadata meta = F.first(cache.context().queries().sqlMetadata());
+
+ testMetadata(meta);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMetadataRemote() throws Exception {
+ CacheConfiguration<Integer, String> partialCacheCfg = new CacheConfiguration<>("partial");
+
+ partialCacheCfg.setIndexedTypes(Integer.class, String.class);
+ partialCacheCfg.setNodeFilter(new NodeIdFilter(grid(1).localNode().id()));
+
+ IgniteCacheProxy<Integer, String> c = (IgniteCacheProxy<Integer, String>)grid(1).createCache(partialCacheCfg);
+
+ GridCacheSqlMetadata meta = F.first(c.context().queries().sqlMetadata());
+
+ testMetadata(meta);
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testTopology() throws Exception {
@@ -918,6 +1029,23 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
info("Topology command result: " + ret);
jsonEquals(ret, pattern("\\[\\{.+\\}\\]", true));
+
+ JSONObject json = JSONObject.fromObject(ret);
+
+ Collection<Map> nodes = (Collection)json.get("response");
+
+ assertEquals(GRID_CNT, nodes.size());
+
+ for (Map node : nodes) {
+ assertEquals(JSONNull.getInstance(), node.get("attributes"));
+ assertEquals(JSONNull.getInstance(), node.get("metrics"));
+
+ assertEquals("PARTITIONED", node.get("defaultCacheMode"));
+
+ Map caches = (Map)node.get("caches");
+
+ assertEquals(F.asMap("person", "PARTITIONED"), caches);
+ }
}
/**
@@ -1056,6 +1184,75 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
/**
* @throws Exception If failed.
*/
+ public void testQueryScan() throws Exception {
+ Map<String, String> params = new HashMap<>();
+ params.put("cmd", GridRestCommand.EXECUTE_SCAN_QUERY.key());
+ params.put("pageSize", "10");
+ params.put("cacheName", "person");
+
+ String ret = content(params);
+
+ assertNotNull(ret);
+ assertTrue(!ret.isEmpty());
+
+ JSONObject json = JSONObject.fromObject(ret);
+
+ List items = (List)((Map)json.get("response")).get("items");
+
+ assertEquals(4, items.size());
+
+ assertFalse(queryCursorFound());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFilterQueryScan() throws Exception {
+ Map<String, String> params = new HashMap<>();
+ params.put("cmd", GridRestCommand.EXECUTE_SCAN_QUERY.key());
+ params.put("pageSize", "10");
+ params.put("cacheName", "person");
+ params.put("classname", ScanFilter.class.getName());
+
+ String ret = content(params);
+
+ assertNotNull(ret);
+ assertTrue(!ret.isEmpty());
+
+ JSONObject json = JSONObject.fromObject(ret);
+
+ List items = (List)((Map)json.get("response")).get("items");
+
+ assertEquals(2, items.size());
+
+ assertFalse(queryCursorFound());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIncorrectFilterQueryScan() throws Exception {
+ Map<String, String> params = new HashMap<>();
+ params.put("cmd", GridRestCommand.EXECUTE_SCAN_QUERY.key());
+ params.put("pageSize", "10");
+ params.put("cacheName", "person");
+ params.put("classname", ScanFilter.class.getName() + 1);
+
+ String ret = content(params);
+
+ assertNotNull(ret);
+ assertTrue(!ret.isEmpty());
+
+ JSONObject json = JSONObject.fromObject(ret);
+
+ String err = (String)json.get("error");
+
+ assertTrue(err.contains("Failed to find target class"));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testQuery() throws Exception {
grid(0).cache(null).put("1", "1");
grid(0).cache(null).put("2", "2");
@@ -1323,4 +1520,32 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
return id;
}
}
+
+ /**
+ * Test filter for scan query.
+ */
+ public static class ScanFilter implements IgniteBiPredicate<Integer, Person> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(Integer integer, Person person) {
+ return person.salary > 1000;
+ }
+ }
+
+ /** Filter by node ID. */
+ private static class NodeIdFilter implements IgnitePredicate<ClusterNode> {
+ /** */
+ private final UUID nid;
+
+ /**
+ * @param nid Node ID where cache should be started.
+ */
+ NodeIdFilter(UUID nid) {
+ this.nid = nid;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode n) {
+ return n.id().equals(nid);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 2ed4520..9e54f6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -56,6 +56,7 @@ import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
@@ -299,16 +300,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @param cacheName Cache name.
* @param filter Cache filter.
* @param nearEnabled Near enabled flag.
- * @param loc {@code True} if cache is local.
+ * @param cacheMode Cache mode.
*/
public void setCacheFilter(
String cacheName,
IgnitePredicate<ClusterNode> filter,
boolean nearEnabled,
- boolean loc
+ CacheMode cacheMode
) {
if (!registeredCaches.containsKey(cacheName))
- registeredCaches.put(cacheName, new CachePredicate(filter, nearEnabled, loc));
+ registeredCaches.put(cacheName, new CachePredicate(filter, nearEnabled, cacheMode));
}
/**
@@ -1592,6 +1593,25 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * @param node Node to check.
+ * @return Cache names accessible on the given node.
+ */
+ public Map<String, CacheMode> nodeCaches(ClusterNode node) {
+ Map<String, CacheMode> caches = U.newHashMap(registeredCaches.size());
+
+ for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
+ String cacheName = entry.getKey();
+
+ CachePredicate pred = entry.getValue();
+
+ if (pred != null && pred.cacheNode(node))
+ caches.put(cacheName, pred.cacheMode);
+ }
+
+ return caches;
+ }
+
+ /**
* Checks if cache with given name has at least one node with near cache enabled.
*
* @param cacheName Cache name.
@@ -2822,7 +2842,484 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
}
- /**
+ /** Cache for discovery collections. */
+ private class DiscoCache {
+ /** Remote nodes. */
+ private final List<ClusterNode> rmtNodes;
+
+ /** All nodes. */
+ private final List<ClusterNode> allNodes;
+
+ /** All nodes with at least one cache configured. */
+ @GridToStringInclude
+ private final Collection<ClusterNode> allNodesWithCaches;
+
+ /** All nodes with at least one cache configured. */
+ @GridToStringInclude
+ private final Collection<ClusterNode> rmtNodesWithCaches;
+
+ /** Cache nodes by cache name. */
+ @GridToStringInclude
+ private final Map<String, Collection<ClusterNode>> allCacheNodes;
+
+ /** Remote cache nodes by cache name. */
+ @GridToStringInclude
+ private final Map<String, Collection<ClusterNode>> rmtCacheNodes;
+
+ /** Cache nodes by cache name. */
+ @GridToStringInclude
+ private final Map<String, Collection<ClusterNode>> affCacheNodes;
+
+ /** Caches where at least one node has near cache enabled. */
+ @GridToStringInclude
+ private final Set<String> nearEnabledCaches;
+
+ /** Nodes grouped by version. */
+ private final NavigableMap<IgniteProductVersion, Collection<ClusterNode>> nodesByVer;
+
+ /** Daemon nodes. */
+ private final List<ClusterNode> daemonNodes;
+
+ /** Node map. */
+ private final Map<UUID, ClusterNode> nodeMap;
+
+ /** Local node. */
+ private final ClusterNode loc;
+
+ /** Highest node order. */
+ private final long maxOrder;
+
+ /**
+ * Cached alive nodes list. As long as this collection doesn't accept {@code null}s use {@link
+ * #maskNull(String)} before passing raw cache names to it.
+ */
+ private final ConcurrentMap<String, Collection<ClusterNode>> aliveCacheNodes;
+
+ /**
+ * Cached alive remote nodes list. As long as this collection doesn't accept {@code null}s use {@link
+ * #maskNull(String)} before passing raw cache names to it.
+ */
+ private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes;
+
+ /**
+ * Cached alive remote nodes with caches.
+ */
+ private final Collection<ClusterNode> aliveNodesWithCaches;
+
+ /**
+ * Cached alive server remote nodes with caches.
+ */
+ private final Collection<ClusterNode> aliveSrvNodesWithCaches;
+
+ /**
+ * Cached alive remote server nodes with caches.
+ */
+ private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches;
+
+ /**
+ * @param loc Local node.
+ * @param rmts Remote nodes.
+ */
+ private DiscoCache(ClusterNode loc, Collection<ClusterNode> rmts) {
+ this.loc = loc;
+
+ rmtNodes = Collections.unmodifiableList(new ArrayList<>(F.view(rmts, daemonFilter)));
+
+ assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" +
+ " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']';
+
+ List<ClusterNode> all = new ArrayList<>(rmtNodes.size() + 1);
+
+ if (!loc.isDaemon())
+ all.add(loc);
+
+ all.addAll(rmtNodes);
+
+ Collections.sort(all, GridNodeOrderComparator.INSTANCE);
+
+ allNodes = Collections.unmodifiableList(all);
+
+ Map<String, Collection<ClusterNode>> cacheMap = new HashMap<>(allNodes.size(), 1.0f);
+ Map<String, Collection<ClusterNode>> rmtCacheMap = new HashMap<>(allNodes.size(), 1.0f);
+ Map<String, Collection<ClusterNode>> dhtNodesMap = new HashMap<>(allNodes.size(), 1.0f);
+ Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size());
+ Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size());
+
+ aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
+ aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
+ aliveNodesWithCaches = new ConcurrentSkipListSet<>();
+ aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>();
+ aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>();
+ nodesByVer = new TreeMap<>();
+
+ long maxOrder0 = 0;
+
+ Set<String> nearEnabledSet = new HashSet<>();
+
+ for (ClusterNode node : allNodes) {
+ assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
+
+ if (node.order() > maxOrder0)
+ maxOrder0 = node.order();
+
+ boolean hasCaches = false;
+
+ for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) {
+ String cacheName = entry.getKey();
+
+ CachePredicate filter = entry.getValue();
+
+ if (filter.cacheNode(node)) {
+ nodesWithCaches.add(node);
+
+ if (!loc.id().equals(node.id()))
+ rmtNodesWithCaches.add(node);
+
+ addToMap(cacheMap, cacheName, node);
+
+ if (alive(node.id()))
+ addToMap(aliveCacheNodes, maskNull(cacheName), node);
+
+ if (filter.dataNode(node))
+ addToMap(dhtNodesMap, cacheName, node);
+
+ if (filter.nearNode(node))
+ nearEnabledSet.add(cacheName);
+
+ if (!loc.id().equals(node.id())) {
+ addToMap(rmtCacheMap, cacheName, node);
+
+ if (alive(node.id()))
+ addToMap(aliveRmtCacheNodes, maskNull(cacheName), node);
+ }
+
+ hasCaches = true;
+ }
+ }
+
+ if (hasCaches) {
+ if (alive(node.id())) {
+ aliveNodesWithCaches.add(node);
+
+ if (!CU.clientNode(node)) {
+ aliveSrvNodesWithCaches.add(node);
+
+ if (!loc.id().equals(node.id()))
+ aliveRmtSrvNodesWithCaches.add(node);
+ }
+ }
+ }
+
+ IgniteProductVersion nodeVer = U.productVersion(node);
+
+ // Create collection for this version if it does not exist.
+ Collection<ClusterNode> nodes = nodesByVer.get(nodeVer);
+
+ if (nodes == null) {
+ nodes = new ArrayList<>(allNodes.size());
+
+ nodesByVer.put(nodeVer, nodes);
+ }
+
+ nodes.add(node);
+ }
+
+ // Need second iteration to add this node to all previous node versions.
+ for (ClusterNode node : allNodes) {
+ IgniteProductVersion nodeVer = U.productVersion(node);
+
+ // Get all versions lower or equal node's version.
+ NavigableMap<IgniteProductVersion, Collection<ClusterNode>> updateView =
+ nodesByVer.headMap(nodeVer, false);
+
+ for (Collection<ClusterNode> prevVersions : updateView.values())
+ prevVersions.add(node);
+ }
+
+ maxOrder = maxOrder0;
+
+ allCacheNodes = Collections.unmodifiableMap(cacheMap);
+ rmtCacheNodes = Collections.unmodifiableMap(rmtCacheMap);
+ affCacheNodes = Collections.unmodifiableMap(dhtNodesMap);
+ allNodesWithCaches = Collections.unmodifiableCollection(nodesWithCaches);
+ this.rmtNodesWithCaches = Collections.unmodifiableCollection(rmtNodesWithCaches);
+ nearEnabledCaches = Collections.unmodifiableSet(nearEnabledSet);
+
+ daemonNodes = Collections.unmodifiableList(new ArrayList<>(
+ F.view(F.concat(false, loc, rmts), F0.not(daemonFilter))));
+
+ Map<UUID, ClusterNode> nodeMap = new HashMap<>(allNodes().size() + daemonNodes.size(), 1.0f);
+
+ for (ClusterNode n : F.concat(false, allNodes(), daemonNodes()))
+ nodeMap.put(n.id(), n);
+
+ this.nodeMap = nodeMap;
+ }
+
+ /**
+ * Adds node to map.
+ *
+ * @param cacheMap Map to add to.
+ * @param cacheName Cache name.
+ * @param rich Node to add
+ */
+ private void addToMap(Map<String, Collection<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
+ Collection<ClusterNode> cacheNodes = cacheMap.get(cacheName);
+
+ if (cacheNodes == null) {
+ cacheNodes = new ArrayList<>(allNodes.size());
+
+ cacheMap.put(cacheName, cacheNodes);
+ }
+
+ cacheNodes.add(rich);
+ }
+
+ /** @return Local node. */
+ ClusterNode localNode() {
+ return loc;
+ }
+
+ /** @return Remote nodes. */
+ Collection<ClusterNode> remoteNodes() {
+ return rmtNodes;
+ }
+
+ /** @return All nodes. */
+ Collection<ClusterNode> allNodes() {
+ return allNodes;
+ }
+
+ /**
+ * Gets collection of nodes which have version equal or greater than {@code ver}.
+ *
+ * @param ver Version to check.
+ * @return Collection of nodes with version equal or greater than {@code ver}.
+ */
+ Collection<ClusterNode> elderNodes(IgniteProductVersion ver) {
+ Map.Entry<IgniteProductVersion, Collection<ClusterNode>> entry = nodesByVer.ceilingEntry(ver);
+
+ if (entry == null)
+ return Collections.emptyList();
+
+ return entry.getValue();
+ }
+
+ /**
+ * @return Versions map.
+ */
+ NavigableMap<IgniteProductVersion, Collection<ClusterNode>> versionsMap() {
+ return nodesByVer;
+ }
+
+ /**
+ * Gets collection of nodes with at least one cache configured.
+ *
+ * @param topVer Topology version (maximum allowed node order).
+ * @return Collection of nodes.
+ */
+ Collection<ClusterNode> allNodesWithCaches(final long topVer) {
+ return filter(topVer, allNodesWithCaches);
+ }
+
+ /**
+ * Gets all nodes that have cache with given name.
+ *
+ * @param cacheName Cache name.
+ * @param topVer Topology version.
+ * @return Collection of nodes.
+ */
+ Collection<ClusterNode> cacheNodes(@Nullable String cacheName, final long topVer) {
+ return filter(topVer, allCacheNodes.get(cacheName));
+ }
+
+ /**
+ * Gets all remote nodes that have cache with given name.
+ *
+ * @param cacheName Cache name.
+ * @param topVer Topology version.
+ * @return Collection of nodes.
+ */
+ Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, final long topVer) {
+ return filter(topVer, rmtCacheNodes.get(cacheName));
+ }
+
+ /**
+ * Gets all remote nodes that have at least one cache configured.
+ *
+ * @param topVer Topology version.
+ * @return Collection of nodes.
+ */
+ Collection<ClusterNode> remoteCacheNodes(final long topVer) {
+ return filter(topVer, rmtNodesWithCaches);
+ }
+
+ /**
+ * Gets all nodes that have cache with given name and should participate in affinity calculation. With
+ * partitioned cache nodes with near-only cache do not participate in affinity node calculation.
+ *
+ * @param cacheName Cache name.
+ * @param topVer Topology version.
+ * @return Collection of nodes.
+ */
+ Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, final long topVer) {
+ return filter(topVer, affCacheNodes.get(cacheName));
+ }
+
+ /**
+ * Gets all alive nodes that have cache with given name.
+ *
+ * @param cacheName Cache name.
+ * @param topVer Topology version.
+ * @return Collection of nodes.
+ */
+ Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, final long topVer) {
+ return filter(topVer, aliveCacheNodes.get(maskNull(cacheName)));
+ }
+
+ /**
+ * Gets all alive remote nodes that have cache with given name.
+ *
+ * @param cacheName Cache name.
+ * @param topVer Topology version.
+ * @return Collection of nodes.
+ */
+ Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, final long topVer) {
+ return filter(topVer, aliveRmtCacheNodes.get(maskNull(cacheName)));
+ }
+
+ /**
+ * Gets all alive remote server nodes with at least one cache configured.
+ *
+ * @param topVer Topology version.
+ * @return Collection of nodes.
+ */
+ Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) {
+ return filter(topVer, aliveRmtSrvNodesWithCaches);
+ }
+
+ /**
+ * Gets all alive server nodes with at least one cache configured.
+ *
+ * @param topVer Topology version.
+ * @return Collection of nodes.
+ */
+ Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) {
+ return filter(topVer, aliveSrvNodesWithCaches);
+ }
+
+ /**
+ * Gets all alive remote nodes with at least one cache configured.
+ *
+ * @param topVer Topology version.
+ * @return Collection of nodes.
+ */
+ Collection<ClusterNode> aliveNodesWithCaches(final long topVer) {
+ return filter(topVer, aliveNodesWithCaches);
+ }
+
+ /**
+ * Checks if cache with given name has at least one node with near cache enabled.
+ *
+ * @param cacheName Cache name.
+ * @return {@code True} if cache with given name has at least one node with near cache enabled.
+ */
+ boolean hasNearCache(@Nullable String cacheName) {
+ return nearEnabledCaches.contains(cacheName);
+ }
+
+ /**
+ * Removes left node from cached alives lists.
+ *
+ * @param leftNode Left node.
+ */
+ void updateAlives(ClusterNode leftNode) {
+ if (leftNode.order() > maxOrder)
+ return;
+
+ filterNodeMap(aliveCacheNodes, leftNode);
+
+ filterNodeMap(aliveRmtCacheNodes, leftNode);
+
+ aliveNodesWithCaches.remove(leftNode);
+ aliveSrvNodesWithCaches.remove(leftNode);
+ aliveRmtSrvNodesWithCaches.remove(leftNode);
+ }
+
+ /**
+ * Creates a copy of nodes map without the given node.
+ *
+ * @param map Map to copy.
+ * @param exclNode Node to exclude.
+ */
+ private void filterNodeMap(ConcurrentMap<String, Collection<ClusterNode>> map, final ClusterNode exclNode) {
+ for (String cacheName : registeredCaches.keySet()) {
+ String maskedName = maskNull(cacheName);
+
+ while (true) {
+ Collection<ClusterNode> oldNodes = map.get(maskedName);
+
+ if (oldNodes == null || oldNodes.isEmpty())
+ break;
+
+ Collection<ClusterNode> newNodes = new ArrayList<>(oldNodes);
+
+ if (!newNodes.remove(exclNode))
+ break;
+
+ if (map.replace(maskedName, oldNodes, newNodes))
+ break;
+ }
+ }
+ }
+
+ /**
+ * Replaces {@code null} with {@code NULL_CACHE_NAME}.
+ *
+ * @param cacheName Cache name.
+ * @return Masked name.
+ */
+ private String maskNull(@Nullable String cacheName) {
+ return cacheName == null ? NULL_CACHE_NAME : cacheName;
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @param nodes Nodes.
+ * @return Filtered collection (potentially empty, but never {@code null}).
+ */
+ private Collection<ClusterNode> filter(final long topVer, @Nullable Collection<ClusterNode> nodes) {
+ if (nodes == null)
+ return Collections.emptyList();
+
+ // If no filtering needed, return original collection.
+ return nodes.isEmpty() || topVer < 0 || topVer >= maxOrder ?
+ nodes :
+ F.view(nodes, new P1<ClusterNode>() {
+ @Override public boolean apply(ClusterNode node) {
+ return node.order() <= topVer;
+ }
+ });
+ }
+
+ /** @return Daemon nodes. */
+ Collection<ClusterNode> daemonNodes() {
+ return daemonNodes;
+ }
+
+ /**
+ * @param id Node ID.
+ * @return Node.
+ */
+ @Nullable ClusterNode node(UUID id) {
+ return nodeMap.get(id);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DiscoCache.class, this, "allNodesWithDaemons", U.toShortString(allNodes));
+ }
+ } /**
* Cache predicate.
*/
private static class CachePredicate {
@@ -2832,8 +3329,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** If near cache is enabled on data nodes. */
private final boolean nearEnabled;
- /** Flag indicating if cache is local. */
- private final boolean loc;
+ /** Cache mode. */
+ private final CacheMode cacheMode;
/** Collection of client near nodes. */
private final ConcurrentHashMap<UUID, Boolean> clientNodes;
@@ -2841,14 +3338,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* @param cacheFilter Cache filter.
* @param nearEnabled Near enabled flag.
- * @param loc {@code True} if cache is local.
+ * @param cacheMode Cache mode.
*/
- private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, boolean loc) {
+ private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, CacheMode cacheMode) {
assert cacheFilter != null;
this.cacheFilter = cacheFilter;
this.nearEnabled = nearEnabled;
- this.loc = loc;
+ this.cacheMode = cacheMode;
clientNodes = new ConcurrentHashMap<>();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index daa4475..736e630 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -689,7 +689,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
cfg.getName(),
cfg.getNodeFilter(),
cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED,
- cfg.getCacheMode() == LOCAL);
+ cfg.getCacheMode());
ctx.discovery().addClientNode(cfg.getName(),
ctx.localNodeId(),
@@ -1941,7 +1941,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.cacheName(),
ccfg.getNodeFilter(),
ccfg.getNearConfiguration() != null,
- ccfg.getCacheMode() == LOCAL);
+ ccfg.getCacheMode());
}
}
else {
@@ -1968,7 +1968,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.cacheName(),
ccfg.getNodeFilter(),
ccfg.getNearConfiguration() != null,
- ccfg.getCacheMode() == LOCAL);
+ ccfg.getCacheMode());
}
}
}
@@ -2468,7 +2468,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ccfg.getName(),
ccfg.getNodeFilter(),
ccfg.getNearConfiguration() != null,
- ccfg.getCacheMode() == LOCAL);
+ ccfg.getCacheMode());
ctx.discovery().addClientNode(req.cacheName(),
req.initiatingNodeId(),
@@ -3482,6 +3482,46 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
*
*/
+ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
+ private class TemplateConfigurationFuture extends GridFutureAdapter<Object> {
+ /** Start ID. */
+ @GridToStringInclude
+ private IgniteUuid deploymentId;
+
+ /** Cache name. */
+ private String cacheName;
+
+ /**
+ * @param cacheName Cache name.
+ * @param deploymentId Deployment ID.
+ */
+ private TemplateConfigurationFuture(String cacheName, IgniteUuid deploymentId) {
+ this.deploymentId = deploymentId;
+ this.cacheName = cacheName;
+ }
+
+ /**
+ * @return Start ID.
+ */
+ public IgniteUuid deploymentId() {
+ return deploymentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+ // Make sure to remove future before completion.
+ pendingTemplateFuts.remove(maskNull(cacheName), this);
+
+ return super.onDone(res, err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TemplateConfigurationFuture.class, this);
+ }
+ } /**
+ *
+ */
private static class LocalAffinityFunction implements AffinityFunction {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 25ace1b..698b035 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -2229,6 +2229,26 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/** {@inheritDoc} */
+ @Override public Map<String, String> keyClasses() {
+ return keyClasses;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, String> valClasses() {
+ return valClasses;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Map<String, String>> fields() {
+ return fields;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Collection<GridCacheSqlIndexMetadata>> indexes() {
+ return indexes;
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<GridCacheSqlIndexMetadata> indexes(String type) {
return indexes.get(type);
}
@@ -2319,6 +2339,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/** {@inheritDoc} */
+ @Override public Collection<String> descendings() {
+ return descendings;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean unique() {
return unique;
}
@@ -2687,15 +2712,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* Cached result.
*/
private abstract static class CachedResult<R> extends GridFutureAdapter<IgniteSpiCloseableIterator<R>> {
+ /** Absolute position of each recipient. */
+ private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1);
/** */
private CircularQueue<R> queue;
-
/** */
private int pruned;
- /** Absolute position of each recipient. */
- private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1);
-
/**
* @param rcpt ID of the recipient.
*/
@@ -3059,6 +3082,47 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
+ *
+ */
+ private class OffheapIteratorClosure
+ extends CX2<T2<Long, Integer>, T2<Long, Integer>, IgniteBiTuple<K, V>> {
+ /** */
+ private static final long serialVersionUID = 7410163202728985912L;
+
+ /** */
+ private IgniteBiPredicate<K, V> filter;
+
+ /** */
+ private boolean keepPortable;
+
+ /**
+ * @param filter Filter.
+ * @param keepPortable Keep portable flag.
+ */
+ private OffheapIteratorClosure(
+ @Nullable IgniteBiPredicate<K, V> filter,
+ boolean keepPortable) {
+ assert filter != null;
+
+ this.filter = filter;
+ this.keepPortable = keepPortable;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteBiTuple<K, V> applyx(T2<Long, Integer> keyPtr,
+ T2<Long, Integer> valPtr)
+ throws IgniteCheckedException {
+ LazyOffheapEntry e = new LazyOffheapEntry(keyPtr, valPtr);
+
+ K key = (K)cctx.unwrapPortableIfNeeded(e.key(), keepPortable);
+ V val = (V)cctx.unwrapPortableIfNeeded(e.value(), keepPortable);
+
+ if (!filter.apply(key, val))
+ return null;
+
+ return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value()));
+ }
+ } /**
* Creates user's SQL fields query for given clause. For more information refer to {@link CacheQuery}
* documentation.
*
@@ -3078,4 +3142,4 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
false,
keepPortable);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlIndexMetadata.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlIndexMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlIndexMetadata.java
index 6b3ed68..539a156 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlIndexMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlIndexMetadata.java
@@ -51,9 +51,14 @@ public interface GridCacheSqlIndexMetadata extends Externalizable {
public boolean descending(String field);
/**
+ * @return Descendings.
+ */
+ public Collection<String> descendings();
+
+ /**
* Gets whether this is a unique index.
*
* @return {@code True} if index is unique, {@code false} otherwise.
*/
public boolean unique();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlMetadata.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlMetadata.java
index dae034c..724962e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlMetadata.java
@@ -78,6 +78,26 @@ public interface GridCacheSqlMetadata extends Externalizable {
@Nullable public Map<String, String> fields(String type);
/**
+ * @return Key classes.
+ */
+ public Map<String, String> keyClasses();
+
+ /**
+ * @return Value classes.
+ */
+ public Map<String, String> valClasses();
+
+ /**
+ * @return Fields.
+ */
+ public Map<String, Map<String, String>> fields();
+
+ /**
+ * @return Indexes.
+ */
+ public Map<String, Collection<GridCacheSqlIndexMetadata>> indexes();
+
+ /**
* Gets descriptors of indexes created for provided type.
* See {@link GridCacheSqlIndexMetadata} javadoc for more information.
*
@@ -86,4 +106,4 @@ public interface GridCacheSqlMetadata extends Externalizable {
* @see GridCacheSqlIndexMetadata
*/
public Collection<GridCacheSqlIndexMetadata> indexes(String type);
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java
index 4f9b3ae..8282d3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java
@@ -96,6 +96,9 @@ public enum GridRestCommand {
/** Cache size. */
CACHE_SIZE("size"),
+ /** Cache metadata. */
+ CACHE_METADATA("metadata"),
+
/** Increment. */
ATOMIC_INCREMENT("incr"),
@@ -141,6 +144,9 @@ public enum GridRestCommand {
/** Execute sql fields query. */
EXECUTE_SQL_FIELDS_QUERY("qryfldexe"),
+ /** Execute scan query. */
+ EXECUTE_SCAN_QUERY("qryscanexe"),
+
/** Fetch query results. */
FETCH_SQL_QUERY("qryfetch"),
@@ -193,4 +199,4 @@ public enum GridRestCommand {
public String key() {
return key;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
index d54c8bb..df79232 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
@@ -51,7 +51,7 @@ import org.apache.ignite.internal.processors.rest.protocols.tcp.GridTcpRestProto
import org.apache.ignite.internal.processors.rest.request.GridRestCacheRequest;
import org.apache.ignite.internal.processors.rest.request.GridRestRequest;
import org.apache.ignite.internal.processors.rest.request.GridRestTaskRequest;
-import org.apache.ignite.internal.processors.rest.request.RestSqlQueryRequest;
+import org.apache.ignite.internal.processors.rest.request.RestQueryRequest;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -565,6 +565,53 @@ public class GridRestProcessor extends GridProcessorAdapter {
}
/**
+ * Applies {@link ConnectorMessageInterceptor}
+ * from {@link ConnectorConfiguration#getMessageInterceptor()} ()}
+ * to all user parameters in the request.
+ *
+ * @param req Client request.
+ */
+ private void interceptRequest(GridRestRequest req) {
+ ConnectorMessageInterceptor interceptor = config().getMessageInterceptor();
+
+ if (interceptor == null)
+ return;
+
+ if (req instanceof GridRestCacheRequest) {
+ GridRestCacheRequest req0 = (GridRestCacheRequest) req;
+
+ req0.key(interceptor.onReceive(req0.key()));
+ req0.value(interceptor.onReceive(req0.value()));
+ req0.value2(interceptor.onReceive(req0.value2()));
+
+ Map<Object, Object> oldVals = req0.values();
+
+ if (oldVals != null) {
+ Map<Object, Object> newVals = U.newHashMap(oldVals.size());
+
+ for (Map.Entry<Object, Object> e : oldVals.entrySet())
+ newVals.put(interceptor.onReceive(e.getKey()), interceptor.onReceive(e.getValue()));
+
+ req0.values(U.sealMap(newVals));
+ }
+ }
+ else if (req instanceof GridRestTaskRequest) {
+ GridRestTaskRequest req0 = (GridRestTaskRequest) req;
+
+ List<Object> oldParams = req0.params();
+
+ if (oldParams != null) {
+ Collection<Object> newParams = new ArrayList<>(oldParams.size());
+
+ for (Object o : oldParams)
+ newParams.add(interceptor.onReceive(o));
+
+ req0.params(U.sealList(newParams));
+ }
+ }
+ }
+
+ /**
* Applies {@link ConnectorMessageInterceptor} from
* {@link ConnectorConfiguration#getMessageInterceptor()}
* to all user objects in the response.
@@ -609,9 +656,7 @@ public class GridRestProcessor extends GridProcessorAdapter {
break;
}
}
- }
-
- /**
+ } /**
* Applies interceptor to a response object.
* Specially handler {@link Map} and {@link Collection} responses.
*
@@ -715,10 +760,11 @@ public class GridRestProcessor extends GridProcessorAdapter {
case EXECUTE_SQL_QUERY:
case EXECUTE_SQL_FIELDS_QUERY:
+ case EXECUTE_SCAN_QUERY:
case CLOSE_SQL_QUERY:
case FETCH_SQL_QUERY:
perm = SecurityPermission.CACHE_READ;
- name = ((RestSqlQueryRequest)req).cacheName();
+ name = ((RestQueryRequest)req).cacheName();
break;
@@ -764,6 +810,7 @@ public class GridRestProcessor extends GridProcessorAdapter {
case CACHE_METRICS:
case CACHE_SIZE:
+ case CACHE_METADATA:
case TOPOLOGY:
case NODE:
case VERSION:
@@ -884,15 +931,13 @@ public class GridRestProcessor extends GridProcessorAdapter {
/** Session token id. */
private final UUID sesId;
-
- /** Security context. */
- private volatile SecurityContext secCtx;
-
/**
* Time when session is used last time.
* If this time was set at TIMEDOUT_FLAG, then it should never be changed.
*/
private final AtomicLong lastTouchTime = new AtomicLong(U.currentTimeMillis());
+ /** Security context. */
+ private volatile SecurityContext secCtx;
/**
* @param clientId Client ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
index 9d32c17..1bbc754 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.query.GridCacheSqlMetadata;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.rest.GridRestCommand;
import org.apache.ignite.internal.processors.rest.GridRestResponse;
@@ -73,6 +74,7 @@ import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_G
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_GET_AND_PUT_IF_ABSENT;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_GET_AND_REMOVE;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_GET_AND_REPLACE;
+import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_METADATA;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_METRICS;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_PREPEND;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_PUT;
@@ -119,7 +121,8 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
CACHE_APPEND,
CACHE_PREPEND,
CACHE_METRICS,
- CACHE_SIZE
+ CACHE_SIZE,
+ CACHE_METADATA
);
/** Requests with required parameter {@code key}. */
@@ -151,11 +154,6 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
super(ctx);
}
- /** {@inheritDoc} */
- @Override public Collection<GridRestCommand> supportedCommands() {
- return SUPPORTED_COMMANDS;
- }
-
/**
* Retrieves cache flags from corresponding bits.
*
@@ -172,6 +170,153 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
return false;
}
+ /**
+ * Handles append and prepend commands.
+ *
+ * @param ctx Kernal context.
+ * @param cache Cache.
+ * @param key Key.
+ * @param req Request.
+ * @param prepend Whether to prepend.
+ * @return Future of operation result.
+ * @throws IgniteCheckedException In case of any exception.
+ */
+ private static IgniteInternalFuture<?> appendOrPrepend(
+ final GridKernalContext ctx,
+ final IgniteInternalCache<Object, Object> cache,
+ final Object key, GridRestCacheRequest req, final boolean prepend) throws IgniteCheckedException {
+ assert cache != null;
+ assert key != null;
+ assert req != null;
+
+ final Object val = req.value();
+
+ if (val == null)
+ throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("val"));
+
+ return ctx.closure().callLocalSafe(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+ Object curVal = cache.get(key);
+
+ if (curVal == null)
+ return false;
+
+ // Modify current value with appendix one.
+ Object newVal = appendOrPrepend(curVal, val, !prepend);
+
+ // Put new value asynchronously.
+ cache.put(key, newVal);
+
+ tx.commit();
+ }
+
+ return true;
+ }
+ }, false);
+ }
+
+ /**
+ * Append or prepend new value to the current one.
+ *
+ * @param origVal Original value.
+ * @param appendVal Appendix value to add to the original one.
+ * @param appendPlc Append or prepend policy flag.
+ * @return Resulting value.
+ * @throws IgniteCheckedException In case of grid exceptions.
+ */
+ private static Object appendOrPrepend(Object origVal, Object appendVal, boolean appendPlc) throws IgniteCheckedException {
+ // Strings.
+ if (appendVal instanceof String && origVal instanceof String)
+ return appendPlc ? origVal + (String)appendVal : (String)appendVal + origVal;
+
+ // Maps.
+ if (appendVal instanceof Map && origVal instanceof Map) {
+ Map<Object, Object> origMap = (Map<Object, Object>)origVal;
+ Map<Object, Object> appendMap = (Map<Object, Object>)appendVal;
+
+ Map<Object, Object> map = X.cloneObject(origMap, false, true);
+
+ if (appendPlc)
+ map.putAll(appendMap); // Append.
+ else {
+ map.clear();
+ map.putAll(appendMap); // Prepend.
+ map.putAll(origMap);
+ }
+
+ for (Map.Entry<Object, Object> e : appendMap.entrySet()) // Remove zero-valued entries.
+ if (e.getValue() == null && map.get(e.getKey()) == null)
+ map.remove(e.getKey());
+
+ return map;
+ }
+
+ // Generic collection.
+ if (appendVal instanceof Collection<?> && origVal instanceof Collection<?>) {
+ Collection<Object> origCol = (Collection<Object>)origVal;
+ Collection<Object> appendCol = (Collection<Object>)appendVal;
+
+ Collection<Object> col = X.cloneObject(origCol, false, true);
+
+ if (appendPlc)
+ col.addAll(appendCol); // Append.
+ else {
+ col.clear();
+ col.addAll(appendCol); // Prepend.
+ col.addAll(origCol);
+ }
+
+ return col;
+ }
+
+ throw new IgniteCheckedException("Incompatible types [appendVal=" + appendVal + ", old=" + origVal + ']');
+ }
+
+ /**
+ * Creates a transformation function from {@link CacheCommand}'s results into {@link GridRestResponse}.
+ *
+ * @param c Cache instance to obtain affinity data.
+ * @param key Affinity key for previous operation.
+ * @return Rest response.
+ */
+ private static IgniteClosure<IgniteInternalFuture<?>, GridRestResponse> resultWrapper(
+ final IgniteInternalCache<Object, Object> c, @Nullable final Object key) {
+ return new CX1<IgniteInternalFuture<?>, GridRestResponse>() {
+ @Override public GridRestResponse applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException {
+ GridCacheRestResponse resp = new GridCacheRestResponse();
+
+ resp.setResponse(f.get());
+
+ if (key != null)
+ resp.setAffinityNodeId(c.cache().affinity().mapKeyToNode(key).id().toString());
+
+ return resp;
+ }
+ };
+ }
+
+ /**
+ * @param ignite Grid instance.
+ * @param cacheName Name of the cache.
+ * @return Instance on the named cache.
+ * @throws IgniteCheckedException If cache not found.
+ */
+ private static IgniteInternalCache<Object, Object> cache(Ignite ignite, String cacheName) throws IgniteCheckedException {
+ IgniteInternalCache<Object, Object> cache = ((IgniteKernal)ignite).getCache(cacheName);
+
+ if (cache == null)
+ throw new IgniteCheckedException(
+ "Failed to find cache for given cache name (null for default cache): " + cacheName);
+
+ return cache;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<GridRestCommand> supportedCommands() {
+ return SUPPORTED_COMMANDS;
+ }
+
/** {@inheritDoc} */
@Override public IgniteInternalFuture<GridRestResponse> handleAsync(final GridRestRequest req) {
assert req instanceof GridRestCacheRequest : "Invalid command for topology handler: " + req;
@@ -224,6 +369,25 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
break;
}
+ case CACHE_METADATA: {
+ IgniteInternalCache<?, ?> cache = ctx.cache().cache(cacheName);
+
+ if (cache != null) {
+ GridCacheSqlMetadata res = F.first(cache.context().queries().sqlMetadata());
+
+ fut = new GridFinishedFuture<>(new GridRestResponse(res));
+ }
+ else {
+ ClusterGroup prj = ctx.grid().cluster().forDataNodes(cacheName);
+
+ ctx.task().setThreadContext(TC_NO_FAILOVER, true);
+
+ fut = ctx.closure().callAsync(BALANCE, new MetadataCommand(cacheName), prj.nodes());
+ }
+
+ break;
+ }
+
case CACHE_CONTAINS_KEYS: {
fut = executeCommand(req.destinationId(), req.clientId(), cacheName, skipStore, key,
new ContainsKeysCommand(getKeys(req0)));
@@ -545,138 +709,12 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
}
}
- /**
- * Handles append and prepend commands.
- *
- * @param ctx Kernal context.
- * @param cache Cache.
- * @param key Key.
- * @param req Request.
- * @param prepend Whether to prepend.
- * @return Future of operation result.
- * @throws IgniteCheckedException In case of any exception.
- */
- private static IgniteInternalFuture<?> appendOrPrepend(
- final GridKernalContext ctx,
- final IgniteInternalCache<Object, Object> cache,
- final Object key, GridRestCacheRequest req, final boolean prepend) throws IgniteCheckedException {
- assert cache != null;
- assert key != null;
- assert req != null;
-
- final Object val = req.value();
-
- if (val == null)
- throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("val"));
-
- return ctx.closure().callLocalSafe(new Callable<Object>() {
- @Override public Object call() throws Exception {
- try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
- Object curVal = cache.get(key);
-
- if (curVal == null)
- return false;
-
- // Modify current value with appendix one.
- Object newVal = appendOrPrepend(curVal, val, !prepend);
-
- // Put new value asynchronously.
- cache.put(key, newVal);
-
- tx.commit();
- }
-
- return true;
- }
- }, false);
- }
-
- /**
- * Append or prepend new value to the current one.
- *
- * @param origVal Original value.
- * @param appendVal Appendix value to add to the original one.
- * @param appendPlc Append or prepend policy flag.
- * @return Resulting value.
- * @throws IgniteCheckedException In case of grid exceptions.
- */
- private static Object appendOrPrepend(Object origVal, Object appendVal, boolean appendPlc) throws IgniteCheckedException {
- // Strings.
- if (appendVal instanceof String && origVal instanceof String)
- return appendPlc ? origVal + (String)appendVal : (String)appendVal + origVal;
-
- // Maps.
- if (appendVal instanceof Map && origVal instanceof Map) {
- Map<Object, Object> origMap = (Map<Object, Object>)origVal;
- Map<Object, Object> appendMap = (Map<Object, Object>)appendVal;
-
- Map<Object, Object> map = X.cloneObject(origMap, false, true);
-
- if (appendPlc)
- map.putAll(appendMap); // Append.
- else {
- map.clear();
- map.putAll(appendMap); // Prepend.
- map.putAll(origMap);
- }
-
- for (Map.Entry<Object, Object> e : appendMap.entrySet()) // Remove zero-valued entries.
- if (e.getValue() == null && map.get(e.getKey()) == null)
- map.remove(e.getKey());
-
- return map;
- }
-
- // Generic collection.
- if (appendVal instanceof Collection<?> && origVal instanceof Collection<?>) {
- Collection<Object> origCol = (Collection<Object>)origVal;
- Collection<Object> appendCol = (Collection<Object>)appendVal;
-
- Collection<Object> col = X.cloneObject(origCol, false, true);
-
- if (appendPlc)
- col.addAll(appendCol); // Append.
- else {
- col.clear();
- col.addAll(appendCol); // Prepend.
- col.addAll(origCol);
- }
-
- return col;
- }
-
- throw new IgniteCheckedException("Incompatible types [appendVal=" + appendVal + ", old=" + origVal + ']');
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheCommandHandler.class, this);
}
/**
- * Creates a transformation function from {@link CacheCommand}'s results into {@link GridRestResponse}.
- *
- * @param c Cache instance to obtain affinity data.
- * @param key Affinity key for previous operation.
- * @return Rest response.
- */
- private static IgniteClosure<IgniteInternalFuture<?>, GridRestResponse> resultWrapper(
- final IgniteInternalCache<Object, Object> c, @Nullable final Object key) {
- return new CX1<IgniteInternalFuture<?>, GridRestResponse>() {
- @Override public GridRestResponse applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException {
- GridCacheRestResponse resp = new GridCacheRestResponse();
-
- resp.setResponse(f.get());
-
- if (key != null)
- resp.setAffinityNodeId(c.cache().affinity().mapKeyToNode(key).id().toString());
-
- return resp;
- }
- };
- }
-
- /**
* @param cacheName Cache name.
* @return If replicated cache with given name is locally available.
*/
@@ -702,22 +740,6 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
}
/**
- * @param ignite Grid instance.
- * @param cacheName Name of the cache.
- * @return Instance on the named cache.
- * @throws IgniteCheckedException If cache not found.
- */
- private static IgniteInternalCache<Object, Object> cache(Ignite ignite, String cacheName) throws IgniteCheckedException {
- IgniteInternalCache<Object, Object> cache = ((IgniteKernal)ignite).getCache(cacheName);
-
- if (cache == null)
- throw new IgniteCheckedException(
- "Failed to find cache for given cache name (null for default cache): " + cacheName);
-
- return cache;
- }
-
- /**
* Fixed result closure.
*/
private static final class FixedResult extends CX1<IgniteInternalFuture<?>, Object> {
@@ -771,22 +793,16 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
private static class FlaggedCacheOperationCallable implements Callable<GridRestResponse>, Serializable {
/** */
private static final long serialVersionUID = 0L;
-
- /** Client ID. */
- private UUID clientId;
-
/** */
private final String cacheName;
-
/** */
private final boolean skipStore;
-
/** */
private final CacheProjectionCommand op;
-
/** */
private final Object key;
-
+ /** Client ID. */
+ private UUID clientId;
/** */
@IgniteInstanceResource
private Ignite g;
@@ -829,19 +845,14 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
private static class CacheOperationCallable implements Callable<GridRestResponse>, Serializable {
/** */
private static final long serialVersionUID = 0L;
-
- /** Client ID. */
- private UUID clientId;
-
/** */
private final String cacheName;
-
/** */
private final CacheCommand op;
-
/** */
private final Object key;
-
+ /** Client ID. */
+ private UUID clientId;
/** */
@IgniteInstanceResource
private Ignite g;
@@ -892,6 +903,31 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
}
/** */
+ private static class MetadataCommand implements Callable<GridRestResponse>, Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final String cacheName;
+
+ /** */
+ @IgniteInstanceResource
+ private Ignite g;
+
+ /**
+ * @param cacheName Cache name.
+ */
+ private MetadataCommand(String cacheName) {
+ this.cacheName = cacheName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridRestResponse call() throws Exception {
+ return new GridRestResponse(F.first(cache(g, cacheName).context().queries().sqlMetadata()));
+ }
+ }
+
+ /** */
private static class ContainsKeysCommand extends CacheProjectionCommand {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
index 64c7673..f4ddd59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.rest.handlers.query;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -25,8 +27,10 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.internal.GridKernalContext;
@@ -37,12 +41,14 @@ import org.apache.ignite.internal.processors.rest.GridRestCommand;
import org.apache.ignite.internal.processors.rest.GridRestResponse;
import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandlerAdapter;
import org.apache.ignite.internal.processors.rest.request.GridRestRequest;
-import org.apache.ignite.internal.processors.rest.request.RestSqlQueryRequest;
+import org.apache.ignite.internal.processors.rest.request.RestQueryRequest;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CLOSE_SQL_QUERY;
+import static org.apache.ignite.internal.processors.rest.GridRestCommand.EXECUTE_SCAN_QUERY;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.EXECUTE_SQL_FIELDS_QUERY;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.EXECUTE_SQL_QUERY;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.FETCH_SQL_QUERY;
@@ -54,6 +60,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** Supported commands. */
private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(EXECUTE_SQL_QUERY,
EXECUTE_SQL_FIELDS_QUERY,
+ EXECUTE_SCAN_QUERY,
FETCH_SQL_QUERY,
CLOSE_SQL_QUERY);
@@ -70,6 +77,76 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
super(ctx);
}
+ /**
+ * @param qryCurs Query cursors.
+ * @param cur Current cursor.
+ * @param req Sql request.
+ * @param qryId Query id.
+ * @return Query result with items.
+ */
+ private static CacheQueryResult createQueryResult(
+ ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs,
+ Iterator cur, RestQueryRequest req, Long qryId) {
+ CacheQueryResult res = new CacheQueryResult();
+
+ List<Object> items = new ArrayList<>();
+
+ for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i)
+ items.add(cur.next());
+
+ res.setItems(items);
+
+ res.setLast(!cur.hasNext());
+
+ res.setQueryId(qryId);
+
+ if (!cur.hasNext())
+ qryCurs.remove(qryId);
+
+ return res;
+ }
+
+ /**
+ * Creates class instance.
+ *
+ * @param cls Target class.
+ * @param clsName Implementing class name.
+ * @return Class instance.
+ * @throws IgniteException If failed.
+ */
+ private static <T> T instance(Class<? extends T> cls, String clsName) throws IgniteException {
+ try {
+ Class<?> implCls = Class.forName(clsName);
+
+ if (!cls.isAssignableFrom(implCls))
+ throw new IgniteException("Failed to create instance (target class does not extend or implement " +
+ "required class or interface) [cls=" + cls.getName() + ", clsName=" + clsName + ']');
+
+ Constructor<?> ctor = implCls.getConstructor();
+
+ return (T)ctor.newInstance();
+ }
+ catch (ClassNotFoundException e) {
+ throw new IgniteException("Failed to find target class: " + clsName, e);
+ }
+ catch (NoSuchMethodException e) {
+ throw new IgniteException("Failed to find constructor for provided arguments " +
+ "[clsName=" + clsName + ']', e);
+ }
+ catch (InstantiationException e) {
+ throw new IgniteException("Failed to instantiate target class " +
+ "[clsName=" + clsName + ']', e);
+ }
+ catch (IllegalAccessException e) {
+ throw new IgniteException("Failed to instantiate class (constructor is not available) " +
+ "[clsName=" + clsName + ']', e);
+ }
+ catch (InvocationTargetException e) {
+ throw new IgniteException("Failed to instantiate class (constructor threw an exception) " +
+ "[clsName=" + clsName + ']', e.getCause());
+ }
+ }
+
/** {@inheritDoc} */
@Override public Collection<GridRestCommand> supportedCommands() {
return SUPPORTED_COMMANDS;
@@ -80,23 +157,24 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
assert req != null;
assert SUPPORTED_COMMANDS.contains(req.command());
- assert req instanceof RestSqlQueryRequest : "Invalid type of query request.";
+ assert req instanceof RestQueryRequest : "Invalid type of query request.";
switch (req.command()) {
case EXECUTE_SQL_QUERY:
- case EXECUTE_SQL_FIELDS_QUERY: {
+ case EXECUTE_SQL_FIELDS_QUERY:
+ case EXECUTE_SCAN_QUERY: {
return ctx.closure().callLocalSafe(
- new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, qryCurs), false);
+ new ExecuteQueryCallable(ctx, (RestQueryRequest)req, qryCurs), false);
}
case FETCH_SQL_QUERY: {
return ctx.closure().callLocalSafe(
- new FetchQueryCallable((RestSqlQueryRequest)req, qryCurs), false);
+ new FetchQueryCallable(ctx, (RestQueryRequest)req, qryCurs), false);
}
case CLOSE_SQL_QUERY: {
return ctx.closure().callLocalSafe(
- new CloseQueryCallable((RestSqlQueryRequest)req, qryCurs), false);
+ new CloseQueryCallable((RestQueryRequest)req, qryCurs), false);
}
}
@@ -111,7 +189,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
private GridKernalContext ctx;
/** Execute query request. */
- private RestSqlQueryRequest req;
+ private RestQueryRequest req;
/** Queries cursors. */
private ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs;
@@ -121,7 +199,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
* @param req Execute query request.
* @param qryCurs Queries cursors.
*/
- public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req,
+ public ExecuteQueryCallable(GridKernalContext ctx, RestQueryRequest req,
ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) {
this.ctx = ctx;
this.req = req;
@@ -135,15 +213,33 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
try {
Query qry;
- if (req.typeName() != null) {
- qry = new SqlQuery(req.typeName(), req.sqlQuery());
+ switch (req.queryType()) {
+ case SQL:
+ qry = new SqlQuery(req.typeName(), req.sqlQuery());
- ((SqlQuery)qry).setArgs(req.arguments());
- }
- else {
- qry = new SqlFieldsQuery(req.sqlQuery());
+ ((SqlQuery)qry).setArgs(req.arguments());
+
+ break;
+
+ case SQL_FIELDS:
+ qry = new SqlFieldsQuery(req.sqlQuery());
+
+ ((SqlFieldsQuery)qry).setArgs(req.arguments());
- ((SqlFieldsQuery)qry).setArgs(req.arguments());
+ break;
+
+ case SCAN:
+ IgniteBiPredicate pred = null;
+
+ if (req.className() != null)
+ pred = instance(IgniteBiPredicate.class, req.className());
+
+ qry = new ScanQuery(pred);
+
+ break;
+
+ default:
+ throw new IgniteException("Incorrect query type [type=" + req.queryType() + "]");
}
IgniteCache<Object, Object> cache = ctx.grid().cache(req.cacheName());
@@ -160,9 +256,25 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
CacheQueryResult res = createQueryResult(qryCurs, cur, req, qryId);
- List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl<?>) qryCur).fieldsMeta();
+ switch (req.queryType()) {
+ case SQL:
+ case SQL_FIELDS:
+ List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta();
+
+ res.setFieldsMetadata(convertMetadata(fieldsMeta));
- res.setFieldsMetadata(convertMetadata(fieldsMeta));
+ break;
+ case SCAN:
+ CacheQueryFieldsMetaResult keyField = new CacheQueryFieldsMetaResult();
+ keyField.setFieldName("key");
+
+ CacheQueryFieldsMetaResult valField = new CacheQueryFieldsMetaResult();
+ valField.setFieldName("value");
+
+ res.setFieldsMetadata(U.sealList(keyField, valField));
+
+ break;
+ }
return new GridRestResponse(res);
}
@@ -193,17 +305,16 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
* Close query callable.
*/
private static class CloseQueryCallable implements Callable<GridRestResponse> {
- /** Execute query request. */
- private RestSqlQueryRequest req;
-
/** Queries cursors. */
private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs;
+ /** Execute query request. */
+ private RestQueryRequest req;
/**
* @param req Execute query request.
* @param qryCurs Queries cursors.
*/
- public CloseQueryCallable(RestSqlQueryRequest req,
+ public CloseQueryCallable(RestQueryRequest req,
ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) {
this.req = req;
this.qryCurs = qryCurs;
@@ -236,18 +347,21 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
* Fetch query callable.
*/
private static class FetchQueryCallable implements Callable<GridRestResponse> {
- /** Execute query request. */
- private RestSqlQueryRequest req;
-
/** Queries cursors. */
private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs;
+ /** Grid kernal context. */
+ private final GridKernalContext ctx;
+ /** Execute query request. */
+ private RestQueryRequest req;
/**
+ * @param ctx Grid kernal context.
* @param req Execute query request.
* @param qryCurs Queries cursors.
*/
- public FetchQueryCallable(RestSqlQueryRequest req,
+ public FetchQueryCallable(GridKernalContext ctx, RestQueryRequest req,
ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) {
+ this.ctx = ctx;
this.req = req;
this.qryCurs = qryCurs;
}
@@ -272,33 +386,4 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
}
}
}
-
- /**
- * @param qryCurs Query cursors.
- * @param cur Current cursor.
- * @param req Sql request.
- * @param qryId Query id.
- * @return Query result with items.
- */
- private static CacheQueryResult createQueryResult(
- ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs,
- Iterator cur, RestSqlQueryRequest req, Long qryId) {
- CacheQueryResult res = new CacheQueryResult();
-
- List<Object> items = new ArrayList<>();
-
- for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i)
- items.add(cur.next());
-
- res.setItems(items);
-
- res.setLast(!cur.hasNext());
-
- res.setQueryId(qryId);
-
- if (!cur.hasNext())
- qryCurs.remove(qryId);
-
- return res;
- }
-}
\ No newline at end of file
+}