You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/09 15:41:54 UTC

[47/50] ignite git commit: ignite-1127 Query with result size more then one page doesn't increase Query executions count metric - Fixes #23.

ignite-1127 Query with result size more then one page doesn't increase Query executions count metric - Fixes #23.

Signed-off-by: S.Vladykin <sv...@gridgain.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2311de47
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2311de47
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2311de47

Branch: refs/heads/ignite-1093-2
Commit: 2311de4777bff3a6f97904e547cc028d6ea1e51f
Parents: 6771638
Author: agura <ag...@gridgain.com>
Authored: Wed Sep 9 16:05:56 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Wed Sep 9 16:05:56 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      |  27 +-
 .../cache/query/GridCacheLocalQueryFuture.java  |   5 +-
 .../cache/query/GridCacheQueryAdapter.java      |  43 +--
 .../query/GridCacheQueryFutureAdapter.java      |   9 +-
 .../cache/query/GridCacheQueryManager.java      |  11 +-
 .../query/GridCacheQueryMetricsAdapter.java     | 125 +++++----
 .../processors/query/GridQueryProcessor.java    |  50 ++--
 .../CacheAbstractQueryMetricsSelfTest.java      | 279 +++++++++----------
 8 files changed, 284 insertions(+), 265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 7c88b98..ce0cdd7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -70,6 +70,7 @@ import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.GridEmptyIterator;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -439,7 +440,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
      * @return Cursor.
      */
     @SuppressWarnings("unchecked")
