You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/09/09 15:06:35 UTC
ignite git commit: ignite-1127 Query with result size more then one
page doesn't increase Query executions count metric - Fixes #23.
Repository: ignite
Updated Branches:
refs/heads/master 6771638a5 -> 2311de477
ignite-1127 Query with result size more then one page doesn't increase Query executions count metric - Fixes #23.
Signed-off-by: S.Vladykin <sv...@gridgain.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2311de47
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2311de47
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2311de47
Branch: refs/heads/master
Commit: 2311de4777bff3a6f97904e547cc028d6ea1e51f
Parents: 6771638
Author: agura <ag...@gridgain.com>
Authored: Wed Sep 9 16:05:56 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Wed Sep 9 16:05:56 2015 +0300
----------------------------------------------------------------------
.../processors/cache/IgniteCacheProxy.java | 27 +-
.../cache/query/GridCacheLocalQueryFuture.java | 5 +-
.../cache/query/GridCacheQueryAdapter.java | 43 +--
.../query/GridCacheQueryFutureAdapter.java | 9 +-
.../cache/query/GridCacheQueryManager.java | 11 +-
.../query/GridCacheQueryMetricsAdapter.java | 125 +++++----
.../processors/query/GridQueryProcessor.java | 50 ++--
.../CacheAbstractQueryMetricsSelfTest.java | 279 +++++++++----------
8 files changed, 284 insertions(+), 265 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 7c88b98..ce0cdd7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -70,6 +70,7 @@ import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.GridEmptyIterator;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -439,7 +440,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
* @return Cursor.
*/
@SuppressWarnings("unchecked")
- private QueryCursor<Cache.Entry<K,V>> query(Query filter, @Nullable ClusterGroup grp) {
+ private QueryCursor<Cache.Entry<K,V>> query(final Query filter, @Nullable ClusterGroup grp)
+ throws IgniteCheckedException {
final CacheQuery<Map.Entry<K,V>> qry;
final CacheQueryFuture<Map.Entry<K,V>> fut;
@@ -454,7 +456,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
if (grp != null)
qry.projection(grp);
- fut = qry.execute();
+ fut = ctx.kernalContext().query().executeQuery(ctx,
+ new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
+ @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException {
+ return qry.execute();
+ }
+ }, false);
}
else if (filter instanceof TextQuery) {
TextQuery p = (TextQuery)filter;
@@ -464,7 +471,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
if (grp != null)
qry.projection(grp);
- fut = qry.execute();
+ fut = ctx.kernalContext().query().executeQuery(ctx,
+ new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
+ @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException {
+ return qry.execute();
+ }
+ }, false);
}
else if (filter instanceof SpiQuery) {
qry = ctx.queries().createSpiQuery(isKeepPortable);
@@ -472,7 +484,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
if (grp != null)
qry.projection(grp);
- fut = qry.execute(((SpiQuery)filter).getArgs());
+ fut = ctx.kernalContext().query().executeQuery(ctx,
+ new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
+ @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException {
+ return qry.execute(((SpiQuery)filter).getArgs());
+ }
+ }, false);
}
else {
if (filter instanceof SqlFieldsQuery)
@@ -619,7 +636,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
catch (Exception e) {
if (e instanceof CacheException)
- throw e;
+ throw (CacheException)e;
throw new CacheException(e);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
index 91fc194..46af18a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
@@ -46,7 +46,7 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap
protected GridCacheLocalQueryFuture(GridCacheContext<K, V> ctx, GridCacheQueryBean qry) {
super(ctx, qry, true);
- run = new LocalQueryRunnable<>();
+ run = new LocalQueryRunnable();
}
/**
@@ -78,7 +78,7 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap
}
/** */
- private class LocalQueryRunnable<K, V, R> implements GridPlainRunnable {
+ private class LocalQueryRunnable implements GridPlainRunnable {
/** {@inheritDoc} */
@Override public void run() {
try {
@@ -101,7 +101,6 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap
* @return Query info.
* @throws IgniteCheckedException In case of error.
*/
- @SuppressWarnings({"unchecked"})
private GridCacheQueryInfo localQueryInfo() throws IgniteCheckedException {
GridCacheQueryBean qry = query();
http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index a016037..3ac5746 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -371,6 +371,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/**
* @return Key-value filter.
*/
+ @SuppressWarnings("unchecked")
@Nullable public <K, V> IgniteBiPredicate<K, V> scanFilter() {
return (IgniteBiPredicate<K, V>)filter;
}
@@ -396,8 +397,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
* @param startTime Start time.
* @param duration Duration.
*/
- public void onExecuted(Object res, Throwable err, long startTime, long duration) {
- GridQueryProcessor.onExecuted(cctx, metrics, res, err, startTime, duration, log);
+ public void onCompleted(Object res, Throwable err, long startTime, long duration) {
+ GridQueryProcessor.onCompleted(cctx, res, err, startTime, duration, log);
}
/** {@inheritDoc} */
@@ -431,7 +432,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
* @param args Arguments.
* @return Future.
*/
- @SuppressWarnings("IfMayBeConditional")
+ @SuppressWarnings({"IfMayBeConditional", "unchecked"})
private <R> CacheQueryFuture<R> execute(@Nullable IgniteReducer<T, R> rmtReducer,
@Nullable IgniteClosure<T, R> rmtTransform, @Nullable Object... args) {
Collection<ClusterNode> nodes;
@@ -440,13 +441,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
nodes = nodes();
}
catch (IgniteCheckedException e) {
- return queryErrorFuture(cctx, e, log);
+ return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e);
}
cctx.checkSecurity(SecurityPermission.CACHE_READ);
if (nodes.isEmpty())
- return queryErrorFuture(cctx, new ClusterGroupEmptyCheckedException(), log);
+ return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new ClusterGroupEmptyCheckedException());
if (log.isDebugEnabled())
log.debug("Executing query [query=" + this + ", nodes=" + nodes + ']');
@@ -457,7 +458,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
cctx.deploy().registerClasses(args);
}
catch (IgniteCheckedException e) {
- return queryErrorFuture(cctx, e, log);
+ return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e);
}
}
@@ -488,6 +489,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
private Collection<ClusterNode> nodes() throws IgniteCheckedException {
CacheMode cacheMode = cctx.config().getCacheMode();
+ Integer part = partition();
+
switch (cacheMode) {
case LOCAL:
if (prj != null)
@@ -495,21 +498,21 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
"(only local node will be queried): " + this);
if (type == SCAN && cctx.config().getCacheMode() == LOCAL &&
- partition() != null && partition() >= cctx.affinity().partitions())
- throw new IgniteCheckedException("Invalid partition number: " + partition());
+ part != null && part >= cctx.affinity().partitions())
+ throw new IgniteCheckedException("Invalid partition number: " + part);
return Collections.singletonList(cctx.localNode());
case REPLICATED:
- if (prj != null || partition() != null)
- return nodes(cctx, prj, partition());
+ if (prj != null || part != null)
+ return nodes(cctx, prj, part);
return cctx.affinityNode() ?
Collections.singletonList(cctx.localNode()) :
- Collections.singletonList(F.rand(nodes(cctx, null, partition())));
+ Collections.singletonList(F.rand(nodes(cctx, null, null)));
case PARTITIONED:
- return nodes(cctx, prj, partition());
+ return nodes(cctx, prj, part);
default:
throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode);
@@ -537,7 +540,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
return F.view(affNodes, new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode n) {
-
return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
(prj == null || prj.node(n.id()) != null) &&
(part == null || owners.contains(n));
@@ -545,21 +547,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
});
}
- /**
- * @param cctx Cache context.
- * @param e Exception.
- * @param log Logger.
- */
- private static <T> GridCacheQueryErrorFuture<T> queryErrorFuture(GridCacheContext<?, ?> cctx,
- Exception e, IgniteLogger log) {
-
- GridCacheQueryMetricsAdapter metrics = (GridCacheQueryMetricsAdapter)cctx.queries().metrics();
-
- GridQueryProcessor.onExecuted(cctx, metrics, null, e, 0, 0, log);
-
- return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e);
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheQueryAdapter.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index 9a83ce9..ad9ee39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -155,7 +155,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
@Override public boolean onDone(Collection<R> res, Throwable err) {
cctx.time().removeTimeoutObject(this);
- qry.query().onExecuted(res, err, startTime(), duration());
+ qry.query().onCompleted(res, err, startTime(), duration());
return super.onDone(res, err);
}
@@ -413,11 +413,6 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
}
}
}
- catch (Error e) {
- onPageError(nodeId, e);
-
- throw e;
- }
catch (Throwable e) {
onPageError(nodeId, e);
@@ -446,6 +441,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
* @param col Collection.
* @return Collection with masked {@code null} values.
*/
+ @SuppressWarnings("unchecked")
private Collection<Object> maskNulls(Collection<Object> col) {
assert col != null;
@@ -460,6 +456,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
* @param col Collection.
* @return Collection with unmasked {@code null} values.
*/
+ @SuppressWarnings("unchecked")
private Collection<Object> unmaskNulls(Collection<Object> col) {
assert col != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 2041464..1d934d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1875,11 +1875,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
+ * @param fail {@code true} if execution failed.
+ */
+ public void onExecuted(boolean fail) {
+ metrics.onQueryExecute(fail);
+ }
+
+ /**
* @param duration Execution duration.
* @param fail {@code true} if execution failed.
*/
- public void onMetricsUpdate(long duration, boolean fail) {
- metrics.onQueryExecute(duration, fail);
+ public void onCompleted(long duration, boolean fail) {
+ metrics.onQueryCompleted(duration, fail);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
index fe219a9..1928ea5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
@@ -22,7 +22,9 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.apache.ignite.cache.query.QueryMetrics;
+import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jsr166.LongAdder8;
/**
* Adapter for {@link QueryMetrics}.
@@ -32,79 +34,97 @@ public class GridCacheQueryMetricsAdapter implements QueryMetrics, Externalizabl
private static final long serialVersionUID = 0L;
/** Minimum time of execution. */
- private volatile long minTime;
+ private final GridAtomicLong minTime = new GridAtomicLong();
/** Maximum time of execution. */
- private volatile long maxTime;
+ private final GridAtomicLong maxTime = new GridAtomicLong();
- /** Average time of execution. */
- private volatile double avgTime;
+ /** Sum of execution time for all completed queries. */
+ private final LongAdder8 sumTime = new LongAdder8();
- /** Number of hits. */
- private volatile int execs;
+ /** Average time of execution.
+ * If doesn't equal zero then this metrics set is copy from remote node and doesn't actually update.
+ */
+ private double avgTime;
- /** Number of fails. */
- private volatile int fails;
+ /** Number of executions. */
+ private final LongAdder8 execs = new LongAdder8();
- /** Whether query was executed at least once. */
- private boolean executed;
+ /** Number of completed executions. */
+ private final LongAdder8 completed = new LongAdder8();
- /** Mutex. */
- private final Object mux = new Object();
+ /** Number of fails. */
+ private final LongAdder8 fails = new LongAdder8();
/** {@inheritDoc} */
@Override public long minimumTime() {
- return minTime;
+ return minTime.get();
}
/** {@inheritDoc} */
@Override public long maximumTime() {
- return maxTime;
+ return maxTime.get();
}
/** {@inheritDoc} */
@Override public double averageTime() {
- return avgTime;
+ if (avgTime > 0)
+ return avgTime;
+ else {
+ long val = completed.sum();
+
+ return val > 0 ? sumTime.sum() / val : 0;
+ }
}
/** {@inheritDoc} */
@Override public int executions() {
- return execs;
+ return execs.intValue();
+ }
+
+ /**
+ * Gets total number of completed executions of query.
+ * This value is actual only for local node.
+ *
+ * @return Number of completed executions.
+ */
+ public int completedExecutions() {
+ return completed.intValue();
}
/** {@inheritDoc} */
@Override public int fails() {
- return fails;
+ return fails.intValue();
}
/**
* Callback for query execution.
*
- * @param duration Duration of queue execution.
* @param fail {@code True} query executed unsuccessfully {@code false} otherwise.
*/
- public void onQueryExecute(long duration, boolean fail) {
- synchronized (mux) {
- if (!executed) {
- minTime = duration;
- maxTime = duration;
-
- executed = true;
- }
- else {
- if (minTime > duration)
- minTime = duration;
+ public void onQueryExecute(boolean fail) {
+ execs.increment();
- if (maxTime < duration)
- maxTime = duration;
- }
+ if (fail)
+ fails.increment();
+ }
- execs++;
+ /**
+ * Callback for completion of query execution.
+ *
+ * @param duration Duration of queue execution.
+ * @param fail {@code True} query executed unsuccessfully {@code false} otherwise.
+ */
+ public void onQueryCompleted(long duration, boolean fail) {
+ minTime.setIfLess(duration);
+ maxTime.setIfGreater(duration);
- if (fail)
- fails++;
+ if (fail)
+ fails.increment();
+ else {
+ completed.increment();
- avgTime = (avgTime * (execs - 1) + duration) / execs;
+ sumTime.add(duration);
}
}
@@ -116,33 +136,34 @@ public class GridCacheQueryMetricsAdapter implements QueryMetrics, Externalizabl
public GridCacheQueryMetricsAdapter copy() {
GridCacheQueryMetricsAdapter m = new GridCacheQueryMetricsAdapter();
- synchronized (mux) {
- m.fails = fails;
- m.minTime = minTime;
- m.maxTime = maxTime;
- m.execs = execs;
- m.avgTime = avgTime;
- }
+ // Not synchronized because accuracy isn't critical.
+ m.fails.add(fails.sum());
+ m.minTime.set(minTime.get());
+ m.maxTime.set(maxTime.get());
+ m.execs.add(execs.sum());
+ m.completed.add(completed.sum());
+ m.sumTime.add(sumTime.sum());
+ m.avgTime = avgTime;
return m;
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeLong(minTime);
- out.writeLong(maxTime);
- out.writeDouble(avgTime);
- out.writeInt(execs);
- out.writeInt(fails);
+ out.writeLong(minTime.get());
+ out.writeLong(maxTime.get());
+ out.writeDouble(averageTime());
+ out.writeInt(execs.intValue());
+ out.writeInt(fails.intValue());
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- minTime = in.readLong();
- maxTime = in.readLong();
+ minTime.set(in.readLong());
+ maxTime.set(in.readLong());
avgTime = in.readDouble();
- execs = in.readInt();
- fails = in.readInt();
+ execs.add(in.readInt());
+ fails.add(in.readInt());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 7370996..84db145 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -56,8 +56,8 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
-import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMetricsAdapter;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -581,7 +581,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
return idx.query(space, clause, params, type, filters);
}
- });
+ }, false);
}
finally {
busyLock.leaveBusy();
@@ -609,7 +609,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
qry,
cctx.keepPortable());
}
- });
+ }, false);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -635,7 +635,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
@Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException {
return idx.queryTwoStep(cctx, qry);
}
- });
+ }, true);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -661,7 +661,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
@Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
return idx.queryTwoStep(cctx, qry);
}
- });
+ }, false);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -731,7 +731,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
};
}
- });
+ }, false);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -803,7 +803,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
return cursor;
}
- });
+ }, true);
}
catch (IgniteCheckedException e) {
throw new CacheException(e);
@@ -818,7 +818,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param key Key.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- @SuppressWarnings("unchecked")
public void remove(String space, CacheObject key, CacheObject val) throws IgniteCheckedException {
assert key != null;
@@ -904,7 +903,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
type,
filters);
}
- });
+ }, false);
}
finally {
busyLock.leaveBusy();
@@ -933,7 +932,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
@Override public GridQueryFieldsResult applyx() throws IgniteCheckedException {
return idx.queryFields(space, clause, params, filters);
}
- });
+ }, false);
}
finally {
busyLock.leaveBusy();
@@ -1479,10 +1478,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/**
* @param cctx Cache context.
* @param clo Closure.
+ * @param complete Complete.
*/
- private <R> R executeQuery(GridCacheContext<?,?> cctx, IgniteOutClosureX<R> clo)
+ public <R> R executeQuery(GridCacheContext<?, ?> cctx, IgniteOutClosureX<R> clo, boolean complete)
throws IgniteCheckedException {
- final long start = U.currentTimeMillis();
+ final long startTime = U.currentTimeMillis();
Throwable err = null;
@@ -1491,6 +1491,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
try {
res = clo.apply();
+ if (res instanceof CacheQueryFuture) {
+ CacheQueryFuture fut = (CacheQueryFuture) res;
+
+ err = fut.error();
+ }
+
return res;
}
catch (GridClosureException e) {
@@ -1504,34 +1510,30 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw new IgniteCheckedException(e);
}
finally {
- GridCacheQueryMetricsAdapter metrics = (GridCacheQueryMetricsAdapter)cctx.queries().metrics();
+ cctx.queries().onExecuted(err != null);
- onExecuted(cctx, metrics, res, err, start, U.currentTimeMillis() - start, log);
+ if (complete && err == null)
+ onCompleted(cctx, res, null, startTime, U.currentTimeMillis() - startTime, log);
}
}
/**
* @param cctx Cctx.
- * @param metrics Metrics.
* @param res Result.
* @param err Err.
* @param startTime Start time.
* @param duration Duration.
* @param log Logger.
*/
- public static void onExecuted(GridCacheContext<?, ?> cctx, GridCacheQueryMetricsAdapter metrics,
- Object res, Throwable err, long startTime, long duration, IgniteLogger log) {
+ public static void onCompleted(GridCacheContext<?, ?> cctx, Object res, Throwable err,
+ long startTime, long duration, IgniteLogger log) {
boolean fail = err != null;
- // Update own metrics.
- metrics.onQueryExecute(duration, fail);
-
- // Update metrics in query manager.
- cctx.queries().onMetricsUpdate(duration, fail);
+ cctx.queries().onCompleted(duration, fail);
if (log.isTraceEnabled())
- log.trace("Query execution finished [startTime=" + startTime +
- ", duration=" + duration + ", fail=" + (err != null) + ", res=" + res + ']');
+ log.trace("Query execution completed [startTime=" + startTime +
+ ", duration=" + duration + ", fail=" + fail + ", res=" + res + ']');
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
index a082abf..28eef90 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
@@ -19,14 +19,18 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.query.QueryMetrics;
+import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMetricsAdapter;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -47,6 +51,16 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
startGridsMultiThreaded(gridCnt);
+
+ IgniteCache<String, Integer> cacheA = grid(0).cache("A");
+
+ for (int i = 0; i < 100; i++)
+ cacheA.put(String.valueOf(i), i);
+
+ IgniteCache<String, Integer> cacheB = grid(0).cache("B");
+
+ for (int i = 0; i < 100; i++)
+ cacheB.put(String.valueOf(i), i);
}
/** {@inheritDoc} */
@@ -84,44 +98,30 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra
}
/**
- * Test metrics for SQL queries.
+ * Test metrics for SQL fields queries.
*
* @throws Exception In case of error.
*/
public void testSqlFieldsQueryMetrics() throws Exception {
IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
- // Execute query.
SqlFieldsQuery qry = new SqlFieldsQuery("select * from Integer");
- cache.query(qry).getAll();
-
- QueryMetrics m = cache.queryMetrics();
-
- assert m != null;
-
- info("Metrics: " + m);
-
- assertEquals(1, m.executions());
- assertEquals(0, m.fails());
- assertTrue(m.averageTime() >= 0);
- assertTrue(m.maximumTime() >= 0);
- assertTrue(m.minimumTime() >= 0);
-
- // Execute again with the same parameters.
- cache.query(qry).getAll();
-
- m = cache.queryMetrics();
+ testQueryMetrics(cache, qry);
+ }
- assert m != null;
+ /**
+ * Test metrics for SQL fields queries.
+ *
+ * @throws Exception In case of error.
+ */
+ public void testSqlFieldsQueryNotFullyFetchedMetrics() throws Exception {
+ IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
- info("Metrics: " + m);
+ SqlFieldsQuery qry = new SqlFieldsQuery("select * from Integer");
+ qry.setPageSize(10);
- assertEquals(2, m.executions());
- assertEquals(0, m.fails());
- assertTrue(m.averageTime() >= 0);
- assertTrue(m.maximumTime() >= 0);
- assertTrue(m.minimumTime() >= 0);
+ testQueryNotFullyFetchedMetrics(cache, qry, false);
}
/**
@@ -132,47 +132,22 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra
public void testSqlFieldsQueryFailedMetrics() throws Exception {
IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
- // Execute query.
SqlFieldsQuery qry = new SqlFieldsQuery("select * from UNKNOWN");
- try {
- cache.query(qry).getAll();
- }
- catch (Exception e) {
- // No-op.
- }
-
- QueryMetrics m = cache.queryMetrics();
-
- assert m != null;
-
- info("Metrics: " + m);
-
- assertEquals(1, m.executions());
- assertEquals(1, m.fails());
- assertTrue(m.averageTime() >= 0);
- assertTrue(m.maximumTime() >= 0);
- assertTrue(m.minimumTime() >= 0);
-
- // Execute again with the same parameters.
- try {
- cache.query(qry).getAll();
- }
- catch (Exception e) {
- // No-op.
- }
-
- m = cache.queryMetrics();
+ testQueryFailedMetrics(cache, qry);
+ }
- assert m != null;
+ /**
+ * Test metrics for Scan queries.
+ *
+ * @throws Exception In case of error.
+ */
+ public void testScanQueryMetrics() throws Exception {
+ IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
- info("Metrics: " + m);
+ ScanQuery<String, Integer> qry = new ScanQuery<>();
- assertEquals(2, m.executions());
- assertEquals(2, m.fails());
- assertTrue(m.averageTime() >= 0);
- assertTrue(m.maximumTime() >= 0);
- assertTrue(m.minimumTime() >= 0);
+ testQueryMetrics(cache, qry);
}
/**
@@ -180,128 +155,136 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra
*
* @throws Exception In case of error.
*/
- public void testScanQueryMetrics() throws Exception {
+ public void testScanQueryNotFullyFetchedMetrics() throws Exception {
IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
- // Execute query.
ScanQuery<String, Integer> qry = new ScanQuery<>();
+ qry.setPageSize(10);
- cache.query(qry).getAll();
+ testQueryNotFullyFetchedMetrics(cache, qry, true);
+ }
- QueryMetrics m = cache.queryMetrics();
+ /**
+ * Test metrics for failed Scan queries.
+ *
+ * @throws Exception In case of error.
+ */
+ public void testScanQueryFailedMetrics() throws Exception {
+ IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
- assert m != null;
+ ScanQuery<String, Integer> qry = new ScanQuery<>(Integer.MAX_VALUE);
- info("Metrics: " + m);
+ testQueryFailedMetrics(cache, qry);
+ }
- assertEquals(1, m.executions());
- assertEquals(0, m.fails());
- assertTrue(m.averageTime() >= 0);
- assertTrue(m.maximumTime() >= 0);
- assertTrue(m.minimumTime() >= 0);
+ /**
+ * Test metrics for SQL cross cache queries.
+ *
+ * @throws Exception In case of error.
+ */
+ public void testSqlCrossCacheQueryMetrics() throws Exception {
+ IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
- // Execute again with the same parameters.
- cache.query(qry).getAll();
+ SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".Integer");
- m = cache.queryMetrics();
+ testQueryMetrics(cache, qry);
+ }
- assert m != null;
+ /**
+ * Test metrics for SQL cross cache queries.
+ *
+ * @throws Exception In case of error.
+ */
+ public void testSqlCrossCacheQueryNotFullyFetchedMetrics() throws Exception {
+ IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
- info("Metrics: " + m);
+ SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".Integer");
+ qry.setPageSize(10);
- assertEquals(2, m.executions());
- assertEquals(0, m.fails());
- assertTrue(m.averageTime() >= 0);
- assertTrue(m.maximumTime() >= 0);
- assertTrue(m.minimumTime() >= 0);
+ testQueryNotFullyFetchedMetrics(cache, qry, false);
}
/**
- * Test metrics for failed Scan queries.
+ * Test metrics for failed SQL cross cache queries.
*
* @throws Exception In case of error.
*/
- public void testScanQueryFailedMetrics() throws Exception {
+ public void testSqlCrossCacheQueryFailedMetrics() throws Exception {
IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
- // Execute query.
- ScanQuery<String, Integer> qry = new ScanQuery<>(Integer.MAX_VALUE);
+ SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"G\".Integer");
- try {
- cache.query(qry).getAll();
- }
- catch (Exception e) {
- // No-op.
- }
+ testQueryFailedMetrics(cache, qry);
+ }
- QueryMetrics m = cache.queryMetrics();
+ /**
+ * @param cache Cache.
+ * @param qry Query.
+ */
+ private void testQueryMetrics(IgniteCache<String, Integer> cache, Query qry) {
+ cache.query(qry).getAll();
- assert m != null;
+ GridCacheQueryMetricsAdapter m = (GridCacheQueryMetricsAdapter)cache.queryMetrics();
info("Metrics: " + m);
assertEquals(1, m.executions());
- assertEquals(1, m.fails());
+ assertEquals(1, m.completedExecutions());
+ assertEquals(0, m.fails());
assertTrue(m.averageTime() >= 0);
assertTrue(m.maximumTime() >= 0);
assertTrue(m.minimumTime() >= 0);
// Execute again with the same parameters.
- try {
- cache.query(qry).getAll();
- }
- catch (Exception e) {
- // No-op.
- }
-
- m = cache.queryMetrics();
+ cache.query(qry).getAll();
- assert m != null;
+ m = (GridCacheQueryMetricsAdapter)cache.queryMetrics();
info("Metrics: " + m);
assertEquals(2, m.executions());
- assertEquals(2, m.fails());
+ assertEquals(2, m.completedExecutions());
+ assertEquals(0, m.fails());
assertTrue(m.averageTime() >= 0);
assertTrue(m.maximumTime() >= 0);
assertTrue(m.minimumTime() >= 0);
}
/**
- * Test metrics for SQL cross cache queries.
- *
- * @throws Exception In case of error.
+ * @param cache Cache.
+ * @param qry Query.
+ * @param waitingForCompletion Waiting for query completion.
*/
- public void testSqlCrossCacheQueryMetrics() throws Exception {
- IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
+ private void testQueryNotFullyFetchedMetrics(IgniteCache<String, Integer> cache, Query qry,
+ boolean waitingForCompletion) throws IgniteInterruptedCheckedException {
+ cache.query(qry).iterator().next();
- // Execute query.
- SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".Integer");
-
- cache.query(qry).getAll();
+ if (waitingForCompletion)
+ waitingForCompletion(cache, 1);
- QueryMetrics m = cache.queryMetrics();
-
- assert m != null;
+ GridCacheQueryMetricsAdapter m = (GridCacheQueryMetricsAdapter)cache.queryMetrics();
info("Metrics: " + m);
assertEquals(1, m.executions());
+ assertEquals(1, m.completedExecutions());
assertEquals(0, m.fails());
assertTrue(m.averageTime() >= 0);
assertTrue(m.maximumTime() >= 0);
assertTrue(m.minimumTime() >= 0);
// Execute again with the same parameters.
- cache.query(qry).getAll();
+ cache.query(qry).iterator().next();
- m = cache.queryMetrics();
+ if (waitingForCompletion)
+ waitingForCompletion(cache, 2);
- assert m != null;
+ m = (GridCacheQueryMetricsAdapter)cache.queryMetrics();
info("Metrics: " + m);
assertEquals(2, m.executions());
+ assertEquals(2, m.completedExecutions());
assertEquals(0, m.fails());
assertTrue(m.averageTime() >= 0);
assertTrue(m.maximumTime() >= 0);
@@ -309,34 +292,27 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra
}
/**
- * Test metrics for failed SQL cross cache queries.
- *
- * @throws Exception In case of error.
+ * @param cache Cache.
+ * @param qry Query.
*/
- public void testSqlCrossCacheQueryFailedMetrics() throws Exception {
- IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
-
- // Execute query.
- SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"G\".Integer");
-
+ private void testQueryFailedMetrics(IgniteCache<String, Integer> cache, Query qry) {
try {
cache.query(qry).getAll();
}
catch (Exception e) {
- // No-op
+ // No-op.
}
- QueryMetrics m = cache.queryMetrics();
-
- assert m != null;
+ GridCacheQueryMetricsAdapter m = (GridCacheQueryMetricsAdapter)cache.queryMetrics();
info("Metrics: " + m);
assertEquals(1, m.executions());
+ assertEquals(0, m.completedExecutions());
assertEquals(1, m.fails());
- assertTrue(m.averageTime() >= 0);
- assertTrue(m.maximumTime() >= 0);
- assertTrue(m.minimumTime() >= 0);
+ assertTrue(m.averageTime() == 0);
+ assertTrue(m.maximumTime() == 0);
+ assertTrue(m.minimumTime() == 0);
// Execute again with the same parameters.
try {
@@ -346,16 +322,29 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra
// No-op.
}
- m = cache.queryMetrics();
-
- assert m != null;
+ m = (GridCacheQueryMetricsAdapter)cache.queryMetrics();
info("Metrics: " + m);
assertEquals(2, m.executions());
+ assertEquals(0, m.completedExecutions());
assertEquals(2, m.fails());
- assertTrue(m.averageTime() >= 0);
- assertTrue(m.maximumTime() >= 0);
- assertTrue(m.minimumTime() >= 0);
+ assertTrue(m.averageTime() == 0);
+ assertTrue(m.maximumTime() == 0);
+ assertTrue(m.minimumTime() == 0);
+ }
+
+ /**
+ * @param cache Cache.
+ * @param exp Expected.
+ */
+ private static void waitingForCompletion(final IgniteCache<String, Integer> cache,
+ final int exp) throws IgniteInterruptedCheckedException {
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ GridCacheQueryMetricsAdapter m = (GridCacheQueryMetricsAdapter)cache.queryMetrics();
+ return m.completedExecutions() == exp;
+ }
+ }, 5000);
}
}
\ No newline at end of file