You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/06/06 11:57:15 UTC

[16/32] ignite git commit: IGNITE-3232 - Inline transformers for IgniteRDD.savePairs and IgniteRDD.saveValues

IGNITE-3232 - Inline transformers for IgniteRDD.savePairs and IgniteRDD.saveValues


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9df1b905
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9df1b905
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9df1b905

Branch: refs/heads/ignite-3212
Commit: 9df1b905cd86384c1d191785d70a8e5c8e741e48
Parents: 91862c7
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Thu Jun 2 16:03:12 2016 +0300
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Fri Jun 3 20:12:11 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/spark/IgniteRDD.scala     | 75 +++++++++++++++++++
 .../org/apache/ignite/spark/JavaIgniteRDD.scala |  9 +++
 .../org/apache/ignite/spark/IgniteRDDSpec.scala | 77 +++++++++++++++++++-
 3 files changed, 160 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9df1b905/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 0d8e730..036dfe6 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -180,6 +180,39 @@ class IgniteRDD[K, V] (
     }
 
     /**
+     * Saves values from given RDD into Ignite. A unique key will be generated for each value of the given RDD.
+     *
+     * @param rdd RDD instance to save values from.
+     * @param f Transformation function.
+     */
+    def saveValues[T](rdd: RDD[T], f: (T, IgniteContext[K, V]) \u21d2 V) = {
+        rdd.foreachPartition(it \u21d2 {
+            val ig = ic.ignite()
+
+            ensureCache()
+
+            val locNode = ig.cluster().localNode()
+
+            val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode))
+
+            val streamer = ig.dataStreamer[Object, V](cacheName)
+
+            try {
+                it.foreach(t \u21d2 {
+                    val value = f(t, ic)
+
+                    val key = affinityKeyFunc(value, node.orNull)
+
+                    streamer.addData(key, value)
+                })
+            }
+            finally {
+                streamer.close()
+            }
+        })
+    }
+
+    /**
      * Saves values from the given key-value RDD into Ignite.
      *
      * @param rdd RDD instance to save values from.
@@ -209,6 +242,48 @@ class IgniteRDD[K, V] (
     }
 
     /**
+     * Saves values from the given RDD into Ignite.
+     *
+     * @param rdd RDD instance to save values from.
+     * @param f Transformation function.
+     * @param overwrite Boolean flag indicating whether the call on this method should overwrite existing
+     *      values in Ignite cache.
+     */
+    def savePairs[T](rdd: RDD[T], f: (T, IgniteContext[K, V]) \u21d2 (K, V), overwrite: Boolean) = {
+        rdd.foreachPartition(it \u21d2 {
+            val ig = ic.ignite()
+
+            // Make sure to deploy the cache
+            ensureCache()
+
+            val streamer = ig.dataStreamer[K, V](cacheName)
+
+            try {
+                streamer.allowOverwrite(overwrite)
+
+                it.foreach(t \u21d2 {
+                    val tup = f(t, ic)
+
+                    streamer.addData(tup._1, tup._2)
+                })
+            }
+            finally {
+                streamer.close()
+            }
+        })
+    }
+
+    /**
+     * Saves values from the given RDD into Ignite.
+     *
+     * @param rdd RDD instance to save values from.
+     * @param f Transformation function.
+     */
+    def savePairs[T](rdd: RDD[T], f: (T, IgniteContext[K, V]) \u21d2 (K, V)): Unit = {
+        savePairs(rdd, f, overwrite = false)
+    }
+
+    /**
      * Removes all values from the underlying Ignite cache.
      */
     def clear(): Unit = {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df1b905/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
index 2e8702e..cac0e15 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
@@ -80,12 +80,21 @@ class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V])
 
     def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd))
 
+    def saveValues[T](jrdd: JavaRDD[T], f: (T, IgniteContext[K, V]) \u21d2 V) = rdd.saveValues(JavaRDD.toRDD(jrdd), f)
+
     def savePairs(jrdd: JavaPairRDD[K, V]) = {
         val rrdd: RDD[(K, V)] = JavaPairRDD.toRDD(jrdd)
 
         rdd.savePairs(rrdd)
     }
 
