You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2015/07/31 14:46:41 UTC

incubator-ignite git commit: #ignite-1161: add new class for query cursor.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1161 0f7816def -> 6de1f082f


#ignite-1161: add new class for query cursor.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6de1f082
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6de1f082
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6de1f082

Branch: refs/heads/ignite-1161
Commit: 6de1f082f1df4dd9c9cccc30ea3f9c46b253cf28
Parents: 0f7816d
Author: ivasilinets <iv...@gridgain.com>
Authored: Fri Jul 31 15:46:31 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Fri Jul 31 15:46:31 2015 +0300

----------------------------------------------------------------------
 .../JettyRestProcessorAbstractSelfTest.java     |   2 +-
 .../handlers/query/QueryCommandHandler.java     | 203 +++++++++++++++----
 2 files changed, 162 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6de1f082/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index 0468c9b..9e82f13 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -1204,7 +1204,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
         Map<String, String> params = new HashMap<>();
         params.put("cmd", GridRestCommand.EXECUTE_SQL_QUERY.key());
         params.put("type", "Person");
-        params.put("psz", "1");
+        params.put("pageSize", "1");
         params.put("cacheName", "person");
         params.put("qry", URLEncoder.encode(qry));
         params.put("arg1", "1000");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6de1f082/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
index bb19f2a..d992f23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
@@ -26,12 +26,12 @@ import org.apache.ignite.internal.processors.rest.*;
 import org.apache.ignite.internal.processors.rest.handlers.*;
 import org.apache.ignite.internal.processors.rest.request.*;
 import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
 
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.*;
 
@@ -49,8 +49,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
     private static final AtomicLong qryIdGen = new AtomicLong();
 
     /** Current queries cursors. */
-    private final static ConcurrentHashMap<Long, GridTuple3<QueryCursor, Iterator, Long>> qryCurs =
-        new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs = new ConcurrentHashMap<>();
 
     /**
      * @param ctx Context.
@@ -66,13 +65,24 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
             @Override public void run() {
                 long time = U.currentTimeMillis();
 
-                for (Map.Entry<Long, GridTuple3<QueryCursor, Iterator, Long>> e : qryCurs.entrySet()) {
-                    synchronized (e.getValue()) {
-                        long createTime = e.getValue().get3();
+                for (Map.Entry<Long, QueryCursorIterator> e : qryCurs.entrySet()) {
+                    QueryCursorIterator val = e.getValue();
 
-                        if (createTime + idleQryCurTimeout > time && qryCurs.remove(e.getKey(), e.getValue()))
-                            e.getValue().get1().close();
-                    }
+                    long createTime = val.lastUsage();
+
+                    if (createTime + idleQryCurTimeout > time)
+                        if (val.lock().tryLock()) {
+                            try {
+                                val.lastUsage(-1);
+
+                                qryCurs.remove(e.getKey(), val);
+
+                                val.close();
+                            }
+                            finally {
+                                val.lock().unlock();
+                            }
+                        }
                 }
             }
         }, qryCheckFrq, qryCheckFrq);
@@ -94,17 +104,17 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
             case EXECUTE_SQL_QUERY:
             case EXECUTE_SQL_FIELDS_QUERY: {
                 return ctx.closure().callLocalSafe(
-                    new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req), false);
+                    new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, qryCurs), false);
             }
 
             case FETCH_SQL_QUERY: {
                 return ctx.closure().callLocalSafe(
-                    new FetchQueryCallable((RestSqlQueryRequest)req), false);
+                    new FetchQueryCallable((RestSqlQueryRequest)req, qryCurs), false);
             }
 
             case CLOSE_SQL_QUERY: {
                 return ctx.closure().callLocalSafe(
-                    new CloseQueryCallable((RestSqlQueryRequest)req), false);
+                    new CloseQueryCallable((RestSqlQueryRequest)req, qryCurs), false);
             }
         }
 
@@ -121,13 +131,19 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
         /** Execute query request. */
         private RestSqlQueryRequest req;
 
