You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/07/17 00:58:26 UTC
[26/50] [abbrv] incubator-ignite git commit: #ignite-961: add promises
#ignite-961: add promises
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8616eebb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8616eebb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8616eebb
Branch: refs/heads/ignite-1121
Commit: 8616eebbfd8b89ed260cc86af9a7748a9a1cd3fd
Parents: 4f8810c
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu Jul 16 12:25:37 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu Jul 16 12:25:37 2015 +0300
----------------------------------------------------------------------
examples/src/main/js/cache-api-example.js | 58 +-
examples/src/main/js/cache-put-get-example.js | 75 +-
examples/src/main/js/cache-query-example.js | 73 +-
.../main/js/cache-sql-fields-query-example.js | 64 +-
examples/src/main/js/compute-run-example.js | 38 +-
examples/src/main/js/map-reduce-example.js | 23 +-
.../processors/rest/GridRestCommand.java | 5 +-
.../handlers/query/QueryCommandHandler.java | 110 ++-
modules/nodejs/src/main/js/cache.js | 371 ++++++---
modules/nodejs/src/main/js/compute.js | 36 +-
modules/nodejs/src/main/js/ignite.js | 107 +--
modules/nodejs/src/main/js/ignition.js | 106 ++-
modules/nodejs/src/main/js/package.json | 5 +-
modules/nodejs/src/main/js/server.js | 2 +
.../ignite/internal/NodeJsIgnitionSelfTest.java | 12 +-
.../ignite/internal/NodeJsSqlQuerySelfTest.java | 18 +
modules/nodejs/src/test/js/test-cache-api.js | 799 ++++++++++--------
modules/nodejs/src/test/js/test-compute.js | 814 +++++++++----------
modules/nodejs/src/test/js/test-ignite.js | 104 +--
modules/nodejs/src/test/js/test-ignition.js | 78 +-
modules/nodejs/src/test/js/test-key.js | 56 +-
modules/nodejs/src/test/js/test-query.js | 169 ++--
modules/nodejs/src/test/js/test-utils.js | 11 +-
.../http/jetty/GridJettyRestHandler.java | 11 +
24 files changed, 1760 insertions(+), 1385 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/examples/src/main/js/cache-api-example.js
----------------------------------------------------------------------
diff --git a/examples/src/main/js/cache-api-example.js b/examples/src/main/js/cache-api-example.js
index f708160..1d4ddda 100644
--- a/examples/src/main/js/cache-api-example.js
+++ b/examples/src/main/js/cache-api-example.js
@@ -31,69 +31,57 @@ function main() {
var cacheName = "ApiExampleCache";
/** Connect to node that started with {@code examples/config/js/example-js-cache.xml} configuration. */
- Ignition.start(['127.0.0.1:8000..9000'], null, onConnect);
-
- function onConnect(err, ignite) {
- if (err !== null)
- throw "Start remote node with config examples/config/example-ignite.xml.";
-
+ Ignition.start(['127.0.0.1:8000..9000'], null).then(function(ignite) {
console.log(">>> Cache API example started.");
// Create cache on server with cacheName.
- ignite.getOrCreateCache(cacheName, function(err, cache) {
+ ignite.getOrCreateCache(cacheName).then(function(cache){
atomicMapOperations(ignite, cache);
});
- }
+ }).catch(function(err) {
+ if (err !== null)
+ console.log("Start remote node with config examples/config/example-ignite.xml.");
+ });
/**
* Demonstrates cache operations similar to {@link ConcurrentMap} API. Note that
* cache API is a lot richer than the JDK {@link ConcurrentMap}.
*/
- atomicMapOperations = function(ignite, cache) {
+ function atomicMapOperations (ignite, cache) {
console.log(">>> Cache atomic map operation examples.");
- cache.removeAllFromCache(function(err) {
+ cache.removeAllFromCache().then(function(){
// Put and return previous value.
- cache.getAndPut(1, "1", onGetAndPut)
- });
-
- function onGetAndPut(err, entry) {
+ return cache.getAndPut(1, "1");
+ }).then(function(entry){
console.log(">>> Get and put finished [result=" + entry + "]");
// Put and do not return previous value.
// Performs better when previous value is not needed.
- cache.put(2, "2", onPut);
- }
-
- onPut = function(err) {
+ return cache.put(2, "2")
+ }).then(function(){
console.log(">>> Put finished.");
// Put-if-absent.
- cache.putIfAbsent(4, "44", onPutIfAbsent);
- }
-
- onPutIfAbsent = function(err, res) {
+ return cache.putIfAbsent(4, "44");
+ }).then(function(res){
console.log(">>> Put if absent finished [result=" + res + "]");
// Replace.
- cache.replaceValue(4, "55", "44", onReplaceValue);
- }
-
- onReplaceValue = function(err, res) {
+ return cache.replaceValue(4, "55", "44");
+ }).then(function(res) {
console.log(">>> Replace value finished [result=" + res + "]");
// Replace not correct value.
- cache.replaceValue(4, "555", "44", onEnd);
- }
-
- onEnd = function(err) {
+ return cache.replaceValue(4, "555", "44");
+ }).then(function(res) {
console.log(">>> Replace finished.");
- // Destroying cache.
- ignite.destroyCache(cacheName, function(err) {
- console.log(">>> End of Cache API example.");
- });
- }
+ //Destroying cache.
+ return ignite.destroyCache(cacheName);
+ }).then(function(){
+ console.log(">>> End of Cache API example.");
+ })
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/examples/src/main/js/cache-put-get-example.js
----------------------------------------------------------------------
diff --git a/examples/src/main/js/cache-put-get-example.js b/examples/src/main/js/cache-put-get-example.js
index e12a54d..69478e9 100644
--- a/examples/src/main/js/cache-put-get-example.js
+++ b/examples/src/main/js/cache-put-get-example.js
@@ -32,48 +32,29 @@ function main() {
var cacheName = "PutGetExampleCache";
/** Connect to node that started with {@code examples/config/js/example-js-cache.xml} configuration. */
- Ignition.start(['127.0.0.1:8000..9000'], null, onConnect);
+ Ignition.start(['127.0.0.1:8000..9000'], null).then(function(ignite) {
+ console.log(">>> Cache put-get example started.");
- function onConnect(err, ignite) {
+ // Create cache on server with cacheName.
+ ignite.getOrCreateCache(cacheName).then(function(cache){
+ putGetExample(ignite, cache);
+ });
+ }).catch(function(err) {
if (err !== null)
- throw "Start remote node with config examples/config/example-ignite.xml.";
-
- ignite.getOrCreateCache(cacheName, function(err, cache) { putGetExample(ignite, cache); });
- }
-
- /** Execute individual puts and gets. */
- putGetExample = function(ignite, cache) {
- console.log(">>> Cache put-get example started.");
+ console.log("Start remote node with config examples/config/example-ignite.xml.");
+ });
+ /** Execute puts and gets. */
+ function putGetExample(ignite, cache) {
var key = 1;
- // Store key in cache.
- cache.put(key, "1", onPut);
-
- function onPut(err) {
- console.log(">>> Stored values in cache.");
-
- cache.get(key, onGet);
- }
-
- function onGet(err, res) {
- console.log("Get value=" + res);
-
- putAllGetAll(ignite, cache);
- }
- }
-
- /** Execute bulk {@code putAll(...)} and {@code getAll(...)} operations. */
- function putAllGetAll(ignite, cache) {
- console.log(">>> Starting putAll-getAll example.");
-
var keyCnt = 20;
// Create batch.
var batch = [];
var keys = [];
- for (var i = keyCnt; i < keyCnt + keyCnt; ++i) {
+ for (var i = 0; i < keyCnt; ++i) {
var key = i;
var val = "bulk-" + i;
@@ -81,24 +62,34 @@ function main() {
batch.push(new CacheEntry(key, val));
}
- // Bulk-store entries in cache.
- cache.putAll(batch, onPutAll);
-
- function onPutAll(err) {
+ // Store key in cache.
+ cache.put(key, "1").then(function(){
console.log(">>> Stored values in cache.");
- // Bulk-get values from cache.
- cache.getAll(keys, onGetAll);
- }
+ // Get value.
+ return cache.get(key);
+ }).then(function(entry){
+ console.log(">>> Get finished [result=" + entry + "]");
+
+ console.log(">>> Starting putAll-getAll example.");
- function onGetAll(err, entries) {
+ // Bulk-store entries in cache.
+ return cache.putAll(batch);
+ }).then(function(){
+ console.log(">>> Stored values in cache.");
+
+ // GetAll keys.
+ return cache.getAll(keys);
+ }).then(function(entries){
for (var e of entries) {
- console.log("Got entry [key=" + e.key + ", value=" + e.value + ']');
+ console.log(">>> Got entry [key=" + e.key + ", value=" + e.value + ']');
}
// Destroying cache.
- ignite.destroyCache(cacheName, function(err) { console.log(">>> End of cache put-get example."); });
- }
+ return ignite.destroyCache(cacheName);
+ }).then(function(){
+ console.log(">>> End of cache put-get example.")
+ })
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/examples/src/main/js/cache-query-example.js
----------------------------------------------------------------------
diff --git a/examples/src/main/js/cache-query-example.js b/examples/src/main/js/cache-query-example.js
index 1c6b980..8b455b5 100644
--- a/examples/src/main/js/cache-query-example.js
+++ b/examples/src/main/js/cache-query-example.js
@@ -36,26 +36,25 @@ main = function() {
var cacheName = "CacheQueryExample";
/** Connect to node that started with {@code examples/config/js/example-js-cache.xml} configuration. */
- Ignition.start(['127.0.0.1:8000..9000'], null, onConnect);
-
- function onConnect(err, ignite) {
- if (err !== null)
- throw "Start remote node with config examples/config/example-ignite.xml.";
-
+ Ignition.start(['127.0.0.1:8000..9000'], null).then(function(ignite) {
console.log(">>> Cache query example started.");
- var entries = initializeEntries();
-
- ignite.getOrCreateCache(cacheName, function(err, cache) {
- cacheQuery(ignite, cache, entries);
+ // Create cache on server with cacheName.
+ ignite.getOrCreateCache(cacheName).then(function(cache){
+ cacheQuery(ignite, cache);
});
- }
+ }).catch(function(err) {
+ if (err !== null)
+ console.log("Start remote node with config examples/config/example-ignite.xml.");
+ });
- function cacheQuery(ignite, cache, entries) {
- cache.putAll(entries, onCachePut);
+ // Run query example.
+ function cacheQuery(ignite, cache) {
+ var entries = initializeEntries();
- function onCachePut(err) {
- console.log(">>> Create cache for people.")
+ // Initialize cache.
+ cache.putAll(entries).then(function(){
+ console.log(">>> Create cache for people.");
//SQL clause which selects salaries based on range.
var qry = new SqlQuery("salary > ? and salary <= ?");
@@ -69,27 +68,37 @@ main = function() {
var fullRes = [];
- //This function is called when we get part of query result.
- qry.on("page", function(res) {
- console.log(">>> Get result on page: " + JSON.stringify(res));
+ // Get query cursor.
+ var cursor = ignite.cache(cacheName).query(qry);
- fullRes = fullRes.concat(res);
- });
+ function onQuery(cursor) {
+ var page = cursor.page();
- //This function is called when query is finished.
- qry.on("end", function(err) {
- console.log(">>> People with salaries between 0 and 2000 (queried with SQL query): " +
- JSON.stringify(fullRes));
+ console.log(">>> Get result on page: " + JSON.stringify(page));
- // Destroying cache.
- ignite.destroyCache(cacheName, function(err) {
- console.log(">>> End of query example.");
- });
- });
+ //Concat query page results.
+ fullRes.concat(page);
+
+ // IsFinished return true if it is the last page.
+ if (cursor.isFinished()) {
+ console.log(">>> People with salaries between 0 and 2000 (queried with SQL query): " +
+ JSON.stringify(fullRes));
+
+ //Destroying cache on the end of the example.
+ return ignite.destroyCache(cacheName);
+ }
- //Run query.
- ignite.cache(cacheName).query(qry);
- }
+ //Get Promise for next page.
+ var nextPromise = cursor.nextPage();
+
+ return nextPromise.then(onQuery);
+ }
+
+ // Get query's page.
+ return cursor.nextPage().then(onQuery).then(function(){
+ console.log(">>> End of sql query example.");
+ });
+ })
}
// Initialize cache for people.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/examples/src/main/js/cache-sql-fields-query-example.js
----------------------------------------------------------------------
diff --git a/examples/src/main/js/cache-sql-fields-query-example.js b/examples/src/main/js/cache-sql-fields-query-example.js
index 58eb26b..7290581 100644
--- a/examples/src/main/js/cache-sql-fields-query-example.js
+++ b/examples/src/main/js/cache-sql-fields-query-example.js
@@ -22,6 +22,7 @@ var SqlQuery = apacheIgnite.SqlQuery;
var SqlFieldsQuery = apacheIgnite.SqlFieldsQuery;
var CacheEntry = apacheIgnite.CacheEntry;
+
/**
* Cache queries example. This example demonstrates SQL queries over cache.
* <p>
@@ -32,24 +33,71 @@ var CacheEntry = apacheIgnite.CacheEntry;
*/
main = function() {
/** Cache name. */
- var cacheName = "CacheSqlFieldsQueryExample";
+ var cacheName = "CacheQueryExample";
/** Connect to node that started with {@code examples/config/js/example-js-cache.xml} configuration. */
- Ignition.start(['127.0.0.1:8000..9000'], null, onConnect);
+ Ignition.start(['127.0.0.1:8000..9000'], null).then(function(ignite) {
+ console.log(">>> Cache sql fields query example started.");
- function onConnect(err, ignite) {
+ // Create cache on server with cacheName.
+ ignite.getOrCreateCache(cacheName).then(function(cache){
+ cacheSqlFieldsQuery(ignite, cache);
+ });
+ }).catch(function(err) {
if (err !== null)
- throw "Start remote node with config examples/config/example-ignite.xml.";
-
- console.log(">>> Cache sql fields query example started.");
+ console.log("Start remote node with config examples/config/example-ignite.xml.");
+ });
+ // Run query example.
+ function cacheSqlFieldsQuery(ignite, cache) {
var entries = initializeEntries();
- ignite.getOrCreateCache(cacheName, function(err, cache) {
- cacheSqlFieldsQuery(ignite, cache, entries);
+ // Initialize cache.
+ cache.putAll(entries).then(function(){
+ console.log(">>> Create cache for people.");
+
+ //Sql query to get names of all employees.
+ var qry = new SqlFieldsQuery("select concat(firstName, ' ', lastName) from Person");
+
+ // Set page size for query.
+ qry.setPageSize(2);
+
+ //Set salary range.
+ qry.setArguments([0, 2000]);
+
+ // Run query.
+ ignite.cache(cacheName).query(qry).getAll(function(fullRes){
+ console.log(">>> Names of all employees: " + JSON.stringify(fullRes));
+
+ // Destroying cache on the end of the example.
+ return ignite.destroyCache(cacheName);
+ }).then(function(){
+ console.log(">>> End of sql fields query example.");
+ });
});
}
+ // Initialize cache for people.
+ function initializeEntries() {
+ var key1 = "1";
+ var value1 = {"firstName" : "John", "lastName" : "Doe", "salary" : 2000};
+ var key2 = "2";
+ var value2 = {"firstName" : "Jane", "lastName" : "Doe", "salary" : 1000};
+ var key3 = "3";
+ var value3 = {"firstName" : "John", "lastName" : "Smith", "salary" : 1000};
+ var key4 = "4";
+ var value4 = {"firstName" : "Jane", "lastName" : "Smith", "salary" : 2000};
+ var key5 = "5";
+ var value5 = {"firstName" : "Ann", "lastName" : "Smith", "salary" : 3000};
+
+ return [new CacheEntry(key1, value1), new CacheEntry(key2, value2),
+ new CacheEntry(key3, value3), new CacheEntry(key4, value4)];
+ }
+}
+
+main();
+
+
function cacheSqlFieldsQuery(ignite, cache, entries) {
cache.putAll(entries, onCachePut.bind(null, ignite));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/examples/src/main/js/compute-run-example.js
----------------------------------------------------------------------
diff --git a/examples/src/main/js/compute-run-example.js b/examples/src/main/js/compute-run-example.js
index 189c63f..9d2a9ad 100644
--- a/examples/src/main/js/compute-run-example.js
+++ b/examples/src/main/js/compute-run-example.js
@@ -31,25 +31,24 @@ function main() {
var cacheName = "RunCacheScriptCache";
/** Connect to node that started with {@code examples/config/js/example-js-cache.xml} configuration. */
- Ignition.start(['127.0.0.1:8000..9000'], null, onConnect);
-
- function onConnect(err, ignite) {
- if (err !== null)
- throw "Start remote node with config examples/config/example-ignite.xml.";
-
+ Ignition.start(['127.0.0.1:8000..9000'], null).then(function(ignite) {
console.log(">>> Run cache script example started.");
- ignite.getOrCreateCache(cacheName, function(err, cache) { runCacheScript(ignite, cache); });
- }
+ // Create cache on server with cacheName.
+ ignite.getOrCreateCache(cacheName).then(function(cache){
+ runCacheScript(ignite, cache);
+ });
+ }).catch(function(err) {
+ if (err !== null)
+ console.log("Start remote node with config examples/config/example-ignite.xml.");
+ });
function runCacheScript(ignite, cache) {
var key = "John";
var person = {"firstName": "John", "lastName": "Doe", "salary" : 2000};
// Store person in the cache
- cache.put(key, person, onPut);
-
- function onPut(err) {
+ cache.put(key, person).then(function(){
var job = function (args) {
print(">>> Hello node: " + ignite.name());
@@ -65,16 +64,15 @@ function main() {
return val.salary;
}
- var onRun = function(err, salary) {
- console.log(">>> " + key + "'s salary is " + salary);
-
- // Destroying cache.
- ignite.destroyCache(cacheName, function(err) { console.log(">>> End of run cache script example."); });
- }
-
/** Run remote job on server ignite node with arguments [cacheName, key]. */
- ignite.compute().run(job, [cacheName, key], onRun);
- }
+ return ignite.compute().run(job, [cacheName, key]);
+ }).then(function(salary){
+ console.log(">>> " + key + "'s salary is " + salary);
+
+ return ignite.destroyCache(cacheName);
+ }).then(function() {
+ console.log(">>> End of run cache script example.");
+ });
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/examples/src/main/js/map-reduce-example.js
----------------------------------------------------------------------
diff --git a/examples/src/main/js/map-reduce-example.js b/examples/src/main/js/map-reduce-example.js
index db2da87..326fa3c 100644
--- a/examples/src/main/js/map-reduce-example.js
+++ b/examples/src/main/js/map-reduce-example.js
@@ -32,12 +32,7 @@ var Ignition = apacheIgnite.Ignition;
*/
function main() {
/** Connect to node that started with {@code examples/config/js/example-js-cache.xml} configuration. */
- Ignition.start(['127.0.0.1:8000..9000'], null, onConnect);
-
- function onConnect(err, ignite) {
- if (err !== null)
- throw "Start remote node with config examples/config/example-ignite.xml.";
-
+ Ignition.start(['127.0.0.1:8000..9000'], null).then(function(ignite) {
console.log(">>> Compute map reduce example started.");
/**
@@ -72,14 +67,14 @@ function main() {
return sum;
}
- // Called when map reduced finished.
- var onMapReduce = function(err, cnt) {
- console.log(">>> Total number of characters in the phrase is '" + cnt + "'.");
- console.log(">>> End of compute map reduce example.");
- }
-
- ignite.compute().mapReduce(map, reduce, "Hello Ignite Enabled World!", onMapReduce);
- }
+ return ignite.compute().mapReduce(map, reduce, "Hello Ignite World!");
+ }).then(function(cnt){
+ console.log(">>> Total number of characters in the phrase is '" + cnt + "'.");
+ console.log(">>> End of compute map reduce example.");
+ }).catch(function(err) {
+ if (err !== null)
+ console.log("Start remote node with config examples/config/example-ignite.xml. ");
+ });
}
main();
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java
index 45e86e0..f5c2546 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java
@@ -151,7 +151,10 @@ public enum GridRestCommand {
EXECUTE_SQL_FIELDS_QUERY("qryfieldsexecute"),
/** Fetch query results. */
- FETCH_SQL_QUERY("qryfetch");
+ FETCH_SQL_QUERY("qryfetch"),
+
+ /** Close query. */
+ CLOSE_SQL_QUERY("qryclose");
/** Enum values. */
private static final GridRestCommand[] VALS = values();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
index ec9575c..e2118b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.rest.handlers.query;
+import org.apache.ignite.*;
import org.apache.ignite.cache.query.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.rest.*;
@@ -38,7 +39,8 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** Supported commands. */
private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(EXECUTE_SQL_QUERY,
EXECUTE_SQL_FIELDS_QUERY,
- FETCH_SQL_QUERY);
+ FETCH_SQL_QUERY,
+ CLOSE_SQL_QUERY);
/** Query ID sequence. */
private static final AtomicLong qryIdGen = new AtomicLong();
@@ -46,6 +48,9 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** Current queries. */
private final ConcurrentHashMap<Long, Iterator> curs = new ConcurrentHashMap<>();
+ /** Current queries cursors. */
+ private final ConcurrentHashMap<Long, QueryCursor> qryCurs = new ConcurrentHashMap<>();
+
/**
* @param ctx Context.
*/
@@ -63,21 +68,23 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
assert req != null;
assert SUPPORTED_COMMANDS.contains(req.command());
+ assert req instanceof RestSqlQueryRequest : "Invalid type of query request.";
switch (req.command()) {
case EXECUTE_SQL_QUERY:
case EXECUTE_SQL_FIELDS_QUERY: {
- assert req instanceof RestSqlQueryRequest : "Invalid type of query request.";
-
return ctx.closure().callLocalSafe(
- new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, curs), false);
+ new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, curs, qryCurs), false);
}
case FETCH_SQL_QUERY: {
- assert req instanceof RestSqlQueryRequest : "Invalid type of query request.";
+ return ctx.closure().callLocalSafe(
+ new FetchQueryCallable((RestSqlQueryRequest)req, curs, qryCurs), false);
+ }
+ case CLOSE_SQL_QUERY: {
return ctx.closure().callLocalSafe(
- new FetchQueryCallable((RestSqlQueryRequest)req, curs), false);
+ new CloseQueryCallable((RestSqlQueryRequest)req, curs, qryCurs), false);
}
}
@@ -94,19 +101,23 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** Execute query request. */
private RestSqlQueryRequest req;
- /** Queries cursors. */
+ /** Queries iterators. */
private ConcurrentHashMap<Long, Iterator> curs;
+ /** Queries cursors. */
+ private ConcurrentHashMap<Long, QueryCursor> qryCurs;
+
/**
* @param ctx Kernal context.
* @param req Execute query request.
* @param curs Queries cursors.
*/
public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req,
- ConcurrentHashMap<Long, Iterator> curs) {
+ ConcurrentHashMap<Long, Iterator> curs, ConcurrentHashMap<Long, QueryCursor> qryCurs) {
this.ctx = ctx;
this.req = req;
this.curs = curs;
+ this.qryCurs = qryCurs;
}
/** {@inheritDoc} */
@@ -125,13 +136,22 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
((SqlFieldsQuery)qry).setArgs(req.arguments());
}
- Iterator cur = ctx.grid().cache(req.cacheName()).query(qry).iterator();
+ IgniteCache<Object, Object> cache = ctx.grid().cache(req.cacheName());
+
+ if (cache == null)
+ return new GridRestResponse(GridRestResponse.STATUS_FAILED,
+ "No cache with name. [cacheName=" + req.cacheName() + "]");
+
+ QueryCursor qryCur = cache.query(qry);
+
+ Iterator cur = qryCur.iterator();
long qryId = qryIdGen.getAndIncrement();
+ qryCurs.put(qryId, qryCur);
curs.put(qryId, cur);
- CacheQueryResult res = createQueryResult(curs, cur, req, qryId);
+ CacheQueryResult res = createQueryResult(qryCurs, curs, cur, req, qryId);
return new GridRestResponse(res);
}
@@ -142,23 +162,74 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
}
/**
+ * Close query callable.
+ */
+ private static class CloseQueryCallable implements Callable<GridRestResponse> {
+ /** Execute query request. */
+ private RestSqlQueryRequest req;
+
+ /** Queries iterators. */
+ private ConcurrentHashMap<Long, Iterator> curs;
+
+ /** Queries cursors. */
+ private ConcurrentHashMap<Long, QueryCursor> qryCurs;
+
+ /**
+ * @param req Execute query request.
+ * @param curs Queries cursors.
+ */
+ public CloseQueryCallable(RestSqlQueryRequest req,
+ ConcurrentHashMap<Long, Iterator> curs,
+ ConcurrentHashMap<Long, QueryCursor> qryCurs) {
+ this.req = req;
+ this.curs = curs;
+ this.qryCurs = qryCurs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridRestResponse call() throws Exception {
+ try {
+ QueryCursor cur = qryCurs.get(req.queryId());
+
+ if (cur == null)
+ return new GridRestResponse(GridRestResponse.STATUS_FAILED,
+ "Cannot find query [qryId=" + req.queryId() + "]");
+
+ cur.close();
+
+ return new GridRestResponse(true);
+ }
+ catch (Exception e) {
+ qryCurs.remove(req.queryId());
+ curs.remove(req.queryId());
+
+ return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
+ }
+ }
+ }
+
+ /**
* Fetch query callable.
*/
private static class FetchQueryCallable implements Callable<GridRestResponse> {
/** Execute query request. */
private RestSqlQueryRequest req;
- /** Queries cursors. */
+ /** Queries iterators. */
private ConcurrentHashMap<Long, Iterator> curs;
+ /** Queries cursors. */
+ private ConcurrentHashMap<Long, QueryCursor> qryCurs;
+
/**
* @param req Execute query request.
* @param curs Queries cursors.
*/
- public FetchQueryCallable(RestSqlQueryRequest req,
- ConcurrentHashMap<Long, Iterator> curs) {
+ public FetchQueryCallable(RestSqlQueryRequest req, ConcurrentHashMap<Long, Iterator> curs,
+ ConcurrentHashMap<Long, QueryCursor> qryCurs) {
this.req = req;
this.curs = curs;
+ this.qryCurs = qryCurs;
}
/** {@inheritDoc} */
@@ -170,12 +241,13 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
return new GridRestResponse(GridRestResponse.STATUS_FAILED,
"Cannot find query [qryId=" + req.queryId() + "]");
- CacheQueryResult res = createQueryResult(curs, cur, req, req.queryId());
+ CacheQueryResult res = createQueryResult(qryCurs, curs, cur, req, req.queryId());
return new GridRestResponse(res);
}
catch (Exception e) {
curs.remove(req.queryId());
+ qryCurs.remove(req.queryId());
return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
}
@@ -183,13 +255,15 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
}
/**
- * @param curs Queries cursors.
+ * @param qryCurs Query cursors.
+ * @param curs Queries iterators.
* @param cur Current cursor.
* @param req Sql request.
* @param qryId Query id.
* @return Query result with items.
*/
- private static CacheQueryResult createQueryResult(ConcurrentHashMap<Long, Iterator> curs, Iterator cur,
+ private static CacheQueryResult createQueryResult(ConcurrentHashMap<Long, QueryCursor> qryCurs,
+ ConcurrentHashMap<Long, Iterator> curs, Iterator cur,
RestSqlQueryRequest req, Long qryId) {
CacheQueryResult res = new CacheQueryResult();
@@ -204,8 +278,10 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
res.setQueryId(qryId);
- if (!cur.hasNext())
+ if (!cur.hasNext()) {
+ qryCurs.remove(qryId);
curs.remove(qryId);
+ }
return res;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/nodejs/src/main/js/cache.js
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/main/js/cache.js b/modules/nodejs/src/main/js/cache.js
index 893a945..6cae632 100644
--- a/modules/nodejs/src/main/js/cache.js
+++ b/modules/nodejs/src/main/js/cache.js
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+var Promise = require("bluebird");
var Server = require("./server").Server;
var Command = require("./server").Command;
var SqlFieldsQuery = require("./sql-fields-query").SqlFieldsQuery
@@ -48,12 +49,10 @@ Cache.prototype.name = function() {
*
* @this {Cache}
* @param {string} key Key
- * @param {onGet} callback Called on finish
*/
-Cache.prototype.get = function(key, callback) {
- this._server.runCommand(this._createCommand("get").
- setPostData(JSON.stringify({"key": key})),
- callback);
+Cache.prototype.get = function(key) {
+ return this.__createPromise(this._createCommand("get").
+ setPostData(JSON.stringify({"key": key})));
};
/**
@@ -62,12 +61,10 @@ Cache.prototype.get = function(key, callback) {
* @this {Cache}
* @param {string} key Key
* @param {string} value Value
- * @param {noValue} callback Called on finish
*/
-Cache.prototype.put = function(key, value, callback) {
- this._server.runCommand(this._createCommand("put").
- setPostData(JSON.stringify({"key": key, "val" : value})),
- callback);
+Cache.prototype.put = function(key, value) {
+ return this.__createPromise(this._createCommand("put").
+ setPostData(JSON.stringify({"key": key, "val" : value})));
}
/**
@@ -76,12 +73,10 @@ Cache.prototype.put = function(key, value, callback) {
* @this {Cache}
* @param {string} key Key
* @param {string} value Value
- * @param {onGet} callback Called on finish
*/
-Cache.prototype.putIfAbsent = function(key, value, callback) {
- this._server.runCommand(this._createCommand("putifabsent").
- setPostData(JSON.stringify({"key": key, "val" : value})),
- callback);
+Cache.prototype.putIfAbsent = function(key, value) {
+ return this.__createPromise(this._createCommand("putifabsent").
+ setPostData(JSON.stringify({"key": key, "val" : value})));
}
/**
@@ -89,12 +84,10 @@ Cache.prototype.putIfAbsent = function(key, value, callback) {
*
* @this {Cache}
* @param key Key
- * @param {noValue} callback Called on finish
*/
Cache.prototype.remove = function(key, callback) {
- this._server.runCommand(this._createCommand("rmv").
- setPostData(JSON.stringify({"key": key})),
- callback);
+ return this.__createPromise(this._createCommand("rmv").
+ setPostData(JSON.stringify({"key": key})));
}
/**
@@ -103,12 +96,10 @@ Cache.prototype.remove = function(key, callback) {
* @this {Cache}
* @param key Key
* @param value Value
- * @param {noValue} callback Called on finish
*/
Cache.prototype.removeValue = function(key, value, callback) {
- this._server.runCommand(this._createCommand("rmvvalue").
- setPostData(JSON.stringify({"key": key, "val" : value})),
- callback);
+ return this.__createPromise(this._createCommand("rmvvalue").
+ setPostData(JSON.stringify({"key": key, "val" : value})));
}
/**
@@ -116,12 +107,10 @@ Cache.prototype.removeValue = function(key, value, callback) {
*
* @this {Cache}
* @param {string} key Key
- * @param {onGet} callback Called on finish with previous value
*/
Cache.prototype.getAndRemove = function(key, callback) {
- this._server.runCommand(this._createCommand("getandrmv").
- setPostData(JSON.stringify({"key": key})),
- callback);
+ return this.__createPromise(this._createCommand("getandrmv").
+ setPostData(JSON.stringify({"key": key})));
}
/**
@@ -129,23 +118,19 @@ Cache.prototype.getAndRemove = function(key, callback) {
*
* @this {Cache}
* @param {string[]} keys Keys to remove
- * @param {noValue} callback Called on finish
*/
Cache.prototype.removeAll = function(keys, callback) {
- this._server.runCommand(this._createCommand("rmvall").
- setPostData(JSON.stringify({"keys" : keys})),
- callback);
+ return this.__createPromise(this._createCommand("rmvall").
+ setPostData(JSON.stringify({"keys" : keys})));
}
/**
* Remove all cache keys
*
* @this {Cache}
- * @param {noValue} callback Called on finish
*/
Cache.prototype.removeAllFromCache = function(callback) {
- this._server.runCommand(this._createCommand("rmvall"),
- callback);
+ return this.__createPromise(this._createCommand("rmvall"));
}
/**
@@ -153,11 +138,10 @@ Cache.prototype.removeAllFromCache = function(callback) {
*
* @this {Cache}
* @param {CacheEntry[]} List of entries to put in the cache
- * @param {noValue} callback Called on finish
*/
-Cache.prototype.putAll = function(entries, callback) {
- this._server.runCommand(this._createCommand("putall").setPostData(
- JSON.stringify({"entries" : entries})), callback);
+Cache.prototype.putAll = function(entries) {
+ return this.__createPromise(this._createCommand("putall").setPostData(
+ JSON.stringify({"entries" : entries})));
}
/**
@@ -165,28 +149,27 @@ Cache.prototype.putAll = function(entries, callback) {
*
* @this {Cache}
* @param {Object[]} keys Keys
- * @param {Cache~onGetAll} callback Called on finish
*/
Cache.prototype.getAll = function(keys, callback) {
- function onGetAll(callback, err, res) {
- if (err) {
- callback.call(null, err, null);
-
- return;
- }
-
- var result = [];
-
- for (var key of res) {
- result.push(new CacheEntry(key["key"], key["value"]));
- }
-
- callback.call(null, null, result);
- }
-
- this._server.runCommand(this._createCommand("getall").setPostData(
- JSON.stringify({"keys" : keys})),
- onGetAll.bind(null, callback));
+ var cmd = this._createCommand("getall").setPostData(JSON.stringify({"keys" : keys}));
+
+ var server = this._server;
+ return new Promise(function(resolve, reject) {
+ server.runCommand(cmd, function(err, res) {
+ if(err != null) {
+ reject(err);
+ }
+ else {
+ var result = [];
+
+ for (var key of res) {
+ result.push(new CacheEntry(key["key"], key["value"]));
+ }
+
+ resolve(result);
+ }
+ });
+ });
}
/**
@@ -194,11 +177,10 @@ Cache.prototype.getAll = function(keys, callback) {
*
* @this {Cache}
* @param {Object} key Key
- * @param {Cache~onGetAll} callback Called on finish with boolean result
*/
-Cache.prototype.containsKey = function(key, callback) {
- this._server.runCommand(this._createCommand("containskey").
- setPostData(JSON.stringify({"key" : key})), callback);
+Cache.prototype.containsKey = function(key) {
+ return this.__createPromise(this._createCommand("containskey").
+ setPostData(JSON.stringify({"key" : key})));
}
/**
@@ -206,11 +188,10 @@ Cache.prototype.containsKey = function(key, callback) {
*
* @this {Cache}
* @param {Object[]} keys Keys
- * @param {Cache~onGetAll} callback Called on finish with boolean result
*/
Cache.prototype.containsKeys = function(keys, callback) {
- this._server.runCommand(this._createCommand("containskeys").
- setPostData(JSON.stringify({"keys" : keys})), callback);
+ return this.__createPromise(this._createCommand("containskeys").
+ setPostData(JSON.stringify({"keys" : keys})));
}
/**
@@ -219,11 +200,10 @@ Cache.prototype.containsKeys = function(keys, callback) {
* @this {Cache}
* @param {string} key Key
* @param {string} value Value
- * @param {onGet} callback Called on finish
*/
-Cache.prototype.getAndPut = function(key, val, callback) {
- this._server.runCommand(this._createCommand("getandput").
- setPostData(JSON.stringify({"key" : key, "val" : val})), callback);
+Cache.prototype.getAndPut = function(key, val) {
+ return this.__createPromise(this._createCommand("getandput").
+ setPostData(JSON.stringify({"key" : key, "val" : val})));
}
/**
@@ -232,11 +212,10 @@ Cache.prototype.getAndPut = function(key, val, callback) {
* @this {Cache}
* @param key Key
* @param value Value
- * @param {onGet} callback Called on finish
*/
Cache.prototype.replace = function(key, val, callback) {
- this._server.runCommand(this._createCommand("rep").
- setPostData(JSON.stringify({"key" : key, "val" : val})), callback);
+ return this.__createPromise(this._createCommand("rep").
+ setPostData(JSON.stringify({"key" : key, "val" : val})));
}
/**
@@ -246,11 +225,10 @@ Cache.prototype.replace = function(key, val, callback) {
* @param key Key
* @param value Value
* @param oldVal Old value
- * @param {onGet} callback Called on finish
*/
-Cache.prototype.replaceValue = function(key, val, oldVal, callback) {
- this._server.runCommand(this._createCommand("repVal").
- setPostData(JSON.stringify({"key" : key, "val" : val, "oldVal" : oldVal})), callback);
+Cache.prototype.replaceValue = function(key, val, oldVal) {
+ return this.__createPromise(this._createCommand("repVal").
+ setPostData(JSON.stringify({"key" : key, "val" : val, "oldVal" : oldVal})));
}
/**
@@ -259,11 +237,10 @@ Cache.prototype.replaceValue = function(key, val, oldVal, callback) {
* @this {Cache}
* @param {string} key Key
* @param {string} value Value
- * @param {onGet} callback Called on finish
*/
-Cache.prototype.getAndReplace = function(key, val, callback) {
- this._server.runCommand(this._createCommand("getandreplace").
- setPostData(JSON.stringify({"key" : key, "val" : val})), callback);
+Cache.prototype.getAndReplace = function(key, val) {
+ return this.__createPromise(this._createCommand("getandreplace").
+ setPostData(JSON.stringify({"key" : key, "val" : val})));
}
/**
@@ -272,92 +249,224 @@ Cache.prototype.getAndReplace = function(key, val, callback) {
* @this {Cache}
* @param {string} key Key
* @param {string} value Value
- * @param {onGet} callback Called on finish
*/
-Cache.prototype.getAndPutIfAbsent = function(key, val, callback) {
- this._server.runCommand(this._createCommand("getandputifabsent").
- setPostData(JSON.stringify({"key" : key, "val" : val})), callback);
+Cache.prototype.getAndPutIfAbsent = function(key, val) {
+ return this.__createPromise(this._createCommand("getandputifabsent").
+ setPostData(JSON.stringify({"key" : key, "val" : val})));
}
/**
* @this {Cache}
- * @param {onGet} callback Called on finish
*/
Cache.prototype.size = function(callback) {
- this._server.runCommand(this._createCommand("cachesize"), callback);
+ return this.__createPromise(this._createCommand("cachesize"));
}
/**
* Execute sql query
*
* @param {SqlQuery|SqlFieldsQuery} qry Query
+ * @returns {QueryCursor} Cursor for current query.
*/
Cache.prototype.query = function(qry) {
- function onQueryExecute(qry, error, res) {
- if (error !== null) {
- qry.end(error);
+ return new QueryCursor(this, qry, true, null);
+}
- return;
- }
+Cache.prototype.__createPromise = function(cmd) {
+ var server = this._server;
+
+ return new Promise(function(resolve, reject) {
+ server.runCommand(cmd, function(err, res) {
+ if(err != null) {
+ reject(err);
+ }
+ else {
+ resolve(res);
+ }
+ });
+ });
+}
- qry.page(res["items"]);
+Cache.prototype._createCommand = function(name) {
+ var command = new Command(name);
- if (res["last"]) {
- qry.end(null);
- }
- else {
- var command = this._createCommand("qryfetch");
+ return command.addParam("cacheName", this._cacheName);
+}
- command.addParam("qryId", res.queryId).addParam("psz", qry.pageSize());
+/**
+ * Creates an instance of QueryCursor
+ *
+ * @constructor
+ * @this {QueryCursor}
+ * @param {Cache} cache Cache that runs query
+ * @param {SqlQuery|SqlFieldsQuery} qry Sql query
+ * @param {boolean} init True if query is not started
+ * @param {Object[]} res Current page result
+ */
+QueryCursor = function(cache, qry, init, res) {
+ this._qry = qry;
+ this._cache = cache;
+ this._init = init;
+ this._res = res;
+}
- this._server.runCommand(command, onQueryExecute.bind(this, qry));
- }
+/**
+ * Gets Promise with all query results.
+ * Use this method when you know in advance that query result is
+ * relatively small and will not cause memory utilization issues.
+ * <p>
+ * Since all the results will be fetched, all the resources will be closed
+ * automatically after this call, e.g. there is no need to call close() method in this case.
+ *
+ * @this{QueryCursor}
+ * @returns {Promise} Promise with query result
+ */
+QueryCursor.prototype.getAll = function() {
+ if (!this._init) {
+ return new Promise(function(resolve, reject){
+ reject("GetAll is called after nextPage.");
+ });
}
- if (qry.type() === "Sql") {
- this._sqlQuery(qry, onQueryExecute);
- }
- else {
- this._sqlFieldsQuery(qry, onQueryExecute);
- }
-}
+ var cmd = this._getQueryCommand();
+ var server = this._cache._server;
+ var cursor = this;
+
+ return new Promise(function(resolve, reject) {
+ var fullRes = [];
+
+ onResult = function (err, res){
+ if (err !== null) {
+ reject(err);
+ }
+ else {
+ cursor._res = res;
+
+ fullRes = fullRes.concat(res["items"]);
+
+ if (res["last"]) {
+ resolve(fullRes);
+ }
+ else {
+ server.runCommand(cursor._getQueryCommand(), onResult);
+ }
+ }
+ }
-Cache.prototype._sqlFieldsQuery = function(qry, onQueryExecute) {
- var command = this._createQueryCommand("qryfieldsexecute", qry);
+ server.runCommand(cmd, onResult);
+ });
+}
- command.setPostData(JSON.stringify({"arg" : qry.arguments()}));
+/**
+ * Gets Promise with Cursor on next page of the query results.
+ *
+ * @this{QueryCursor}
+ * @returns {Promise} Promise with Cursor on next page
+ */
+QueryCursor.prototype.nextPage = function() {
+ if (this._res !== null && this._res["last"]) {
+ throw "All pages are returned.";
+ }
- this._server.runCommand(command, onQueryExecute.bind(this, qry));
+ var cmd = this._getQueryCommand();
+ var server = this._cache._server;
+ var qry = this._qry;
+ var cache = this._cache;
+
+ return new Promise(function(resolve, reject) {
+ server.runCommand(cmd, function(err, res) {
+ if(err !== null) {
+ reject(err);
+ }
+ else {
+ resolve(new QueryCursor(cache, qry, false, res));
+ }
+ });
+ });
}
-Cache.prototype._sqlQuery = function(qry, onQueryExecute) {
- if (qry.returnType() == null) {
- qry.end("No type for sql query.");
+/**
+ * Gets collections of the query page results.
+ *
+ * @this{QueryCursor}
+ * @returns {Object[]} Query page result.
+ */
+QueryCursor.prototype.page = function() {
+ if (this._res === null)
+ return null;
- return;
- }
+ return this._res["items"];
+}
- var command = this._createQueryCommand("qryexecute", qry);
+/**
+ * Closes all resources related to this cursor.
+ *
+ * @this{QueryCursor}
+ * @returns {Promise} Promise on cursor close.
+ */
+QueryCursor.prototype.close = function() {
+ if (this._init) {
+ return new Promise(function(resolve, reject) {
+ return resolve(true);
+ });
+ }
- command.addParam("type", qry.returnType());
+ var server = this._cache._server;
+ var cmd = this._createQueryCommand("qryclose", this._qry).addParam("qryId", this._res.queryId);
+
+ return new Promise(function(resolve, reject) {
+ server.runCommand(cmd, function(err, res) {
+ if(err != null) {
+ reject(err);
+ }
+ else {
+ resolve(true);
+ }
+ });
+ });
+}
- command.setPostData(JSON.stringify({"arg" : qry.arguments()}));
+/**
+ * Returns True if the iteration has no more elements.
+ *
+ * @this{QueryCursor}
+ * @returns {boolean} True if it is the last page
+ */
+QueryCursor.prototype.isFinished = function() {
+ if (this._res === null)
+ return false;
- this._server.runCommand(command, onQueryExecute.bind(this, qry));
+ return this._res["last"];
}
-Cache.prototype._createCommand = function(name) {
- var command = new Command(name);
+QueryCursor.prototype._getQueryCommand = function() {
+ if (this._init) {
+ if (this._qry.type() === "Sql") {
+ return this._sqlQuery(this._qry);
+ }
- return command.addParam("cacheName", this._cacheName);
+ this._init = false;
+
+ return this._sqlFieldsQuery(this._qry);
+ }
+
+ return this._cache._createCommand("qryfetch").addParam("qryId", this._res.queryId).
+ addParam("psz", this._qry.pageSize());
}
-Cache.prototype._createQueryCommand = function(name, qry) {
- var command = this._createCommand(name);
+QueryCursor.prototype._sqlFieldsQuery = function(qry) {
+ return this._createQueryCommand("qryfieldsexecute", qry).
+ setPostData(JSON.stringify({"arg" : qry.arguments()}));
+}
- command.addParam("qry", qry.query());
+QueryCursor.prototype._sqlQuery = function(qry) {
+ return this._createQueryCommand("qryexecute", qry).addParam("type", qry.returnType()).
+ setPostData(JSON.stringify({"arg" : qry.arguments()}));
+}
- return command.addParam("psz", qry.pageSize());
+QueryCursor.prototype._createQueryCommand = function(name, qry) {
+ return new Command(name).addParam("cacheName", this._cache._cacheName).
+ addParam("qry", qry.query()).addParam("psz", qry.pageSize());
}
/**
@@ -370,13 +479,5 @@ function CacheEntry(key0, val0) {
this.value = val0;
}
-/**
- * Callback for cache get
- *
- * @callback Cache~onGetAll
- * @param {string} error Error
- * @param {string[]} results Result values
- */
-
exports.Cache = Cache
exports.CacheEntry = CacheEntry
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/nodejs/src/main/js/compute.js
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/main/js/compute.js b/modules/nodejs/src/main/js/compute.js
index 5c28418..53cd11c 100644
--- a/modules/nodejs/src/main/js/compute.js
+++ b/modules/nodejs/src/main/js/compute.js
@@ -31,11 +31,10 @@ function Compute(server) {
* @this {Compute}
* @param job Function
* @param args Function arguments
- * @param {onGet} callback Callback
*/
-Compute.prototype.run = function(job, args, callback) {
- this._server.runCommand(new Command("runscript").addParam("func", job).
- setPostData(JSON.stringify({"arg" : args})), callback);
+Compute.prototype.run = function(job, args) {
+ return this._createPromise(new Command("runscript").addParam("func", job).
+ setPostData(JSON.stringify({"arg" : args})));
}
/**
@@ -46,11 +45,10 @@ Compute.prototype.run = function(job, args, callback) {
* @param {string|number|JSONObject} key Key.
* @param job Function
* @param args Function arguments
- * @param {onGet} callback Callback
*/
-Compute.prototype.affinityRun = function(cacheName, key, job, args, callback) {
- this._server.runCommand(new Command("affrun").addParam("func", job).addParam("cacheName", cacheName).
- setPostData(JSON.stringify({"arg" : args, "key" : key})), callback);
+Compute.prototype.affinityRun = function(cacheName, key, job, args) {
+ return this._createPromise(new Command("affrun").addParam("func", job).addParam("cacheName", cacheName).
+ setPostData(JSON.stringify({"arg" : args, "key" : key})));
}
/**
@@ -61,14 +59,26 @@ Compute.prototype.affinityRun = function(cacheName, key, job, args, callback) {
* @param {onGet} callback Callback
*/
Compute.prototype.mapReduce = function(map, reduce, arg, callback) {
- var command = new Command("excmapreduce");
-
- command.addParam("map", map).addParam("reduce", reduce);
- command.setPostData(JSON.stringify({"arg" : arg}));
+ var cmd = new Command("excmapreduce").addParam("map", map).addParam("reduce", reduce).
+ setPostData(JSON.stringify({"arg" : arg}));
- this._server.runCommand(command, callback);
+ return this._createPromise(cmd);
}
+
+Compute.prototype._createPromise = function(cmd) {
+ var server = this._server;
+ return new Promise(function(resolve, reject) {
+ server.runCommand(cmd, function(err, res) {
+ if (err != null) {
+ reject(err);
+ }
+ else {
+ resolve(res);
+ }
+ });
+ });
+}
/**
* @name EmitFunction
* @function
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/nodejs/src/main/js/ignite.js
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/main/js/ignite.js b/modules/nodejs/src/main/js/ignite.js
index a4a1dd9..c3d90ac 100644
--- a/modules/nodejs/src/main/js/ignite.js
+++ b/modules/nodejs/src/main/js/ignite.js
@@ -15,10 +15,10 @@
* limitations under the License.
*/
+var Promise = require("bluebird");
var Cache = require("./cache").Cache;
var Compute = require("./compute").Compute;
var ClusterNode = require("./cluster-node").ClusterNode;
-var Server = require("./server").Server;
var Command = require("./server").Command;
/**
@@ -56,21 +56,20 @@ Ignite.prototype.cache = function(cacheName) {
*
* @this {Ignite}
* @param {string} Cache name
- * @param callback Callback with cache.
*/
-Ignite.prototype.getOrCreateCache = function(cacheName, callback) {
- var onCreateCallback = function(callback, err, res) {
- if (err !== null) {
- callback.call(null, err, null);
-
- return;
- }
-
- callback.call(null, null, new Cache(this._server, cacheName))
- }
-
- this._server.runCommand(new Command("getorcreatecache").addParam("cacheName", cacheName),
- onCreateCallback.bind(this, callback));
+Ignite.prototype.getOrCreateCache = function(cacheName) {
+ var server = this._server;
+ return new Promise(function(resolve, reject) {
+ server.runCommand(new Command("getorcreatecache").addParam("cacheName", cacheName),
+ function(err, res) {
+ if (err != null) {
+ reject(err);
+ }
+ else {
+ resolve(new Cache(server, cacheName));
+ }
+ });
+ });
}
/**
@@ -78,10 +77,9 @@ Ignite.prototype.getOrCreateCache = function(cacheName, callback) {
*
* @this {Ignite}
* @param {string} cacheName Cache name to stop
- * @param {noValue} callback Callback contains only error
*/
-Ignite.prototype.destroyCache = function(cacheName, callback) {
- this._server.runCommand(new Command("destroycache").addParam("cacheName", cacheName), callback);
+Ignite.prototype.destroyCache = function(cacheName) {
+ return this._createPromise(new Command("destroycache").addParam("cacheName", cacheName));
}
/**
@@ -98,51 +96,62 @@ Ignite.prototype.compute = function() {
* Ignite version
*
* @this {Ignite}
- * @param {onGet} callback Result in callback contains string with Ignite version.
*/
-Ignite.prototype.version = function(callback) {
- this._server.runCommand(new Command("version"), callback);
+Ignite.prototype.version = function() {
+ return this._createPromise(new Command("version"));
}
/**
* Connected ignite name
*
* @this {Ignite}
- * @param {onGet} callback Result in callback contains string with Ignite name.
*/
-Ignite.prototype.name = function(callback) {
- this._server.runCommand(new Command("name"), callback);
+Ignite.prototype.name = function() {
+ return this._createPromise(new Command("name"));
}
/**
* @this {Ignite}
- * @param {onGet} callback Result in callback contains list of ClusterNodes
*/
-Ignite.prototype.cluster = function(callback) {
- function onTop(callback, err, res) {
- if (err) {
- callback.call(null, err, null);
-
- return;
- }
-
- if (!res || res.length == 0) {
- callback.call(null, "Empty topology cluster.", null);
-
- return;
- }
-
- var nodes = [];
-
- for (var node of res) {
- nodes.push(new ClusterNode(node.nodeId, node.attributes));
- }
-
- callback.call(null, null, nodes);
- }
+Ignite.prototype.cluster = function() {
+ var cmd = new Command("top").addParam("attr", "true").addParam("mtr", "false");
+
+ var server = this._server;
+ return new Promise(function(resolve, reject) {
+ server.runCommand(cmd, function(err, res) {
+ if (err != null) {
+ reject(err);
+ }
+ else {
+ if (!res || res.length == 0) {
+ reject("Empty topology cluster.");
+ }
+ else {
+ var nodes = [];
+
+ for (var node of res) {
+ nodes.push(new ClusterNode(node.nodeId, node.attributes));
+ }
+
+ resolve(nodes);
+ }
+ }
+ });
+ });
+}
- this._server.runCommand(new Command("top").addParam("attr", "true").addParam("mtr", "false"),
- onTop.bind(null, callback));
+Ignite.prototype._createPromise = function(cmd) {
+ var server = this._server;
+ return new Promise(function(resolve, reject) {
+ server.runCommand(cmd, function(err, res) {
+ if (err != null) {
+ reject(err);
+ }
+ else {
+ resolve(res);
+ }
+ });
+ });
}
exports.Ignite = Ignite;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/nodejs/src/main/js/ignition.js
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/main/js/ignition.js b/modules/nodejs/src/main/js/ignition.js
index 049eb4b..a7d4518 100644
--- a/modules/nodejs/src/main/js/ignition.js
+++ b/modules/nodejs/src/main/js/ignition.js
@@ -14,7 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+var Server = require("./server").Server;
+var Ignite = require("./ignite").Ignite
/**
* Creates an instance of Ignition
*
@@ -36,73 +37,70 @@ function Ignition() {
*
* @param {string[]} address List of nodes hosts with ports
* @param {string} secretKey Secret key.
- * @param {Ignition~onStart} callback Called on finish
*/
-Ignition.start = function(address, secretKey, callback) {
- var Server = require("./server").Server;
- var Ignite = require("./ignite").Ignite
-
- var numConn = 0;
-
- for (var addr of address) {
- var params = addr.split(":");
-
- var portsRange = params[1].split("..");
-
- var start;
- var end;
-
- if (portsRange.length === 1) {
- start = parseInt(portsRange[0], 10);
- end = start;
- }
- else if (portsRange.length === 2) {
- start = parseInt(portsRange[0], 10);
- end = parseInt(portsRange[1], 10);
+Ignition.start = function(address, secretKey) {
+ return new Promise(function(resolve, reject) {
+ var numConn = 0;
+
+ var needVal = true;
+
+ for (var addr of address) {
+ var params = addr.split(":");
+
+ var portsRange = params[1].split("..");
+
+ var start;
+ var end;
+
+ if (portsRange.length === 1) {
+ start = parseInt(portsRange[0], 10);
+ end = start;
+ }
+ else if (portsRange.length === 2) {
+ start = parseInt(portsRange[0], 10);
+ end = parseInt(portsRange[1], 10);
+ }
+
+ if (isNaN(start) || isNaN(end)) {
+ needVal = false;
+
+ reject("Incorrect address format.");
+ }
+ else {
+ for (var i = start; i <= end; i++) {
+ checkServer(params[0], i, secretKey);
+ }
+ }
}
- if (isNaN(start) || isNaN(end)) {
- incorrectAddress();
+ function checkServer(host, port, secretKey) {
+ numConn++;
- return;
- }
+ var server = new Server(host, port, secretKey);
- for (var i = start; i <= end; i++) {
- checkServer(params[0], i, secretKey);
+ server.checkConnection(onConnect.bind(this, server));
}
- }
-
- function checkServer(host, port, secretKey) {
- numConn++;
-
- var server = new Server(host, port, secretKey);
- server.checkConnection(onConnect.bind(null, server));
- }
+ function onConnect(server, error) {
+ if (!needVal) return;
- function incorrectAddress() {
- callback.call(null, "Incorrect address format.", null);
+ numConn--;
- callback = null;
- }
+ if (!error) {
+ resolve(new Ignite(server));
- function onConnect(server, error) {
- if (!callback) return;
+ needVal = false;
- numConn--;
+ return;
+ }
- if (!error) {
- callback.call(null, null, new Ignite(server));
-
- callback = null;
-
- return;
- }
+ if (!numConn) {
+ reject("Cannot connect to servers. " + error);
- if (!numConn) {
- callback.call(null, "Cannot connect to servers. " + error, null);
+ return;
+ }
}
- }
+ });
}
exports.Ignition = Ignition;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/nodejs/src/main/js/package.json
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/main/js/package.json b/modules/nodejs/src/main/js/package.json
index ae4b911..47c627e 100644
--- a/modules/nodejs/src/main/js/package.json
+++ b/modules/nodejs/src/main/js/package.json
@@ -10,5 +10,8 @@
"license" : "Apache-2.0",
"keywords" : "grid",
"homepage" : "https://ignite.incubator.apache.org/",
- "engines" : { "node" : ">=0.12.4" }
+ "engines" : { "node" : ">=0.12.4" },
+ "dependencies" : {
+ "bluebird" : ">=2.0.0"
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/nodejs/src/main/js/server.js
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/main/js/server.js b/modules/nodejs/src/main/js/server.js
index 5d7430a..f8a98ab 100644
--- a/modules/nodejs/src/main/js/server.js
+++ b/modules/nodejs/src/main/js/server.js
@@ -86,6 +86,8 @@ Server.prototype.runCommand = function(cmd, callback) {
});
response.on('end', function () {
+ console.log("Full response:" + fullResponseString);
+
if (response.statusCode !== 200) {
if (response.statusCode === 401) {
callback.call(null, "Authentication failed. Status code 401.");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsIgnitionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsIgnitionSelfTest.java b/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsIgnitionSelfTest.java
index 205e467..bdbebab 100644
--- a/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsIgnitionSelfTest.java
+++ b/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsIgnitionSelfTest.java
@@ -41,28 +41,28 @@ public class NodeJsIgnitionSelfTest extends NodeJsAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testIgnitionStart() throws Exception {
- runJsScript("ignitionStartSuccess");
+ public void testIgnitionStartSuccess() throws Exception {
+ runJsScript("testIgnitionStartSuccess");
}
/**
* @throws Exception If failed.
*/
- public void testIgnitionFailedStart() throws Exception {
+ public void testIgnitionFail() throws Exception {
runJsScript("testIgnitionFail");
}
/**
* @throws Exception If failed.
*/
- public void testIgnitionStartWithSeveralPorts() throws Exception {
- runJsScript("ignitionStartSuccessWithSeveralPorts");
+ public void testIgnitionStartSuccessWithSeveralPorts() throws Exception {
+ runJsScript("testIgnitionStartSuccessWithSeveralPorts");
}
/**
* @throws Exception If failed.
*/
public void testIgnitionNotStartWithSeveralPorts() throws Exception {
- runJsScript("ignitionNotStartWithSeveralPorts");
+ runJsScript("testIgnitionNotStartWithSeveralPorts");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsSqlQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsSqlQuerySelfTest.java b/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsSqlQuerySelfTest.java
index 9024b93..9a29f52 100644
--- a/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsSqlQuerySelfTest.java
+++ b/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsSqlQuerySelfTest.java
@@ -64,6 +64,24 @@ public class NodeJsSqlQuerySelfTest extends NodeJsAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testCloseQuery() throws Exception {
+ initCache();
+
+ runJsScript("testCloseQuery");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSqlFieldsGetAllQuery() throws Exception {
+ initCache();
+
+ runJsScript("testSqlFieldsGetAllQuery");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testSqlQueryWithParams() throws Exception {
initCache();