You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/26 18:47:02 UTC

[7/7] incubator-ignite git commit: #ignite-964: simple query works with pages.

#ignite-964: simple query works with pages.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ed7dd30e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ed7dd30e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ed7dd30e

Branch: refs/heads/ignite-964
Commit: ed7dd30ed613657e0d2576260aeebdf5c181e7a2
Parents: 1d03ca2
Author: ivasilinets <iv...@gridgain.com>
Authored: Fri Jun 26 19:46:37 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Fri Jun 26 19:46:37 2015 +0300

----------------------------------------------------------------------
 .../rest/handlers/query/CacheQueryResult.java   |  32 ++---
 .../handlers/query/QueryCommandHandler.java     | 117 +++++++++++++++++--
 .../IgniteScriptingCommandHandler.java          |   6 +
 .../rest/request/RestSqlQueryRequest.java       |  17 +++
 modules/nodejs/src/main/js/cache.js             |  18 ++-
 modules/nodejs/src/main/js/sql-query.js         |  11 +-
 modules/nodejs/src/test/js/test-query.js        |  22 ++--
 .../http/jetty/GridJettyRestHandler.java        |  12 ++
 8 files changed, 192 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java
index ede8a45..3e49576 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java
@@ -38,70 +38,57 @@ public class CacheQueryResult implements Externalizable {
     /** Last flag. */
     private boolean last;
 
-    /** Node ID. */
-    private UUID nodeId;
-
     /**
      * @return Query ID.
      */
-    public long queryId() {
+    public long getQueryId() {
         return qryId;
     }
 
     /**
      * @param qryId Query ID.
      */
-    public void queryId(long qryId) {
+    public void setQueryId(long qryId) {
         this.qryId = qryId;
     }
 
     /**
      * @return Items.
      */
-    public Collection<?> items() {
+    public Collection<?> getItems() {
         return items;
     }
 
     /**
      * @param items Items.
      */
-    public void items(Collection<?> items) {
+    public void setItems(Collection<?> items) {
         this.items = items;
     }
 
     /**
      * @return Last flag.
      */
-    public boolean last() {
+    public boolean getLast() {
         return last;
     }
 
     /**
      * @param last Last flag.
      */
-    public void last(boolean last) {
+    public void setLast(boolean last) {
         this.last = last;
     }
 
-    /**
-     * @return Node ID.
-     */
-    public UUID nodeId() {
-        return nodeId;
-    }
-
-    /**
-     * @param nodeId Node ID.
-     */
-    public void nodeId(UUID nodeId) {
-        this.nodeId = nodeId;
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheQueryResult.class, this);
     }
 
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeBoolean(last);
         out.writeLong(qryId);
-        U.writeUuid(out, nodeId);
         U.writeCollection(out, items);
     }
 
@@ -109,7 +96,6 @@ public class CacheQueryResult implements Externalizable {
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         last = in.readBoolean();
         qryId = in.readLong();
-        nodeId = U.readUuid(in);
         items = U.readCollection(in);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
index f31e246..82f3726 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.rest.handlers.query;
 
-import org.apache.ignite.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.rest.*;
@@ -29,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import javax.cache.*;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.*;
 
@@ -37,7 +37,15 @@ import static org.apache.ignite.internal.processors.rest.GridRestCommand.*;
  */
 public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
     /** Supported commands. */
-    private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(EXECUTE_SQL_QUERY);
+    private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(EXECUTE_SQL_QUERY,
+        FETCH_SQL_QUERY);
+
+    /** Query ID sequence. */
+    private static final AtomicLong qryIdGen = new AtomicLong();
+
+    /** Current queries. */
+    private final ConcurrentHashMap<Long, Iterator<Cache.Entry<String, String>>> curs =
+        new ConcurrentHashMap<>();
 
     /**
      * @param ctx Context.
@@ -61,7 +69,15 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
             case EXECUTE_SQL_QUERY: {
                 assert req instanceof RestSqlQueryRequest : "Invalid type of query request.";
 
-                return ctx.closure().callLocalSafe(new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req),false);
+                return ctx.closure().callLocalSafe(
+                    new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, curs), false);
+            }
+
+            case FETCH_SQL_QUERY: {
+                assert req instanceof RestSqlQueryRequest : "Invalid type of query request.";
+
+                return ctx.closure().callLocalSafe(
+                    new FetchQueryCallable(ctx, (RestSqlQueryRequest)req, curs), false);
             }
         }
 
@@ -72,19 +88,27 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
      * Execute query callable.
      */
     private static class ExecuteQueryCallable implements Callable<GridRestResponse> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
         /** Kernal context. */
         private GridKernalContext ctx;
 
         /** Execute query request. */
         private RestSqlQueryRequest req;
 
+        /** Queries cursors. */
+        private ConcurrentHashMap<Long, Iterator<Cache.Entry<String, String>>> curs;
+
         /**
          * @param ctx Kernal context.
          * @param req Execute query request.
          */
-        public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req) {
+        public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req,
+            ConcurrentHashMap<Long, Iterator<Cache.Entry<String, String>>> curs) {
             this.ctx = ctx;
             this.req = req;
+            this.curs = curs;
         }
 
         /** {@inheritDoc} */
@@ -92,11 +116,90 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
             try {
                 SqlQuery<String, String> qry = new SqlQuery(String.class, req.sqlQuery());
 
-                IgniteCache<Object, Object> cache = ctx.grid().cache(req.cacheName());
+                Iterator<Cache.Entry<String, String>> cur =
+                    ctx.grid().cache(req.cacheName()).query(qry).iterator();
+
+                long qryId = qryIdGen.getAndIncrement();
+
+                curs.put(qryId, cur);
+
+                List<Cache.Entry<String, String>> res = new ArrayList<>();
+
+                CacheQueryResult response = new CacheQueryResult();
+
+                for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i)
+                    res.add(cur.next());
+
+                response.setItems(res);
+
+                response.setLast(!cur.hasNext());
+
+                response.setQueryId(qryId);
+
+                if (!cur.hasNext())
+                    curs.remove(qryId);
+
+                return new GridRestResponse(response);
+            }
+            catch (Exception e) {
+                return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Fetch query callable.
+     */
+    private static class FetchQueryCallable implements Callable<GridRestResponse> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Kernal context. */
+        private GridKernalContext ctx;
+
+        /** Execute query request. */
+        private RestSqlQueryRequest req;
+
+        /** Queries cursors. */
+        private ConcurrentHashMap<Long, Iterator<Cache.Entry<String, String>>> curs;
+
+        /**
+         * @param ctx Kernal context.
+         * @param req Execute query request.
+         */
+        public FetchQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req,
+            ConcurrentHashMap<Long, Iterator<Cache.Entry<String, String>>> curs) {
+            this.ctx = ctx;
+            this.req = req;
+            this.curs = curs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridRestResponse call() throws Exception {
+            try {
+                if (curs.contains(req.queryId()))
+                    return new GridRestResponse(GridRestResponse.STATUS_FAILED,
+                        "Cannot find query [qryId=" + req.queryId() + "]");
+
+                Iterator<Cache.Entry<String, String>> cur = curs.get(req.queryId());
+
+                List<Cache.Entry<String, String>> res = new ArrayList<>();
+
+                CacheQueryResult response = new CacheQueryResult();
+
+                for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i)
+                    res.add(cur.next());
+
+                response.setItems(res);
+
+                response.setLast(!cur.hasNext());
+
+                response.setQueryId(req.queryId());
 
-                List<Cache.Entry<String, String>> res = cache.query(qry).getAll();
+                if (!cur.hasNext())
+                    curs.remove(req.queryId());
 
-                return new GridRestResponse(res);
+                return new GridRestResponse(response);
             }
             catch (Exception e) {
                 return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java
index 2d65016..d7525a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java
@@ -260,6 +260,9 @@ public class IgniteScriptingCommandHandler extends GridRestCommandHandlerAdapter
      * Run script callable.
      */
     private static class RunScriptCallable implements Callable<GridRestResponse> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
         /** Kernal context. */
         private GridKernalContext ctx;
 
@@ -291,6 +294,9 @@ public class IgniteScriptingCommandHandler extends GridRestCommandHandlerAdapter
      * Map reduce callable.
      */
     private static class MapReduceCallable implements Callable<GridRestResponse> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
         /** Kernal context. */
         private GridKernalContext ctx;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java
index 5731a03..3a0005c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java
@@ -33,6 +33,9 @@ public class RestSqlQueryRequest extends GridRestRequest {
     /** Cache name. */
     private String cacheName;
 
+    /** Query id. */
+    private Long qryId;
+
     /**
      * @param sqlQry Sql query.
      */
@@ -88,4 +91,18 @@ public class RestSqlQueryRequest extends GridRestRequest {
     public String cacheName() {
         return cacheName;
     }
+
+    /**
+     * @param id Query id.
+     */
+    public void queryId(Long id) {
+        this.qryId = id;
+    }
+
+    /**
+     * @return Query id.
+     */
+    public Long queryId() {
+        return qryId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/nodejs/src/main/js/cache.js
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/main/js/cache.js b/modules/nodejs/src/main/js/cache.js
index cdbd6d4..56a2def 100644
--- a/modules/nodejs/src/main/js/cache.js
+++ b/modules/nodejs/src/main/js/cache.js
@@ -134,21 +134,33 @@ Cache.prototype.getAll = function(keys, callback) {
  * @param {SqlQuery} qry Query
  */
 Cache.prototype.query = function(qry) {
-    function onQuery(qry, error, res) {
+    function onQueryExecute(qry, error, res) {
         if (error) {
             qry.error(error);
 
             return;
         }
 
-        qry.end(res);
+        qry.page(res["items"]);
+
+        if (res["last"]) {
+            qry.end();
+        }
+        else {
+            this._server.runCommand("qryfetch", [
+                Server.pair("cacheName", this._cacheName),
+                Server.pair("qryId", res.queryId),
+                Server.pair("psz", qry.pageSize())],
+                onQueryExecute.bind(this, qry, res["queryId"]));
+        }
     }
 
     this._server.runCommand("qryexecute", [
         Server.pair("cacheName", this._cacheName),
         Server.pair("qry", qry.query()),
         Server.pair("arg", qry.arguments()),
-        Server.pair("psz", qry.pageSize())], onQuery.bind(null, qry));
+        Server.pair("psz", qry.pageSize())],
+        onQueryExecute.bind(this, qry));
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/nodejs/src/main/js/sql-query.js
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/main/js/sql-query.js b/modules/nodejs/src/main/js/sql-query.js
index ea3b23b..ba554a3 100644
--- a/modules/nodejs/src/main/js/sql-query.js
+++ b/modules/nodejs/src/main/js/sql-query.js
@@ -22,8 +22,9 @@
 function SqlQuery(sql) {
     this._sql = sql;
     this._arg = [];
-    this._pageSz = 0;
+    this._pageSz = 1;
     this._endFunc = function(res) {console.log("Empty end function is called [res=" + res + "]")};
+    this._pageFunc = function(res) {console.log("Empty page function is called [res=" + res + "]")}
     this._errFunc = function(err) {console.log("Empty error function is called [err=" + err + "]")}
 }
 
@@ -33,6 +34,10 @@ SqlQuery.prototype.on = function(code, f) {
             this._endFunc = f;
 
             break;
+        case "page":
+            this._pageFunc = f;
+
+            break;
         case "error" :
             this._errFunc = f;
 
@@ -50,6 +55,10 @@ SqlQuery.prototype.error = function(err) {
     return this._errFunc(err);
 }
 
+SqlQuery.prototype.page = function(res) {
+    return this._pageFunc(res);
+}
+
 SqlQuery.prototype.query = function() {
     return this._sql;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/nodejs/src/test/js/test-query.js
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/test/js/test-query.js b/modules/nodejs/src/test/js/test-query.js
index a09c5da..bc0fdc2 100644
--- a/modules/nodejs/src/test/js/test-query.js
+++ b/modules/nodejs/src/test/js/test-query.js
@@ -28,21 +28,25 @@ testSqlQuery = function() {
 
         var qry = new SqlQuery("select * from String");
 
-        qry.on("error", function(err) {
-                console.log("!!!!!!!!!!!!Error: " + err);
+        var fullRes = [];
 
+        qry.on("error", function(err) {
                 TestUtils.testFails();
             });
 
-        qry.on("end", function(res) {
-                assert(res.length, 1, "Result length is not correct" +
-                    "[expected=1, val = " + res.length + "]");
+        qry.on("page", function(res) {
+            fullRes = fullRes.concat(res);
+        });
+
+        qry.on("end", function() {
+                assert(fullRes.length, 1, "Result length is not correct" +
+                    "[expected=1, val = " + fullRes.length + "]");
 
-                assert(res[0]["key"] === "key0", "Result value for key is not correct "+
-                    "[expected=key0, real=" + res[0]["key"] + "]");
+                assert(fullRes[0]["key"] === "key0", "Result value for key is not correct "+
+                    "[expected=key0, real=" + fullRes[0]["key"] + "]");
 
-                assert(res[0]["value"] === "val0", "Result value for key is not correct "+
-                    "[expected=val0, real=" + res[0]["value"] + "]");
+                assert(fullRes[0]["value"] === "val0", "Result value for key is not correct "+
+                    "[expected=val0, real=" + fullRes[0]["value"] + "]");
 
                 TestUtils.testDone();
             });

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
----------------------------------------------------------------------
diff --git a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
index c188439..361b713 100644
--- a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
+++ b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
@@ -484,6 +484,18 @@ public class GridJettyRestHandler extends AbstractHandler {
                 break;
             }
 
+            case FETCH_SQL_QUERY: {
+                RestSqlQueryRequest restReq0 = new RestSqlQueryRequest();
+
+                restReq0.queryId(Long.parseLong((String)params.get("qryId")));
+                restReq0.pageSize(Integer.parseInt((String)params.get("psz")));
+                restReq0.cacheName((String)params.get("cacheName"));
+
+                restReq = restReq0;
+
+                break;
+            }
+
             default:
                 throw new IgniteCheckedException("Invalid command: " + cmd);
         }