You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2015/07/28 16:54:34 UTC
incubator-ignite git commit: #ignite-1161: replace scheduler with
ignite scheduler.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1161 a1601d791 -> a9eb9da03
#ignite-1161: replace scheduler with ignite scheduler.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a9eb9da0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a9eb9da0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a9eb9da0
Branch: refs/heads/ignite-1161
Commit: a9eb9da039cf6fa4c703c8a337145b8bf091723f
Parents: a1601d7
Author: ivasilinets <iv...@gridgain.com>
Authored: Tue Jul 28 17:54:23 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Tue Jul 28 17:54:23 2015 +0300
----------------------------------------------------------------------
.../rest/AbstractRestProcessorSelfTest.java | 3 +-
.../JettyRestProcessorAbstractSelfTest.java | 7 +-
.../configuration/ConnectorConfiguration.java | 60 +++++++++---
.../handlers/query/QueryCommandHandler.java | 99 ++++++++++----------
4 files changed, 104 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a9eb9da0/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
index b5b430c..9b26bd8 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
@@ -73,7 +73,8 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest {
clientCfg.setJettyPath("modules/clients/src/test/resources/jetty/rest-jetty.xml");
- clientCfg.setQueryRemoveDelay(5);
+ clientCfg.setIdleQueryCursorTimeout(5000);
+ clientCfg.setQueryCheckFrequency(5000);
cfg.setConnectorConfiguration(clientCfg);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a9eb9da0/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index 91dfa66..29ca521 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -1210,7 +1210,10 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
params.put("arg1", "1000");
params.put("arg2", "2000");
- String ret = content(params);
+ String ret = null;
+
+ for (int i = 0; i < 10; ++i)
+ ret = content(params);
assertNotNull(ret);
assertTrue(!ret.isEmpty());
@@ -1223,7 +1226,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
assertTrue(queryCursorFound());
- U.sleep(12000);
+ U.sleep(10000);
assertFalse(queryCursorFound());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a9eb9da0/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
index 237f4b1..bd849a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
@@ -61,8 +61,11 @@ public class ConnectorConfiguration {
/** Default socket send and receive buffer size. */
public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
- /** Default delay for storing query cursor (10 minutes). */
- private static final int DFLT_QRY_RMV_DELAY = 10 * 60;
+ /** Default REST idle timeout for query cursor. */
+ private static final long DFLT_IDLE_QRY_CUR_TIMEOUT = 10 * 60 * 1000;
+
+ /** Default REST check frequency for query cursor. */
+ private static final long DFLT_QRY_CHECK_FRQ_TIMEOUT = 60 * 1000;
/** Jetty XML configuration path. */
private String jettyPath;
@@ -88,8 +91,11 @@ public class ConnectorConfiguration {
/** REST TCP receive buffer size. */
private int rcvBufSize = DFLT_SOCK_BUF_SIZE;
- /** REST delay for storing query cursor. */
- private int qryRmvDelay = DFLT_QRY_RMV_DELAY;
+ /** REST idle timeout for query cursor. */
+ private long idleQryCurTimeout = DFLT_IDLE_QRY_CUR_TIMEOUT;
+
+ /** REST idle timeout for query cursor. */
+ private long qryCheckFrq = DFLT_QRY_CHECK_FRQ_TIMEOUT;
/** REST TCP send queue limit. */
private int sndQueueLimit;
@@ -154,7 +160,8 @@ public class ConnectorConfiguration {
sslClientAuth = cfg.isSslClientAuth();
sslCtxFactory = cfg.getSslContextFactory();
sslEnabled = cfg.isSslEnabled();
- qryRmvDelay = cfg.getQueryRemoveDelay();
+ idleQryCurTimeout = cfg.getIdleQueryCursorTimeout();
+ qryCheckFrq = cfg.getQueryCheckFrequency();
}
/**
@@ -556,18 +563,47 @@ public class ConnectorConfiguration {
}
/**
- * Sets delay for removing query cursors that are not used.
+ * Sets idle query cursors timeout.
+ *
+ * @param idleQryCurTimeout Idle query cursors timeout in milliseconds.
+ * @see #getIdleQueryCursorTimeout()
+ */
+ public void setIdleQueryCursorTimeout(long idleQryCurTimeout) {
+ this.idleQryCurTimeout = idleQryCurTimeout;
+ }
+
+ /**
+ * Gets idle query cursors timeout in milliseconds.
+ * <p>
+ * This setting is used to reject open query cursors that is not used. If no fetch query request
+ * come within idle timeout, it will be removed on next check for old query cursors
+ * (see {@link #getQueryCheckFrequency()}).
*
- * @param qryRmvDelay Query remove delay in seconds.
+ * @return Idle query cursors timeout in milliseconds
*/
- public void setQueryRemoveDelay(int qryRmvDelay) {
- this.qryRmvDelay = qryRmvDelay;
+ public long getIdleQueryCursorTimeout() {
+ return idleQryCurTimeout;
}
/**
- * Gets delay for removing query cursors that are not used.
+ * Sets query check frequency.
+ *
+ * @param qryCheckFrq Idle query check frequency in milliseconds.
+ * @see #getQueryCheckFrequency()
+ */
+ public void setQueryCheckFrequency(long qryCheckFrq) {
+ this.qryCheckFrq = qryCheckFrq;
+ }
+
+ /**
+ * Gets query cursors check frequency.
+ * This setting is used to reject open query cursors that is not used.
+ * <p>
+ * Scheduler tries with specified period to close queries' cursors that are overtime.
+ *
+ * @return Query check frequency in milliseconds.
*/
- public int getQueryRemoveDelay() {
- return qryRmvDelay;
+ public long getQueryCheckFrequency() {
+ return qryCheckFrq;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a9eb9da0/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
index 18a2ae7..ffac32c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
@@ -27,7 +27,6 @@ import org.apache.ignite.internal.processors.rest.handlers.*;
import org.apache.ignite.internal.processors.rest.request.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import java.util.*;
@@ -50,22 +49,36 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
private static final AtomicLong qryIdGen = new AtomicLong();
/** Current queries cursors. */
- private final static ConcurrentHashMap<Long, GridTuple3<QueryCursor, Iterator, Boolean>> qryCurs =
+ private final static ConcurrentHashMap<Long, GridTuple3<QueryCursor, Iterator, Long>> qryCurs =
new ConcurrentHashMap<>();
- /** Remove delay. */
- private static int rmvDelay = 0;
-
- /** Scheduler. */
- private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1);
-
/**
* @param ctx Context.
*/
public QueryCommandHandler(GridKernalContext ctx) {
super(ctx);
- rmvDelay = ctx.config().getConnectorConfiguration().getQueryRemoveDelay();
+ final long idleQryCurTimeout = ctx.config().getConnectorConfiguration().getIdleQueryCursorTimeout();
+
+ long qryCheckFrq = ctx.config().getConnectorConfiguration().getQueryCheckFrequency();
+
+ ctx.timeout().schedule(new Runnable() {
+ @Override public void run() {
+ long time = System.currentTimeMillis();
+
+ for (Map.Entry<Long, GridTuple3<QueryCursor, Iterator, Long>> e : qryCurs.entrySet()) {
+ synchronized (e.getValue()) {
+ long createTime = e.getValue().get3();
+
+ if (createTime + idleQryCurTimeout > time) {
+ e.getValue().get1().close();
+
+ qryCurs.remove(e.getKey());
+ }
+ }
+ }
+ }
+ }, qryCheckFrq, qryCheckFrq);
}
/** {@inheritDoc} */
@@ -148,17 +161,20 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
Iterator cur = qryCur.iterator();
- qryCurs.put(qryId, new GridTuple3<>(qryCur, cur, true));
+ GridTuple3<QueryCursor, Iterator, Long> val =
+ new GridTuple3<>(qryCur, cur, System.currentTimeMillis());
- scheduleRemove(qryId);
+ synchronized (val) {
+ qryCurs.put(qryId, val);
- CacheQueryResult res = createQueryResult(cur, req, qryId);
+ CacheQueryResult res = createQueryResult(cur, req, qryId);
- List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl<?>) qryCur).fieldsMeta();
+ List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl<?>) qryCur).fieldsMeta();
- res.setFieldsMetadata(convertMetadata(fieldsMeta));
+ res.setFieldsMetadata(convertMetadata(fieldsMeta));
- return new GridRestResponse(res);
+ return new GridRestResponse(res);
+ }
}
catch (Exception e) {
qryCurs.remove(qryId);
@@ -200,15 +216,19 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** {@inheritDoc} */
@Override public GridRestResponse call() throws Exception {
try {
- QueryCursor cur = qryCurs.get(req.queryId()).get1();
+ GridTuple3<QueryCursor, Iterator, Long> val = qryCurs.get(req.queryId());
- if (cur == null)
+ if (val == null)
return new GridRestResponse(GridRestResponse.STATUS_FAILED,
"Cannot find query [qryId=" + req.queryId() + "]");
- cur.close();
+ synchronized (val) {
+ QueryCursor cur = val.get1();
- qryCurs.remove(req.queryId());
+ cur.close();
+
+ qryCurs.remove(req.queryId());
+ }
return new GridRestResponse(true);
}
@@ -237,19 +257,21 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** {@inheritDoc} */
@Override public GridRestResponse call() throws Exception {
try {
- GridTuple3<QueryCursor, Iterator, Boolean> t = qryCurs.get(req.queryId());
+ GridTuple3<QueryCursor, Iterator, Long> t = qryCurs.get(req.queryId());
- t.set3(true);
-
- Iterator cur = t.get2();
-
- if (cur == null)
+ if (t == null)
return new GridRestResponse(GridRestResponse.STATUS_FAILED,
"Cannot find query [qryId=" + req.queryId() + "]");
- CacheQueryResult res = createQueryResult(cur, req, req.queryId());
+ synchronized (t) {
+ t.set3(System.currentTimeMillis());
+
+ Iterator cur = t.get2();
+
+ CacheQueryResult res = createQueryResult(cur, req, req.queryId());
- return new GridRestResponse(res);
+ return new GridRestResponse(res);
+ }
}
catch (Exception e) {
qryCurs.remove(req.queryId());
@@ -284,27 +306,4 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
return res;
}
-
- /**
- * Schedule remove for query cursor.
- *
- * @param id Query id.
- */
- private static void scheduleRemove(final long id) {
- SCHEDULER.schedule(new CAX() {
- @Override public void applyx() throws IgniteCheckedException {
- GridTuple3<QueryCursor, Iterator, Boolean> t = qryCurs.get(id);
-
- if (t != null) {
- if (t.get3()) {
- t.set3(false);
-
- scheduleRemove(id);
- }
- else
- qryCurs.remove(id);
- }
- }
- }, rmvDelay, TimeUnit.SECONDS);
- }
}