You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2015/07/28 16:54:34 UTC

incubator-ignite git commit: #ignite-1161: replace scheduler with ignite scheduler.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1161 a1601d791 -> a9eb9da03


#ignite-1161: replace scheduler with  ignite scheduler.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a9eb9da0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a9eb9da0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a9eb9da0

Branch: refs/heads/ignite-1161
Commit: a9eb9da039cf6fa4c703c8a337145b8bf091723f
Parents: a1601d7
Author: ivasilinets <iv...@gridgain.com>
Authored: Tue Jul 28 17:54:23 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Tue Jul 28 17:54:23 2015 +0300

----------------------------------------------------------------------
 .../rest/AbstractRestProcessorSelfTest.java     |  3 +-
 .../JettyRestProcessorAbstractSelfTest.java     |  7 +-
 .../configuration/ConnectorConfiguration.java   | 60 +++++++++---
 .../handlers/query/QueryCommandHandler.java     | 99 ++++++++++----------
 4 files changed, 104 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a9eb9da0/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
index b5b430c..9b26bd8 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
@@ -73,7 +73,8 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest {
 
         clientCfg.setJettyPath("modules/clients/src/test/resources/jetty/rest-jetty.xml");
 
-        clientCfg.setQueryRemoveDelay(5);
+        clientCfg.setIdleQueryCursorTimeout(5000);
+        clientCfg.setQueryCheckFrequency(5000);
 
         cfg.setConnectorConfiguration(clientCfg);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a9eb9da0/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index 91dfa66..29ca521 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -1210,7 +1210,10 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
         params.put("arg1", "1000");
         params.put("arg2", "2000");
 
-        String ret = content(params);
+        String ret = null;
+
+        for (int i = 0; i < 10; ++i)
+            ret = content(params);
 
         assertNotNull(ret);
         assertTrue(!ret.isEmpty());
@@ -1223,7 +1226,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
 
         assertTrue(queryCursorFound());
 
-        U.sleep(12000);
+        U.sleep(10000);
 
         assertFalse(queryCursorFound());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a9eb9da0/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
index 237f4b1..bd849a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
@@ -61,8 +61,11 @@ public class ConnectorConfiguration {
     /** Default socket send and receive buffer size. */
     public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
 
-    /** Default delay for storing query cursor (10 minutes). */
-    private static final int DFLT_QRY_RMV_DELAY = 10 * 60;
+    /** Default REST idle timeout for query cursor. */
+    private static final long DFLT_IDLE_QRY_CUR_TIMEOUT = 10 * 60 * 1000;
+
+    /** Default REST check frequency for query cursor. */
+    private static final long DFLT_QRY_CHECK_FRQ_TIMEOUT = 60 * 1000;
 
     /** Jetty XML configuration path. */
     private String jettyPath;
@@ -88,8 +91,11 @@ public class ConnectorConfiguration {
     /** REST TCP receive buffer size. */
     private int rcvBufSize = DFLT_SOCK_BUF_SIZE;
 
-    /** REST delay for storing query cursor. */
-    private int qryRmvDelay = DFLT_QRY_RMV_DELAY;
+    /** REST idle timeout for query cursor. */
+    private long idleQryCurTimeout = DFLT_IDLE_QRY_CUR_TIMEOUT;
+
+    /** REST idle timeout for query cursor. */
+    private long qryCheckFrq = DFLT_QRY_CHECK_FRQ_TIMEOUT;
 
     /** REST TCP send queue limit. */
     private int sndQueueLimit;
@@ -154,7 +160,8 @@ public class ConnectorConfiguration {
         sslClientAuth = cfg.isSslClientAuth();
         sslCtxFactory = cfg.getSslContextFactory();
         sslEnabled = cfg.isSslEnabled();
-        qryRmvDelay = cfg.getQueryRemoveDelay();
+        idleQryCurTimeout = cfg.getIdleQueryCursorTimeout();
+        qryCheckFrq = cfg.getQueryCheckFrequency();
     }
 
     /**
@@ -556,18 +563,47 @@ public class ConnectorConfiguration {
     }
 
     /**
-     * Sets delay for removing query cursors that are not used.
+     * Sets idle query cursors timeout.
+     *
+     * @param idleQryCurTimeout Idle query cursors timeout in milliseconds.
+     * @see #getIdleQueryCursorTimeout()
+     */
+    public void setIdleQueryCursorTimeout(long idleQryCurTimeout) {
+        this.idleQryCurTimeout = idleQryCurTimeout;
+    }
+
+    /**
+     * Gets idle query cursors timeout in milliseconds.
+     * <p>
+     * This setting is used to reject open query cursors that is not used. If no fetch query request
+     * come within idle timeout, it will be removed on next check for old query cursors
+     * (see {@link #getQueryCheckFrequency()}).
      *
-     * @param qryRmvDelay Query remove delay in seconds.
+     * @return Idle query cursors timeout in milliseconds
      */
-    public void setQueryRemoveDelay(int qryRmvDelay) {
-        this.qryRmvDelay = qryRmvDelay;
+    public long getIdleQueryCursorTimeout() {
+        return idleQryCurTimeout;
     }
 
     /**
-     * Gets delay for removing query cursors that are not used.
+     * Sets query check frequency.
+     *
+     * @param qryCheckFrq Idle query check frequency in milliseconds.
+     * @see #getQueryCheckFrequency()
+     */
+    public void setQueryCheckFrequency(long qryCheckFrq) {
+        this.qryCheckFrq = qryCheckFrq;
+    }
+
+    /**
+     * Gets query cursors check frequency.
+     * This setting is used to reject open query cursors that is not used.
+     * <p>
+     * Scheduler tries with specified period to close queries' cursors that are overtime.
+     *
+     * @return Query check frequency in milliseconds.
      */
-    public int getQueryRemoveDelay() {
-        return qryRmvDelay;
+    public long getQueryCheckFrequency() {
+        return qryCheckFrq;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a9eb9da0/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
index 18a2ae7..ffac32c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
@@ -27,7 +27,6 @@ import org.apache.ignite.internal.processors.rest.handlers.*;
 import org.apache.ignite.internal.processors.rest.request.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
 import java.util.*;
@@ -50,22 +49,36 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
     private static final AtomicLong qryIdGen = new AtomicLong();
 
     /** Current queries cursors. */
-    private final static ConcurrentHashMap<Long, GridTuple3<QueryCursor, Iterator, Boolean>> qryCurs =
+    private final static ConcurrentHashMap<Long, GridTuple3<QueryCursor, Iterator, Long>> qryCurs =
         new ConcurrentHashMap<>();
 
-    /** Remove delay. */
-    private static int rmvDelay = 0;
-
-    /** Scheduler. */
-    private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1);
-
     /**
      * @param ctx Context.
      */
     public QueryCommandHandler(GridKernalContext ctx) {
         super(ctx);
 
-        rmvDelay = ctx.config().getConnectorConfiguration().getQueryRemoveDelay();
+        final long idleQryCurTimeout = ctx.config().getConnectorConfiguration().getIdleQueryCursorTimeout();
+
+        long qryCheckFrq = ctx.config().getConnectorConfiguration().getQueryCheckFrequency();
+
+        ctx.timeout().schedule(new Runnable() {
+            @Override public void run() {
+                long time = System.currentTimeMillis();
+
+                for (Map.Entry<Long, GridTuple3<QueryCursor, Iterator, Long>> e : qryCurs.entrySet()) {
+                    synchronized (e.getValue()) {
+                        long createTime = e.getValue().get3();
+
+                        if (createTime + idleQryCurTimeout > time) {
+                            e.getValue().get1().close();
+
+                            qryCurs.remove(e.getKey());
+                        }
+                    }
+                }
+            }
+        }, qryCheckFrq, qryCheckFrq);
     }
 
     /** {@inheritDoc} */
@@ -148,17 +161,20 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
 
                 Iterator cur = qryCur.iterator();
 
-                qryCurs.put(qryId, new GridTuple3<>(qryCur, cur, true));
+                GridTuple3<QueryCursor, Iterator, Long> val =
+                    new GridTuple3<>(qryCur, cur, System.currentTimeMillis());
 
-                scheduleRemove(qryId);
+                synchronized (val) {
+                    qryCurs.put(qryId, val);
 
-                CacheQueryResult res = createQueryResult(cur, req, qryId);
+                    CacheQueryResult res = createQueryResult(cur, req, qryId);
 
-                List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl<?>) qryCur).fieldsMeta();
+                    List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl<?>) qryCur).fieldsMeta();
 
-                res.setFieldsMetadata(convertMetadata(fieldsMeta));
+                    res.setFieldsMetadata(convertMetadata(fieldsMeta));
 
-                return new GridRestResponse(res);
+                    return new GridRestResponse(res);
+                }
             }
             catch (Exception e) {
                 qryCurs.remove(qryId);
@@ -200,15 +216,19 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
         /** {@inheritDoc} */
         @Override public GridRestResponse call() throws Exception {
             try {
-                QueryCursor cur = qryCurs.get(req.queryId()).get1();
+                GridTuple3<QueryCursor, Iterator, Long> val = qryCurs.get(req.queryId());
 
-                if (cur == null)
+                if (val == null)
                     return new GridRestResponse(GridRestResponse.STATUS_FAILED,
                         "Cannot find query [qryId=" + req.queryId() + "]");
 
-                cur.close();
+                synchronized (val) {
+                    QueryCursor cur = val.get1();
 
-                qryCurs.remove(req.queryId());
+                    cur.close();
+
+                    qryCurs.remove(req.queryId());
+                }
 
                 return new GridRestResponse(true);
             }
@@ -237,19 +257,21 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
         /** {@inheritDoc} */
         @Override public GridRestResponse call() throws Exception {
             try {
-                GridTuple3<QueryCursor, Iterator, Boolean> t = qryCurs.get(req.queryId());
+                GridTuple3<QueryCursor, Iterator, Long> t = qryCurs.get(req.queryId());
 
-                t.set3(true);
-
-                Iterator cur = t.get2();
-
-                if (cur == null)
+                if (t == null)
                     return new GridRestResponse(GridRestResponse.STATUS_FAILED,
                         "Cannot find query [qryId=" + req.queryId() + "]");
 
-                CacheQueryResult res = createQueryResult(cur, req, req.queryId());
+                synchronized (t) {
+                    t.set3(System.currentTimeMillis());
+
+                    Iterator cur = t.get2();
+
+                    CacheQueryResult res = createQueryResult(cur, req, req.queryId());
 
-                return new GridRestResponse(res);
+                    return new GridRestResponse(res);
+                }
             }
             catch (Exception e) {
                 qryCurs.remove(req.queryId());
@@ -284,27 +306,4 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
 
         return res;
     }
-
-    /**
-     * Schedule remove for query cursor.
-     *
-     * @param id Query id.
-     */
-    private static void scheduleRemove(final long id) {
-        SCHEDULER.schedule(new CAX() {
-            @Override public void applyx() throws IgniteCheckedException {
-                GridTuple3<QueryCursor, Iterator, Boolean> t = qryCurs.get(id);
-
-                if (t != null) {
-                    if (t.get3()) {
-                        t.set3(false);
-
-                        scheduleRemove(id);
-                    }
-                    else
-                        qryCurs.remove(id);
-                }
-            }
-        }, rmvDelay, TimeUnit.SECONDS);
-    }
 }