-    private QueryCursor<Cache.Entry<K,V>> query(Query filter, @Nullable ClusterGroup grp) {
+    private QueryCursor<Cache.Entry<K,V>> query(final Query filter, @Nullable ClusterGroup grp)
+        throws IgniteCheckedException {
         final CacheQuery<Map.Entry<K,V>> qry;
         final CacheQueryFuture<Map.Entry<K,V>> fut;
 
@@ -454,7 +456,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
             if (grp != null)
                 qry.projection(grp);
 
-            fut = qry.execute();
+            fut = ctx.kernalContext().query().executeQuery(ctx,
+                new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
+                    @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException {
+                        return qry.execute();
+                    }
+                }, false);
         }
         else if (filter instanceof TextQuery) {
             TextQuery p = (TextQuery)filter;
@@ -464,7 +471,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
             if (grp != null)
                 qry.projection(grp);
 
-            fut = qry.execute();
+            fut = ctx.kernalContext().query().executeQuery(ctx,
+                new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
+                    @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException {
+                        return qry.execute();
+                    }
+                }, false);
         }
         else if (filter instanceof SpiQuery) {
             qry = ctx.queries().createSpiQuery(isKeepPortable);
@@ -472,7 +484,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
             if (grp != null)
                 qry.projection(grp);
 
-            fut = qry.execute(((SpiQuery)filter).getArgs());
+            fut = ctx.kernalContext().query().executeQuery(ctx,
+                new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
+                    @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException {
+                        return qry.execute(((SpiQuery)filter).getArgs());
+                    }
+                }, false);
         }
         else {
             if (filter instanceof SqlFieldsQuery)
@@ -619,7 +636,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         }
         catch (Exception e) {
             if (e instanceof CacheException)
-                throw e;
+                throw (CacheException)e;
 
             throw new CacheException(e);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
index 91fc194..46af18a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
@@ -46,7 +46,7 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap
     protected GridCacheLocalQueryFuture(GridCacheContext<K, V> ctx, GridCacheQueryBean qry) {
         super(ctx, qry, true);
 
-        run = new LocalQueryRunnable<>();
+        run = new LocalQueryRunnable();
     }
 
     /**
@@ -78,7 +78,7 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap
     }
 
     /** */
-    private class LocalQueryRunnable<K, V, R> implements GridPlainRunnable {
+    private class LocalQueryRunnable implements GridPlainRunnable {
         /** {@inheritDoc} */
         @Override public void run() {
             try {
@@ -101,7 +101,6 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap
          * @return Query info.
          * @throws IgniteCheckedException In case of error.
          */
-        @SuppressWarnings({"unchecked"})
         private GridCacheQueryInfo localQueryInfo() throws IgniteCheckedException {
             GridCacheQueryBean qry = query();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index a016037..3ac5746 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -371,6 +371,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     /**
      * @return Key-value filter.
      */
+    @SuppressWarnings("unchecked")
     @Nullable public <K, V> IgniteBiPredicate<K, V> scanFilter() {
         return (IgniteBiPredicate<K, V>)filter;
     }
@@ -396,8 +397,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
      * @param startTime Start time.
      * @param duration Duration.
      */
-    public void onExecuted(Object res, Throwable err, long startTime, long duration) {
-        GridQueryProcessor.onExecuted(cctx, metrics, res, err, startTime, duration, log);
+    public void onCompleted(Object res, Throwable err, long startTime, long duration) {
+        GridQueryProcessor.onCompleted(cctx, res, err, startTime, duration, log);
     }
 
     /** {@inheritDoc} */
@@ -431,7 +432,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
      * @param args Arguments.
      * @return Future.
      */
-    @SuppressWarnings("IfMayBeConditional")
+    @SuppressWarnings({"IfMayBeConditional", "unchecked"})
     private <R> CacheQueryFuture<R> execute(@Nullable IgniteReducer<T, R> rmtReducer,
         @Nullable IgniteClosure<T, R> rmtTransform, @Nullable Object... args) {
         Collection<ClusterNode> nodes;
@@ -440,13 +441,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
             nodes = nodes();
         }
         catch (IgniteCheckedException e) {
-            return queryErrorFuture(cctx, e, log);
+            return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e);
         }
 
         cctx.checkSecurity(SecurityPermission.CACHE_READ);
 
         if (nodes.isEmpty())
-            return queryErrorFuture(cctx, new ClusterGroupEmptyCheckedException(), log);
+            return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new ClusterGroupEmptyCheckedException());
 
         if (log.isDebugEnabled())
             log.debug("Executing query [query=" + this + ", nodes=" + nodes + ']');
@@ -457,7 +458,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
                 cctx.deploy().registerClasses(args);
             }
             catch (IgniteCheckedException e) {
-                return queryErrorFuture(cctx, e, log);
+                return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e);
             }
         }
 
@@ -488,6 +489,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     private Collection<ClusterNode> nodes() throws IgniteCheckedException {
         CacheMode cacheMode = cctx.config().getCacheMode();
 
+        Integer part = partition();
+
         switch (cacheMode) {
             case LOCAL:
                 if (prj != null)
@@ -495,21 +498,21 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
                         "(only local node will be queried): " + this);
 
                 if (type == SCAN && cctx.config().getCacheMode() == LOCAL &&
-                    partition() != null && partition() >= cctx.affinity().partitions())
-                    throw new IgniteCheckedException("Invalid partition number: " + partition());
+                    part != null && part >= cctx.affinity().partitions())
+                    throw new IgniteCheckedException("Invalid partition number: " + part);
 
                 return Collections.singletonList(cctx.localNode());
 
             case REPLICATED:
-                if (prj != null || partition() != null)
-                    return nodes(cctx, prj, partition());
+                if (prj != null || part != null)
+                    return nodes(cctx, prj, part);
 
                 return cctx.affinityNode() ?
                     Collections.singletonList(cctx.localNode()) :
-                    Collections.singletonList(F.rand(nodes(cctx, null, partition())));
+                    Collections.singletonList(F.rand(nodes(cctx, null, null)));
 
             case PARTITIONED:
-                return nodes(cctx, prj, partition());
+                return nodes(cctx, prj, part);
 
             default:
                 throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode);
@@ -537,7 +540,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
 
         return F.view(affNodes, new P1<ClusterNode>() {
             @Override public boolean apply(ClusterNode n) {
-
                 return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
                     (prj == null || prj.node(n.id()) != null) &&
                     (part == null || owners.contains(n));
@@ -545,21 +547,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         });
     }
 
-    /**
-     * @param cctx Cache context.
-     * @param e Exception.
-     * @param log Logger.
-     */
-    private static <T> GridCacheQueryErrorFuture<T> queryErrorFuture(GridCacheContext<?, ?> cctx,
-        Exception e, IgniteLogger log) {
-
-        GridCacheQueryMetricsAdapter metrics = (GridCacheQueryMetricsAdapter)cctx.queries().metrics();
-
-        GridQueryProcessor.onExecuted(cctx, metrics, null, e, 0, 0, log);
-
-        return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e);
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheQueryAdapter.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index 9a83ce9..ad9ee39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -155,7 +155,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
     @Override public boolean onDone(Collection<R> res, Throwable err) {
         cctx.time().removeTimeoutObject(this);
 
-        qry.query().onExecuted(res, err, startTime(), duration());
+        qry.query().onCompleted(res, err, startTime(), duration());
 
         return super.onDone(res, err);
     }
@@ -413,11 +413,6 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
                 }
             }
         }
