You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/01/15 17:17:48 UTC
incubator-ignite git commit: #IGNITE-53: Refactoring
IgniteQueryStorage
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-53 d6372ea5a -> 224f7e2fc
#IGNITE-53: Refactoring IgniteQueryStorage
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/224f7e2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/224f7e2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/224f7e2f
Branch: refs/heads/ignite-53
Commit: 224f7e2fcac84302e31478de8faaaecbc290c250
Parents: d6372ea
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu Jan 15 19:16:57 2015 +0400
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu Jan 15 19:16:57 2015 +0400
----------------------------------------------------------------------
.../processors/cache/IgniteCacheProxy.java | 69 ++----
.../cache/datastructures/GridCacheSetImpl.java | 168 ++-----------
.../IgniteQueryAbstractStorage.java | 233 +++++++++++++++++++
.../IgniteQueryFutureStorage.java | 190 ---------------
4 files changed, 276 insertions(+), 384 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/224f7e2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index cda62b2..b0ae913 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -24,7 +24,6 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.query.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.cache.datastructures.*;
@@ -63,8 +62,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements
/** Projection. */
private GridCacheProjectionImpl<K, V> prj;
- /** Query future storage */
- private final IgniteQueryFutureStorage queryStorage;
+ /** Query storage */
+ private final IgniteQueryStorage queryStorage;
/**
* @param ctx Context.
@@ -85,7 +84,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements
this.delegate = delegate;
this.prj = prj;
- this.queryStorage = new IgniteQueryFutureStorage(ctx);
+ this.queryStorage = new IgniteQueryStorage(ctx);
gate = ctx.gate();
}
@@ -983,7 +982,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
- return new IgniteCacheIterator(delegate.queries().createScanQuery(null).execute(), queryStorage);
+ return queryStorage.iterator(delegate.queries().createScanQuery(null).execute());
}
finally {
gate.leave(prev);
@@ -1236,55 +1235,37 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements
}
/**
- * Iterator over the cache.
+ * Queries' storage
*/
- private class IgniteCacheIterator implements Iterator<Cache.Entry<K, V>> {
- /** Iterator over the cache*/
- IgniteQueryFutureStorage.Iterator<Map.Entry<K, V>> iter;
-
- IgniteCacheIterator(GridCacheQueryFuture<Map.Entry<K, V>> fut, IgniteQueryFutureStorage storage) {
- iter = storage.iterator(fut);
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasNext() {
- try {
- return iter.onHasNext();
- } catch (IgniteCheckedException e) {
- throw cacheException(e);
- }
+ private class IgniteQueryStorage extends IgniteQueryAbstractStorage<Entry<K, V>, Map.Entry<K, V>> {
+ /**
+ * @param ctx Cache context.
+ */
+ public IgniteQueryStorage(GridCacheContext ctx) {
+ super(ctx);
}
/** {@inheritDoc} */
- @Override public Entry<K, V> next() {
- try {
- final Map.Entry<K, V> cur = iter.onNext();
- return new Cache.Entry<K, V>() {
- @Override public K getKey() {
- return cur.getKey();
- }
-
- @Override public V getValue() {
- return cur.getValue();
- }
-
- @Override public <T> T unwrap(Class<T> clazz) {
- throw new IllegalArgumentException();
- }
- };
- }
- catch (IgniteCheckedException e) {
- throw cacheException(e);
- }
+ @Override protected Cache.Entry<K, V> convert(final Map.Entry<K, V> v) {
+ return new Cache.Entry<K, V>() {
+ @Override public K getKey() {
+ return v.getKey();
+ }
+ @Override public V getValue() {
+ return v.getValue();
+ }
+ @Override public <T> T unwrap(Class<T> clazz) {
+ throw new IllegalArgumentException();
+ }
+ };
}
/** {@inheritDoc} */
- @Override public void remove() {
- Map.Entry<K, V> curEntry = iter.itemToRemove();
+ @Override protected void remove(Entry<K, V> item) {
try {
- delegate.removex(curEntry.getKey());
+ delegate.removex(item.getKey());
}
catch (IgniteCheckedException e) {
throw cacheException(e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/224f7e2f/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java
index 9519ad8..23eca52 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java
@@ -20,7 +20,6 @@ package org.gridgain.grid.kernal.processors.cache.datastructures;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.affinity.*;
import org.gridgain.grid.cache.datastructures.*;
@@ -31,11 +30,9 @@ import org.gridgain.grid.util.*;
import org.gridgain.grid.util.lang.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
-import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
import java.io.*;
-import java.lang.ref.*;
import java.util.*;
import java.util.concurrent.*;
@@ -54,9 +51,6 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa
/** Cache. */
private final GridCache<GridCacheSetItemKey, Boolean> cache;
- /** Logger. */
- private final IgniteLogger log;
-
/** Set name. */
private final String name;
@@ -72,11 +66,8 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa
/** Removed flag. */
private volatile boolean rmvd;
- /** Iterators weak references queue. */
- private final ReferenceQueue<SetIterator<?>> itRefQueue = new ReferenceQueue<>();
-
- /** Iterators futures. */
- private final Map<WeakReference<SetIterator<?>>, GridCacheQueryFuture<?>> itFuts = new ConcurrentHashMap8<>();
+ /** Query storage */
+ private final IgniteQueryStorage queryStorage;
/**
* @param ctx Cache context.
@@ -92,9 +83,9 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa
cache = ctx.cache();
- log = ctx.logger(GridCacheSetImpl.class);
-
hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name));
+
+ queryStorage = new IgniteQueryStorage(ctx);
}
/** {@inheritDoc} */
@@ -349,16 +340,10 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa
qry.projection(ctx.grid().forNodes(nodes));
- GridCacheQueryFuture<T> fut = qry.execute();
-
- SetIterator<T> it = new SetIterator<>(fut);
-
- itFuts.put(it.weakReference(), fut);
+ IgniteQueryAbstractStorage.IgniteIterator it = queryStorage.iterator(qry.execute());
if (rmvd) {
- itFuts.remove(it.weakReference());
-
- it.close();
+ queryStorage.removeIterator(it);
checkRemoved();
}
@@ -444,18 +429,8 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa
this.rmvd = rmvd;
- if (rmvd) {
- for (GridCacheQueryFuture<?> fut : itFuts.values()) {
- try {
- fut.cancel();
- }
- catch (IgniteCheckedException e) {
- log.error("Failed to close iterator.", e);
- }
- }
-
- itFuts.clear();
- }
+ if (rmvd)
+ queryStorage.clearQueries();
}
/**
@@ -467,29 +442,10 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa
}
/**
- * Closes unreachable iterators.
- */
- private void checkWeakQueue() {
- for (Reference<? extends SetIterator<?>> itRef = itRefQueue.poll(); itRef != null; itRef = itRefQueue.poll()) {
- try {
- WeakReference<SetIterator<?>> weakRef = (WeakReference<SetIterator<?>>)itRef;
-
- GridCacheQueryFuture<?> fut = itFuts.remove(weakRef);
-
- if (fut != null)
- fut.cancel();
- }
- catch (IgniteCheckedException e) {
- log.error("Failed to close iterator.", e);
- }
- }
- }
-
- /**
* Checks if set was removed and handles iterators weak reference queue.
*/
private void onAccess() {
- checkWeakQueue();
+ queryStorage.checkWeakQueue();
checkRemoved();
}
@@ -522,112 +478,24 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa
}
/**
- *
+ * Queries' storage.
*/
- private class SetIterator<T> extends GridCloseableIteratorAdapter<T> {
- /** */
- private static final long serialVersionUID = -1460570789166994846L;
-
- /** Query future. */
- private final GridCacheQueryFuture<T> fut;
-
- /** Init flag. */
- private boolean init;
-
- /** Next item. */
- private T next;
-
- /** Current item. */
- private T cur;
-
- /** Weak reference. */
- private final WeakReference<SetIterator<?>> weakRef;
-
+ private class IgniteQueryStorage extends IgniteQueryAbstractStorage<T, Map.Entry<T, ?>> {
/**
- * @param fut Query future.
+ * @param ctx Cache context.
*/
- private SetIterator(GridCacheQueryFuture<T> fut) {
- this.fut = fut;
-
- weakRef = new WeakReference<SetIterator<?>>(this, itRefQueue);
- }
-
- /** {@inheritDoc} */
- @Override protected T onNext() throws IgniteCheckedException {
- init();
-
- if (next == null) {
- clearWeakReference();
-
- throw new NoSuchElementException();
- }
-
- cur = next;
-
- Map.Entry e = (Map.Entry)fut.next();
-
- next = e != null ? (T)e.getKey() : null;
-
- if (next == null)
- clearWeakReference();
-
- return cur;
+ public IgniteQueryStorage(GridCacheContext ctx) {
+ super(ctx);
}
/** {@inheritDoc} */
- @Override protected boolean onHasNext() throws IgniteCheckedException {
- init();
-
- boolean hasNext = next != null;
-
- if (!hasNext)
- clearWeakReference();
-
- return hasNext;
- }
-
- /** {@inheritDoc} */
- @Override protected void onClose() throws IgniteCheckedException {
- fut.cancel();
-
- clearWeakReference();
+ @Override protected T convert(Map.Entry<T, ?> v) {
+ return v != null ? (T) v.getKey() : null;
}
/** {@inheritDoc} */
- @Override protected void onRemove() throws IgniteCheckedException {
- if (cur == null)
- throw new NoSuchElementException();
-
- GridCacheSetImpl.this.remove(cur);
- }
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- private void init() throws IgniteCheckedException {
- if (!init) {
- Map.Entry e = (Map.Entry)fut.next();
-
- next = e != null ? (T)e.getKey() : null;
-
- init = true;
- }
- }
-
- /**
- * @return Iterator weak reference.
- */
- WeakReference<SetIterator<?>> weakReference() {
- return weakRef;
- }
-
- /**
- * Clears weak reference.
- */
- private void clearWeakReference() {
- weakRef.clear(); // Do not need to enqueue.
-
- itFuts.remove(weakRef);
+ @Override protected void remove(T item) {
+ GridCacheSetImpl.this.remove(item);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/224f7e2f/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryAbstractStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryAbstractStorage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryAbstractStorage.java
new file mode 100644
index 0000000..443fb86
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryAbstractStorage.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.gridgain.grid.kernal.processors.cache.datastructures;
+
+import org.apache.ignite.*;
+import org.gridgain.grid.cache.query.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.util.*;
+import org.jdk8.backport.*;
+
+import java.lang.ref.*;
+import java.util.*;
+
+/**
+ * Storage for GridCacheQueryFuture.
+ * @param <T> Type for iterator.
+ * @param <V> Type for cache query future.
+ */
+public abstract class IgniteQueryAbstractStorage<T, V> {
+ /** Iterators weak references queue. */
+ private final ReferenceQueue<IgniteIterator> refQueue = new ReferenceQueue<>();
+
+ /** Iterators futures. */
+ private final Map<WeakReference<IgniteIterator>, GridCacheQueryFuture<V>> futs = new ConcurrentHashMap8<>();
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /**
+ * @param ctx Cache context.
+ */
+ public IgniteQueryAbstractStorage(GridCacheContext ctx) {
+ log = ctx.logger(IgniteQueryAbstractStorage.class);
+ }
+
+ /**
+ * Iterator over the cache.
+ * @param fut Query to iterate
+ * @return iterator
+ */
+ public IgniteIterator iterator(GridCacheQueryFuture<V> fut) {
+ IgniteIterator it = new IgniteIterator(fut);
+
+ futs.put(it.weakReference(), fut);
+
+ return it;
+ }
+
+ public void removeIterator(IgniteIterator it) throws IgniteCheckedException {
+ futs.remove(it.weakReference());
+
+ it.close();
+ }
+
+ /**
+ * Closes unreachable iterators.
+ */
+ public void checkWeakQueue() {
+ for (Reference<? extends IgniteIterator> itRef = refQueue.poll(); itRef != null; itRef = refQueue.poll()) {
+ try {
+ WeakReference<IgniteIterator> weakRef = (WeakReference<IgniteIterator>)itRef;
+
+ GridCacheQueryFuture<?> fut = futs.remove(weakRef);
+
+ if (fut != null)
+ fut.cancel();
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed to close iterator.", e);
+ }
+ }
+ }
+
+ /**
+ * Checks if set was removed and handles iterators weak reference queue.
+ */
+ public void onAccess() throws IgniteCheckedException {
+ checkWeakQueue();
+ }
+
+ /**
+ * Cancel all cache queries
+ */
+ protected void clearQueries(){
+ for (GridCacheQueryFuture<?> fut : futs.values()) {
+ try {
+ fut.cancel();
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed to close iterator.", e);
+ }
+ }
+
+ futs.clear();
+ }
+
+ /**
+ * Convert class V to class T.
+ * @param v Item to convert.
+ * @return Converted item.
+ */
+ protected abstract T convert(V v);
+
+ /**
+ * Remove item from the cache.
+ * @param item Item to remove.
+ */
+ protected abstract void remove(T item);
+
+ /**
+ * Iterator over the cache.
+ */
+ public class IgniteIterator extends GridCloseableIteratorAdapter<T> {
+ /** Query future. */
+ private final GridCacheQueryFuture<V> fut;
+
+ /** Weak reference. */
+ private final WeakReference<IgniteIterator> weakRef;
+
+ /** Init flag. */
+ private boolean init;
+
+ /** Next item. */
+ private T next;
+
+ /** Current item. */
+ private T cur;
+
+ /**
+ * @param fut GridCacheQueryFuture to iterate
+ */
+ IgniteIterator(GridCacheQueryFuture<V> fut) {
+ this.fut = fut;
+
+ this.weakRef = new WeakReference<IgniteIterator>(this, refQueue);
+ }
+
+ /**
+ * @return Iterator weak reference.
+ */
+ public WeakReference<IgniteIterator> weakReference() {
+ return weakRef;
+ }
+
+ /** {@inheritDoc} */
+ @Override public T onNext() throws IgniteCheckedException {
+ init();
+
+ if (next == null) {
+ clearWeakReference();
+
+ throw new NoSuchElementException();
+ }
+
+ cur = next;
+
+ V futNext = fut.next();
+
+ if (futNext == null)
+ clearWeakReference();
+
+ next = futNext != null ? convert(futNext) : null;
+
+ return cur;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onHasNext() throws IgniteCheckedException {
+ init();
+
+ boolean hasNext = next != null;
+
+ if (!hasNext)
+ clearWeakReference();
+
+ return hasNext;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onClose() throws IgniteCheckedException {
+ fut.cancel();
+
+ clearWeakReference();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onRemove() throws IgniteCheckedException {
+ if (cur == null)
+ throw new IllegalStateException();
+
+ IgniteQueryAbstractStorage.this.remove(cur);
+
+ cur = null;
+ }
+
+ /**
+ * Clears weak reference.
+ */
+ private void clearWeakReference() {
+ weakRef.clear();
+
+ futs.remove(weakRef);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void init() throws IgniteCheckedException {
+ if (!init) {
+ V futNext = fut.next();
+
+ next = futNext != null ? convert(futNext) : null;
+
+ init = true;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/224f7e2f/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java
deleted file mode 100644
index d24045f..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java
+++ /dev/null
@@ -1,190 +0,0 @@
-package org.gridgain.grid.kernal.processors.cache.datastructures;
-
-import org.apache.ignite.*;
-import org.gridgain.grid.cache.query.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.jdk8.backport.*;
-
-import java.lang.ref.*;
-import java.util.*;
-
-/**
- * Storage for GridCacheQueryFuture.
- */
-public class IgniteQueryFutureStorage {
- /** Iterators weak references queue. */
- private final ReferenceQueue<Iterator<?>> refQueue = new ReferenceQueue<>();
-
- /** Iterators futures. */
- private final Map<WeakReference<Iterator<?>>, GridCacheQueryFuture<?>> futs = new ConcurrentHashMap8<>();
-
- /** Logger. */
- private final IgniteLogger log;
-
- /**
- * @param ctx Cache context.
- */
- public IgniteQueryFutureStorage(GridCacheContext ctx) {
- log = ctx.logger(GridCacheSetImpl.class);
- }
-
- /**
- * Iterator over the cache.
- * @param fut Query to iterate
- * @return iterator
- */
- public <T> Iterator<T> iterator(GridCacheQueryFuture<T> fut) {
- Iterator<T> it = new Iterator<>(fut);
-
- futs.put(it.weakReference(), fut);
-
- return it;
- }
-
- /**
- * Closes unreachable iterators.
- */
- private void checkWeakQueue() throws IgniteCheckedException {
- for (Reference<? extends Iterator<?>> itRef = refQueue.poll(); itRef != null; itRef = refQueue.poll()) {
- WeakReference<Iterator<?>> weakRef = (WeakReference<Iterator<?>>) itRef;
-
- GridCacheQueryFuture<?> fut = futs.remove(weakRef);
-
- if (fut != null)
- fut.cancel();
-
- }
- }
-
- /**
- * Checks if set was removed and handles iterators weak reference queue.
- */
- public void onAccess() throws IgniteCheckedException {
- checkWeakQueue();
- }
-
- /**
- * Cancel all cache queries
- * @throws IgniteCheckedException
- */
- protected void clearQueries() throws IgniteCheckedException {
- for (GridCacheQueryFuture<?> fut : futs.values()) {
- try {
- fut.cancel();
- }
- catch (IgniteCheckedException e) {
- log.error("Failed to close iterator.", e);
- }
- }
-
- futs.clear();
- }
-
- /**
- * Iterator over the cache
- */
- public class Iterator<T> {
- /** Query future. */
- private final GridCacheQueryFuture<T> fut;
-
- /** Weak reference. */
- private final WeakReference<Iterator<?>> weakRef;
-
- /** Init flag. */
- private boolean init;
-
- /** Next item. */
- private T next;
-
- /** Current item. */
- private T cur;
-
- /**
- * @param fut GridCacheQueryFuture to iterate
- */
- Iterator(GridCacheQueryFuture<T> fut) {
- this.fut = fut;
-
- this.weakRef = new WeakReference<Iterator<?>>(this, refQueue);
- }
-
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- private void init() throws IgniteCheckedException {
- if (!init) {
- next = fut.next();
-
- init = true;
- }
- }
-
- /**
- * @return Iterator weak reference.
- */
- WeakReference<Iterator<?>> weakReference() {
- return weakRef;
- }
-
- /**
- * Clears weak reference.
- */
- private void clearWeakReference() {
- weakRef.clear();
-
- futs.remove(weakRef);
- }
-
- /**
- * The same as Iterator.next()
- */
- public T onNext() throws IgniteCheckedException {
- init();
-
- if (next == null) {
- clearWeakReference();
-
- throw new NoSuchElementException();
- }
-
- cur = next;
- next = fut.next();
-
- if (next == null)
- clearWeakReference();
-
- return cur;
- }
-
- /**
- * The same as Iterator.hasNext()
- */
- public boolean onHasNext() throws IgniteCheckedException {
- init();
-
- boolean hasNext = next != null;
-
- if (!hasNext)
- clearWeakReference();
-
- return hasNext;
- }
-
- /**
- * @return current item to remove
- * @throws IllegalStateException if the {@code onNext} method has not
- * yet been called, or the {@code itemToRemove} method has already
- * been called after the last call to the {@code onNext}
- * method
- */
- public T itemToRemove() {
- if (cur == null)
- throw new IllegalStateException();
- T res = cur;
- cur = null;
- return res;
- }
- }
-
-}