You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/06/06 11:57:15 UTC
[16/32] ignite git commit: IGNITE-3232 - Inline transformers for
IgniteRDD.savePairs and IgniteRDD.saveValues
IGNITE-3232 - Inline transformers for IgniteRDD.savePairs and IgniteRDD.saveValues
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9df1b905
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9df1b905
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9df1b905
Branch: refs/heads/ignite-3212
Commit: 9df1b905cd86384c1d191785d70a8e5c8e741e48
Parents: 91862c7
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Thu Jun 2 16:03:12 2016 +0300
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Fri Jun 3 20:12:11 2016 +0300
----------------------------------------------------------------------
.../org/apache/ignite/spark/IgniteRDD.scala | 75 +++++++++++++++++++
.../org/apache/ignite/spark/JavaIgniteRDD.scala | 9 +++
.../org/apache/ignite/spark/IgniteRDDSpec.scala | 77 +++++++++++++++++++-
3 files changed, 160 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9df1b905/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 0d8e730..036dfe6 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -180,6 +180,39 @@ class IgniteRDD[K, V] (
}
/**
+ * Saves values from given RDD into Ignite. A unique key will be generated for each value of the given RDD.
+ *
+ * @param rdd RDD instance to save values from.
+ * @param f Transformation function.
+ */
+ def saveValues[T](rdd: RDD[T], f: (T, IgniteContext[K, V]) \u21d2 V) = {
+ rdd.foreachPartition(it \u21d2 {
+ val ig = ic.ignite()
+
+ ensureCache()
+
+ val locNode = ig.cluster().localNode()
+
+ val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode))
+
+ val streamer = ig.dataStreamer[Object, V](cacheName)
+
+ try {
+ it.foreach(t \u21d2 {
+ val value = f(t, ic)
+
+ val key = affinityKeyFunc(value, node.orNull)
+
+ streamer.addData(key, value)
+ })
+ }
+ finally {
+ streamer.close()
+ }
+ })
+ }
+
+ /**
* Saves values from the given key-value RDD into Ignite.
*
* @param rdd RDD instance to save values from.
@@ -209,6 +242,48 @@ class IgniteRDD[K, V] (
}
/**
+ * Saves values from the given RDD into Ignite.
+ *
+ * @param rdd RDD instance to save values from.
+ * @param f Transformation function.
+ * @param overwrite Boolean flag indicating whether the call on this method should overwrite existing
+ * values in Ignite cache.
+ */
+ def savePairs[T](rdd: RDD[T], f: (T, IgniteContext[K, V]) \u21d2 (K, V), overwrite: Boolean) = {
+ rdd.foreachPartition(it \u21d2 {
+ val ig = ic.ignite()
+
+ // Make sure to deploy the cache
+ ensureCache()
+
+ val streamer = ig.dataStreamer[K, V](cacheName)
+
+ try {
+ streamer.allowOverwrite(overwrite)
+
+ it.foreach(t \u21d2 {
+ val tup = f(t, ic)
+
+ streamer.addData(tup._1, tup._2)
+ })
+ }
+ finally {
+ streamer.close()
+ }
+ })
+ }
+
+ /**
+ * Saves values from the given RDD into Ignite.
+ *
+ * @param rdd RDD instance to save values from.
+ * @param f Transformation function.
+ */
+ def savePairs[T](rdd: RDD[T], f: (T, IgniteContext[K, V]) \u21d2 (K, V)): Unit = {
+ savePairs(rdd, f, overwrite = false)
+ }
+
+ /**
* Removes all values from the underlying Ignite cache.
*/
def clear(): Unit = {
http://git-wip-us.apache.org/repos/asf/ignite/blob/9df1b905/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
index 2e8702e..cac0e15 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
@@ -80,12 +80,21 @@ class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V])
def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd))
+ def saveValues[T](jrdd: JavaRDD[T], f: (T, IgniteContext[K, V]) \u21d2 V) = rdd.saveValues(JavaRDD.toRDD(jrdd), f)
+
def savePairs(jrdd: JavaPairRDD[K, V]) = {
val rrdd: RDD[(K, V)] = JavaPairRDD.toRDD(jrdd)
rdd.savePairs(rrdd)
}
+ def savePairs[T](jrdd: JavaRDD[T], f: (T, IgniteContext[K, V]) \u21d2 (K, V), overwrite: Boolean = false) = {
+ rdd.savePairs(JavaRDD.toRDD(jrdd), f, overwrite)
+ }
+
+ def savePairs[T](jrdd: JavaRDD[T], f: (T, IgniteContext[K, V]) \u21d2 (K, V)): Unit =
+ savePairs(jrdd, f, overwrite = false)
+
def clear(): Unit = rdd.clear()
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9df1b905/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
index 8e36275..15a51ae 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
@@ -26,6 +26,7 @@ import org.apache.spark.SparkContext
import org.junit.runner.RunWith
import org.scalatest._
import org.scalatest.junit.JUnitRunner
+import scala.collection.JavaConversions._
import IgniteRDDSpec._
@@ -34,7 +35,7 @@ import scala.annotation.meta.field
@RunWith(classOf[JUnitRunner])
class IgniteRDDSpec extends FunSpec with Matchers with BeforeAndAfterAll with BeforeAndAfterEach {
describe("IgniteRDD") {
- it("should successfully store data to ignite") {
+ it("should successfully store data to ignite using savePairs") {
val sc = new SparkContext("local[*]", "test")
try {
@@ -59,6 +60,80 @@ class IgniteRDDSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be
}
}
+ it("should successfully store data to ignite using savePairs with inline transformation") {
+ val sc = new SparkContext("local[*]", "test")
+
+ try {
+ val ic = new IgniteContext[String, String](sc,
+ () \u21d2 configuration("client", client = true))
+
+ // Save pairs ("0", "val0"), ("1", "val1"), ... to Ignite cache.
+ ic.fromCache(PARTITIONED_CACHE_NAME).savePairs(
+ sc.parallelize(0 to 10000, 2), (i: Int, ic) \u21d2 (String.valueOf(i), "val" + i))
+
+ // Check cache contents.
+ val ignite = Ignition.ignite("grid-0")
+
+ for (i \u2190 0 to 10000) {
+ val res = ignite.cache[String, String](PARTITIONED_CACHE_NAME).get(String.valueOf(i))
+
+ assert(res != null, "Value was not put to cache for key: " + i)
+ assert("val" + i == res, "Invalid value stored for key: " + i)
+ }
+ }
+ finally {
+ sc.stop()
+ }
+ }
+
+ it("should successfully store data to ignite using saveValues") {
+ val sc = new SparkContext("local[*]", "test")
+
+ try {
+ val ic = new IgniteContext[String, String](sc,
+ () \u21d2 configuration("client", client = true))
+
+ // Save pairs ("0", "val0"), ("1", "val1"), ... to Ignite cache.
+ ic.fromCache(PARTITIONED_CACHE_NAME).saveValues(
+ sc.parallelize(0 to 10000, 2).map(i \u21d2 "val" + i))
+
+ // Check cache contents.
+ val ignite = Ignition.ignite("grid-0")
+
+ val values = ignite.cache[String, String](PARTITIONED_CACHE_NAME).toList.map(e \u21d2 e.getValue)
+
+ for (i \u2190 0 to 10000)
+ assert(values.contains("val" + i), "Value not found for index: " + i)
+ }
+ finally {
+ sc.stop()
+ }
+ }
+
+ it("should successfully store data to ignite using saveValues with inline transformation") {
+ val sc = new SparkContext("local[*]", "test")
+
+ try {
+ val ic = new IgniteContext[String, String](sc,
+ () \u21d2 configuration("client", client = true))
+
+ // Save pairs ("0", "val0"), ("1", "val1"), ... to Ignite cache.
+ ic.fromCache(PARTITIONED_CACHE_NAME).saveValues(
+ sc.parallelize(0 to 10000, 2), (i: Int, ic) \u21d2 "val" + i)
+
+ // Check cache contents.
+ val ignite = Ignition.ignite("grid-0")
+
+ val values = ignite.cache[String, String](PARTITIONED_CACHE_NAME).toList.map(e \u21d2 e.getValue)
+
+ for (i \u2190 0 to 10000)
+ assert(values.contains("val" + i), "Value not found for index: " + i)
+ }
+ finally {
+ sc.stop()
+ }
+ }
+
it("should successfully read data from ignite") {
val sc = new SparkContext("local[*]", "test")