You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/11 15:12:29 UTC

[03/50] incubator-ignite git commit: #IGNITE-389 - More functions on API.

#IGNITE-389 - More functions on API.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/37a7679d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/37a7679d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/37a7679d

Branch: refs/heads/ignite-745
Commit: 37a7679df3fd05473840482d0e5c2c9483d02b2a
Parents: 5d6bb53
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Fri May 29 08:46:52 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Fri May 29 08:46:52 2015 -0700

----------------------------------------------------------------------
 .../org/apache/ignite/spark/IgniteContext.scala | 13 ++++++++
 .../org/apache/ignite/spark/IgniteRDD.scala     | 32 +++++++++++++++++++-
 2 files changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37a7679d/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index a73405b..5b649db 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -23,6 +23,14 @@ import org.apache.ignite.{Ignition, Ignite}
 import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
 import org.apache.spark.SparkContext
 
+/**
+ * Ignite context.
+ *
+ * @param sparkContext Spark context.
+ * @param cfgF Configuration factory.
+ * @tparam K Key type.
+ * @tparam V Value type.
+ */
 class IgniteContext[K, V](
     @scala.transient val sparkContext: SparkContext,
     cfgF: () => IgniteConfiguration
@@ -61,4 +69,9 @@ class IgniteContext[K, V](
         }
     }
 
+    def close() = {
+        val igniteCfg = cfgF()
+
+        Ignition.stop(igniteCfg.getGridName, false)
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37a7679d/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 30efa7a..358fcd4 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -29,12 +29,27 @@ import org.apache.spark.{TaskContext, Partition}
 
 import scala.collection.JavaConversions._
 
+/**
+ * Ignite RDD. Represents Ignite cache as Spark RDD abstraction.
+ *
+ * @param ic Ignite context to use.
+ * @param cacheName Cache name.
+ * @param cacheCfg Cache configuration.
+ * @tparam K Key type.
+ * @tparam V Value type.
+ */
 class IgniteRDD[K, V] (
     ic: IgniteContext[K, V],
     cacheName: String,
     cacheCfg: CacheConfiguration[K, V]
 ) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) {
-
+    /**
+     * Computes iterator based on given partition.
+     *
+     * @param part Partition to use.
+     * @param context Task context.
+     * @return Partition iterator.
+     */
     override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
         val cache = ensureCache()
 
@@ -49,6 +64,11 @@ class IgniteRDD[K, V] (
         })
     }
 
+    /**
+     * Gets partitions for the given cache RDD.
+     *
+     * @return Partitions.
+     */
     override protected def getPartitions: Array[Partition] = {
         ensureCache()
 
@@ -57,6 +77,12 @@ class IgniteRDD[K, V] (
         (0 until parts).map(new IgnitePartition(_)).toArray
     }
 
+    /**
+     * Gets prefferred locations for the given partition.
+     *
+     * @param split Split partition.
+     * @return
+     */
     override protected def getPreferredLocations(split: Partition): Seq[String] = {
         ensureCache()
 
@@ -129,6 +155,10 @@ class IgniteRDD[K, V] (
         })
     }
 
+    def clear(): Unit = {
+        ensureCache().removeAll()
+    }
+
     private def affinityKeyFunc(value: V, node: ClusterNode): Object = {
         IgniteUuid.randomUuid()
     }