You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/02 00:09:35 UTC

incubator-ignite git commit: ignite-948 Add Java API for Ignite RDD

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-948 [created] 4944bb482


ignite-948 Add Java API for Ignite RDD


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4944bb48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4944bb48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4944bb48

Branch: refs/heads/ignite-948
Commit: 4944bb4827d5f0c2517b47441bc0f1259f9378bf
Parents: c527a04
Author: agura <ag...@gridgain.com>
Authored: Tue Jun 2 01:09:17 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Tue Jun 2 01:09:17 2015 +0300

----------------------------------------------------------------------
 .../spark/examples/java/ColocationTest.java     |  74 ++++++++++++++
 .../org/apache/ignite/spark/IgniteRDD.scala     |  10 +-
 .../apache/ignite/spark/JavaIgniteContext.scala |  55 ++++++++++
 .../org/apache/ignite/spark/JavaIgniteRDD.scala | 101 +++++++++++++++++++
 4 files changed, 235 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4944bb48/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
new file mode 100644
index 0000000..932f922
--- /dev/null
+++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples.java;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spark.*;
+import org.apache.ignite.spark.examples.*;
+import org.apache.spark.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.Function;
+import scala.*;
+
+import java.util.*;
+
+public class ColocationTest {
+    public static void main(String[] args) {
+        SparkConf conf = new SparkConf();
+
+        conf.setAppName("Colocation test");
+
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "Colocation test", conf);
+
+        JavaIgniteContext<Integer, Integer> ignite = new JavaIgniteContext<>(sc, new IgniteOutClosure<IgniteConfiguration>() {
+            @Override public IgniteConfiguration apply() {
+                return ExampleConfiguration.configuration();
+            }
+        });
+
+        JavaIgniteRDD<Integer, Integer> cache = ignite.fromCache("partitioned");
+
+        List<Integer> seq = new ArrayList<>();
+
+        long sum = 0;
+
+        for (int i = 0; i < 100000; i++) {
+            seq.add(i);
+
+            sum += i;
+        }
+
+        cache.savePairs(sc.parallelize(seq, 48).map(new Function<Integer, Tuple2<Integer, Integer>>() {
+            @Override public Tuple2<Integer, Integer> call(Integer v1) throws Exception {
+                return new Tuple2<>(v1, v1);
+            }
+        }));
+
+        // Execute parallel sum.
+        System.out.println("Local sum: " + sum);
+
+        Function1<Tuple2<Integer, Integer>, Integer> f = new Function1<Tuple2<Integer, Integer>, Integer>() {
+            @Override public Integer apply(Tuple2<Integer, Integer> t) {
+                return t._2();
+            }
+        };
+
+        //System.out.println("Distributed sum: " + cache.map(f).sum())
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4944bb48/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index f286b58..05df188 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -39,9 +39,9 @@ import scala.collection.JavaConversions._
  * @tparam V Value type.
  */
 class IgniteRDD[K, V] (
-    ic: IgniteContext[K, V],
-    cacheName: String,
-    cacheCfg: CacheConfiguration[K, V]
+    val ic: IgniteContext[K, V],
+    val cacheName: String,
+    val cacheCfg: CacheConfiguration[K, V]
 ) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) {
     /**
      * Computes iterator based on given partition.
@@ -69,7 +69,7 @@ class IgniteRDD[K, V] (
      *
      * @return Partitions.
      */
-    override protected def getPartitions: Array[Partition] = {
+    override protected[spark] def getPartitions: Array[Partition] = {
         ensureCache()
 
         val parts = ic.ignite().affinity(cacheName).partitions()
@@ -83,7 +83,7 @@ class IgniteRDD[K, V] (
      * @param split Split partition.
      * @return
      */
-    override protected def getPreferredLocations(split: Partition): Seq[String] = {
+    override protected[spark] def getPreferredLocations(split: Partition): Seq[String] = {
         ensureCache()
 
         ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4944bb48/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
new file mode 100644
index 0000000..992be52
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.lang.IgniteOutClosure
+import org.apache.spark.api.java.JavaSparkContext
+
+import scala.reflect.ClassTag
+
+class JavaIgniteContext[K, V](
+    @scala.transient val sc: JavaSparkContext,
+    val cfg: IgniteOutClosure[IgniteConfiguration]) extends Serializable {
+
+    val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => cfg.apply())
+
+    def this(sc: JavaSparkContext, springUrl: String) {
+        this(sc, new IgniteOutClosure[IgniteConfiguration] {
+            override def apply() = IgnitionEx.loadConfiguration(springUrl).get1()
+        })
+    }
+
+    def fromCache(cacheName: String): JavaIgniteRDD[K, V] =
+        JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null))
+
+    def fromCache(cacheCfg: CacheConfiguration[K, V]) =
+        JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg))
+
+    def ignite(): Ignite = ic.ignite()
+
+    def close() = ic.close()
+
+    private[spark] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+
+    implicit val ktag: ClassTag[K] = fakeClassTag
+
+    implicit val vtag: ClassTag[V] = fakeClassTag
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4944bb48/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
new file mode 100644
index 0000000..6944313
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.lang.IgniteBiTuple
+import org.apache.ignite.spark.impl.IgniteAbstractRDD
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{Partition, TaskContext}
+
+import scala.collection.JavaConversions._
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+
+class JavaIgniteRDD[K, V](val rdd: IgniteRDD[K, V])(implicit val ktag: ClassTag[K], implicit val vtag: ClassTag[V])
+    extends IgniteAbstractRDD[(K, V), K, V](rdd.ic, rdd.cacheName, rdd.cacheCfg) {
+
+    /**
+     * Computes iterator based on given partition.
+     *
+     * @param part Partition to use.
+     * @param context Task context.
+     * @return Partition iterator.
+     */
+    override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
+        rdd.compute(part, context)
+    }
+
+    /**
+     * Gets partitions for the given cache RDD.
+     *
+     * @return Partitions.
+     */
+    override protected def getPartitions: Array[Partition] = {
+        rdd.getPartitions
+    }
+
+    /**
+     * Gets preferred locations for the given partition.
+     *
+     * @param split Split partition.
+     * @return
+     */
+    override protected def getPreferredLocations(split: Partition): Seq[String] = {
+        rdd.getPreferredLocations(split)
+    }
+
+    def objectSql(typeName: String, sql: String, args: Any*): JavaRDD[IgniteBiTuple[K, V]] =
+        JavaRDD.fromRDD(rdd.objectSql(typeName, sql, args)).map(tuple2BiTuple[K, V](_))
+
+    def sql(sql: String, args: Any*): JavaRDD[Seq[Any]] = JavaRDD.fromRDD(rdd.sql(sql, args))
+
+    def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd))
+
+    def savePairs(jrdd: JavaRDD[(K, V)]) = rdd.savePairs(JavaRDD.toRDD(jrdd))
+
+    def clear(): Unit = rdd.clear()
+
+    implicit def tuple2BiTuple[A, B](tuple: (A, B)): IgniteBiTuple[A, B] =
+        new IgniteBiTuple[A, B](tuple._1, tuple._2)
+
+    implicit def tupleIt2BiTupleIt[A, B](it: Iterator[(A, B)]): java.util.Iterator[IgniteBiTuple[A, B]] =
+        new java.util.Iterator[IgniteBiTuple[A, B]] {
+            val target: java.util.Iterator[(A, B)] = it
+
+            override def next(): IgniteBiTuple[A, B] = target.next()
+
+            override def remove(): Unit = target.remove()
+
+            override def hasNext: Boolean = target.hasNext
+        }
+}
+
+object JavaIgniteRDD {
+    implicit def fromIgniteRDD[K: ClassTag, V: ClassTag](rdd: IgniteRDD[K, V]): JavaIgniteRDD[K, V] =
+        new JavaIgniteRDD[K, V](rdd)
+
+    implicit def toIgniteRDD[K, V](rdd: JavaIgniteRDD[K, V]): IgniteRDD[K, V] = rdd.rdd
+}
+
+object JavaRDD {
+    implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
+
+    implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd
+}