You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2016/06/14 15:09:49 UTC
[10/10] ignite git commit: IGNITE-3215 - Added
IgniteRDD.withKeepBinary method
IGNITE-3215 - Added IgniteRDD.withKeepBinary method
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b90d182c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b90d182c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b90d182c
Branch: refs/heads/ignite-3215
Commit: b90d182ca2dc675bf7fd1550ffda648911ed82d6
Parents: 98a0990
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Tue Jun 14 18:09:10 2016 +0300
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Tue Jun 14 18:09:10 2016 +0300
----------------------------------------------------------------------
.../org/apache/ignite/spark/IgniteContext.scala | 4 +--
.../org/apache/ignite/spark/IgniteRDD.scala | 19 ++++++++---
.../apache/ignite/spark/JavaIgniteContext.scala | 4 +--
.../org/apache/ignite/spark/JavaIgniteRDD.scala | 2 ++
.../ignite/spark/impl/IgniteAbstractRDD.scala | 15 ++++++---
.../apache/ignite/spark/impl/IgniteSqlRDD.scala | 5 +--
.../spark/impl/JavaIgniteAbstractRDD.scala | 34 --------------------
.../org/apache/ignite/spark/IgniteRDDSpec.scala | 25 ++++++++++++--
8 files changed, 58 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index bd61974..1084dbe 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -107,7 +107,7 @@ class IgniteContext[K, V](
* @return `IgniteRDD` instance.
*/
def fromCache(cacheName: String): IgniteRDD[K, V] = {
- new IgniteRDD[K, V](this, cacheName, null)
+ new IgniteRDD[K, V](this, cacheName, null, false)
}
/**
@@ -118,7 +118,7 @@ class IgniteContext[K, V](
* @return `IgniteRDD` instance.
*/
def fromCache(cacheCfg: CacheConfiguration[K, V]) = {
- new IgniteRDD[K, V](this, cacheCfg.getName, cacheCfg)
+ new IgniteRDD[K, V](this, cacheCfg.getName, cacheCfg, false)
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index fa96212..cad96b9 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -45,8 +45,9 @@ import scala.collection.JavaConversions._
class IgniteRDD[K, V] (
val ic: IgniteContext[K, V],
val cacheName: String,
- val cacheCfg: CacheConfiguration[K, V]
-) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) {
+ val cacheCfg: CacheConfiguration[K, V],
+ val keepBinary: Boolean
+) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg, keepBinary) {
/**
* Computes iterator based on given partition.
*
@@ -127,7 +128,8 @@ class IgniteRDD[K, V] (
qry.setArgs(args.map(_.asInstanceOf[Object]):_*)
- new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry \u21d2 (entry.getKey, entry.getValue))
+ new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry,
+ entry \u21d2 (entry.getKey, entry.getValue), keepBinary)
}
/**
@@ -144,7 +146,8 @@ class IgniteRDD[K, V] (
val schema = buildSchema(ensureCache().query(qry).asInstanceOf[QueryCursorEx[java.util.List[_]]].fieldsMeta())
- val rowRdd = new IgniteSqlRDD[Row, java.util.List[_], K, V](ic, cacheName, cacheCfg, qry, list \u21d2 Row.fromSeq(list))
+ val rowRdd = new IgniteSqlRDD[Row, java.util.List[_], K, V](
+ ic, cacheName, cacheCfg, qry, list \u21d2 Row.fromSeq(list), keepBinary)
ic.sqlContext.createDataFrame(rowRdd, schema)
}
@@ -290,6 +293,14 @@ class IgniteRDD[K, V] (
ensureCache().removeAll()
}
+ def withKeepBinary[K1, V1](): IgniteRDD[K1, V1] = {
+ new IgniteRDD[K1, V1](
+ ic.asInstanceOf[IgniteContext[K1, V1]],
+ cacheName,
+ cacheCfg.asInstanceOf[CacheConfiguration[K1, V1]],
+ true)
+ }
+
/**
* Builds spark schema from query metadata.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
index 44b1cd9..25184e7 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
@@ -52,10 +52,10 @@ class JavaIgniteContext[K, V](
}
def fromCache(cacheName: String): JavaIgniteRDD[K, V] =
- JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null))
+ JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null, false))
def fromCache(cacheCfg: CacheConfiguration[K, V]) =
- JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg))
+ JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg, false))
def ignite(): Ignite = ic.ignite()
http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
index cac0e15..1efc6ae 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
@@ -96,6 +96,8 @@ class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V])
savePairs(jrdd, f, overwrite = false)
def clear(): Unit = rdd.clear()
+
+ def withKeepBinary[K1, V1](): JavaIgniteRDD[K1, V1] = new JavaIgniteRDD[K1, V1](rdd.withKeepBinary[K1, V1]())
}
object JavaIgniteRDD {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
index 25b3b56..9d5171c 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
@@ -27,13 +27,20 @@ import scala.reflect.ClassTag
abstract class IgniteAbstractRDD[R:ClassTag, K, V] (
ic: IgniteContext[K, V],
cacheName: String,
- cacheCfg: CacheConfiguration[K, V]
+ cacheCfg: CacheConfiguration[K, V],
+ keepBinary: Boolean
) extends RDD[R] (ic.sparkContext, deps = Nil) {
protected def ensureCache(): IgniteCache[K, V] = {
// Make sure to deploy the cache
- if (cacheCfg != null)
- ic.ignite().getOrCreateCache(cacheCfg)
+ val cache =
+ if (cacheCfg != null)
+ ic.ignite().getOrCreateCache(cacheCfg)
+ else
+ ic.ignite().getOrCreateCache(cacheName)
+
+ if (keepBinary)
+ cache.withKeepBinary()
else
- ic.ignite().getOrCreateCache(cacheName)
+ cache.asInstanceOf[IgniteCache[K, V]]
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
index 762a6ed..b4579aa 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
@@ -29,8 +29,9 @@ class IgniteSqlRDD[R: ClassTag, T, K, V](
cacheName: String,
cacheCfg: CacheConfiguration[K, V],
qry: Query[T],
- conv: (T) \u21d2 R
-) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg) {
+ conv: (T) \u21d2 R,
+ keepBinary: Boolean
+) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg, keepBinary) {
override def compute(split: Partition, context: TaskContext): Iterator[R] = {
new IgniteQueryIterator[T, R](ensureCache().query(qry).iterator(), conv)
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
deleted file mode 100644
index 13bd3e8..0000000
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spark.impl
-
-import org.apache.ignite.IgniteCache
-import org.apache.ignite.spark.IgniteRDD
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike}
-
-abstract class JavaIgniteAbstractRDD[K, V](val rdd: IgniteRDD[K, V])
- extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
-
- protected def ensureCache(): IgniteCache[K, V] = {
- // Make sure to deploy the cache
- if (rdd.cacheCfg != null)
- rdd.ic.ignite().getOrCreateCache(rdd.cacheCfg)
- else
- rdd.ic.ignite().getOrCreateCache(rdd.cacheName)
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b90d182c/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
index 61040d9..8a5b355 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala
@@ -18,7 +18,7 @@
package org.apache.ignite.spark
import org.apache.ignite.Ignition
-import org.apache.ignite.cache.query.annotations.{QueryTextField, QuerySqlField}
+import org.apache.ignite.cache.query.annotations.{QuerySqlField, QueryTextField}
import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder
@@ -26,9 +26,10 @@ import org.apache.spark.SparkContext
import org.junit.runner.RunWith
import org.scalatest._
import org.scalatest.junit.JUnitRunner
-import scala.collection.JavaConversions._
+import scala.collection.JavaConversions._
import IgniteRDDSpec._
+import org.apache.ignite.binary.BinaryObject
import scala.annotation.meta.field
@@ -267,6 +268,26 @@ class IgniteRDDSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be
sc.stop()
}
}
+
+ it("should properly work with binary objects") {
+ val sc = new SparkContext("local[*]", "test")
+
+ try {
+ val ic = new IgniteContext[String, Entity](sc, () \u21d2 configuration("client", client = true))
+
+ val cache = ic.fromCache(PARTITIONED_CACHE_NAME)
+
+ cache.savePairs(sc.parallelize(0 until 10, 2).map(i \u21d2 (String.valueOf(i),
+ new Entity(i, "name" + i, i * 100))))
+
+ val res = cache.withKeepBinary[String, BinaryObject]().map(t \u21d2 t._2.field[Int]("salary")).collect()
+
+ println(res)
+ }
+ finally {
+ sc.stop()
+ }
+ }
}
override protected def beforeEach() = {