-        catch (Error e) {
-            onPageError(nodeId, e);
-
-            throw e;
-        }
         catch (Throwable e) {
             onPageError(nodeId, e);
 
@@ -446,6 +441,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
      * @param col Collection.
      * @return Collection with masked {@code null} values.
      */
+    @SuppressWarnings("unchecked")
     private Collection<Object> maskNulls(Collection<Object> col) {
         assert col != null;
 
@@ -460,6 +456,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
      * @param col Collection.
      * @return Collection with unmasked {@code null} values.
      */
+    @SuppressWarnings("unchecked")
     private Collection<Object> unmaskNulls(Collection<Object> col) {
         assert col != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 2041464..1d934d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1875,11 +1875,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     }
 
     /**
+     * @param fail {@code true} if execution failed.
+     */
+    public void onExecuted(boolean fail) {
+        metrics.onQueryExecute(fail);
+    }
+
+    /**
      * @param duration Execution duration.
      * @param fail {@code true} if execution failed.
      */
-    public void onMetricsUpdate(long duration, boolean fail) {
-        metrics.onQueryExecute(duration, fail);
+    public void onCompleted(long duration, boolean fail) {
+        metrics.onQueryCompleted(duration, fail);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
index fe219a9..1928ea5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import org.apache.ignite.cache.query.QueryMetrics;
+import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jsr166.LongAdder8;
 
 /**
  * Adapter for {@link QueryMetrics}.
@@ -32,79 +34,97 @@ public class GridCacheQueryMetricsAdapter implements QueryMetrics, Externalizabl
     private static final long serialVersionUID = 0L;
 
     /** Minimum time of execution. */
-    private volatile long minTime;
+    private final GridAtomicLong minTime = new GridAtomicLong();
 
     /** Maximum time of execution. */
-    private volatile long maxTime;
+    private final GridAtomicLong maxTime = new GridAtomicLong();
 
-    /** Average time of execution. */
-    private volatile double avgTime;
+    /** Sum of execution time for all completed queries. */
+    private final LongAdder8 sumTime = new LongAdder8();
 
-    /** Number of hits. */
-    private volatile int execs;
+    /** Average time of execution.
+     * If doesn't equal zero then this metrics set is copy from remote node and doesn't actually update.
+     */
+    private double avgTime;
 
-    /** Number of fails. */
-    private volatile int fails;
+    /** Number of executions. */
+    private final LongAdder8 execs = new LongAdder8();
 
-    /** Whether query was executed at least once. */
-    private boolean executed;
+    /** Number of completed executions. */
+    private final LongAdder8 completed = new LongAdder8();
 
-    /** Mutex. */
-    private final Object mux = new Object();
+    /** Number of fails. */
+    private final LongAdder8 fails = new LongAdder8();
 
     /** {@inheritDoc} */
     @Override public long minimumTime() {
-        return minTime;
+        return minTime.get();
     }
 
     /** {@inheritDoc} */
     @Override public long maximumTime() {
-        return maxTime;
+        return maxTime.get();
     }
 
     /** {@inheritDoc} */
     @Override public double averageTime() {
-        return avgTime;
+        if (avgTime > 0)
+            return avgTime;
+        else {
+            long val = completed.sum();
+
+            return val > 0 ? sumTime.sum() / val : 0;
+        }
     }
 
     /** {@inheritDoc} */
     @Override public int executions() {
-        return execs;
+        return execs.intValue();
+    }
+
+    /**
+     * Gets total number of completed executions of query.
+     * This value is actual only for local node.
+     *
+     * @return Number of completed executions.
+     */
+    public int completedExecutions() {
+        return completed.intValue();
     }
 
     /** {@inheritDoc} */
     @Override public int fails() {
-        return fails;
+        return fails.intValue();
     }
 
     /**
      * Callback for query execution.
      *
-     * @param duration Duration of queue execution.
      * @param fail {@code True} query executed unsuccessfully {@code false} otherwise.
      */
-    public void onQueryExecute(long duration, boolean fail) {
-        synchronized (mux) {
-            if (!executed) {
-                minTime = duration;
-                maxTime = duration;
-
-                executed = true;
-            }
-            else {
-                if (minTime > duration)
-                    minTime = duration;
+    public void onQueryExecute(boolean fail) {
+        execs.increment();
 
-                if (maxTime < duration)
-                    maxTime = duration;
-            }
+        if (fail)
+            fails.increment();
+    }
 
-            execs++;
+    /**
+     * Callback for completion of query execution.
+     *
+     * @param duration Duration of queue execution.
+     * @param fail {@code True} query executed unsuccessfully {@code false} otherwise.
+     */
+    public void onQueryCompleted(long duration, boolean fail) {
+        minTime.setIfLess(duration);
+        maxTime.setIfGreater(duration);
 
-            if (fail)
-                fails++;
+        if (fail)
+            fails.increment();
+        else {
+            completed.increment();
 
-            avgTime = (avgTime * (execs - 1) + duration) / execs;
+            sumTime.add(duration);
         }
     }
 
@@ -116,33 +136,34 @@ public class GridCacheQueryMetricsAdapter implements QueryMetrics, Externalizabl
     public GridCacheQueryMetricsAdapter copy() {
         GridCacheQueryMetricsAdapter m = new GridCacheQueryMetricsAdapter();
 
-        synchronized (mux) {
-            m.fails = fails;
-            m.minTime = minTime;
-            m.maxTime = maxTime;
-            m.execs = execs;
-            m.avgTime = avgTime;
-        }
+        // Not synchronized because accuracy isn't critical.
+        m.fails.add(fails.sum());
+        m.minTime.set(minTime.get());
+        m.maxTime.set(maxTime.get());
+        m.execs.add(execs.sum());
+        m.completed.add(completed.sum());
+        m.sumTime.add(sumTime.sum());
+        m.avgTime = avgTime;
 
         return m;
     }
 
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeLong(minTime);
-        out.writeLong(maxTime);
-        out.writeDouble(avgTime);
-        out.writeInt(execs);
-        out.writeInt(fails);
+        out.writeLong(minTime.get());
+        out.writeLong(maxTime.get());
+        out.writeDouble(averageTime());
+        out.writeInt(execs.intValue());
+        out.writeInt(fails.intValue());
     }
 
     /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        minTime = in.readLong();
-        maxTime = in.readLong();
+        minTime.set(in.readLong());
+        maxTime.set(in.readLong());
         avgTime = in.readDouble();
-        execs = in.readInt();
-        fails = in.readInt();
+        execs.add(in.readInt());
+        fails.add(in.readInt());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 7370996..84db145 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -56,8 +56,8 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
-import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMetricsAdapter;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -581,7 +581,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
                     return idx.query(space, clause, params, type, filters);
                 }
-            });
+            }, false);
         }
         finally {
             busyLock.leaveBusy();
@@ -609,7 +609,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                         qry,
                         cctx.keepPortable());
                 }
