You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2015/11/10 10:48:37 UTC
[39/50] [abbrv] ignite git commit: IGNITE-1161 Close rest sql cursor
after delay. - Fixes #197.
IGNITE-1161 Close rest sql cursor after delay. - Fixes #197.
Signed-off-by: Andrey <an...@gridgain.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/621ecac3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/621ecac3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/621ecac3
Branch: refs/heads/ignite-801
Commit: 621ecac317f55ba467bdb16321bfe550f5d3319b
Parents: 7dfaa3b
Author: Andrey <an...@gridgain.com>
Authored: Mon Nov 9 10:30:56 2015 +0700
Committer: Andrey <an...@gridgain.com>
Committed: Mon Nov 9 10:30:56 2015 +0700
----------------------------------------------------------------------
.../rest/AbstractRestProcessorSelfTest.java | 5 +-
.../JettyRestProcessorAbstractSelfTest.java | 37 +++
.../configuration/ConnectorConfiguration.java | 61 ++++-
.../handlers/query/QueryCommandHandler.java | 258 +++++++++++++++----
4 files changed, 302 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/621ecac3/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
index bde9180..9a030a7 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
@@ -75,6 +75,9 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest {
clientCfg.setJettyPath("modules/clients/src/test/resources/jetty/rest-jetty.xml");
+ clientCfg.setIdleQueryCursorTimeout(5000);
+ clientCfg.setIdleQueryCursorCheckFrequency(5000);
+
cfg.setConnectorConfiguration(clientCfg);
TcpDiscoverySpi disco = new TcpDiscoverySpi();
@@ -99,4 +102,4 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest {
@Override protected <K, V> IgniteCache<K, V> jcache() {
return grid(0).cache(null);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/621ecac3/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index c413bbd..c9c4ced 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheSqlMetadata;
import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandler;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils;
@@ -1406,6 +1407,42 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
assertFalse(queryCursorFound());
}
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueryDelay() throws Exception {
+ String qry = "salary > ? and salary <= ?";
+
+ Map<String, String> params = new HashMap<>();
+ params.put("cmd", GridRestCommand.EXECUTE_SQL_QUERY.key());
+ params.put("type", "Person");
+ params.put("pageSize", "1");
+ params.put("cacheName", "person");
+ params.put("qry", URLEncoder.encode(qry));
+ params.put("arg1", "1000");
+ params.put("arg2", "2000");
+
+ String ret = null;
+
+ for (int i = 0; i < 10; ++i)
+ ret = content(params);
+
+ assertNotNull(ret);
+ assertTrue(!ret.isEmpty());
+
+ JSONObject json = JSONObject.fromObject(ret);
+
+ List items = (List)((Map)json.get("response")).get("items");
+
+ assertEquals(1, items.size());
+
+ assertTrue(queryCursorFound());
+
+ U.sleep(10000);
+
+ assertFalse(queryCursorFound());
+ }
+
protected abstract String signature() throws Exception;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/621ecac3/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
index 88d015c..1bfcbe4 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
@@ -59,6 +59,12 @@ public class ConnectorConfiguration {
/** Default socket send and receive buffer size. */
public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
+ /** Default REST idle timeout for query cursor. */
+ private static final long DFLT_IDLE_QRY_CUR_TIMEOUT = 10 * 60 * 1000;
+
+ /** Default REST check frequency for idle query cursor. */
+ private static final long DFLT_IDLE_QRY_CUR_CHECK_FRQ = 60 * 1000;
+
/** Jetty XML configuration path. */
private String jettyPath;
@@ -83,6 +89,12 @@ public class ConnectorConfiguration {
/** REST TCP receive buffer size. */
private int rcvBufSize = DFLT_SOCK_BUF_SIZE;
+ /** REST idle timeout for query cursor. */
+ private long idleQryCurTimeout = DFLT_IDLE_QRY_CUR_TIMEOUT;
+
+ /** REST idle check frequency for query cursor. */
+ private long idleQryCurCheckFreq = DFLT_IDLE_QRY_CUR_CHECK_FRQ;
+
/** REST TCP send queue limit. */
private int sndQueueLimit;
@@ -146,6 +158,8 @@ public class ConnectorConfiguration {
sslClientAuth = cfg.isSslClientAuth();
sslCtxFactory = cfg.getSslContextFactory();
sslEnabled = cfg.isSslEnabled();
+ idleQryCurTimeout = cfg.getIdleQueryCursorTimeout();
+ idleQryCurCheckFreq = cfg.getIdleQueryCursorCheckFrequency();
}
/**
@@ -545,4 +559,49 @@ public class ConnectorConfiguration {
public void setMessageInterceptor(ConnectorMessageInterceptor interceptor) {
msgInterceptor = interceptor;
}
-}
\ No newline at end of file
+
+ /**
+ * Sets idle query cursors timeout.
+ *
+ * @param idleQryCurTimeout Idle query cursors timeout in milliseconds.
+ * @see #getIdleQueryCursorTimeout()
+ */
+ public void setIdleQueryCursorTimeout(long idleQryCurTimeout) {
+ this.idleQryCurTimeout = idleQryCurTimeout;
+ }
+
+ /**
+ * Gets idle query cursors timeout in milliseconds.
+ * <p>
+ * This setting is used to reject open query cursors that is not used. If no fetch query request
+ * come within idle timeout, it will be removed on next check for old query cursors
+ * (see {@link #getIdleQueryCursorCheckFrequency()}).
+ *
+ * @return Idle query cursors timeout in milliseconds
+ */
+ public long getIdleQueryCursorTimeout() {
+ return idleQryCurTimeout;
+ }
+
+ /**
+ * Sets idle query cursor check frequency.
+ *
+ * @param idleQryCurCheckFreq Idle query check frequency in milliseconds.
+ * @see #getIdleQueryCursorCheckFrequency()
+ */
+ public void setIdleQueryCursorCheckFrequency(long idleQryCurCheckFreq) {
+ this.idleQryCurCheckFreq = idleQryCurCheckFreq;
+ }
+
+ /**
+ * Gets idle query cursors check frequency.
+ * This setting is used to reject open query cursors that is not used.
+ * <p>
+ * Scheduler tries with specified period to close queries' cursors that are overtime.
+ *
+ * @return Idle query cursor check frequency in milliseconds.
+ */
+ public long getIdleQueryCursorCheckFrequency() {
+ return idleQryCurCheckFreq;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/621ecac3/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
index f4ddd59..54cdd29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
@@ -23,9 +23,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.Query;
@@ -45,7 +47,6 @@ import org.apache.ignite.internal.processors.rest.request.RestQueryRequest;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.lang.IgniteBiTuple;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CLOSE_SQL_QUERY;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.EXECUTE_SCAN_QUERY;
@@ -68,25 +69,53 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
private static final AtomicLong qryIdGen = new AtomicLong();
/** Current queries cursors. */
- private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs = new ConcurrentHashMap<>();
/**
* @param ctx Context.
*/
public QueryCommandHandler(GridKernalContext ctx) {
super(ctx);
+
+ final long idleQryCurTimeout = ctx.config().getConnectorConfiguration().getIdleQueryCursorTimeout();
+
+ long idleQryCurCheckFreq = ctx.config().getConnectorConfiguration().getIdleQueryCursorCheckFrequency();
+
+ ctx.timeout().schedule(new Runnable() {
+ @Override public void run() {
+ long time = U.currentTimeMillis();
+
+ for (Map.Entry<Long, QueryCursorIterator> e : qryCurs.entrySet()) {
+ QueryCursorIterator qryCurIt = e.getValue();
+
+ long createTime = qryCurIt.timestamp();
+
+ if (createTime + idleQryCurTimeout > time && qryCurIt.tryLock()) {
+ try {
+ qryCurIt.timestamp(-1);
+
+ qryCurs.remove(e.getKey(), qryCurIt);
+
+ qryCurIt.close();
+ }
+ finally {
+ qryCurIt.unlock();
+ }
+ }
+ }
+ }
+ }, idleQryCurCheckFreq, idleQryCurCheckFreq);
}
/**
- * @param qryCurs Query cursors.
* @param cur Current cursor.
* @param req Sql request.
* @param qryId Query id.
+ * @param qryCurs Query cursors.
* @return Query result with items.
*/
private static CacheQueryResult createQueryResult(
- ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs,
- Iterator cur, RestQueryRequest req, Long qryId) {
+ Iterator cur, RestQueryRequest req, Long qryId, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
CacheQueryResult res = new CacheQueryResult();
List<Object> items = new ArrayList<>();
@@ -101,12 +130,39 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
res.setQueryId(qryId);
if (!cur.hasNext())
- qryCurs.remove(qryId);
+ removeQueryCursor(qryId, qryCurs);
return res;
}
/**
+ * Removes query cursor.
+ *
+ * @param qryId Query id.
+ * @param qryCurs Query cursors.
+ */
+ private static void removeQueryCursor(Long qryId, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
+ QueryCursorIterator qryCurIt = qryCurs.get(qryId);
+
+ if (qryCurIt == null)
+ return;
+
+ qryCurIt.lock();
+
+ try {
+ if (qryCurIt.timestamp() == -1)
+ return;
+
+ qryCurIt.close();
+
+ qryCurs.remove(qryId);
+ }
+ finally {
+ qryCurIt.unlock();
+ }
+ }
+
+ /**
* Creates class instance.
*
* @param cls Target class.
@@ -169,7 +225,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
case FETCH_SQL_QUERY: {
return ctx.closure().callLocalSafe(
- new FetchQueryCallable(ctx, (RestQueryRequest)req, qryCurs), false);
+ new FetchQueryCallable((RestQueryRequest)req, qryCurs), false);
}
case CLOSE_SQL_QUERY: {
@@ -191,16 +247,16 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** Execute query request. */
private RestQueryRequest req;
- /** Queries cursors. */
- private ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs;
+ /** Current queries cursors. */
+ private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
/**
* @param ctx Kernal context.
* @param req Execute query request.
- * @param qryCurs Queries cursors.
+ * @param qryCurs Query cursors.
*/
public ExecuteQueryCallable(GridKernalContext ctx, RestQueryRequest req,
- ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) {
+ ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
this.ctx = ctx;
this.req = req;
this.qryCurs = qryCurs;
@@ -208,7 +264,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** {@inheritDoc} */
@Override public GridRestResponse call() throws Exception {
- long qryId = qryIdGen.getAndIncrement();
+ final long qryId = qryIdGen.getAndIncrement();
try {
Query qry;
@@ -248,38 +304,51 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
return new GridRestResponse(GridRestResponse.STATUS_FAILED,
"Failed to find cache with name: " + req.cacheName());
- QueryCursor qryCur = cache.query(qry);
+ final QueryCursor qryCur = cache.query(qry);
Iterator cur = qryCur.iterator();
- qryCurs.put(qryId, new IgniteBiTuple<>(qryCur, cur));
+ QueryCursorIterator qryCurIt = new QueryCursorIterator(qryCur, cur);
- CacheQueryResult res = createQueryResult(qryCurs, cur, req, qryId);
+ qryCurIt.lock();
- switch (req.queryType()) {
- case SQL:
- case SQL_FIELDS:
- List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta();
+ try {
+ qryCurs.put(qryId, qryCurIt);
- res.setFieldsMetadata(convertMetadata(fieldsMeta));
+ CacheQueryResult res = createQueryResult(cur, req, qryId, qryCurs);
- break;
- case SCAN:
- CacheQueryFieldsMetaResult keyField = new CacheQueryFieldsMetaResult();
- keyField.setFieldName("key");
+ switch (req.queryType()) {
+ case SQL:
+ case SQL_FIELDS:
+ List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl)qryCur).fieldsMeta();
- CacheQueryFieldsMetaResult valField = new CacheQueryFieldsMetaResult();
- valField.setFieldName("value");
+ res.setFieldsMetadata(convertMetadata(fieldsMeta));
- res.setFieldsMetadata(U.sealList(keyField, valField));
+ break;
+ case SCAN:
+ CacheQueryFieldsMetaResult keyField = new CacheQueryFieldsMetaResult();
+ keyField.setFieldName("key");
- break;
- }
+ CacheQueryFieldsMetaResult valField = new CacheQueryFieldsMetaResult();
+ valField.setFieldName("value");
+
+ res.setFieldsMetadata(U.sealList(keyField, valField));
- return new GridRestResponse(res);
+ break;
+ }
+
+ List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl<?>)qryCur).fieldsMeta();
+
+ res.setFieldsMetadata(convertMetadata(fieldsMeta));
+
+ return new GridRestResponse(res);
+ }
+ finally {
+ qryCurIt.unlock();
+ }
}
catch (Exception e) {
- qryCurs.remove(qryId);
+ removeQueryCursor(qryId, qryCurs);
return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
}
@@ -305,17 +374,17 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
* Close query callable.
*/
private static class CloseQueryCallable implements Callable<GridRestResponse> {
- /** Queries cursors. */
- private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs;
+ /** Current queries cursors. */
+ private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
+
/** Execute query request. */
private RestQueryRequest req;
/**
* @param req Execute query request.
- * @param qryCurs Queries cursors.
+ * @param qryCurs Query cursors.
*/
- public CloseQueryCallable(RestQueryRequest req,
- ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) {
+ public CloseQueryCallable(RestQueryRequest req, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
this.req = req;
this.qryCurs = qryCurs;
}
@@ -323,20 +392,29 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** {@inheritDoc} */
@Override public GridRestResponse call() throws Exception {
try {
- QueryCursor cur = qryCurs.get(req.queryId()).get1();
+ QueryCursorIterator qryCurIt = qryCurs.get(req.queryId());
- if (cur == null)
- return new GridRestResponse(GridRestResponse.STATUS_FAILED,
- "Failed to find query with ID: " + req.queryId());
+ if (qryCurIt == null)
+ return new GridRestResponse(true);
- cur.close();
+ qryCurIt.lock();
- qryCurs.remove(req.queryId());
+ try {
+ if (qryCurIt.timestamp() == -1)
+ return new GridRestResponse(true);
+
+ qryCurIt.close();
+
+ qryCurs.remove(req.queryId());
+ }
+ finally {
+ qryCurIt.unlock();
+ }
return new GridRestResponse(true);
}
catch (Exception e) {
- qryCurs.remove(req.queryId());
+ removeQueryCursor(req.queryId(), qryCurs);
return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
}
@@ -347,21 +425,17 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
* Fetch query callable.
*/
private static class FetchQueryCallable implements Callable<GridRestResponse> {
- /** Queries cursors. */
- private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs;
- /** Grid kernal context. */
- private final GridKernalContext ctx;
+ /** Current queries cursors. */
+ private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
+
/** Execute query request. */
private RestQueryRequest req;
/**
- * @param ctx Grid kernal context.
* @param req Execute query request.
- * @param qryCurs Queries cursors.
+ * @param qryCurs Query cursors.
*/
- public FetchQueryCallable(GridKernalContext ctx, RestQueryRequest req,
- ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) {
- this.ctx = ctx;
+ public FetchQueryCallable(RestQueryRequest req, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
this.req = req;
this.qryCurs = qryCurs;
}
@@ -369,21 +443,91 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter {
/** {@inheritDoc} */
@Override public GridRestResponse call() throws Exception {
try {
- Iterator cur = qryCurs.get(req.queryId()).get2();
+ QueryCursorIterator qryCurIt = qryCurs.get(req.queryId());
- if (cur == null)
+ if (qryCurIt == null)
return new GridRestResponse(GridRestResponse.STATUS_FAILED,
"Failed to find query with ID: " + req.queryId());
- CacheQueryResult res = createQueryResult(qryCurs, cur, req, req.queryId());
+ qryCurIt.lock();
+
+ try {
+ if (qryCurIt.timestamp() == -1)
+ return new GridRestResponse(GridRestResponse.STATUS_FAILED,
+ "Query is closed by timeout. Restart query with ID: " + req.queryId());
+
+ qryCurIt.timestamp(U.currentTimeMillis());
- return new GridRestResponse(res);
+ Iterator cur = qryCurIt.iterator();
+
+ CacheQueryResult res = createQueryResult(cur, req, req.queryId(), qryCurs);
+
+ return new GridRestResponse(res);
+ }
+ finally {
+ qryCurIt.unlock();
+ }
}
catch (Exception e) {
- qryCurs.remove(req.queryId());
+ removeQueryCursor(req.queryId(), qryCurs);
return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
}
}
}
+
+ /**
+ * Query cursor iterator.
+ */
+ private static class QueryCursorIterator extends ReentrantLock {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Query cursor. */
+ private QueryCursor cur;
+
+ /** Query iterator. */
+ private Iterator it;
+
+ /** Last timestamp. */
+ private volatile long ts;
+
+ /**
+ * @param cur Query cursor.
+ * @param it Query iterator.
+ */
+ public QueryCursorIterator(QueryCursor cur, Iterator it) {
+ this.cur = cur;
+ this.it = it;
+ ts = U.currentTimeMillis();
+ }
+
+ /**
+ * @return Query iterator.
+ */
+ public Iterator iterator() {
+ return it;
+ }
+
+ /**
+ * @return Timestamp.
+ */
+ public long timestamp() {
+ return ts;
+ }
+
+ /**
+ * @param time Current time or -1 if cursor is closed.
+ */
+ public void timestamp(long time) {
+ ts = time;
+ }
+
+ /**
+ * Close query cursor.
+ */
+ public void close() {
+ cur.close();
+ }
+ }
}