You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2015/07/28 11:02:52 UTC

incubator-ignite git commit: #ignite-1161: add schedule remover for query cursor.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1161 [created] a1601d791


#ignite-1161: add schedule remover for query cursor.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a1601d79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a1601d79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a1601d79

Branch: refs/heads/ignite-1161
Commit: a1601d79181755b985dab80e02c08bdf05788722
Parents: a127756
Author: ivasilinets <iv...@gridgain.com>
Authored: Tue Jul 28 12:02:27 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Tue Jul 28 12:02:27 2015 +0300

----------------------------------------------------------------------
 .../rest/AbstractRestProcessorSelfTest.java     |  2 +
 .../JettyRestProcessorAbstractSelfTest.java     | 34 ++++++++
 .../configuration/ConnectorConfiguration.java   | 23 +++++
 .../handlers/query/QueryCommandHandler.java     | 90 ++++++++++++--------
 4 files changed, 113 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1601d79/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
index 8310b0f..b5b430c 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
@@ -73,6 +73,8 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest {
 
         clientCfg.setJettyPath("modules/clients/src/test/resources/jetty/rest-jetty.xml");
 
+        clientCfg.setQueryRemoveDelay(5);
+
         cfg.setConnectorConfiguration(clientCfg);
 
         TcpDiscoverySpi disco = new TcpDiscoverySpi();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1601d79/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index 8ce070f..91dfa66 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -25,6 +25,7 @@ import org.apache.ignite.cache.query.annotations.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.processors.rest.handlers.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.testframework.*;
 
 import java.io.*;
@@ -1194,6 +1195,39 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
         assertFalse(queryCursorFound());
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryDelay() throws Exception {
+        String qry = "salary > ? and salary <= ?";
+
+        Map<String, String> params = new HashMap<>();
+        params.put("cmd", GridRestCommand.EXECUTE_SQL_QUERY.key());
+        params.put("type", "Person");
+        params.put("psz", "1");
+        params.put("cacheName", "person");
+        params.put("qry", URLEncoder.encode(qry));
+        params.put("arg1", "1000");
+        params.put("arg2", "2000");
+
+        String ret = content(params);
+
+        assertNotNull(ret);
+        assertTrue(!ret.isEmpty());
+
+        JSONObject json = JSONObject.fromObject(ret);
+
+        List items = (List)((Map)json.get("response")).get("items");
+
+        assertEquals(1, items.size());
+
+        assertTrue(queryCursorFound());
+
+        U.sleep(12000);
+
+        assertFalse(queryCursorFound());
+    }
+
     protected abstract String signature() throws Exception;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1601d79/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
index 98753e2..237f4b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
@@ -61,6 +61,9 @@ public class ConnectorConfiguration {
     /** Default socket send and receive buffer size. */
     public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
 
+    /** Default delay for storing query cursor (10 minutes). */
+    private static final int DFLT_QRY_RMV_DELAY = 10 * 60;
+
     /** Jetty XML configuration path. */
     private String jettyPath;
 
@@ -85,6 +88,9 @@ public class ConnectorConfiguration {
     /** REST TCP receive buffer size. */
     private int rcvBufSize = DFLT_SOCK_BUF_SIZE;
 
+    /** REST delay for storing query cursor. */
+    private int qryRmvDelay = DFLT_QRY_RMV_DELAY;
+
     /** REST TCP send queue limit. */
     private int sndQueueLimit;
 
@@ -148,6 +154,7 @@ public class ConnectorConfiguration {
         sslClientAuth = cfg.isSslClientAuth();
         sslCtxFactory = cfg.getSslContextFactory();
         sslEnabled = cfg.isSslEnabled();
+        qryRmvDelay = cfg.getQueryRemoveDelay();
     }
 
     /**
@@ -547,4 +554,20 @@ public class ConnectorConfiguration {
     public void setMessageInterceptor(ConnectorMessageInterceptor interceptor) {
         msgInterceptor = interceptor;
     }
+
+    /**
+     * Sets delay for removing query cursors that are not used.
+     *
+     * @param qryRmvDelay Query remove delay in seconds.
+     */
+    public void setQueryRemoveDelay(int qryRmvDelay) {
+        this.qryRmvDelay = qryRmvDelay;
+    }
+
+    /**
+     * Gets delay for removing query cursors that are not used.
+     */
+    public int getQueryRemoveDelay() {
+        return qryRmvDelay;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1601d79/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
index 59f95c9..18a2ae7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
@@ -26,8 +26,9 @@ import org.apache.ignite.internal.processors.rest.*;
 import org.apache.ignite.internal.processors.rest.handlers.*;
 import org.apache.ignite.internal.processors.rest.request.*;
 import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
 
 import java.util.*;
 import java.util.concurrent.*;
@@ -49,13 +50,22 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
     private static final AtomicLong qryIdGen = new AtomicLong();
 
     /** Current queries cursors. */
-    private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs = new ConcurrentHashMap<>();
+    private final static ConcurrentHashMap<Long, GridTuple3<QueryCursor, Iterator, Boolean>> qryCurs =
+        new ConcurrentHashMap<>();
+
+    /** Remove delay. */
+    private static int rmvDelay = 0;
+
+    /** Scheduler. */
+    private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1);
 
     /**
      * @param ctx Context.
      */
     public QueryCommandHandler(GridKernalContext ctx) {
         super(ctx);
+
+        rmvDelay = ctx.config().getConnectorConfiguration().getQueryRemoveDelay();
     }
 
     /** {@inheritDoc} */
@@ -74,17 +84,17 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
             case EXECUTE_SQL_QUERY:
             case EXECUTE_SQL_FIELDS_QUERY: {
                 return ctx.closure().callLocalSafe(
-                    new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, qryCurs), false);
+                    new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req), false);
             }
 
             case FETCH_SQL_QUERY: {
                 return ctx.closure().callLocalSafe(
-                    new FetchQueryCallable((RestSqlQueryRequest)req, qryCurs), false);
+                    new FetchQueryCallable((RestSqlQueryRequest)req), false);
             }
 
             case CLOSE_SQL_QUERY: {
                 return ctx.closure().callLocalSafe(
-                    new CloseQueryCallable((RestSqlQueryRequest)req, qryCurs), false);
+                    new CloseQueryCallable((RestSqlQueryRequest)req), false);
             }
         }
 