-            });
+            }, false);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
@@ -635,7 +635,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                 @Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException {
                     return idx.queryTwoStep(cctx, qry);
                 }
-            });
+            }, true);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
@@ -661,7 +661,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                 @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
                     return idx.queryTwoStep(cctx, qry);
                 }
-            });
+            }, false);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
@@ -731,7 +731,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                             }
                         };
                     }
-                });
+                }, false);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
@@ -803,7 +803,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
                     return cursor;
                 }
-            });
+            }, true);
         }
         catch (IgniteCheckedException e) {
             throw new CacheException(e);
@@ -818,7 +818,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param key Key.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    @SuppressWarnings("unchecked")
     public void remove(String space, CacheObject key, CacheObject val) throws IgniteCheckedException {
         assert key != null;
 
@@ -904,7 +903,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                         type,
                         filters);
                 }
-            });
+            }, false);
         }
         finally {
             busyLock.leaveBusy();
@@ -933,7 +932,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                 @Override public GridQueryFieldsResult applyx() throws IgniteCheckedException {
                     return idx.queryFields(space, clause, params, filters);
                 }
-            });
+            }, false);
         }
         finally {
             busyLock.leaveBusy();
@@ -1479,10 +1478,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     /**
      * @param cctx Cache context.
      * @param clo Closure.
+     * @param complete Complete.
      */
-    private <R> R executeQuery(GridCacheContext<?,?> cctx, IgniteOutClosureX<R> clo)
+    public <R> R executeQuery(GridCacheContext<?, ?> cctx, IgniteOutClosureX<R> clo, boolean complete)
         throws IgniteCheckedException {
-        final long start = U.currentTimeMillis();
+        final long startTime = U.currentTimeMillis();
 
         Throwable err = null;
 
@@ -1491,6 +1491,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         try {
             res = clo.apply();
 
+            if (res instanceof CacheQueryFuture) {
+                CacheQueryFuture fut = (CacheQueryFuture) res;
+
+                err = fut.error();
+            }
+
             return res;
         }
         catch (GridClosureException e) {
@@ -1504,34 +1510,30 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             throw new IgniteCheckedException(e);
         }
         finally {
-            GridCacheQueryMetricsAdapter metrics = (GridCacheQueryMetricsAdapter)cctx.queries().metrics();
+            cctx.queries().onExecuted(err != null);
 
-            onExecuted(cctx, metrics, res, err, start, U.currentTimeMillis() - start, log);
+            if (complete && err == null)
+                onCompleted(cctx, res, null, startTime, U.currentTimeMillis() - startTime, log);
         }
     }
 
     /**
      * @param cctx Cctx.
-     * @param metrics Metrics.
      * @param res Result.
      * @param err Err.
      * @param startTime Start time.
      * @param duration Duration.
      * @param log Logger.
      */
-    public static void onExecuted(GridCacheContext<?, ?> cctx, GridCacheQueryMetricsAdapter metrics,
-        Object res, Throwable err, long startTime, long duration, IgniteLogger log) {
+    public static void onCompleted(GridCacheContext<?, ?> cctx, Object res, Throwable err,
+        long startTime, long duration, IgniteLogger log) {
         boolean fail = err != null;
 
-        // Update own metrics.
-        metrics.onQueryExecute(duration, fail);
-
-        // Update metrics in query manager.
-        cctx.queries().onMetricsUpdate(duration, fail);
+        cctx.queries().onCompleted(duration, fail);
 
         if (log.isTraceEnabled())
-            log.trace("Query execution finished [startTime=" + startTime +
-                    ", duration=" + duration + ", fail=" + (err != null) + ", res=" + res + ']');
+            log.trace("Query execution completed [startTime=" + startTime +
+                ", duration=" + duration + ", fail=" + fail + ", res=" + res + ']');
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
index a082abf..28eef90 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java
@@ -19,14 +19,18 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.query.QueryMetrics;
+import org.apache.ignite.cache.query.Query;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMetricsAdapter;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -47,6 +51,16 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         startGridsMultiThreaded(gridCnt);
+
+        IgniteCache<String, Integer> cacheA = grid(0).cache("A");
+
+        for (int i = 0; i < 100; i++)
+            cacheA.put(String.valueOf(i), i);
+
+        IgniteCache<String, Integer> cacheB = grid(0).cache("B");
+
+        for (int i = 0; i < 100; i++)
+            cacheB.put(String.valueOf(i), i);
     }
 
     /** {@inheritDoc} */
@@ -84,44 +98,30 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra
     }
 
     /**
-     * Test metrics for SQL queries.
+     * Test metrics for SQL fields queries.
      *
      * @throws Exception In case of error.
      */
     public void testSqlFieldsQueryMetrics() throws Exception {
         IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
 
-        // Execute query.
         SqlFieldsQuery qry = new SqlFieldsQuery("select * from Integer");
 
-        cache.query(qry).getAll();
-
-        QueryMetrics m = cache.queryMetrics();
-
-        assert m != null;
-
-        info("Metrics: " + m);
-
-        assertEquals(1, m.executions());
-        assertEquals(0, m.fails());
-        assertTrue(m.averageTime() >= 0);
-        assertTrue(m.maximumTime() >= 0);
-        assertTrue(m.minimumTime() >= 0);
-
-        // Execute again with the same parameters.
-        cache.query(qry).getAll();
-
-        m = cache.queryMetrics();
+        testQueryMetrics(cache, qry);
+    }
 
-        assert m != null;
+    /**
+     * Test metrics for SQL fields queries.
+     *
+     * @throws Exception In case of error.
+     */
+    public void testSqlFieldsQueryNotFullyFetchedMetrics() throws Exception {
+        IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
 
-        info("Metrics: " + m);
+        SqlFieldsQuery qry = new SqlFieldsQuery("select * from Integer");
+        qry.setPageSize(10);
 
-        assertEquals(2, m.executions());
-        assertEquals(0, m.fails());
-        assertTrue(m.averageTime() >= 0);
-        assertTrue(m.maximumTime() >= 0);
-        assertTrue(m.minimumTime() >= 0);
+        testQueryNotFullyFetchedMetrics(cache, qry, false);
     }
 
     /**
@@ -132,47 +132,22 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra
     public void testSqlFieldsQueryFailedMetrics() throws Exception {
         IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
 
-        // Execute query.
         SqlFieldsQuery qry = new SqlFieldsQuery("select * from UNKNOWN");
 
-        try {
-            cache.query(qry).getAll();
-        }
-        catch (Exception e) {
-            // No-op.
-        }
-
-        QueryMetrics m = cache.queryMetrics();
-
-        assert m != null;
-
-        info("Metrics: " + m);
-
-        assertEquals(1, m.executions());
-        assertEquals(1, m.fails());
-        assertTrue(m.averageTime() >= 0);
-        assertTrue(m.maximumTime() >= 0);
-        assertTrue(m.minimumTime() >= 0);
-
-        // Execute again with the same parameters.
-        try {
-            cache.query(qry).getAll();
-        }
-        catch (Exception e) {
-            // No-op.
-        }
-
-        m = cache.queryMetrics();
+        testQueryFailedMetrics(cache, qry);
+    }
 
-        assert m != null;
+    /**
+     * Test metrics for Scan queries.
+     *
+     * @throws Exception In case of error.
+     */
+    public void testScanQueryMetrics() throws Exception {
+        IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
 
-        info("Metrics: " + m);
+        ScanQuery<String, Integer> qry = new ScanQuery<>();
 
-        assertEquals(2, m.executions());
-        assertEquals(2, m.fails());
-        assertTrue(m.averageTime() >= 0);
-        assertTrue(m.maximumTime() >= 0);
-        assertTrue(m.minimumTime() >= 0);
+        testQueryMetrics(cache, qry);
     }
 
     /**
@@ -180,128 +155,136 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra
      *
      * @throws Exception In case of error.
      */
-    public void testScanQueryMetrics() throws Exception {
+    public void testScanQueryNotFullyFetchedMetrics() throws Exception {
         IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
 
-        // Execute query.
         ScanQuery<String, Integer> qry = new ScanQuery<>();
+        qry.setPageSize(10);
 
-        cache.query(qry).getAll();
+        testQueryNotFullyFetchedMetrics(cache, qry, true);
+    }
 
-        QueryMetrics m = cache.queryMetrics();
+    /**
+     * Test metrics for failed Scan queries.
+     *
+     * @throws Exception In case of error.
+     */
+    public void testScanQueryFailedMetrics() throws Exception {
+        IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
 
-        assert m != null;
+        ScanQuery<String, Integer> qry = new ScanQuery<>(Integer.MAX_VALUE);
 
-        info("Metrics: " + m);
+        testQueryFailedMetrics(cache, qry);
+    }
 
-        assertEquals(1, m.executions());
-        assertEquals(0, m.fails());
-        assertTrue(m.averageTime() >= 0);
-        assertTrue(m.maximumTime() >= 0);
-        assertTrue(m.minimumTime() >= 0);
+    /**
+     * Test metrics for SQL cross cache queries.
+     *
+     * @throws Exception In case of error.
+     */
+    public void testSqlCrossCacheQueryMetrics() throws Exception {
+        IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
 
-        // Execute again with the same parameters.
-        cache.query(qry).getAll();
+        SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".Integer");
 
-        m = cache.queryMetrics();
+        testQueryMetrics(cache, qry);
+    }
 
-        assert m != null;
+    /**
+     * Test metrics for SQL cross cache queries.
+     *
+     * @throws Exception In case of error.
+     */
+    public void testSqlCrossCacheQueryNotFullyFetchedMetrics() throws Exception {
+        IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
 
-        info("Metrics: " + m);
+        SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".Integer");
+        qry.setPageSize(10);
 
-        assertEquals(2, m.executions());
-        assertEquals(0, m.fails());
-        assertTrue(m.averageTime() >= 0);
-        assertTrue(m.maximumTime() >= 0);
-        assertTrue(m.minimumTime() >= 0);
+        testQueryNotFullyFetchedMetrics(cache, qry, false);
     }
 
     /**
-     * Test metrics for failed Scan queries.
+     * Test metrics for failed SQL cross cache queries.
      *
      * @throws Exception In case of error.
      */
