You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2015/07/31 14:46:41 UTC
incubator-ignite git commit: #ignite-1161: add new class for query
cursor.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1161 0f7816def -> 6de1f082f
#ignite-1161: add new class for query cursor.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6de1f082
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6de1f082
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6de1f082
Branch: refs/heads/ignite-1161
Commit: 6de1f082f1df4dd9c9cccc30ea3f9c46b253cf28
Parents: 0f7816d
Author: ivasilinets <iv...@gridgain.com>
Authored: Fri Jul 31 15:46:31 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Fri Jul 31 15:46:31 2015 +0300
----------------------------------------------------------------------
.../JettyRestProcessorAbstractSelfTest.java | 2 +-
.../handlers/query/QueryCommandHandler.java | 203 +++++++++++++++----
2 files changed, 162 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6de1f082/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index 0468c9b..9e82f13 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -1204,7 +1204,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
Map<String, String> params = new HashMap<>();
params.put("cmd", GridRestCommand.EXECUTE_SQL_QUERY.key());
params.put("type", "Person");
- params.put("psz", "1");
+ params.put("pageSize", "1");
params.put("cacheName", "person");
params.put("qry", URLEncoder.encode(qry));
params.put("arg1", "1000");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6de1f082/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
index bb19f2a..d992f23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
@@ -26,12 +26,12 @@ import org.apache.ignite.internal.processors.rest.*;
import org.apache.ignite.internal.processors.rest.handlers.*;
import org.apache.ignite.internal.processors.rest.request.*;
import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.*;
@@ -49,8 +49,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
private static final AtomicLong qryIdGen = new AtomicLong();
/** Current queries cursors. */
- private final static ConcurrentHashMap<Long, GridTuple3<QueryCursor, Iterator, Long>> qryCurs =
- new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs = new ConcurrentHashMap<>();
/**
* @param ctx Context.
@@ -66,13 +65,24 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
@Override public void run() {
long time = U.currentTimeMillis();
- for (Map.Entry<Long, GridTuple3<QueryCursor, Iterator, Long>> e : qryCurs.entrySet()) {
- synchronized (e.getValue()) {
- long createTime = e.getValue().get3();
+ for (Map.Entry<Long, QueryCursorIterator> e : qryCurs.entrySet()) {
+ QueryCursorIterator val = e.getValue();
- if (createTime + idleQryCurTimeout > time && qryCurs.remove(e.getKey(), e.getValue()))
- e.getValue().get1().close();
- }
+ long createTime = val.lastUsage();
+
+ if (createTime + idleQryCurTimeout > time)
+ if (val.lock().tryLock()) {
+ try {
+ val.lastUsage(-1);
+
+ qryCurs.remove(e.getKey(), val);
+
+ val.close();
+ }
+ finally {
+ val.lock().unlock();
+ }
+ }
}
}
}, qryCheckFrq, qryCheckFrq);
@@ -94,17 +104,17 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
case EXECUTE_SQL_QUERY:
case EXECUTE_SQL_FIELDS_QUERY: {
return ctx.closure().callLocalSafe(
- new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req), false);
+ new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, qryCurs), false);
}
case FETCH_SQL_QUERY: {
return ctx.closure().callLocalSafe(
- new FetchQueryCallable((RestSqlQueryRequest)req), false);
+ new FetchQueryCallable((RestSqlQueryRequest)req, qryCurs), false);
}
case CLOSE_SQL_QUERY: {
return ctx.closure().callLocalSafe(
- new CloseQueryCallable((RestSqlQueryRequest)req), false);
+ new CloseQueryCallable((RestSqlQueryRequest)req, qryCurs), false);
}
}
@@ -121,13 +131,19 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** Execute query request. */
private RestSqlQueryRequest req;
+ /** Current queries cursors. */
+ private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
+
/**
* @param ctx Kernal context.
* @param req Execute query request.
+ * @param qryCurs Query cursors.
*/
- public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req) {
+ public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req,
+ ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
this.ctx = ctx;
this.req = req;
+ this.qryCurs = qryCurs;
}
/** {@inheritDoc} */
@@ -158,12 +174,14 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
Iterator cur = qryCur.iterator();
- GridTuple3<QueryCursor, Iterator, Long> val = new GridTuple3<>(qryCur, cur, U.currentTimeMillis());
+ QueryCursorIterator val = new QueryCursorIterator(qryCur, cur);
- synchronized (val) {
+ val.lock().lock();
+
+ try {
qryCurs.put(qryId, val);
- CacheQueryResult res = createQueryResult(cur, req, qryId);
+ CacheQueryResult res = createQueryResult(cur, req, qryId, qryCurs);
List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl<?>) qryCur).fieldsMeta();
@@ -171,9 +189,12 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
return new GridRestResponse(res);
}
+ finally {
+ val.lock().unlock();
+ }
}
catch (Exception e) {
- removeQueryCursor(qryId);
+ removeQueryCursor(qryId, qryCurs);
return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
}
@@ -202,34 +223,44 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** Execute query request. */
private RestSqlQueryRequest req;
+ /** Current queries cursors. */
+ private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
+
/**
* @param req Execute query request.
+ * @param qryCurs Query cursors.
*/
- public CloseQueryCallable(RestSqlQueryRequest req) {
+ public CloseQueryCallable(RestSqlQueryRequest req, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
this.req = req;
+ this.qryCurs = qryCurs;
}
/** {@inheritDoc} */
@Override public GridRestResponse call() throws Exception {
try {
- GridTuple3<QueryCursor, Iterator, Long> val = qryCurs.get(req.queryId());
+ QueryCursorIterator val = qryCurs.get(req.queryId());
if (val == null)
- return new GridRestResponse(GridRestResponse.STATUS_FAILED,
- "Failed to find query with ID: " + req.queryId());
+ return new GridRestResponse(true);
+
+ val.lock().lock();
- synchronized (val) {
- QueryCursor cur = val.get1();
+ try{
+ if (val.lastUsage() == -1)
+ return new GridRestResponse(true);
- cur.close();
+ val.close();
qryCurs.remove(req.queryId());
}
+ finally {
+ val.lock().unlock();
+ }
return new GridRestResponse(true);
}
catch (Exception e) {
- removeQueryCursor(req.queryId());
+ removeQueryCursor(req.queryId(), qryCurs);
return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
}
@@ -243,34 +274,48 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** Execute query request. */
private RestSqlQueryRequest req;
+ /** Current queries cursors. */
+ private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
+
/**
* @param req Execute query request.
+ * @param qryCurs Query cursors.
*/
- public FetchQueryCallable(RestSqlQueryRequest req) {
+ public FetchQueryCallable(RestSqlQueryRequest req, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
this.req = req;
+ this.qryCurs = qryCurs;
}
/** {@inheritDoc} */
@Override public GridRestResponse call() throws Exception {
try {
- GridTuple3<QueryCursor, Iterator, Long> t = qryCurs.get(req.queryId());
+ QueryCursorIterator val = qryCurs.get(req.queryId());
- if (t == null)
+ if (val == null)
return new GridRestResponse(GridRestResponse.STATUS_FAILED,
"Failed to find query with ID: " + req.queryId());
- synchronized (t) {
- t.set3(System.currentTimeMillis());
+ val.lock().lock();
- Iterator cur = t.get2();
+ try {
+ if (val.lastUsage() == -1)
+ return new GridRestResponse(GridRestResponse.STATUS_FAILED,
+ "Query is closed by timeout. Restart query with ID: " + req.queryId());
- CacheQueryResult res = createQueryResult(cur, req, req.queryId());
+ val.lastUsage(U.currentTimeMillis());
+
+ Iterator cur = val.iterator();
+
+ CacheQueryResult res = createQueryResult(cur, req, req.queryId(), qryCurs);
return new GridRestResponse(res);
}
+ finally {
+ val.lock().unlock();
+ }
}
catch (Exception e) {
- removeQueryCursor(req.queryId());
+ removeQueryCursor(req.queryId(), qryCurs);
return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
}
@@ -281,9 +326,11 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
* @param cur Current cursor.
* @param req Sql request.
* @param qryId Query id.
+ * @param qryCurs Query cursors.
* @return Query result with items.
*/
- private static CacheQueryResult createQueryResult(Iterator cur, RestSqlQueryRequest req, Long qryId) {
+ private static CacheQueryResult createQueryResult(Iterator cur, RestSqlQueryRequest req, Long qryId,
+ ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
CacheQueryResult res = new CacheQueryResult();
List<Object> items = new ArrayList<>();
@@ -298,7 +345,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
res.setQueryId(qryId);
if (!cur.hasNext())
- removeQueryCursor(qryId);
+ removeQueryCursor(qryId, qryCurs);
return res;
}
@@ -307,16 +354,88 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
* Removes query cursor.
*
* @param qryId Query id.
+ * @param qryCurs Query cursors.
*/
- private static void removeQueryCursor(Long qryId) {
- GridTuple3<QueryCursor, Iterator, Long> t = qryCurs.get(qryId);
+ private static void removeQueryCursor(Long qryId, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
+ QueryCursorIterator t = qryCurs.get(qryId);
- if (t != null) {
- synchronized (t) {
- t.get1().close();
+ if (t == null)
+ return;
- qryCurs.remove(qryId);
- }
+ t.lock().lock();
+
+ try{
+ if (t.lastUsage() == -1)
+ return;
+
+ t.close();
+
+ qryCurs.remove(qryId);
+ }
+ finally {
+ t.lock().unlock();
+ }
+ }
+
+ /**
+ * Query cursor iterator.
+ */
+ private static class QueryCursorIterator {
+ /** Query cursor. */
+ private QueryCursor cur;
+
+ /** Query iterator. */
+ private Iterator iter;
+
+ /** Last timestamp. */
+ private volatile long lastUsage;
+
+ /** Reentrant lock. */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * @param cur Query cursor.
+ * @param iter Query iterator.
+ */
+ public QueryCursorIterator(QueryCursor cur, Iterator iter) {
+ this.cur = cur;
+ this.iter = iter;
+ lastUsage = U.currentTimeMillis();
+ }
+
+ /**
+ * @return Lock.
+ */
+ public ReentrantLock lock() {
+ return lock;
+ }
+
+ /**
+ * @return Query iterator.
+ */
+ public Iterator iterator() {
+ return iter;
+ }
+
+ /**
+ * @return Last usage
+ */
+ public long lastUsage() {
+ return lastUsage;
+ }
+
+ /**
+ * @param time Current time or -1 if cursor is closed.
+ */
+ public void lastUsage(long time) {
+ lastUsage = time;
+ }
+
+ /**
+ * Close query cursor.
+ */
+ public void close() {
+ cur.close();
}
}
}