You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/06/11 07:05:31 UTC
[12/50] incubator-ignite git commit: #IGNITE-389 - More functions on
API.
#IGNITE-389 - More functions on API.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/37a7679d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/37a7679d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/37a7679d
Branch: refs/heads/ignite-sprint-5
Commit: 37a7679df3fd05473840482d0e5c2c9483d02b2a
Parents: 5d6bb53
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Fri May 29 08:46:52 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Fri May 29 08:46:52 2015 -0700
----------------------------------------------------------------------
.../org/apache/ignite/spark/IgniteContext.scala | 13 ++++++++
.../org/apache/ignite/spark/IgniteRDD.scala | 32 +++++++++++++++++++-
2 files changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37a7679d/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index a73405b..5b649db 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -23,6 +23,14 @@ import org.apache.ignite.{Ignition, Ignite}
import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
import org.apache.spark.SparkContext
+/**
+ * Ignite context.
+ *
+ * @param sparkContext Spark context.
+ * @param cfgF Configuration factory.
+ * @tparam K Key type.
+ * @tparam V Value type.
+ */
class IgniteContext[K, V](
@scala.transient val sparkContext: SparkContext,
cfgF: () => IgniteConfiguration
@@ -61,4 +69,9 @@ class IgniteContext[K, V](
}
}
+ def close() = {
+ val igniteCfg = cfgF()
+
+ Ignition.stop(igniteCfg.getGridName, false)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37a7679d/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 30efa7a..358fcd4 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -29,12 +29,27 @@ import org.apache.spark.{TaskContext, Partition}
import scala.collection.JavaConversions._
+/**
+ * Ignite RDD. Represents Ignite cache as Spark RDD abstraction.
+ *
+ * @param ic Ignite context to use.
+ * @param cacheName Cache name.
+ * @param cacheCfg Cache configuration.
+ * @tparam K Key type.
+ * @tparam V Value type.
+ */
class IgniteRDD[K, V] (
ic: IgniteContext[K, V],
cacheName: String,
cacheCfg: CacheConfiguration[K, V]
) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) {
-
+ /**
+ * Computes iterator based on given partition.
+ *
+ * @param part Partition to use.
+ * @param context Task context.
+ * @return Partition iterator.
+ */
override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
val cache = ensureCache()
@@ -49,6 +64,11 @@ class IgniteRDD[K, V] (
})
}
+ /**
+ * Gets partitions for the given cache RDD.
+ *
+ * @return Partitions.
+ */
override protected def getPartitions: Array[Partition] = {
ensureCache()
@@ -57,6 +77,12 @@ class IgniteRDD[K, V] (
(0 until parts).map(new IgnitePartition(_)).toArray
}
+ /**
+ * Gets prefferred locations for the given partition.
+ *
+ * @param split Split partition.
+ * @return
+ */
override protected def getPreferredLocations(split: Partition): Seq[String] = {
ensureCache()
@@ -129,6 +155,10 @@ class IgniteRDD[K, V] (
})
}
+ def clear(): Unit = {
+ ensureCache().removeAll()
+ }
+
private def affinityKeyFunc(value: V, node: ClusterNode): Object = {
IgniteUuid.randomUuid()
}