You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/02 00:09:35 UTC
incubator-ignite git commit: ignite-948 Add Java API for Ignite RDD
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-948 [created] 4944bb482
ignite-948 Add Java API for Ignite RDD
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4944bb48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4944bb48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4944bb48
Branch: refs/heads/ignite-948
Commit: 4944bb4827d5f0c2517b47441bc0f1259f9378bf
Parents: c527a04
Author: agura <ag...@gridgain.com>
Authored: Tue Jun 2 01:09:17 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Tue Jun 2 01:09:17 2015 +0300
----------------------------------------------------------------------
.../spark/examples/java/ColocationTest.java | 74 ++++++++++++++
.../org/apache/ignite/spark/IgniteRDD.scala | 10 +-
.../apache/ignite/spark/JavaIgniteContext.scala | 55 ++++++++++
.../org/apache/ignite/spark/JavaIgniteRDD.scala | 101 +++++++++++++++++++
4 files changed, 235 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4944bb48/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
new file mode 100644
index 0000000..932f922
--- /dev/null
+++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples.java;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spark.*;
+import org.apache.ignite.spark.examples.*;
+import org.apache.spark.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.Function;
+import scala.*;
+
+import java.util.*;
+
+public class ColocationTest {
+ public static void main(String[] args) {
+ SparkConf conf = new SparkConf();
+
+ conf.setAppName("Colocation test");
+
+ JavaSparkContext sc = new JavaSparkContext("local[*]", "Colocation test", conf);
+
+ JavaIgniteContext<Integer, Integer> ignite = new JavaIgniteContext<>(sc, new IgniteOutClosure<IgniteConfiguration>() {
+ @Override public IgniteConfiguration apply() {
+ return ExampleConfiguration.configuration();
+ }
+ });
+
+ JavaIgniteRDD<Integer, Integer> cache = ignite.fromCache("partitioned");
+
+ List<Integer> seq = new ArrayList<>();
+
+ long sum = 0;
+
+ for (int i = 0; i < 100000; i++) {
+ seq.add(i);
+
+ sum += i;
+ }
+
+ cache.savePairs(sc.parallelize(seq, 48).map(new Function<Integer, Tuple2<Integer, Integer>>() {
+ @Override public Tuple2<Integer, Integer> call(Integer v1) throws Exception {
+ return new Tuple2<>(v1, v1);
+ }
+ }));
+
+ // Execute parallel sum.
+ System.out.println("Local sum: " + sum);
+
+ Function1<Tuple2<Integer, Integer>, Integer> f = new Function1<Tuple2<Integer, Integer>, Integer>() {
+ @Override public Integer apply(Tuple2<Integer, Integer> t) {
+ return t._2();
+ }
+ };
+
+ //System.out.println("Distributed sum: " + cache.map(f).sum())
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4944bb48/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index f286b58..05df188 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -39,9 +39,9 @@ import scala.collection.JavaConversions._
* @tparam V Value type.
*/
class IgniteRDD[K, V] (
- ic: IgniteContext[K, V],
- cacheName: String,
- cacheCfg: CacheConfiguration[K, V]
+ val ic: IgniteContext[K, V],
+ val cacheName: String,
+ val cacheCfg: CacheConfiguration[K, V]
) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) {
/**
* Computes iterator based on given partition.
@@ -69,7 +69,7 @@ class IgniteRDD[K, V] (
*
* @return Partitions.
*/
- override protected def getPartitions: Array[Partition] = {
+ override protected[spark] def getPartitions: Array[Partition] = {
ensureCache()
val parts = ic.ignite().affinity(cacheName).partitions()
@@ -83,7 +83,7 @@ class IgniteRDD[K, V] (
* @param split Split partition.
* @return
*/
- override protected def getPreferredLocations(split: Partition): Seq[String] = {
+ override protected[spark] def getPreferredLocations(split: Partition): Seq[String] = {
ensureCache()
ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4944bb48/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
new file mode 100644
index 0000000..992be52
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.lang.IgniteOutClosure
+import org.apache.spark.api.java.JavaSparkContext
+
+import scala.reflect.ClassTag
+
+class JavaIgniteContext[K, V](
+ @scala.transient val sc: JavaSparkContext,
+ val cfg: IgniteOutClosure[IgniteConfiguration]) extends Serializable {
+
+ val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => cfg.apply())
+
+ def this(sc: JavaSparkContext, springUrl: String) {
+ this(sc, new IgniteOutClosure[IgniteConfiguration] {
+ override def apply() = IgnitionEx.loadConfiguration(springUrl).get1()
+ })
+ }
+
+ def fromCache(cacheName: String): JavaIgniteRDD[K, V] =
+ JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null))
+
+ def fromCache(cacheCfg: CacheConfiguration[K, V]) =
+ JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg))
+
+ def ignite(): Ignite = ic.ignite()
+
+ def close() = ic.close()
+
+ private[spark] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+
+ implicit val ktag: ClassTag[K] = fakeClassTag
+
+ implicit val vtag: ClassTag[V] = fakeClassTag
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4944bb48/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
new file mode 100644
index 0000000..6944313
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.lang.IgniteBiTuple
+import org.apache.ignite.spark.impl.IgniteAbstractRDD
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{Partition, TaskContext}
+
+import scala.collection.JavaConversions._
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+
+class JavaIgniteRDD[K, V](val rdd: IgniteRDD[K, V])(implicit val ktag: ClassTag[K], implicit val vtag: ClassTag[V])
+ extends IgniteAbstractRDD[(K, V), K, V](rdd.ic, rdd.cacheName, rdd.cacheCfg) {
+
+ /**
+ * Computes iterator based on given partition.
+ *
+ * @param part Partition to use.
+ * @param context Task context.
+ * @return Partition iterator.
+ */
+ override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
+ rdd.compute(part, context)
+ }
+
+ /**
+ * Gets partitions for the given cache RDD.
+ *
+ * @return Partitions.
+ */
+ override protected def getPartitions: Array[Partition] = {
+ rdd.getPartitions
+ }
+
+ /**
+ * Gets preferred locations for the given partition.
+ *
+ * @param split Split partition.
+ * @return
+ */
+ override protected def getPreferredLocations(split: Partition): Seq[String] = {
+ rdd.getPreferredLocations(split)
+ }
+
+ def objectSql(typeName: String, sql: String, args: Any*): JavaRDD[IgniteBiTuple[K, V]] =
+ JavaRDD.fromRDD(rdd.objectSql(typeName, sql, args)).map(tuple2BiTuple[K, V](_))
+
+ def sql(sql: String, args: Any*): JavaRDD[Seq[Any]] = JavaRDD.fromRDD(rdd.sql(sql, args))
+
+ def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd))
+
+ def savePairs(jrdd: JavaRDD[(K, V)]) = rdd.savePairs(JavaRDD.toRDD(jrdd))
+
+ def clear(): Unit = rdd.clear()
+
+ implicit def tuple2BiTuple[A, B](tuple: (A, B)): IgniteBiTuple[A, B] =
+ new IgniteBiTuple[A, B](tuple._1, tuple._2)
+
+ implicit def tupleIt2BiTupleIt[A, B](it: Iterator[(A, B)]): java.util.Iterator[IgniteBiTuple[A, B]] =
+ new java.util.Iterator[IgniteBiTuple[A, B]] {
+ val target: java.util.Iterator[(A, B)] = it
+
+ override def next(): IgniteBiTuple[A, B] = target.next()
+
+ override def remove(): Unit = target.remove()
+
+ override def hasNext: Boolean = target.hasNext
+ }
+}
+
+object JavaIgniteRDD {
+ implicit def fromIgniteRDD[K: ClassTag, V: ClassTag](rdd: IgniteRDD[K, V]): JavaIgniteRDD[K, V] =
+ new JavaIgniteRDD[K, V](rdd)
+
+ implicit def toIgniteRDD[K, V](rdd: JavaIgniteRDD[K, V]): IgniteRDD[K, V] = rdd.rdd
+}
+
+object JavaRDD {
+ implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
+
+ implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd
+}