You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/01/15 17:17:48 UTC

incubator-ignite git commit: #IGNITE-53: Refactoring IgniteQueryStorage

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-53 d6372ea5a -> 224f7e2fc


#IGNITE-53: Refactoring IgniteQueryStorage


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/224f7e2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/224f7e2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/224f7e2f

Branch: refs/heads/ignite-53
Commit: 224f7e2fcac84302e31478de8faaaecbc290c250
Parents: d6372ea
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu Jan 15 19:16:57 2015 +0400
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu Jan 15 19:16:57 2015 +0400

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      |  69 ++----
 .../cache/datastructures/GridCacheSetImpl.java  | 168 ++-----------
 .../IgniteQueryAbstractStorage.java             | 233 +++++++++++++++++++
 .../IgniteQueryFutureStorage.java               | 190 ---------------
 4 files changed, 276 insertions(+), 384 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/224f7e2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index cda62b2..b0ae913 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -24,7 +24,6 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
 import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.query.*;
 import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.kernal.processors.cache.datastructures.*;
@@ -63,8 +62,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements
     /** Projection. */
     private GridCacheProjectionImpl<K, V> prj;
 
-    /** Query future storage */
-    private final IgniteQueryFutureStorage queryStorage;
+    /** Query storage */
+    private final IgniteQueryStorage queryStorage;
 
     /**
      * @param ctx Context.
@@ -85,7 +84,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements
         this.delegate = delegate;
         this.prj = prj;
 
-        this.queryStorage = new IgniteQueryFutureStorage(ctx);
+        this.queryStorage = new IgniteQueryStorage(ctx);
 
         gate = ctx.gate();
     }
@@ -983,7 +982,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            return new IgniteCacheIterator(delegate.queries().createScanQuery(null).execute(), queryStorage);
+            return queryStorage.iterator(delegate.queries().createScanQuery(null).execute());
         }
         finally {
             gate.leave(prev);
@@ -1236,55 +1235,37 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements
     }
 
     /**
-     * Iterator over the cache.
+     * Queries' storage
      */
