You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/07/11 09:27:48 UTC

[49/50] ignite git commit: IGNITE-5686 Endless partition eviction during node shutdown

IGNITE-5686 Endless partition eviction during node shutdown


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a668a224
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a668a224
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a668a224

Branch: refs/heads/master
Commit: a668a224a8c69afd618667fa19309f1b7a0c8ca3
Parents: 4f3b69c
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Tue Jul 11 10:40:56 2017 +0200
Committer: Igor Seliverstov <gv...@gmail.com>
Committed: Tue Jul 11 10:40:56 2017 +0200

----------------------------------------------------------------------
 .../src/main/scala/org/apache/ignite/spark/IgniteRDD.scala     | 6 +++---
 .../main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala | 6 +++++-
 .../org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java | 5 -----
 3 files changed, 8 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a668a224/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 81509d0..78e2223 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -60,11 +60,11 @@ class IgniteRDD[K, V] (
 
         val qry: ScanQuery[K, V] = new ScanQuery[K, V](part.index)
 
-        val partNodes = ic.ignite().affinity(cache.getName).mapPartitionToPrimaryAndBackups(part.index)
+        val cur = cache.query(qry)
 
-        val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(qry).iterator()
+        TaskContext.get().addTaskCompletionListener((_) ⇒ cur.close())
 
-        new IgniteQueryIterator[Cache.Entry[K, V], (K, V)](it, entry ⇒ {
+        new IgniteQueryIterator[Cache.Entry[K, V], (K, V)](cur.iterator(), entry ⇒ {
             (entry.getKey, entry.getValue)
         })
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a668a224/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
index f074572..f386f26 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
@@ -33,7 +33,11 @@ class IgniteSqlRDD[R: ClassTag, T, K, V](
     keepBinary: Boolean
 ) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg, keepBinary) {
     override def compute(split: Partition, context: TaskContext): Iterator[R] = {
-        new IgniteQueryIterator[T, R](ensureCache().query(qry).iterator(), conv)
+        val cur = ensureCache().query(qry)
+
+        TaskContext.get().addTaskCompletionListener((_) ⇒ cur.close())
+
+        new IgniteQueryIterator[T, R](cur.iterator(), conv)
     }
 
     override protected def getPartitions: Array[Partition] = {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a668a224/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
index 5477d43..49bb1ac 100644
--- a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
+++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
@@ -100,11 +100,6 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-5690");
-    }
-
-    /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         stopAllGrids();
     }