-    public void testScanQueryFailedMetrics() throws Exception {
+    public void testSqlCrossCacheQueryFailedMetrics() throws Exception {
         IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
 
-        // Execute query.
-        ScanQuery<String, Integer> qry = new ScanQuery<>(Integer.MAX_VALUE);
+        SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"G\".Integer");
 
-        try {
-            cache.query(qry).getAll();
-        }
-        catch (Exception e) {
-            // No-op.
-        }
+        testQueryFailedMetrics(cache, qry);
+    }
 
-        QueryMetrics m = cache.queryMetrics();
+    /**
+     * @param cache Cache.
+     * @param qry Query.
+     */
+    private void testQueryMetrics(IgniteCache<String, Integer> cache, Query qry) {
+        cache.query(qry).getAll();
 
-        assert m != null;
+        GridCacheQueryMetricsAdapter m = (GridCacheQueryMetricsAdapter)cache.queryMetrics();
 
         info("Metrics: " + m);
 
         assertEquals(1, m.executions());
-        assertEquals(1, m.fails());
+        assertEquals(1, m.completedExecutions());
+        assertEquals(0, m.fails());
         assertTrue(m.averageTime() >= 0);
         assertTrue(m.maximumTime() >= 0);
         assertTrue(m.minimumTime() >= 0);
 
         // Execute again with the same parameters.
-        try {
-            cache.query(qry).getAll();
-        }
-        catch (Exception e) {
-            // No-op.
-        }
-
-        m = cache.queryMetrics();
+        cache.query(qry).getAll();
 
-        assert m != null;
+        m = (GridCacheQueryMetricsAdapter)cache.queryMetrics();
 
         info("Metrics: " + m);
 
         assertEquals(2, m.executions());
-        assertEquals(2, m.fails());
+        assertEquals(2, m.completedExecutions());
+        assertEquals(0, m.fails());
         assertTrue(m.averageTime() >= 0);
         assertTrue(m.maximumTime() >= 0);
         assertTrue(m.minimumTime() >= 0);
     }
 
     /**
-     * Test metrics for SQL cross cache queries.
-     *
-     * @throws Exception In case of error.
+     * @param cache Cache.
+     * @param qry Query.
+     * @param waitingForCompletion Waiting for query completion.
      */