-    private class IgniteCacheIterator implements Iterator<Cache.Entry<K, V>> {
-        /** Iterator over the cache*/
-        IgniteQueryFutureStorage.Iterator<Map.Entry<K, V>> iter;
-
-        IgniteCacheIterator(GridCacheQueryFuture<Map.Entry<K, V>> fut, IgniteQueryFutureStorage storage) {
-            iter = storage.iterator(fut);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNext() {
-            try {
-                return iter.onHasNext();
-            } catch (IgniteCheckedException e) {
-                throw cacheException(e);
-            }
+    private class IgniteQueryStorage extends IgniteQueryAbstractStorage<Entry<K, V>, Map.Entry<K, V>> {
+        /**
+         * @param ctx Cache context.
+         */
+        public IgniteQueryStorage(GridCacheContext ctx) {
+            super(ctx);
         }
 
         /** {@inheritDoc} */
-        @Override public Entry<K, V> next() {
-            try {
-                final Map.Entry<K, V> cur = iter.onNext();
-                return new Cache.Entry<K, V>() {
-                    @Override public K getKey() {
-                        return cur.getKey();
-                    }
-
-                    @Override public V getValue() {
-                        return cur.getValue();
-                    }
-
-                    @Override public <T> T unwrap(Class<T> clazz) {
-                        throw new IllegalArgumentException();
-                    }
-                };
-            }
-            catch (IgniteCheckedException e) {
-                throw cacheException(e);
-            }
+        @Override protected Cache.Entry<K, V> convert(final Map.Entry<K, V> v) {
+            return new Cache.Entry<K, V>() {
+                @Override public K getKey() {
+                    return v.getKey();
+                }
 
+                @Override public V getValue() {
+                    return v.getValue();
+                }
 
+                @Override public <T> T unwrap(Class<T> clazz) {
+                    throw new IllegalArgumentException();
+                }
+            };
         }
 
         /** {@inheritDoc} */
-        @Override public void remove() {
-            Map.Entry<K, V> curEntry = iter.itemToRemove();
+        @Override protected void remove(Entry<K, V> item) {
             try {
-                delegate.removex(curEntry.getKey());
+                delegate.removex(item.getKey());
             }
             catch (IgniteCheckedException e) {
                 throw cacheException(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/224f7e2f/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java
index 9519ad8..23eca52 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java
@@ -20,7 +20,6 @@ package org.gridgain.grid.kernal.processors.cache.datastructures;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.affinity.*;
 import org.gridgain.grid.cache.datastructures.*;
@@ -31,11 +30,9 @@ import org.gridgain.grid.util.*;
 import org.gridgain.grid.util.lang.*;
 import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
-import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
-import java.lang.ref.*;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -54,9 +51,6 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa
     /** Cache. */
     private final GridCache<GridCacheSetItemKey, Boolean> cache;
 
-    /** Logger. */
-    private final IgniteLogger log;
-
     /** Set name. */
     private final String name;
 
@@ -72,11 +66,8 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa
     /** Removed flag. */
     private volatile boolean rmvd;
 
-    /** Iterators weak references queue. */
-    private final ReferenceQueue<SetIterator<?>> itRefQueue = new ReferenceQueue<>();
-
-    /** Iterators futures. */
-    private final Map<WeakReference<SetIterator<?>>, GridCacheQueryFuture<?>> itFuts = new ConcurrentHashMap8<>();
+    /** Query storage */
+    private final IgniteQueryStorage queryStorage;
 
     /**
      * @param ctx Cache context.
@@ -92,9 +83,9 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa
 
         cache = ctx.cache();
 
-        log = ctx.logger(GridCacheSetImpl.class);
-
         hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name));
+
+        queryStorage = new IgniteQueryStorage(ctx);
     }
 
     /** {@inheritDoc} */
@@ -349,16 +340,10 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa
 
             qry.projection(ctx.grid().forNodes(nodes));
 
-            GridCacheQueryFuture<T> fut = qry.execute();
-
-            SetIterator<T> it = new SetIterator<>(fut);
-
-            itFuts.put(it.weakReference(), fut);
+            IgniteQueryAbstractStorage.IgniteIterator it = queryStorage.iterator(qry.execute());
 
             if (rmvd) {
-                itFuts.remove(it.weakReference());
-
-                it.close();
+               queryStorage.removeIterator(it);
 
                 checkRemoved();
             }
@@ -444,18 +429,8 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa
 
         this.rmvd = rmvd;
 
-        if (rmvd) {
-            for (GridCacheQueryFuture<?> fut : itFuts.values()) {
-                try {
-                    fut.cancel();
-                }
-                catch (IgniteCheckedException e) {
-                    log.error("Failed to close iterator.", e);
-                }
-            }
-
-            itFuts.clear();
-        }
+        if (rmvd)
+            queryStorage.clearQueries();
     }
 
     /**
@@ -467,29 +442,10 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa
     }
 
     /**
-     * Closes unreachable iterators.
-     */
-    private void checkWeakQueue() {
-        for (Reference<? extends SetIterator<?>> itRef = itRefQueue.poll(); itRef != null; itRef = itRefQueue.poll()) {
-            try {
-                WeakReference<SetIterator<?>> weakRef = (WeakReference<SetIterator<?>>)itRef;
-
-                GridCacheQueryFuture<?> fut = itFuts.remove(weakRef);
-
-                if (fut != null)
-                    fut.cancel();
-            }
-            catch (IgniteCheckedException e) {
-                log.error("Failed to close iterator.", e);
-            }
-        }
-    }
-
-    /**
      * Checks if set was removed and handles iterators weak reference queue.
      */
     private void onAccess() {
-        checkWeakQueue();
+        queryStorage.checkWeakQueue();
 
         checkRemoved();
     }
@@ -522,112 +478,24 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa
     }
 
     /**
-     *
+     * Queries' storage.
      */
-    private class SetIterator<T> extends GridCloseableIteratorAdapter<T> {
-        /** */
-        private static final long serialVersionUID = -1460570789166994846L;
-
-        /** Query future. */
-        private final GridCacheQueryFuture<T> fut;
-
-        /** Init flag. */
-        private boolean init;
-
-        /** Next item. */
-        private T next;
-
-        /** Current item. */
-        private T cur;
-
-        /** Weak reference. */
-        private final WeakReference<SetIterator<?>> weakRef;
-
+    private class IgniteQueryStorage extends IgniteQueryAbstractStorage<T, Map.Entry<T, ?>> {
         /**
-         * @param fut Query future.
+         * @param ctx Cache context.
          */
-        private SetIterator(GridCacheQueryFuture<T> fut) {
-            this.fut = fut;
-
-            weakRef = new WeakReference<SetIterator<?>>(this, itRefQueue);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected T onNext() throws IgniteCheckedException {
-            init();
-
-            if (next == null) {
-                clearWeakReference();
-
-                throw new NoSuchElementException();
-            }
-
-            cur = next;
-
-            Map.Entry e = (Map.Entry)fut.next();
-
-            next = e != null ? (T)e.getKey() : null;
-
-            if (next == null)
-                clearWeakReference();
-
-            return cur;
+        public IgniteQueryStorage(GridCacheContext ctx) {
+            super(ctx);
         }
 
         /** {@inheritDoc} */
-        @Override protected boolean onHasNext() throws IgniteCheckedException {
-            init();
-
-            boolean hasNext = next != null;
-
-            if (!hasNext)
-                clearWeakReference();
-
-            return hasNext;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void onClose() throws IgniteCheckedException {
-            fut.cancel();
-
-            clearWeakReference();
+        @Override protected T convert(Map.Entry<T, ?> v) {
+            return v != null ? (T) v.getKey() : null;
         }
 
         /** {@inheritDoc} */
-        @Override protected void onRemove() throws IgniteCheckedException {
-            if (cur == null)
-                throw new NoSuchElementException();
-
-            GridCacheSetImpl.this.remove(cur);
-        }
-
-        /**
-         * @throws IgniteCheckedException If failed.
-         */
-        private void init() throws IgniteCheckedException {
-            if (!init) {
-                Map.Entry e = (Map.Entry)fut.next();
-
-                next = e != null ? (T)e.getKey() : null;
-
-                init = true;
-            }
-        }
-
-        /**
-         * @return Iterator weak reference.
-         */
-        WeakReference<SetIterator<?>> weakReference() {
-            return weakRef;
-        }
-
-        /**
-         * Clears weak reference.
-         */
-        private void clearWeakReference() {
-            weakRef.clear(); // Do not need to enqueue.
-
-            itFuts.remove(weakRef);
+        @Override protected void remove(T item) {
+            GridCacheSetImpl.this.remove(item);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/224f7e2f/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryAbstractStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryAbstractStorage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryAbstractStorage.java
new file mode 100644
index 0000000..443fb86
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryAbstractStorage.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.gridgain.grid.kernal.processors.cache.datastructures;
+
+import org.apache.ignite.*;
+import org.gridgain.grid.cache.query.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.util.*;
+import org.jdk8.backport.*;
+
+import java.lang.ref.*;
+import java.util.*;
+
+/**
+ * Storage for GridCacheQueryFuture.
+ * @param <T> Type for iterator.
+ * @param <V> Type for cache query future.
+ */
+public abstract class IgniteQueryAbstractStorage<T, V> {
+    /** Iterators weak references queue. */
+    private final ReferenceQueue<IgniteIterator> refQueue = new ReferenceQueue<>();
+
+    /** Iterators futures. */
+    private final Map<WeakReference<IgniteIterator>, GridCacheQueryFuture<V>> futs = new ConcurrentHashMap8<>();
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /**
+     * @param ctx Cache context.
+     */
+    public IgniteQueryAbstractStorage(GridCacheContext ctx) {
+        log = ctx.logger(IgniteQueryAbstractStorage.class);
+    }
+
+    /**
+     * Iterator over the cache.
+     * @param fut Query to iterate
+     * @return iterator
+     */
+    public IgniteIterator iterator(GridCacheQueryFuture<V> fut) {
+        IgniteIterator it = new IgniteIterator(fut);
+
+        futs.put(it.weakReference(), fut);
+
+        return it;
+    }
+
+    public void removeIterator(IgniteIterator it) throws IgniteCheckedException {
+        futs.remove(it.weakReference());
+
+        it.close();
+    }
+
+    /**
+     * Closes unreachable iterators.
+     */
+    public void checkWeakQueue() {
+        for (Reference<? extends IgniteIterator> itRef = refQueue.poll(); itRef != null; itRef = refQueue.poll()) {
+            try {
+                WeakReference<IgniteIterator> weakRef = (WeakReference<IgniteIterator>)itRef;
+
+                GridCacheQueryFuture<?> fut = futs.remove(weakRef);
+
+                if (fut != null)
+                    fut.cancel();
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed to close iterator.", e);
+            }
+        }
+    }
+
+    /**
+     * Checks if set was removed and handles iterators weak reference queue.
+     */
+    public void onAccess() throws IgniteCheckedException {
+        checkWeakQueue();
+    }
+
+    /**
+     * Cancel all cache queries
+     */
+    protected void clearQueries(){
+        for (GridCacheQueryFuture<?> fut : futs.values()) {
+            try {
+                fut.cancel();
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed to close iterator.", e);
+            }
+        }
+
+        futs.clear();
+    }
+
+    /**
+     * Convert class V to class T.
+     * @param v Item to convert.
+     * @return Converted item.
+     */
+    protected abstract T convert(V v);
+
+    /**
+     * Remove item from the cache.
+     * @param item Item to remove.
+     */
+    protected abstract void remove(T item);
+
+    /**
+     * Iterator over the cache.
+     */
+    public class IgniteIterator extends GridCloseableIteratorAdapter<T> {
+        /** Query future. */
+        private final GridCacheQueryFuture<V> fut;
+
+        /** Weak reference. */
+        private final WeakReference<IgniteIterator> weakRef;
+
+        /** Init flag. */
+        private boolean init;
+
+        /** Next item. */
+        private T next;
+
+        /** Current item. */
+        private T cur;
+
+        /**
+         * @param fut GridCacheQueryFuture to iterate
+         */
+        IgniteIterator(GridCacheQueryFuture<V> fut) {
+            this.fut = fut;
+
+            this.weakRef = new WeakReference<IgniteIterator>(this, refQueue);
+        }
+
+        /**
+         * @return Iterator weak reference.
+         */
+        public WeakReference<IgniteIterator> weakReference() {
+            return weakRef;
+        }
+
+        /** {@inheritDoc} */
+        @Override public T onNext() throws IgniteCheckedException {
+            init();
+
+            if (next == null) {
+                clearWeakReference();
+
+                throw new NoSuchElementException();
+            }
+
+            cur = next;
+
+            V futNext = fut.next();
+
+            if (futNext == null)
+                clearWeakReference();
+
+            next = futNext != null ? convert(futNext) : null;
+
+            return cur;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onHasNext() throws IgniteCheckedException {
+            init();
+
+            boolean hasNext = next != null;
+
+            if (!hasNext)
+                clearWeakReference();
+
+            return hasNext;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void onClose() throws IgniteCheckedException {
+            fut.cancel();
+
+            clearWeakReference();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void onRemove() throws IgniteCheckedException {
+            if (cur == null)
+                throw new IllegalStateException();
+
+            IgniteQueryAbstractStorage.this.remove(cur);
+
+            cur = null;
+        }
+
+        /**
+         * Clears weak reference.
+         */
+        private void clearWeakReference() {
+            weakRef.clear();
+
+            futs.remove(weakRef);
+        }
+
+        /**
+         * @throws IgniteCheckedException If failed.
+         */
+        private void init() throws IgniteCheckedException {
+            if (!init) {
+                V futNext = fut.next();
+
+                next = futNext != null ? convert(futNext) : null;
+
+                init = true;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/224f7e2f/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java
deleted file mode 100644
index d24045f..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java
+++ /dev/null
@@ -1,190 +0,0 @@
-package org.gridgain.grid.kernal.processors.cache.datastructures;
-
-import org.apache.ignite.*;
-import org.gridgain.grid.cache.query.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.jdk8.backport.*;
-
-import java.lang.ref.*;
-import java.util.*;
-
-/**
- * Storage for GridCacheQueryFuture.
- */
-public class IgniteQueryFutureStorage {
-    /** Iterators weak references queue. */
-    private final ReferenceQueue<Iterator<?>> refQueue = new ReferenceQueue<>();
-
-    /** Iterators futures. */
-    private final Map<WeakReference<Iterator<?>>, GridCacheQueryFuture<?>> futs = new ConcurrentHashMap8<>();
-
-    /** Logger. */
-    private final IgniteLogger log;
-
-    /**
-     * @param ctx Cache context.
-     */
-    public IgniteQueryFutureStorage(GridCacheContext ctx) {
-        log = ctx.logger(GridCacheSetImpl.class);
-    }
-
-    /**
-     * Iterator over the cache.
-     * @param fut Query to iterate
-     * @return iterator
-     */
-    public <T> Iterator<T> iterator(GridCacheQueryFuture<T> fut) {
-        Iterator<T> it = new Iterator<>(fut);
-
-        futs.put(it.weakReference(), fut);
-
-        return it;
-    }
-
-    /**
-     * Closes unreachable iterators.
-     */
-    private void checkWeakQueue() throws IgniteCheckedException {
-        for (Reference<? extends Iterator<?>> itRef = refQueue.poll(); itRef != null; itRef = refQueue.poll()) {
-            WeakReference<Iterator<?>> weakRef = (WeakReference<Iterator<?>>) itRef;
-
-            GridCacheQueryFuture<?> fut = futs.remove(weakRef);
-
-            if (fut != null)
-                fut.cancel();
-
-        }
-    }
-
-    /**
-     * Checks if set was removed and handles iterators weak reference queue.
-     */
-    public void onAccess() throws IgniteCheckedException {
-        checkWeakQueue();
-    }
-
-    /**
-     * Cancel all cache queries
-     * @throws IgniteCheckedException
-     */
-    protected void clearQueries() throws IgniteCheckedException {
-        for (GridCacheQueryFuture<?> fut : futs.values()) {
-            try {
-                fut.cancel();
-            }
-            catch (IgniteCheckedException e) {
-                log.error("Failed to close iterator.", e);
-            }
-        }
-
-        futs.clear();
-    }
-
-    /**
-     * Iterator over the cache
-     */
-    public class Iterator<T> {
-        /** Query future. */
-        private final GridCacheQueryFuture<T> fut;
-
-        /** Weak reference. */
-        private final WeakReference<Iterator<?>> weakRef;
-
-        /** Init flag. */
-        private boolean init;
-
-        /** Next item. */
-        private T next;
-
-        /** Current item. */
-        private T cur;
-
-        /**
-         * @param fut GridCacheQueryFuture to iterate
-         */
-        Iterator(GridCacheQueryFuture<T> fut) {
-            this.fut = fut;
-
-            this.weakRef = new WeakReference<Iterator<?>>(this, refQueue);
-        }
-
-
-        /**
-         * @throws IgniteCheckedException If failed.
-         */
-        private void init() throws IgniteCheckedException {
-            if (!init) {
-                next = fut.next();
-
-                init = true;
-            }
-        }
-
-        /**
-         * @return Iterator weak reference.
-         */
-        WeakReference<Iterator<?>> weakReference() {
-            return weakRef;
-        }
-
-        /**
-         * Clears weak reference.
-         */
-        private void clearWeakReference() {
-            weakRef.clear();
-
-            futs.remove(weakRef);
-        }
-
-        /**
-         * The same as Iterator.next()
-         */
-        public T onNext() throws IgniteCheckedException {
-            init();
-
-            if (next == null) {
-                clearWeakReference();
-
-                throw new NoSuchElementException();
-            }
-
-            cur = next;
-            next = fut.next();
-
-            if (next == null)
-                clearWeakReference();
-
-            return cur;
-        }
-
-        /**
-         * The same as Iterator.hasNext()
-         */
-        public boolean onHasNext() throws IgniteCheckedException {
-            init();
-
-            boolean hasNext = next != null;
-
-            if (!hasNext)
-                clearWeakReference();
-
-            return hasNext;
-        }
-
-        /**
-         * @return current item to remove
-         * @throws IllegalStateException if the {@code onNext} method has not
-         *         yet been called, or the {@code itemToRemove} method has already
-         *         been called after the last call to the {@code onNext}
-         *         method
-         */
-        public T itemToRemove() {
-            if (cur == null)
-                throw new IllegalStateException();
-            T res = cur;
-            cur = null;
-            return res;
-        }
-    }
-
-}