You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2016/06/14 15:09:49 UTC

[10/10] ignite git commit: IGNITE-3215 - Added IgniteRDD.withKeepBinary method

IGNITE-3215 - Added IgniteRDD.withKeepBinary method


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b90d182c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b90d182c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b90d182c

Branch: refs/heads/ignite-3215
Commit: b90d182ca2dc675bf7fd1550ffda648911ed82d6
Parents: 98a0990
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Tue Jun 14 18:09:10 2016 +0300
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Tue Jun 14 18:09:10 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/spark/IgniteContext.scala |  4 +--
 .../org/apache/ignite/spark/IgniteRDD.scala     | 19 ++++++++---
 .../apache/ignite/spark/JavaIgniteContext.scala |  4 +--
 .../org/apache/ignite/spark/JavaIgniteRDD.scala |  2 ++
 .../ignite/spark/impl/IgniteAbstractRDD.scala   | 15 ++++++---
 .../apache/ignite/spark/impl/IgniteSqlRDD.scala |  5 +--
 .../spark/impl/JavaIgniteAbstractRDD.scala      | 34 --------------------
 .../org/apache/ignite/spark/IgniteRDDSpec.scala | 25 ++++++++++++--
 8 files changed, 58 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index bd61974..1084dbe 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -107,7 +107,7 @@ class IgniteContext[K, V](
      * @return `IgniteRDD` instance.
      */
     def fromCache(cacheName: String): IgniteRDD[K, V] = {
-        new IgniteRDD[K, V](this, cacheName, null)
+        new IgniteRDD[K, V](this, cacheName, null, false)
     }
 
     /**
@@ -118,7 +118,7 @@ class IgniteContext[K, V](
      * @return `IgniteRDD` instance.
      */
     def fromCache(cacheCfg: CacheConfiguration[K, V]) = {
-        new IgniteRDD[K, V](this, cacheCfg.getName, cacheCfg)
+        new IgniteRDD[K, V](this, cacheCfg.getName, cacheCfg, false)
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index fa96212..cad96b9 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -45,8 +45,9 @@ import scala.collection.JavaConversions._
 class IgniteRDD[K, V] (
     val ic: IgniteContext[K, V],
     val cacheName: String,
-    val cacheCfg: CacheConfiguration[K, V]
-) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) {
+    val cacheCfg: CacheConfiguration[K, V],
+    val keepBinary: Boolean
+) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg, keepBinary) {
     /**
      * Computes iterator based on given partition.
      *
@@ -127,7 +128,8 @@ class IgniteRDD[K, V] (
 
         qry.setArgs(args.map(_.asInstanceOf[Object]):_*)
 
-        new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry \u21d2 (entry.getKey, entry.getValue))
+        new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry,
+            entry \u21d2 (entry.getKey, entry.getValue), keepBinary)
     }
 
     /**
@@ -144,7 +146,8 @@ class IgniteRDD[K, V] (
 
         val schema = buildSchema(ensureCache().query(qry).asInstanceOf[QueryCursorEx[java.util.List[_]]].fieldsMeta())
 
-        val rowRdd = new IgniteSqlRDD[Row, java.util.List[_], K, V](ic, cacheName, cacheCfg, qry, list \u21d2 Row.fromSeq(list))
+        val rowRdd = new IgniteSqlRDD[Row, java.util.List[_], K, V](
+            ic, cacheName, cacheCfg, qry, list \u21d2 Row.fromSeq(list), keepBinary)
 
         ic.sqlContext.createDataFrame(rowRdd, schema)
     }
@@ -290,6 +293,14 @@ class IgniteRDD[K, V] (
         ensureCache().removeAll()
     }
 
+    def withKeepBinary[K1, V1](): IgniteRDD[K1, V1] = {
+        new IgniteRDD[K1, V1](
+            ic.asInstanceOf[IgniteContext[K1, V1]],
+            cacheName,
+            cacheCfg.asInstanceOf[CacheConfiguration[K1, V1]],
+            true)
+    }
+
     /**
      * Builds spark schema from query metadata.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
index 44b1cd9..25184e7 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
@@ -52,10 +52,10 @@ class JavaIgniteContext[K, V](
     }
 
     def fromCache(cacheName: String): JavaIgniteRDD[K, V] =
-        JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null))
+        JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null, false))
 
     def fromCache(cacheCfg: CacheConfiguration[K, V]) =
-        JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg))
+        JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg, false))
 
     def ignite(): Ignite = ic.ignite()
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
index cac0e15..1efc6ae 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
@@ -96,6 +96,8 @@ class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V])
         savePairs(jrdd, f, overwrite = false)
 
     def clear(): Unit = rdd.clear()
+
+    def withKeepBinary[K1, V1](): JavaIgniteRDD[K1, V1] = new JavaIgniteRDD[K1, V1](rdd.withKeepBinary[K1, V1]())
 }
 
 object JavaIgniteRDD {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
index 25b3b56..9d5171c 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
@@ -27,13 +27,20 @@ import scala.reflect.ClassTag
 abstract class IgniteAbstractRDD[R:ClassTag, K, V] (
     ic: IgniteContext[K, V],
     cacheName: String,
-    cacheCfg: CacheConfiguration[K, V]
+    cacheCfg: CacheConfiguration[K, V],
+    keepBinary: Boolean
 ) extends RDD[R] (ic.sparkContext, deps = Nil) {
     protected def ensureCache(): IgniteCache[K, V] = {
         // Make sure to deploy the cache
-        if (cacheCfg != null)
-            ic.ignite().getOrCreateCache(cacheCfg)
+        val cache =
+            if (cacheCfg != null)
+                ic.ignite().getOrCreateCache(cacheCfg)
+            else
+                ic.ignite().getOrCreateCache(cacheName)
+
+        if (keepBinary)
+            cache.withKeepBinary()
         else
-            ic.ignite().getOrCreateCache(cacheName)
+            cache.asInstanceOf[IgniteCache[K, V]]
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
index 762a6ed..b4579aa 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
@@ -29,8 +29,9 @@ class IgniteSqlRDD[R: ClassTag, T, K, V](
     cacheName: String,
     cacheCfg: CacheConfiguration[K, V],
     qry: Query[T],
-    conv: (T) \u21d2 R
-) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg) {
+    conv: (T) \u21d2 R,
+    keepBinary: Boolean
+) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg, keepBinary) {
     override def compute(split: Partition, context: TaskContext): Iterator[R] = {
         new IgniteQueryIterator[T, R](ensureCache().query(qry).iterator(), conv)
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
deleted file mode 100644
index 13bd3e8..0000000
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spark.impl
-
-import org.apache.ignite.IgniteCache
-import org.apache.ignite.spark.IgniteRDD
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike}
-
-abstract class JavaIgniteAbstractRDD[K, V](val rdd: IgniteRDD[K, V])
-    extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
-
-    protected def ensureCache(): IgniteCache[K, V] = {
-        // Make sure to deploy the cache
-        if (rdd.cacheCfg != null)
-            rdd.ic.ignite().getOrCreateCache(rdd.cacheCfg)
-        else
-            rdd.ic.ignite().getOrCreateCache(rdd.cacheName)
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
index 61040d9..8a5b355 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
@@ -18,7 +18,7 @@
 package org.apache.ignite.spark
 
 import org.apache.ignite.Ignition
-import org.apache.ignite.cache.query.annotations.{QueryTextField, QuerySqlField}
+import org.apache.ignite.cache.query.annotations.{QuerySqlField, QueryTextField}
 import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder
@@ -26,9 +26,10 @@ import org.apache.spark.SparkContext
 import org.junit.runner.RunWith
 import org.scalatest._
 import org.scalatest.junit.JUnitRunner
-import scala.collection.JavaConversions._
 
+import scala.collection.JavaConversions._
 import IgniteRDDSpec._
+import org.apache.ignite.binary.BinaryObject
 
 import scala.annotation.meta.field
 
@@ -267,6 +268,26 @@ class IgniteRDDSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be
                 sc.stop()
             }
         }
+
+        it("should properly work with binary objects") {
+            val sc = new SparkContext("local[*]", "test")
+
+            try {
+                val ic = new IgniteContext[String, Entity](sc, () \u21d2 configuration("client", client = true))
+
+                val cache = ic.fromCache(PARTITIONED_CACHE_NAME)
+
+                cache.savePairs(sc.parallelize(0 until 10, 2).map(i \u21d2 (String.valueOf(i),
+                    new Entity(i, "name" + i, i * 100))))
+
+                val res = cache.withKeepBinary[String, BinaryObject]().map(t \u21d2 t._2.field[Int]("salary")).collect()
+
+                println(res)
+            }
+            finally {
+                sc.stop()
+            }
+        }
     }
 
     override protected def beforeEach() = {