-    public void testSqlCrossCacheQueryMetrics() throws Exception {
-        IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
+    private void testQueryNotFullyFetchedMetrics(IgniteCache<String, Integer> cache, Query qry,
+        boolean waitingForCompletion) throws IgniteInterruptedCheckedException {
+        cache.query(qry).iterator().next();
 
-        // Execute query.
-        SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".Integer");
-
-        cache.query(qry).getAll();
+        if (waitingForCompletion)
+            waitingForCompletion(cache, 1);
 
-        QueryMetrics m = cache.queryMetrics();
-
-        assert m != null;
+        GridCacheQueryMetricsAdapter m = (GridCacheQueryMetricsAdapter)cache.queryMetrics();
 
         info("Metrics: " + m);
 
         assertEquals(1, m.executions());
+        assertEquals(1, m.completedExecutions());
         assertEquals(0, m.fails());
         assertTrue(m.averageTime() >= 0);
         assertTrue(m.maximumTime() >= 0);
         assertTrue(m.minimumTime() >= 0);
 
         // Execute again with the same parameters.
-        cache.query(qry).getAll();
+        cache.query(qry).iterator().next();
 
-        m = cache.queryMetrics();
+        if (waitingForCompletion)
+            waitingForCompletion(cache, 2);
 
-        assert m != null;
+        m = (GridCacheQueryMetricsAdapter)cache.queryMetrics();
 
         info("Metrics: " + m);
 
         assertEquals(2, m.executions());
+        assertEquals(2, m.completedExecutions());
         assertEquals(0, m.fails());
         assertTrue(m.averageTime() >= 0);
         assertTrue(m.maximumTime() >= 0);
@@ -309,34 +292,27 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra
     }
 
     /**
-     * Test metrics for failed SQL cross cache queries.
-     *
-     * @throws Exception In case of error.
+     * @param cache Cache.
+     * @param qry Query.
      */
