You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/05/23 03:49:44 UTC
[2/2] incubator-ignite git commit: #IGNITE-389 - Changing API
#IGNITE-389 - Changing API
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7439b5b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7439b5b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7439b5b7
Branch: refs/heads/ignite-389
Commit: 7439b5b7ddb5f6f771c927a2a6f5bdb8b3405392
Parents: 520cd03
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Fri May 22 18:49:33 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Fri May 22 18:49:33 2015 -0700
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 1 -
.../processors/cache/GridCacheProcessor.java | 30 ++++--
.../cache/IgniteDynamicCacheStartSelfTest.java | 22 ++--
.../apache/ignite/spark/IgniteAbstractRDD.scala | 38 +++++++
.../org/apache/ignite/spark/IgniteContext.scala | 97 ++---------------
.../org/apache/ignite/spark/IgniteRDD.scala | 106 ++++++++++++++++---
.../spark/examples/IgniteProcessExample.scala | 13 ++-
.../spark/examples/IgniteStoreExample.scala | 4 +-
.../ignite/spark/impl/IgniteQueryIterator.scala | 17 ++-
.../apache/ignite/spark/impl/IgniteSqlRDD.scala | 43 ++++++++
.../spark/util/SerializablePredicate2.scala | 32 ------
11 files changed, 232 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index df6b2ee..692f1e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -774,7 +774,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
*
* @param loadPrevVal Load previous value flag.
* @return {@code this} for chaining.
- * @return {@code this} for chaining.
*/
public CacheConfiguration setLoadPreviousValue(boolean loadPrevVal) {
this.loadPrevVal = loadPrevVal;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 0e1a9c2..3c4c7d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1931,7 +1931,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.cacheType(cacheType);
- return F.first(initiateCacheChanges(F.asList(req)));
+ return F.first(initiateCacheChanges(F.asList(req), failIfExists));
}
/**
@@ -1941,14 +1941,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
public IgniteInternalFuture<?> dynamicStopCache(String cacheName) {
DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId(), true);
- return F.first(initiateCacheChanges(F.asList(t)));
+ return F.first(initiateCacheChanges(F.asList(t), false));
}
/**
* @param reqs Requests.
* @return Collection of futures.
*/
- public Collection<DynamicCacheStartFuture> initiateCacheChanges(Collection<DynamicCacheChangeRequest> reqs) {
+ @SuppressWarnings("TypeMayBeWeakened")
+ private Collection<DynamicCacheStartFuture> initiateCacheChanges(Collection<DynamicCacheChangeRequest> reqs,
+ boolean failIfExists) {
Collection<DynamicCacheStartFuture> res = new ArrayList<>(reqs.size());
Collection<DynamicCacheChangeRequest> sndReqs = new ArrayList<>(reqs.size());
@@ -1981,9 +1983,23 @@ public class GridCacheProcessor extends GridProcessorAdapter {
maskNull(req.cacheName()), fut);
if (old != null) {
- if (req.start() && !req.clientStartOnly()) {
- fut.onDone(new CacheExistsException("Failed to start cache " +
- "(a cache with the same name is already being started or stopped): " + req.cacheName()));
+ if (req.start()) {
+ if (!req.clientStartOnly()) {
+ if (failIfExists)
+ fut.onDone(new CacheExistsException("Failed to start cache " +
+ "(a cache with the same name is already being started or stopped): " +
+ req.cacheName()));
+ else {
+ fut = old;
+
+ continue;
+ }
+ }
+ else {
+ fut = old;
+
+ continue;
+ }
}
else {
fut = old;
@@ -2617,7 +2633,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.clientStartOnly(true);
- F.first(initiateCacheChanges(F.asList(req))).get();
+ F.first(initiateCacheChanges(F.asList(req), false)).get();
IgniteCache cache = jCacheProxies.get(masked);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index adece63..698ee03 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -793,23 +793,19 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testGetOrCreateMultiNodeTemplate() throws Exception {
- for (int i = 0; i < 100; i++) {
- info(">>> Iteration " + i);
+ final AtomicInteger idx = new AtomicInteger();
- final AtomicInteger idx = new AtomicInteger();
-
- GridTestUtils.runMultiThreaded(new Callable<Object>() {
- @Override public Object call() throws Exception {
- int idx0 = idx.getAndIncrement();
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int idx0 = idx.getAndIncrement();
- ignite(idx0 % nodeCount()).getOrCreateCache(DYNAMIC_CACHE_NAME);
+ ignite(idx0 % nodeCount()).getOrCreateCache(DYNAMIC_CACHE_NAME);
- return null;
- }
- }, nodeCount() * 4, "runner");
+ return null;
+ }
+ }, nodeCount() * 4, "runner");
- ignite(0).destroyCache(DYNAMIC_CACHE_NAME);
- }
+ ignite(0).destroyCache(DYNAMIC_CACHE_NAME);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteAbstractRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteAbstractRDD.scala
new file mode 100644
index 0000000..63232be
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteAbstractRDD.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.IgniteCache
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+abstract class IgniteAbstractRDD[R:ClassTag, K, V] (
+ ic: IgniteContext[K, V],
+ cacheName: String,
+ cacheCfg: CacheConfiguration[K, V]
+) extends RDD[R] (ic.sparkContext, deps = Nil) {
+ protected def ensureCache(): IgniteCache[K, V] = {
+ // Make sure to deploy the cache
+ if (cacheCfg != null)
+ ic.ignite().getOrCreateCache(cacheCfg)
+ else
+ ic.ignite().getOrCreateCache(cacheName)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index 56d2a05..9d9f9a7 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -17,98 +17,29 @@
package org.apache.ignite.spark
-import javax.cache.Cache
-import org.apache.ignite.cache.query.{Query, ScanQuery}
-import org.apache.ignite.cluster.ClusterNode
import org.apache.ignite.internal.IgnitionEx
-import org.apache.ignite.lang.IgniteUuid
-import org.apache.ignite.spark.util.SerializablePredicate2
-import org.apache.ignite.{Ignition, IgniteCache, Ignite}
+import org.apache.ignite.{Ignition, Ignite}
import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
class IgniteContext[K, V](
- @scala.transient sc: SparkContext,
- cfgF: () => IgniteConfiguration,
- val cacheName: String,
- cacheCfg: CacheConfiguration[K, V]
+ @scala.transient val sparkContext: SparkContext,
+ cfgF: () => IgniteConfiguration
) extends Serializable {
- type ScanRDD[K1, V1] = IgniteRDD[Cache.Entry[K1, V1], K1, V1]
-
- def this(
- sc: SparkContext,
- springUrl: String,
- cacheName: String
- ) {
- this(sc, () => IgnitionEx.loadConfiguration(springUrl).get1(), cacheName, null)
- }
-
- def this(
- sc: SparkContext,
- cfgF: () => IgniteConfiguration,
- cacheName: String
- ) {
- this(sc, cfgF, cacheName, null)
- }
-
- def this(
- sc: SparkContext,
- cfgF: () => IgniteConfiguration,
- cacheCfg: CacheConfiguration[K, V]
- ) {
- this(sc, cfgF, cacheCfg.getName, cacheCfg)
- }
-
def this(
sc: SparkContext,
- springUrl: String,
- cacheCfg: CacheConfiguration[K, V]
+ springUrl: String
) {
- this(sc, () => IgnitionEx.loadConfiguration(springUrl).get1(), cacheCfg.getName, cacheCfg)
+ this(sc, () => IgnitionEx.loadConfiguration(springUrl).get1())
}
- def sparkContext() = sc
-
- def scan(p: (K, V) => Boolean = (_, _) => true): ScanRDD[K, V] = {
- new ScanRDD(this, new ScanQuery[K, V](new SerializablePredicate2[K, V](p)))
+ def fromCache(cacheName: String): IgniteRDD[K, V] = {
+ new IgniteRDD[K, V](this, cacheName, null)
}
- def scan[R:ClassTag](qry: Query[R]): IgniteRDD[R, K, V] = {
- new IgniteRDD[R, K, V](this, qry)
- }
-
- def saveToIgnite[T](rdd: RDD[V], keyFunc: (IgniteContext[K, V], V, ClusterNode) => T = affinityKeyFunc(_: IgniteContext[K, V], _:V, _: ClusterNode)) = {
- rdd.foreachPartition(it => {
- println("Using scala version: " + scala.util.Properties.versionString)
- // Make sure to deploy the cache
- igniteCache()
-
- val ig = ignite()
-
- val locNode = ig.cluster().localNode()
-
- val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode))
-
- val streamer = ignite().dataStreamer[T, V](cacheName)
-
- try {
- it.foreach(value => {
- val key: T = keyFunc(this, value, node.orNull)
-
- println("Saving: " + key + ", " + value)
-
- streamer.addData(key, value)
- })
- }
- finally {
- streamer.close()
- }
- })
+ def fromCache(cacheCfg: CacheConfiguration[K, V]) = {
+ new IgniteRDD[K, V](this, cacheCfg.getName, cacheCfg)
}
def ignite(): Ignite = {
@@ -130,14 +61,4 @@ class IgniteContext[K, V](
}
}
- private def igniteCache(): IgniteCache[K, V] = {
- if (cacheCfg == null)
- ignite().getOrCreateCache(cacheName)
- else
- ignite().getOrCreateCache(cacheCfg)
- }
-
- private def affinityKeyFunc(ic: IgniteContext[K, V], value: V, node: ClusterNode): Object = {
- IgniteUuid.randomUuid()
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 4018c53..ce51e9c 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -14,32 +14,114 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.ignite.spark
-import org.apache.ignite.cache.query.Query
-import org.apache.ignite.spark.impl.{IgniteQueryIterator, IgnitePartition}
-import org.apache.spark.{TaskContext, Partition}
+import javax.cache.Cache
+
+import org.apache.ignite.cache.query.{SqlQuery, ScanQuery}
+import org.apache.ignite.cluster.ClusterNode
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.lang.IgniteUuid
+import org.apache.ignite.spark.impl.{IgniteSqlRDD, IgnitePartition, IgniteQueryIterator}
import org.apache.spark.rdd.RDD
+import org.apache.spark.{TaskContext, Partition}
import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-class IgniteRDD[R:ClassTag, K, V](
+class IgniteRDD[K, V] (
ic: IgniteContext[K, V],
- qry: Query[R]
-) extends RDD[R] (ic.sparkContext(), deps = Nil) {
- override def compute(part: Partition, context: TaskContext): Iterator[R] = {
- new IgniteQueryIterator[R, K, V](ic, part, qry)
+ cacheName: String,
+ cacheCfg: CacheConfiguration[K, V]
+) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) {
+
+ override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
+ val cache = ensureCache()
+
+ val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(new ScanQuery[K, V]()).iterator()
+
+ new IgniteQueryIterator[Cache.Entry[K, V], (K, V)](it, entry => {
+ (entry.getKey, entry.getValue)
+ })
}
override protected def getPartitions: Array[Partition] = {
- val parts = ic.ignite().affinity(ic.cacheName).partitions()
+ ensureCache()
+
+ val parts = ic.ignite().affinity(cacheName).partitions()
(0 until parts).map(new IgnitePartition(_)).toArray
}
override protected def getPreferredLocations(split: Partition): Seq[String] = {
- ic.ignite().affinity(ic.cacheName).mapPartitionToPrimaryAndBackups(split.index).map(_.addresses()).flatten.toList
+ ensureCache()
+
+ ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index).map(_.addresses()).flatten.toList
+ }
+
+ def query(typeName: String, sql: String, args: Any*): RDD[(K, V)] = {
+ val qry: SqlQuery[K, V] = new SqlQuery[K, V](typeName, sql)
+
+ qry.setArgs(args)
+
+ new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry => (entry.getKey, entry.getValue))
+ }
+
+ def saveValues(rdd: RDD[V]) = {
+ rdd.foreachPartition(it => {
+ println("Using scala version: " + scala.util.Properties.versionString)
+ val ig = ic.ignite()
+
+ ensureCache()
+
+ val locNode = ig.cluster().localNode()
+
+ val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode))
+
+ val streamer = ig.dataStreamer[Object, V](cacheName)
+
+ try {
+ it.foreach(value => {
+ val key = affinityKeyFunc(value, node.orNull)
+
+ println("Saving: " + key + ", " + value)
+
+ streamer.addData(key, value)
+ })
+ }
+ finally {
+ streamer.close()
+ }
+ })
+ }
+
+ def save(rdd: RDD[(K, V)]) = {
+ rdd.foreachPartition(it => {
+ println("Using scala version: " + scala.util.Properties.versionString)
+ val ig = ic.ignite()
+
+ // Make sure to deploy the cache
+ ensureCache()
+
+ val locNode = ig.cluster().localNode()
+
+ val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode))
+
+ val streamer = ig.dataStreamer[K, V](cacheName)
+
+ try {
+ it.foreach(tup => {
+ println("Saving: " + tup._1 + ", " + tup._2)
+
+ streamer.addData(tup._1, tup._2)
+ })
+ }
+ finally {
+ streamer.close()
+ }
+ })
+ }
+
+ private def affinityKeyFunc(value: V, node: ClusterNode): Object = {
+ IgniteUuid.randomUuid()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
index 3932a26..cdb41d2 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
@@ -25,20 +25,23 @@ object IgniteProcessExample {
val conf = new SparkConf().setAppName("Ignite processing example")
val sc = new SparkContext(conf)
- val partitioned = new IgniteContext[Object, String](sc, ExampleConfiguration.configuration _, "partitioned")
+ val ignite = new IgniteContext[Object, String](sc, ExampleConfiguration.configuration _)
// Search for lines containing "Ignite".
- val scanRdd = partitioned.scan((k, v) => v.contains("Ignite"))
+ val scanRdd = ignite.fromCache("partitioned")
val processedRdd = scanRdd.filter(line => {
println("Analyzing line: " + line)
+ line._2.contains("Ignite")
true
- }).map(_.getValue)
+ }).map(_._2)
// Create a new cache for results.
- val results = new IgniteContext[Object, String](sc, ExampleConfiguration.configuration _, "results")
+ val results = ignite.fromCache("results")
- results.saveToIgnite(processedRdd)
+ results.saveValues(processedRdd)
+
+ ignite.fromCache("indexed").query("Person", "age > ?", 20).collect()
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala
index a7823f4..c74804e 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala
@@ -27,7 +27,7 @@ object IgniteStoreExample {
val conf = new SparkConf().setAppName("Ignite store example")
val sc = new SparkContext(conf)
- val ignite = new IgniteContext[String, String](sc, ExampleConfiguration.configuration _, "partitioned")
+ val ignite = new IgniteContext[String, String](sc, ExampleConfiguration.configuration _)
val lines: RDD[String] = sc.textFile(args(0)).filter(line => {
println("Read line: " + line)
@@ -35,6 +35,6 @@ object IgniteStoreExample {
true
})
- ignite.saveToIgnite(lines)
+ ignite.fromCache("partitioned").saveValues(lines)
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala
index 07b24a9..b24ba50 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala
@@ -17,16 +17,11 @@
package org.apache.ignite.spark.impl
-import org.apache.ignite.cache.query.Query
-import org.apache.ignite.spark.IgniteContext
-import org.apache.spark.Partition
+class IgniteQueryIterator[T, R] (
+ cur: java.util.Iterator[T],
+ conv: (T) => R
+) extends Iterator[R] {
+ override def hasNext: Boolean = cur.hasNext
-class IgniteQueryIterator[R, K, V] (
- ic: IgniteContext[K, V],
- part: Partition,
- qry: Query[R]
- ) extends Iterator[R] {
- override def hasNext: Boolean = ???
-
- override def next(): R = ???
+ override def next(): R = conv(cur.next())
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
new file mode 100644
index 0000000..e347c85
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.cache.query.Query
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.spark.{IgniteAbstractRDD, IgniteContext}
+import org.apache.spark.{TaskContext, Partition}
+
+import scala.reflect.ClassTag
+
+class IgniteSqlRDD[R: ClassTag, T, K, V](
+ ic: IgniteContext[K, V],
+ cacheName: String,
+ cacheCfg: CacheConfiguration[K, V],
+ qry: Query[T],
+ conv: (T) => R
+) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg) {
+ override def compute(split: Partition, context: TaskContext): Iterator[R] = {
+ val it: java.util.Iterator[T] = ensureCache().query(qry).iterator()
+
+ new IgniteQueryIterator[T, R](it, conv)
+ }
+
+ override protected def getPartitions: Array[Partition] = {
+ Array(new IgnitePartition(0))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala
deleted file mode 100644
index 484d0df..0000000
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spark.util
-
-import org.apache.ignite.lang.IgniteBiPredicate
-
-/**
- * Peer deploy aware adapter for Java's `GridPredicate2`.
- */
-class SerializablePredicate2[T1, T2](private val p: (T1, T2) => Boolean) extends IgniteBiPredicate[T1, T2] {
- assert(p != null)
-
- /**
- * Delegates to passed in function.
- */
- def apply(e1: T1, e2: T2) = p(e1, e2)
-}
\ No newline at end of file