+        /** Current queries cursors. */
+        private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
+
         /**
          * @param ctx Kernal context.
          * @param req Execute query request.
+         * @param qryCurs Query cursors.
          */
-        public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req) {
+        public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req,
+            ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
             this.ctx = ctx;
             this.req = req;
+            this.qryCurs = qryCurs;
         }
 
         /** {@inheritDoc} */
@@ -158,12 +174,14 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
 
                 Iterator cur = qryCur.iterator();
 
-                GridTuple3<QueryCursor, Iterator, Long> val = new GridTuple3<>(qryCur, cur, U.currentTimeMillis());
+                QueryCursorIterator val = new QueryCursorIterator(qryCur, cur);
 
-                synchronized (val) {
+                val.lock().lock();
+
+                try {
                     qryCurs.put(qryId, val);
 
-                    CacheQueryResult res = createQueryResult(cur, req, qryId);
+                    CacheQueryResult res = createQueryResult(cur, req, qryId, qryCurs);
 
                     List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl<?>) qryCur).fieldsMeta();
 
@@ -171,9 +189,12 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
 
                     return new GridRestResponse(res);
                 }
+                finally {
+                    val.lock().unlock();
+                }
             }
             catch (Exception e) {
-                removeQueryCursor(qryId);
+                removeQueryCursor(qryId, qryCurs);
 
                 return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
             }
@@ -202,34 +223,44 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
         /** Execute query request. */
         private RestSqlQueryRequest req;
 
+        /** Current queries cursors. */
+        private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
+
         /**
          * @param req Execute query request.
+         * @param qryCurs Query cursors.
          */
