You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/26 18:47:02 UTC
[7/7] incubator-ignite git commit: #ignite-964: simple query works
with pages.
#ignite-964: simple query works with pages.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ed7dd30e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ed7dd30e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ed7dd30e
Branch: refs/heads/ignite-964
Commit: ed7dd30ed613657e0d2576260aeebdf5c181e7a2
Parents: 1d03ca2
Author: ivasilinets <iv...@gridgain.com>
Authored: Fri Jun 26 19:46:37 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Fri Jun 26 19:46:37 2015 +0300
----------------------------------------------------------------------
.../rest/handlers/query/CacheQueryResult.java | 32 ++---
.../handlers/query/QueryCommandHandler.java | 117 +++++++++++++++++--
.../IgniteScriptingCommandHandler.java | 6 +
.../rest/request/RestSqlQueryRequest.java | 17 +++
modules/nodejs/src/main/js/cache.js | 18 ++-
modules/nodejs/src/main/js/sql-query.js | 11 +-
modules/nodejs/src/test/js/test-query.js | 22 ++--
.../http/jetty/GridJettyRestHandler.java | 12 ++
8 files changed, 192 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java
index ede8a45..3e49576 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java
@@ -38,70 +38,57 @@ public class CacheQueryResult implements Externalizable {
/** Last flag. */
private boolean last;
- /** Node ID. */
- private UUID nodeId;
-
/**
* @return Query ID.
*/
- public long queryId() {
+ public long getQueryId() {
return qryId;
}
/**
* @param qryId Query ID.
*/
- public void queryId(long qryId) {
+ public void setQueryId(long qryId) {
this.qryId = qryId;
}
/**
* @return Items.
*/
- public Collection<?> items() {
+ public Collection<?> getItems() {
return items;
}
/**
* @param items Items.
*/
- public void items(Collection<?> items) {
+ public void setItems(Collection<?> items) {
this.items = items;
}
/**
* @return Last flag.
*/
- public boolean last() {
+ public boolean getLast() {
return last;
}
/**
* @param last Last flag.
*/
- public void last(boolean last) {
+ public void setLast(boolean last) {
this.last = last;
}
- /**
- * @return Node ID.
- */
- public UUID nodeId() {
- return nodeId;
- }
-
- /**
- * @param nodeId Node ID.
- */
- public void nodeId(UUID nodeId) {
- this.nodeId = nodeId;
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheQueryResult.class, this);
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeBoolean(last);
out.writeLong(qryId);
- U.writeUuid(out, nodeId);
U.writeCollection(out, items);
}
@@ -109,7 +96,6 @@ public class CacheQueryResult implements Externalizable {
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
last = in.readBoolean();
qryId = in.readLong();
- nodeId = U.readUuid(in);
items = U.readCollection(in);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
index f31e246..82f3726 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.rest.handlers.query;
-import org.apache.ignite.*;
import org.apache.ignite.cache.query.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.rest.*;
@@ -29,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
import javax.cache.*;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.*;
@@ -37,7 +37,15 @@ import static org.apache.ignite.internal.processors.rest.GridRestCommand.*;
*/
public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** Supported commands. */
- private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(EXECUTE_SQL_QUERY);
+ private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(EXECUTE_SQL_QUERY,
+ FETCH_SQL_QUERY);
+
+ /** Query ID sequence. */
+ private static final AtomicLong qryIdGen = new AtomicLong();
+
+ /** Current queries. */
+ private final ConcurrentHashMap<Long, Iterator<Cache.Entry<String, String>>> curs =
+ new ConcurrentHashMap<>();
/**
* @param ctx Context.
@@ -61,7 +69,15 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
case EXECUTE_SQL_QUERY: {
assert req instanceof RestSqlQueryRequest : "Invalid type of query request.";
- return ctx.closure().callLocalSafe(new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req),false);
+ return ctx.closure().callLocalSafe(
+ new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, curs), false);
+ }
+
+ case FETCH_SQL_QUERY: {
+ assert req instanceof RestSqlQueryRequest : "Invalid type of query request.";
+
+ return ctx.closure().callLocalSafe(
+ new FetchQueryCallable(ctx, (RestSqlQueryRequest)req, curs), false);
}
}
@@ -72,19 +88,27 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
* Execute query callable.
*/
private static class ExecuteQueryCallable implements Callable<GridRestResponse> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
/** Kernal context. */
private GridKernalContext ctx;
/** Execute query request. */
private RestSqlQueryRequest req;
+ /** Queries cursors. */
+ private ConcurrentHashMap<Long, Iterator<Cache.Entry<String, String>>> curs;
+
/**
* @param ctx Kernal context.
* @param req Execute query request.
*/
- public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req) {
+ public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req,
+ ConcurrentHashMap<Long, Iterator<Cache.Entry<String, String>>> curs) {
this.ctx = ctx;
this.req = req;
+ this.curs = curs;
}
/** {@inheritDoc} */
@@ -92,11 +116,90 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
try {
SqlQuery<String, String> qry = new SqlQuery(String.class, req.sqlQuery());
- IgniteCache<Object, Object> cache = ctx.grid().cache(req.cacheName());
+ Iterator<Cache.Entry<String, String>> cur =
+ ctx.grid().cache(req.cacheName()).query(qry).iterator();
+
+ long qryId = qryIdGen.getAndIncrement();
+
+ curs.put(qryId, cur);
+
+ List<Cache.Entry<String, String>> res = new ArrayList<>();
+
+ CacheQueryResult response = new CacheQueryResult();
+
+ for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i)
+ res.add(cur.next());
+
+ response.setItems(res);
+
+ response.setLast(!cur.hasNext());
+
+ response.setQueryId(qryId);
+
+ if (!cur.hasNext())
+ curs.remove(qryId);
+
+ return new GridRestResponse(response);
+ }
+ catch (Exception e) {
+ return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Fetch query callable.
+ */
+ private static class FetchQueryCallable implements Callable<GridRestResponse> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Kernal context. */
+ private GridKernalContext ctx;
+
+ /** Execute query request. */
+ private RestSqlQueryRequest req;
+
+ /** Queries cursors. */
+ private ConcurrentHashMap<Long, Iterator<Cache.Entry<String, String>>> curs;
+
+ /**
+ * @param ctx Kernal context.
+ * @param req Execute query request.
+ */
+ public FetchQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req,
+ ConcurrentHashMap<Long, Iterator<Cache.Entry<String, String>>> curs) {
+ this.ctx = ctx;
+ this.req = req;
+ this.curs = curs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridRestResponse call() throws Exception {
+ try {
+ if (curs.contains(req.queryId()))
+ return new GridRestResponse(GridRestResponse.STATUS_FAILED,
+ "Cannot find query [qryId=" + req.queryId() + "]");
+
+ Iterator<Cache.Entry<String, String>> cur = curs.get(req.queryId());
+
+ List<Cache.Entry<String, String>> res = new ArrayList<>();
+
+ CacheQueryResult response = new CacheQueryResult();
+
+ for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i)
+ res.add(cur.next());
+
+ response.setItems(res);
+
+ response.setLast(!cur.hasNext());
+
+ response.setQueryId(req.queryId());
- List<Cache.Entry<String, String>> res = cache.query(qry).getAll();
+ if (!cur.hasNext())
+ curs.remove(req.queryId());
- return new GridRestResponse(res);
+ return new GridRestResponse(response);
}
catch (Exception e) {
return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java
index 2d65016..d7525a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java
@@ -260,6 +260,9 @@ public class IgniteScriptingCommandHandler extends GridRestCommandHandlerAdapter
* Run script callable.
*/
private static class RunScriptCallable implements Callable<GridRestResponse> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
/** Kernal context. */
private GridKernalContext ctx;
@@ -291,6 +294,9 @@ public class IgniteScriptingCommandHandler extends GridRestCommandHandlerAdapter
* Map reduce callable.
*/
private static class MapReduceCallable implements Callable<GridRestResponse> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
/** Kernal context. */
private GridKernalContext ctx;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java
index 5731a03..3a0005c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java
@@ -33,6 +33,9 @@ public class RestSqlQueryRequest extends GridRestRequest {
/** Cache name. */
private String cacheName;
+ /** Query id. */
+ private Long qryId;
+
/**
* @param sqlQry Sql query.
*/
@@ -88,4 +91,18 @@ public class RestSqlQueryRequest extends GridRestRequest {
public String cacheName() {
return cacheName;
}
+
+ /**
+ * @param id Query id.
+ */
+ public void queryId(Long id) {
+ this.qryId = id;
+ }
+
+ /**
+ * @return Query id.
+ */
+ public Long queryId() {
+ return qryId;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/nodejs/src/main/js/cache.js
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/main/js/cache.js b/modules/nodejs/src/main/js/cache.js
index cdbd6d4..56a2def 100644
--- a/modules/nodejs/src/main/js/cache.js
+++ b/modules/nodejs/src/main/js/cache.js
@@ -134,21 +134,33 @@ Cache.prototype.getAll = function(keys, callback) {
* @param {SqlQuery} qry Query
*/
Cache.prototype.query = function(qry) {
- function onQuery(qry, error, res) {
+ function onQueryExecute(qry, error, res) {
if (error) {
qry.error(error);
return;
}
- qry.end(res);
+ qry.page(res["items"]);
+
+ if (res["last"]) {
+ qry.end();
+ }
+ else {
+ this._server.runCommand("qryfetch", [
+ Server.pair("cacheName", this._cacheName),
+ Server.pair("qryId", res.queryId),
+ Server.pair("psz", qry.pageSize())],
+ onQueryExecute.bind(this, qry, res["queryId"]));
+ }
}
this._server.runCommand("qryexecute", [
Server.pair("cacheName", this._cacheName),
Server.pair("qry", qry.query()),
Server.pair("arg", qry.arguments()),
- Server.pair("psz", qry.pageSize())], onQuery.bind(null, qry));
+ Server.pair("psz", qry.pageSize())],
+ onQueryExecute.bind(this, qry));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/nodejs/src/main/js/sql-query.js
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/main/js/sql-query.js b/modules/nodejs/src/main/js/sql-query.js
index ea3b23b..ba554a3 100644
--- a/modules/nodejs/src/main/js/sql-query.js
+++ b/modules/nodejs/src/main/js/sql-query.js
@@ -22,8 +22,9 @@
function SqlQuery(sql) {
this._sql = sql;
this._arg = [];
- this._pageSz = 0;
+ this._pageSz = 1;
this._endFunc = function(res) {console.log("Empty end function is called [res=" + res + "]")};
+ this._pageFunc = function(res) {console.log("Empty page function is called [res=" + res + "]")}
this._errFunc = function(err) {console.log("Empty error function is called [err=" + err + "]")}
}
@@ -33,6 +34,10 @@ SqlQuery.prototype.on = function(code, f) {
this._endFunc = f;
break;
+ case "page":
+ this._pageFunc = f;
+
+ break;
case "error" :
this._errFunc = f;
@@ -50,6 +55,10 @@ SqlQuery.prototype.error = function(err) {
return this._errFunc(err);
}
+SqlQuery.prototype.page = function(res) {
+ return this._pageFunc(res);
+}
+
SqlQuery.prototype.query = function() {
return this._sql;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/nodejs/src/test/js/test-query.js
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/test/js/test-query.js b/modules/nodejs/src/test/js/test-query.js
index a09c5da..bc0fdc2 100644
--- a/modules/nodejs/src/test/js/test-query.js
+++ b/modules/nodejs/src/test/js/test-query.js
@@ -28,21 +28,25 @@ testSqlQuery = function() {
var qry = new SqlQuery("select * from String");
- qry.on("error", function(err) {
- console.log("!!!!!!!!!!!!Error: " + err);
+ var fullRes = [];
+ qry.on("error", function(err) {
TestUtils.testFails();
});
- qry.on("end", function(res) {
- assert(res.length, 1, "Result length is not correct" +
- "[expected=1, val = " + res.length + "]");
+ qry.on("page", function(res) {
+ fullRes = fullRes.concat(res);
+ });
+
+ qry.on("end", function() {
+ assert(fullRes.length, 1, "Result length is not correct" +
+ "[expected=1, val = " + fullRes.length + "]");
- assert(res[0]["key"] === "key0", "Result value for key is not correct "+
- "[expected=key0, real=" + res[0]["key"] + "]");
+ assert(fullRes[0]["key"] === "key0", "Result value for key is not correct "+
+ "[expected=key0, real=" + fullRes[0]["key"] + "]");
- assert(res[0]["value"] === "val0", "Result value for key is not correct "+
- "[expected=val0, real=" + res[0]["value"] + "]");
+ assert(fullRes[0]["value"] === "val0", "Result value for key is not correct "+
+ "[expected=val0, real=" + fullRes[0]["value"] + "]");
TestUtils.testDone();
});
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
----------------------------------------------------------------------
diff --git a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
index c188439..361b713 100644
--- a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
+++ b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
@@ -484,6 +484,18 @@ public class GridJettyRestHandler extends AbstractHandler {
break;
}
+ case FETCH_SQL_QUERY: {
+ RestSqlQueryRequest restReq0 = new RestSqlQueryRequest();
+
+ restReq0.queryId(Long.parseLong((String)params.get("qryId")));
+ restReq0.pageSize(Integer.parseInt((String)params.get("psz")));
+ restReq0.cacheName((String)params.get("cacheName"));
+
+ restReq = restReq0;
+
+ break;
+ }
+
default:
throw new IgniteCheckedException("Invalid command: " + cmd);
}