You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/07/11 09:27:48 UTC
[49/50] ignite git commit: IGNITE-5686 Endless partition eviction
during node shutdown
IGNITE-5686 Endless partition eviction during node shutdown
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a668a224
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a668a224
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a668a224
Branch: refs/heads/master
Commit: a668a224a8c69afd618667fa19309f1b7a0c8ca3
Parents: 4f3b69c
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Tue Jul 11 10:40:56 2017 +0200
Committer: Igor Seliverstov <gv...@gmail.com>
Committed: Tue Jul 11 10:40:56 2017 +0200
----------------------------------------------------------------------
.../src/main/scala/org/apache/ignite/spark/IgniteRDD.scala | 6 +++---
.../main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala | 6 +++++-
.../org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java | 5 -----
3 files changed, 8 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a668a224/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 81509d0..78e2223 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -60,11 +60,11 @@ class IgniteRDD[K, V] (
val qry: ScanQuery[K, V] = new ScanQuery[K, V](part.index)
- val partNodes = ic.ignite().affinity(cache.getName).mapPartitionToPrimaryAndBackups(part.index)
+ val cur = cache.query(qry)
- val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(qry).iterator()
+ TaskContext.get().addTaskCompletionListener((_) ⇒ cur.close())
- new IgniteQueryIterator[Cache.Entry[K, V], (K, V)](it, entry ⇒ {
+ new IgniteQueryIterator[Cache.Entry[K, V], (K, V)](cur.iterator(), entry ⇒ {
(entry.getKey, entry.getValue)
})
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a668a224/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
index f074572..f386f26 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
@@ -33,7 +33,11 @@ class IgniteSqlRDD[R: ClassTag, T, K, V](
keepBinary: Boolean
) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg, keepBinary) {
override def compute(split: Partition, context: TaskContext): Iterator[R] = {
- new IgniteQueryIterator[T, R](ensureCache().query(qry).iterator(), conv)
+ val cur = ensureCache().query(qry)
+
+ TaskContext.get().addTaskCompletionListener((_) ⇒ cur.close())
+
+ new IgniteQueryIterator[T, R](cur.iterator(), conv)
}
override protected def getPartitions: Array[Partition] = {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a668a224/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
index 5477d43..49bb1ac 100644
--- a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
+++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
@@ -100,11 +100,6 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-5690");
- }
-
- /** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
}