-        public CloseQueryCallable(RestSqlQueryRequest req) {
+        public CloseQueryCallable(RestSqlQueryRequest req, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
             this.req = req;
+            this.qryCurs = qryCurs;
         }
 
         /** {@inheritDoc} */
         @Override public GridRestResponse call() throws Exception {
             try {
-                GridTuple3<QueryCursor, Iterator, Long> val = qryCurs.get(req.queryId());
+                QueryCursorIterator val = qryCurs.get(req.queryId());
 
                 if (val == null)
-                    return new GridRestResponse(GridRestResponse.STATUS_FAILED,
-                        "Failed to find query with ID: " + req.queryId());
+                    return new GridRestResponse(true);
+
+                val.lock().lock();
 
-                synchronized (val) {
-                    QueryCursor cur = val.get1();
+                try{
+                    if (val.lastUsage() == -1)
+                        return new GridRestResponse(true);
 
-                    cur.close();
+                    val.close();
 
                     qryCurs.remove(req.queryId());
                 }
+                finally {
+                    val.lock().unlock();
+                }
 
                 return new GridRestResponse(true);
             }
             catch (Exception e) {
-                removeQueryCursor(req.queryId());
+                removeQueryCursor(req.queryId(), qryCurs);
 
                 return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
             }
@@ -243,34 +274,48 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
         /** Execute query request. */
         private RestSqlQueryRequest req;
 
+        /** Current queries cursors. */
+        private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
+
         /**
          * @param req Execute query request.
+         * @param qryCurs Query cursors.
          */
-        public FetchQueryCallable(RestSqlQueryRequest req) {
+        public FetchQueryCallable(RestSqlQueryRequest req, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
             this.req = req;
+            this.qryCurs = qryCurs;
         }
 
         /** {@inheritDoc} */
         @Override public GridRestResponse call() throws Exception {
             try {
-                GridTuple3<QueryCursor, Iterator, Long> t = qryCurs.get(req.queryId());
+                QueryCursorIterator val = qryCurs.get(req.queryId());
 
-                if (t == null)
+                if (val == null)
                     return new GridRestResponse(GridRestResponse.STATUS_FAILED,
                         "Failed to find query with ID: " + req.queryId());
 
-                synchronized (t) {
-                    t.set3(System.currentTimeMillis());
+                val.lock().lock();
 
-                    Iterator cur = t.get2();
+                try {
+                    if (val.lastUsage() == -1)
+                        return new GridRestResponse(GridRestResponse.STATUS_FAILED,
+                            "Query is closed by timeout. Restart query with ID: " + req.queryId());
 
-                    CacheQueryResult res = createQueryResult(cur, req, req.queryId());
+                    val.lastUsage(U.currentTimeMillis());
+
+                    Iterator cur = val.iterator();
+
+                    CacheQueryResult res = createQueryResult(cur, req, req.queryId(), qryCurs);
 
                     return new GridRestResponse(res);
                 }
+                finally {
+                    val.lock().unlock();
+                }
             }
             catch (Exception e) {
-                removeQueryCursor(req.queryId());
+                removeQueryCursor(req.queryId(), qryCurs);
 
                 return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
             }
@@ -281,9 +326,11 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
      * @param cur Current cursor.
      * @param req Sql request.
      * @param qryId Query id.
+     * @param qryCurs Query cursors.
      * @return Query result with items.
      */
-    private static CacheQueryResult createQueryResult(Iterator cur, RestSqlQueryRequest req, Long qryId) {
+    private static CacheQueryResult createQueryResult(Iterator cur, RestSqlQueryRequest req, Long qryId,
+        ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
         CacheQueryResult res = new CacheQueryResult();
 
         List<Object> items = new ArrayList<>();
@@ -298,7 +345,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
         res.setQueryId(qryId);
 
         if (!cur.hasNext())
-            removeQueryCursor(qryId);
+            removeQueryCursor(qryId, qryCurs);
 
         return res;
     }
@@ -307,16 +354,88 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
      * Removes query cursor.
      *
      * @param qryId Query id.
+     * @param qryCurs Query cursors.
      */
-    private static void removeQueryCursor(Long qryId) {
-        GridTuple3<QueryCursor, Iterator, Long> t = qryCurs.get(qryId);
+    private static void removeQueryCursor(Long qryId, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
+        QueryCursorIterator t = qryCurs.get(qryId);
 
-        if (t != null) {
-            synchronized (t) {
-                t.get1().close();
+        if (t == null)
+            return;
 
-                qryCurs.remove(qryId);
-            }
+        t.lock().lock();
+
+        try{
+            if (t.lastUsage() == -1)
+                return;
+
+            t.close();
+
+            qryCurs.remove(qryId);
+        }
+        finally {
+            t.lock().unlock();
+        }
+    }
+
+    /**
+     * Query cursor iterator.
+     */
+    private static class QueryCursorIterator {
+        /** Query cursor. */
+        private QueryCursor cur;
+
+        /** Query iterator. */
+        private Iterator iter;
+
+        /** Last timestamp. */
+        private volatile long lastUsage;
+
+        /** Reentrant lock. */
+        private final ReentrantLock lock = new ReentrantLock();
+
+        /**
+         * @param cur Query cursor.
+         * @param iter Query iterator.
+         */
+        public QueryCursorIterator(QueryCursor cur, Iterator iter) {
+            this.cur = cur;
+            this.iter = iter;
+            lastUsage = U.currentTimeMillis();
+        }
+
+        /**
+         * @return Lock.
+         */
+        public ReentrantLock lock() {
+            return lock;
+        }
+
+        /**
+         * @return Query iterator.
+         */
+        public Iterator iterator() {
+            return iter;
+        }
+
+        /**
+         * @return Last usage
+         */
+        public long lastUsage() {
+            return lastUsage;
+        }
+
+        /**
+         * @param time Current time or -1 if cursor is closed.
+         */
+        public void lastUsage(long time) {
+            lastUsage = time;
+        }
+
+        /**
+         * Close query cursor.
+         */
+        public void close() {
+            cur.close();
         }
     }
 }