@@ -101,24 +111,18 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
         /** Execute query request. */
         private RestSqlQueryRequest req;
 
-        /** Queries cursors. */
-        private ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs;
-
         /**
          * @param ctx Kernal context.
          * @param req Execute query request.
-         * @param qryCurs Queries cursors.
          */
-        public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req,
-            ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) {
+        public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req) {
             this.ctx = ctx;
             this.req = req;
-            this.qryCurs = qryCurs;
         }
 
         /** {@inheritDoc} */
         @Override public GridRestResponse call() throws Exception {
-            long qryId = qryIdGen.getAndIncrement();
+            final long qryId = qryIdGen.getAndIncrement();
 
             try {
                 Query qry;
@@ -140,13 +144,15 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
                     return new GridRestResponse(GridRestResponse.STATUS_FAILED,
                         "No cache with name [cacheName=" + req.cacheName() + "]");
 
-                QueryCursor qryCur = cache.query(qry);
+                final QueryCursor qryCur = cache.query(qry);
 
                 Iterator cur = qryCur.iterator();
 
-                qryCurs.put(qryId, new IgniteBiTuple<>(qryCur, cur));
+                qryCurs.put(qryId, new GridTuple3<>(qryCur, cur, true));
 
-                CacheQueryResult res = createQueryResult(qryCurs, cur, req, qryId);
+                scheduleRemove(qryId);
+
+                CacheQueryResult res = createQueryResult(cur, req, qryId);
 
                 List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl<?>) qryCur).fieldsMeta();
 
@@ -184,17 +190,11 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
         /** Execute query request. */
         private RestSqlQueryRequest req;
 
-        /** Queries cursors. */
-        private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs;
-
         /**
          * @param req Execute query request.
-         * @param qryCurs Queries cursors.
          */
-        public CloseQueryCallable(RestSqlQueryRequest req,
-            ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) {
+        public CloseQueryCallable(RestSqlQueryRequest req) {
             this.req = req;
-            this.qryCurs = qryCurs;
         }
 
         /** {@inheritDoc} */
@@ -227,29 +227,27 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
         /** Execute query request. */
         private RestSqlQueryRequest req;
 
-        /** Queries cursors. */
-        private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs;
-
         /**
          * @param req Execute query request.
-         * @param qryCurs Queries cursors.
          */
-        public FetchQueryCallable(RestSqlQueryRequest req,
-            ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) {
+        public FetchQueryCallable(RestSqlQueryRequest req) {
             this.req = req;
-            this.qryCurs = qryCurs;
         }
 
         /** {@inheritDoc} */
         @Override public GridRestResponse call() throws Exception {
             try {
-                Iterator cur = qryCurs.get(req.queryId()).get2();
+                GridTuple3<QueryCursor, Iterator, Boolean> t = qryCurs.get(req.queryId());
+
+                t.set3(true);
+
+                Iterator cur = t.get2();
 
                 if (cur == null)
                     return new GridRestResponse(GridRestResponse.STATUS_FAILED,
                         "Cannot find query [qryId=" + req.queryId() + "]");
 
-                CacheQueryResult res = createQueryResult(qryCurs, cur, req, req.queryId());
+                CacheQueryResult res = createQueryResult(cur, req, req.queryId());
 
                 return new GridRestResponse(res);
             }
@@ -262,15 +260,12 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
     }
 
     /**
-     * @param qryCurs Query cursors.
      * @param cur Current cursor.
      * @param req Sql request.
      * @param qryId Query id.
      * @return Query result with items.
      */
-    private static CacheQueryResult createQueryResult(
-        ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs,
-        Iterator cur, RestSqlQueryRequest req, Long qryId) {
+    private static CacheQueryResult createQueryResult(Iterator cur, RestSqlQueryRequest req, Long qryId) {
         CacheQueryResult res = new CacheQueryResult();
 
         List<Object> items = new ArrayList<>();
@@ -289,4 +284,27 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
 
         return res;
     }
+
+    /**
+     * Schedule remove for query cursor.
+     *
+     * @param id Query id.
+     */
+    private static void scheduleRemove(final long id) {
+        SCHEDULER.schedule(new CAX() {
+            @Override public void applyx() throws IgniteCheckedException {
+                GridTuple3<QueryCursor, Iterator, Boolean> t = qryCurs.get(id);
+
+                if (t != null) {
+                    if (t.get3()) {
+                        t.set3(false);
+
+                        scheduleRemove(id);
+                    }
+                    else
+                        qryCurs.remove(id);
+                }
+            }
+        }, rmvDelay, TimeUnit.SECONDS);
+    }
 }