+    def savePairs[T](jrdd: JavaRDD[T], f: (T, IgniteContext[K, V]) \u21d2 (K, V), overwrite: Boolean = false) = {
+        rdd.savePairs(JavaRDD.toRDD(jrdd), f, overwrite)
+    }
+
+    def savePairs[T](jrdd: JavaRDD[T], f: (T, IgniteContext[K, V]) \u21d2 (K, V)): Unit =
+        savePairs(jrdd, f, overwrite = false)
+
     def clear(): Unit = rdd.clear()
 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9df1b905/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
index 8e36275..15a51ae 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
@@ -26,6 +26,7 @@ import org.apache.spark.SparkContext
 import org.junit.runner.RunWith
 import org.scalatest._
 import org.scalatest.junit.JUnitRunner
+import scala.collection.JavaConversions._
 
 import IgniteRDDSpec._
 
@@ -34,7 +35,7 @@ import scala.annotation.meta.field
 @RunWith(classOf[JUnitRunner])
 class IgniteRDDSpec extends FunSpec with Matchers with BeforeAndAfterAll with BeforeAndAfterEach {
     describe("IgniteRDD") {
-        it("should successfully store data to ignite") {
+        it("should successfully store data to ignite using savePairs") {
             val sc = new SparkContext("local[*]", "test")
 
             try {
@@ -59,6 +60,80 @@ class IgniteRDDSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be
             }
         }
 
+        it("should successfully store data to ignite using savePairs with inline transformation") {
+            val sc = new SparkContext("local[*]", "test")
+
+            try {
+                val ic = new IgniteContext[String, String](sc,
+                    () \u21d2 configuration("client", client = true))
+
+                // Save pairs ("0", "val0"), ("1", "val1"), ... to Ignite cache.
+                ic.fromCache(PARTITIONED_CACHE_NAME).savePairs(
+                    sc.parallelize(0 to 10000, 2), (i: Int, ic) \u21d2 (String.valueOf(i), "val" + i))
+
+                // Check cache contents.
+                val ignite = Ignition.ignite("grid-0")
+
+                for (i \u2190 0 to 10000) {
+                    val res = ignite.cache[String, String](PARTITIONED_CACHE_NAME).get(String.valueOf(i))
+
+                    assert(res != null, "Value was not put to cache for key: " + i)
+                    assert("val" + i == res, "Invalid value stored for key: " + i)
+                }
+            }
+            finally {
+                sc.stop()
+            }
+        }
+
+        it("should successfully store data to ignite using saveValues") {
+            val sc = new SparkContext("local[*]", "test")
+
+            try {
+                val ic = new IgniteContext[String, String](sc,
+                    () \u21d2 configuration("client", client = true))
+
+                // Save pairs ("0", "val0"), ("1", "val1"), ... to Ignite cache.
+                ic.fromCache(PARTITIONED_CACHE_NAME).saveValues(
+                    sc.parallelize(0 to 10000, 2).map(i \u21d2 "val" + i))
+
+                // Check cache contents.
+                val ignite = Ignition.ignite("grid-0")
+
+                val values = ignite.cache[String, String](PARTITIONED_CACHE_NAME).toList.map(e \u21d2 e.getValue)
+
+                for (i \u2190 0 to 10000)
+                    assert(values.contains("val" + i), "Value not found for index: " + i)
+            }
+            finally {
+                sc.stop()
+            }
+        }
+
+        it("should successfully store data to ignite using saveValues with inline transformation") {
+            val sc = new SparkContext("local[*]", "test")
+
+            try {
+                val ic = new IgniteContext[String, String](sc,
+                    () \u21d2 configuration("client", client = true))
+
+                // Save pairs ("0", "val0"), ("1", "val1"), ... to Ignite cache.
+                ic.fromCache(PARTITIONED_CACHE_NAME).saveValues(
+                    sc.parallelize(0 to 10000, 2), (i: Int, ic) \u21d2 "val" + i)
+
+                // Check cache contents.
+                val ignite = Ignition.ignite("grid-0")
+
+                val values = ignite.cache[String, String](PARTITIONED_CACHE_NAME).toList.map(e \u21d2 e.getValue)
+
+                for (i \u2190 0 to 10000)
+                    assert(values.contains("val" + i), "Value not found for index: " + i)
+            }
+            finally {
+                sc.stop()
+            }
+        }
+
         it("should successfully read data from ignite") {
             val sc = new SparkContext("local[*]", "test")