-    public void testSqlCrossCacheQueryFailedMetrics() throws Exception {
-        IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A");
-
-        // Execute query.
-        SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"G\".Integer");
-
+    private void testQueryFailedMetrics(IgniteCache<String, Integer> cache, Query qry) {
         try {
             cache.query(qry).getAll();
         }
         catch (Exception e) {
-            // No-op
+            // No-op.
         }
 
-        QueryMetrics m = cache.queryMetrics();
-
-        assert m != null;
+        GridCacheQueryMetricsAdapter m = (GridCacheQueryMetricsAdapter)cache.queryMetrics();
 
         info("Metrics: " + m);
 
         assertEquals(1, m.executions());
+        assertEquals(0, m.completedExecutions());
         assertEquals(1, m.fails());
-        assertTrue(m.averageTime() >= 0);
-        assertTrue(m.maximumTime() >= 0);
-        assertTrue(m.minimumTime() >= 0);
+        assertTrue(m.averageTime() == 0);
+        assertTrue(m.maximumTime() == 0);
+        assertTrue(m.minimumTime() == 0);
 
         // Execute again with the same parameters.
         try {
@@ -346,16 +322,29 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra
             // No-op.
         }
 
-        m = cache.queryMetrics();
-
-        assert m != null;
+        m = (GridCacheQueryMetricsAdapter)cache.queryMetrics();
 
         info("Metrics: " + m);
 
         assertEquals(2, m.executions());
+        assertEquals(0, m.completedExecutions());
         assertEquals(2, m.fails());
-        assertTrue(m.averageTime() >= 0);
-        assertTrue(m.maximumTime() >= 0);
-        assertTrue(m.minimumTime() >= 0);
+        assertTrue(m.averageTime() == 0);
+        assertTrue(m.maximumTime() == 0);
+        assertTrue(m.minimumTime() == 0);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param exp Expected.
+     */
+    private static void waitingForCompletion(final IgniteCache<String, Integer> cache,
+        final int exp) throws IgniteInterruptedCheckedException {
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                GridCacheQueryMetricsAdapter m = (GridCacheQueryMetricsAdapter)cache.queryMetrics();
+                return m.completedExecutions() == exp;
+            }
+        }, 5000);
     }
 }
\ No newline at end of file