You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2015/07/28 11:02:52 UTC
incubator-ignite git commit: #ignite-1161: add schedule remover for
query cursor.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1161 [created] a1601d791
#ignite-1161: add schedule remover for query cursor.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a1601d79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a1601d79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a1601d79
Branch: refs/heads/ignite-1161
Commit: a1601d79181755b985dab80e02c08bdf05788722
Parents: a127756
Author: ivasilinets <iv...@gridgain.com>
Authored: Tue Jul 28 12:02:27 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Tue Jul 28 12:02:27 2015 +0300
----------------------------------------------------------------------
.../rest/AbstractRestProcessorSelfTest.java | 2 +
.../JettyRestProcessorAbstractSelfTest.java | 34 ++++++++
.../configuration/ConnectorConfiguration.java | 23 +++++
.../handlers/query/QueryCommandHandler.java | 90 ++++++++++++--------
4 files changed, 113 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1601d79/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
index 8310b0f..b5b430c 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
@@ -73,6 +73,8 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest {
clientCfg.setJettyPath("modules/clients/src/test/resources/jetty/rest-jetty.xml");
+ clientCfg.setQueryRemoveDelay(5);
+
cfg.setConnectorConfiguration(clientCfg);
TcpDiscoverySpi disco = new TcpDiscoverySpi();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1601d79/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index 8ce070f..91dfa66 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -25,6 +25,7 @@ import org.apache.ignite.cache.query.annotations.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.processors.rest.handlers.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.testframework.*;
import java.io.*;
@@ -1194,6 +1195,39 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
assertFalse(queryCursorFound());
}
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueryDelay() throws Exception {
+ String qry = "salary > ? and salary <= ?";
+
+ Map<String, String> params = new HashMap<>();
+ params.put("cmd", GridRestCommand.EXECUTE_SQL_QUERY.key());
+ params.put("type", "Person");
+ params.put("psz", "1");
+ params.put("cacheName", "person");
+ params.put("qry", URLEncoder.encode(qry));
+ params.put("arg1", "1000");
+ params.put("arg2", "2000");
+
+ String ret = content(params);
+
+ assertNotNull(ret);
+ assertTrue(!ret.isEmpty());
+
+ JSONObject json = JSONObject.fromObject(ret);
+
+ List items = (List)((Map)json.get("response")).get("items");
+
+ assertEquals(1, items.size());
+
+ assertTrue(queryCursorFound());
+
+ U.sleep(12000);
+
+ assertFalse(queryCursorFound());
+ }
+
protected abstract String signature() throws Exception;
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1601d79/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
index 98753e2..237f4b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
@@ -61,6 +61,9 @@ public class ConnectorConfiguration {
/** Default socket send and receive buffer size. */
public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
+ /** Default delay for storing query cursor (10 minutes). */
+ private static final int DFLT_QRY_RMV_DELAY = 10 * 60;
+
/** Jetty XML configuration path. */
private String jettyPath;
@@ -85,6 +88,9 @@ public class ConnectorConfiguration {
/** REST TCP receive buffer size. */
private int rcvBufSize = DFLT_SOCK_BUF_SIZE;
+ /** REST delay for storing query cursor. */
+ private int qryRmvDelay = DFLT_QRY_RMV_DELAY;
+
/** REST TCP send queue limit. */
private int sndQueueLimit;
@@ -148,6 +154,7 @@ public class ConnectorConfiguration {
sslClientAuth = cfg.isSslClientAuth();
sslCtxFactory = cfg.getSslContextFactory();
sslEnabled = cfg.isSslEnabled();
+ qryRmvDelay = cfg.getQueryRemoveDelay();
}
/**
@@ -547,4 +554,20 @@ public class ConnectorConfiguration {
public void setMessageInterceptor(ConnectorMessageInterceptor interceptor) {
msgInterceptor = interceptor;
}
+
+ /**
+ * Sets delay for removing query cursors that are not used.
+ *
+ * @param qryRmvDelay Query remove delay in seconds.
+ */
+ public void setQueryRemoveDelay(int qryRmvDelay) {
+ this.qryRmvDelay = qryRmvDelay;
+ }
+
+ /**
+ * Gets delay for removing query cursors that are not used.
+ */
+ public int getQueryRemoveDelay() {
+ return qryRmvDelay;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1601d79/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
index 59f95c9..18a2ae7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
@@ -26,8 +26,9 @@ import org.apache.ignite.internal.processors.rest.*;
import org.apache.ignite.internal.processors.rest.handlers.*;
import org.apache.ignite.internal.processors.rest.request.*;
import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
import java.util.*;
import java.util.concurrent.*;
@@ -49,13 +50,22 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
private static final AtomicLong qryIdGen = new AtomicLong();
/** Current queries cursors. */
- private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs = new ConcurrentHashMap<>();
+ private final static ConcurrentHashMap<Long, GridTuple3<QueryCursor, Iterator, Boolean>> qryCurs =
+ new ConcurrentHashMap<>();
+
+ /** Remove delay. */
+ private static int rmvDelay = 0;
+
+ /** Scheduler. */
+ private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1);
/**
* @param ctx Context.
*/
public QueryCommandHandler(GridKernalContext ctx) {
super(ctx);
+
+ rmvDelay = ctx.config().getConnectorConfiguration().getQueryRemoveDelay();
}
/** {@inheritDoc} */
@@ -74,17 +84,17 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
case EXECUTE_SQL_QUERY:
case EXECUTE_SQL_FIELDS_QUERY: {
return ctx.closure().callLocalSafe(
- new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, qryCurs), false);
+ new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req), false);
}
case FETCH_SQL_QUERY: {
return ctx.closure().callLocalSafe(
- new FetchQueryCallable((RestSqlQueryRequest)req, qryCurs), false);
+ new FetchQueryCallable((RestSqlQueryRequest)req), false);
}
case CLOSE_SQL_QUERY: {
return ctx.closure().callLocalSafe(
- new CloseQueryCallable((RestSqlQueryRequest)req, qryCurs), false);
+ new CloseQueryCallable((RestSqlQueryRequest)req), false);
}
}
@@ -101,24 +111,18 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** Execute query request. */
private RestSqlQueryRequest req;
- /** Queries cursors. */
- private ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs;
-
/**
* @param ctx Kernal context.
* @param req Execute query request.
- * @param qryCurs Queries cursors.
*/
- public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req,
- ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) {
+ public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req) {
this.ctx = ctx;
this.req = req;
- this.qryCurs = qryCurs;
}
/** {@inheritDoc} */
@Override public GridRestResponse call() throws Exception {
- long qryId = qryIdGen.getAndIncrement();
+ final long qryId = qryIdGen.getAndIncrement();
try {
Query qry;
@@ -140,13 +144,15 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
return new GridRestResponse(GridRestResponse.STATUS_FAILED,
"No cache with name [cacheName=" + req.cacheName() + "]");
- QueryCursor qryCur = cache.query(qry);
+ final QueryCursor qryCur = cache.query(qry);
Iterator cur = qryCur.iterator();
- qryCurs.put(qryId, new IgniteBiTuple<>(qryCur, cur));
+ qryCurs.put(qryId, new GridTuple3<>(qryCur, cur, true));
- CacheQueryResult res = createQueryResult(qryCurs, cur, req, qryId);
+ scheduleRemove(qryId);
+
+ CacheQueryResult res = createQueryResult(cur, req, qryId);
List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl<?>) qryCur).fieldsMeta();
@@ -184,17 +190,11 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** Execute query request. */
private RestSqlQueryRequest req;
- /** Queries cursors. */
- private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs;
-
/**
* @param req Execute query request.
- * @param qryCurs Queries cursors.
*/
- public CloseQueryCallable(RestSqlQueryRequest req,
- ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) {
+ public CloseQueryCallable(RestSqlQueryRequest req) {
this.req = req;
- this.qryCurs = qryCurs;
}
/** {@inheritDoc} */
@@ -227,29 +227,27 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** Execute query request. */
private RestSqlQueryRequest req;
- /** Queries cursors. */
- private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs;
-
/**
* @param req Execute query request.
- * @param qryCurs Queries cursors.
*/
- public FetchQueryCallable(RestSqlQueryRequest req,
- ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) {
+ public FetchQueryCallable(RestSqlQueryRequest req) {
this.req = req;
- this.qryCurs = qryCurs;
}
/** {@inheritDoc} */
@Override public GridRestResponse call() throws Exception {
try {
- Iterator cur = qryCurs.get(req.queryId()).get2();
+ GridTuple3<QueryCursor, Iterator, Boolean> t = qryCurs.get(req.queryId());
+
+ t.set3(true);
+
+ Iterator cur = t.get2();
if (cur == null)
return new GridRestResponse(GridRestResponse.STATUS_FAILED,
"Cannot find query [qryId=" + req.queryId() + "]");
- CacheQueryResult res = createQueryResult(qryCurs, cur, req, req.queryId());
+ CacheQueryResult res = createQueryResult(cur, req, req.queryId());
return new GridRestResponse(res);
}
@@ -262,15 +260,12 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
}
/**
- * @param qryCurs Query cursors.
* @param cur Current cursor.
* @param req Sql request.
* @param qryId Query id.
* @return Query result with items.
*/
- private static CacheQueryResult createQueryResult(
- ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs,
- Iterator cur, RestSqlQueryRequest req, Long qryId) {
+ private static CacheQueryResult createQueryResult(Iterator cur, RestSqlQueryRequest req, Long qryId) {
CacheQueryResult res = new CacheQueryResult();
List<Object> items = new ArrayList<>();
@@ -289,4 +284,27 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
return res;
}
+
+ /**
+ * Schedule remove for query cursor.
+ *
+ * @param id Query id.
+ */
+ private static void scheduleRemove(final long id) {
+ SCHEDULER.schedule(new CAX() {
+ @Override public void applyx() throws IgniteCheckedException {
+ GridTuple3<QueryCursor, Iterator, Boolean> t = qryCurs.get(id);
+
+ if (t != null) {
+ if (t.get3()) {
+ t.set3(false);
+
+ scheduleRemove(id);
+ }
+ else
+ qryCurs.remove(id);
+ }
+ }
+ }, rmvDelay, TimeUnit.SECONDS);
+ }
}