You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/11 12:31:44 UTC
[28/50] incubator-ignite git commit: ignite-948 Add Java API for
Ignite RDD
ignite-948 Add Java API for Ignite RDD
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4d36d123
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4d36d123
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4d36d123
Branch: refs/heads/ignite-929
Commit: 4d36d12361b78aa79517addce2a33fd772a0201e
Parents: ac9dd30
Author: agura <ag...@gridgain.com>
Authored: Tue Jun 2 01:09:17 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Thu Jun 4 22:15:42 2015 +0300
----------------------------------------------------------------------
examples/config/example-ignite.xml | 4 +-
modules/spark/pom.xml | 14 +
.../spark/examples/java/ColocationTest.java | 89 ++++++
.../examples/java/ExampleConfiguration.java | 31 ++
.../examples/java/IgniteProcessExample.java | 80 +++++
.../spark/examples/java/IgniteStoreExample.java | 68 +++++
.../spark/examples/java/package-info.java | 21 ++
.../org/apache/ignite/spark/IgniteRDD.scala | 10 +-
.../apache/ignite/spark/JavaIgniteContext.scala | 63 ++++
.../org/apache/ignite/spark/JavaIgniteRDD.scala | 99 ++++++
.../ignite/spark/examples/ColocationTest.scala | 5 +-
.../spark/impl/JavaIgniteAbstractRDD.scala | 34 +++
.../ignite/spark/JavaIgniteRDDSelfTest.java | 298 +++++++++++++++++++
parent/pom.xml | 4 +
14 files changed, 811 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/examples/config/example-ignite.xml
----------------------------------------------------------------------
diff --git a/examples/config/example-ignite.xml b/examples/config/example-ignite.xml
index e746e59..dcb2ba8 100644
--- a/examples/config/example-ignite.xml
+++ b/examples/config/example-ignite.xml
@@ -30,14 +30,16 @@
http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
+<!--
<property name="peerClassLoadingEnabled" value="true"/>
<property name="marshaller">
<bean class="org.apache.ignite.marshaller.optimized.OptimizedMarshaller">
- <!-- Set to false to allow non-serializable objects in examples, default is true. -->
+ <!– Set to false to allow non-serializable objects in examples, default is true. –>
<property name="requireSerializable" value="false"/>
</bean>
</property>
+-->
<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index c22a52b..8900a10 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -87,6 +87,20 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-beans</artifactId>
+ <version>${spring.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ <version>${spring.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
new file mode 100644
index 0000000..20d6e88
--- /dev/null
+++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples.java;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spark.*;
+import org.apache.spark.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.*;
+
+import scala.Tuple2;
+
+import java.util.*;
+
+/**
+ * Colocation test example.
+ */
+public class ColocationTest {
+ /** Keys count. */
+ private static final int KEYS_CNT = 10000;
+
+ /** To pair function. */
+ private static final IgniteClosure<Integer, Tuple2<Integer, Integer>> TO_PAIR_F =
+ new IgniteClosure<Integer, Tuple2<Integer, Integer>>() {
+ @Override public Tuple2<Integer, Integer> apply(Integer i) {
+ return new Tuple2<>(i, i);
+ }
+ };
+
+ /** To value function. */
+ private static final Function<Tuple2<Integer, Integer>, Integer> TO_VALUE_F =
+ new Function<Tuple2<Integer, Integer>, Integer>() {
+ /** {@inheritDoc} */
+ @Override public Integer call(Tuple2<Integer, Integer> t) throws Exception {
+ return t._2();
+ }
+ };
+
+ /** Sum function. */
+ private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() {
+ public Integer call(Integer x, Integer y) {
+ return x + y;
+ }
+ };
+
+ /**
+ * @param args Args.
+ */
+ public static void main(String[] args) {
+ SparkConf conf = new SparkConf();
+
+ conf.setAppName("Colocation test");
+
+ JavaSparkContext sc = new JavaSparkContext(conf);
+
+ JavaIgniteContext<Integer, Integer> ignite = new JavaIgniteContext<>(sc, new ExampleConfiguration());
+
+ JavaIgniteRDD<Integer, Integer> cache = ignite.fromCache("partitioned");
+
+ List<Integer> seq = F.range(0, KEYS_CNT + 1);
+
+ JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(F.transformList(seq, TO_PAIR_F), 48);
+
+ cache.savePairs(rdd);
+
+ int sum = (KEYS_CNT * KEYS_CNT - KEYS_CNT) / 2;
+
+ // Execute parallel sum.
+ System.out.println("Local sum: " + sum);
+
+ System.out.println("Distributed sum: " + cache.map(TO_VALUE_F).fold(0, SUM_F));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ExampleConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ExampleConfiguration.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ExampleConfiguration.java
new file mode 100644
index 0000000..5d769f2
--- /dev/null
+++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ExampleConfiguration.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples.java;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+
+/**
+ * Ignite example configuration provider.
+ */
+public class ExampleConfiguration implements IgniteOutClosure<IgniteConfiguration> {
+ /** {@inheritDoc} */
+ @Override public IgniteConfiguration apply() {
+ return org.apache.ignite.spark.examples.ExampleConfiguration.configuration();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java
new file mode 100644
index 0000000..8994355
--- /dev/null
+++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples.java;
+
+import org.apache.ignite.spark.*;
+import org.apache.spark.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.*;
+
+import scala.*;
+
+import java.lang.Boolean;
+
+/**
+ * Ignite process example.
+ */
+public class IgniteProcessExample {
+ /** Filter function. */
+ private static final Function<Tuple2<Object, String>, Boolean> FILTER_F =
+ new Function<Tuple2<Object, String>, Boolean>() {
+ @Override public Boolean call(Tuple2<Object, String> t) throws Exception {
+ System.out.println("Analyzing line: " + t._2());
+
+ return t._2().contains("Ignite");
+ }
+ };
+
+ /** To value function. */
+ private static final Function<Tuple2<Object, String>, String> TO_VALUE_F =
+ new Function<Tuple2<Object, String>, String>() {
+ @Override public String call(Tuple2<Object, String> t) throws Exception {
+ return t._2();
+ }
+ };
+
+ /**
+ * @param args Args.
+ */
+ public static void main(String[] args) {
+ SparkConf conf = new SparkConf();
+
+ conf.setAppName("Ignite processing example");
+
+ JavaSparkContext sc = new JavaSparkContext(conf);
+
+ JavaIgniteContext<Object, String> ignite = new JavaIgniteContext<>(sc, new ExampleConfiguration());
+
+ // Search for lines containing "Ignite".
+ JavaIgniteRDD<Object, String> scanRdd = ignite.fromCache("partitioned");
+
+ JavaRDD<String> processedRdd = scanRdd.filter(FILTER_F).map(TO_VALUE_F);
+
+ // Create a new cache for results.
+ JavaIgniteRDD<Object, String> results = ignite.fromCache("results");
+
+ results.saveValues(processedRdd);
+
+ // SQL query
+ ignite.fromCache("indexed").objectSql("Person", "age > ? and organizationId = ?", 20, 12).collect();
+
+ // SQL fields query
+ DataFrame df = ignite.fromCache("indexed").sql("select name, age from Person where age > ?", 20);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java
new file mode 100644
index 0000000..24ae77f
--- /dev/null
+++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples.java;
+
+import org.apache.ignite.spark.*;
+import org.apache.spark.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.*;
+
+import scala.*;
+
+import java.lang.Boolean;
+
+/**
+ * Ignite store example.
+ */
+public class IgniteStoreExample {
+ /** Predicate. */
+ private static final Function<String, Boolean> PREDICATE = new Function<String, Boolean>() {
+ @Override public Boolean call(String s) throws Exception {
+ System.out.println("Read line: " + s);
+
+ return s.contains("Ignite");
+ }
+ };
+
+ /** To pair function. */
+ private static final PairFunction<String, String, String> TO_PAIR_F = new PairFunction<String, String, String>() {
+ @Override public Tuple2<String, String> call(String s) throws Exception {
+ return new Tuple2<>(s, s);
+ }
+ };
+
+ /**
+ * @param args Args.
+ */
+ public static void main(String[] args) {
+ SparkConf conf = new SparkConf();
+
+ conf.setAppName("Ignite processing example");
+
+ JavaSparkContext sc = new JavaSparkContext(conf);
+
+ JavaIgniteContext<String, String> ignite = new JavaIgniteContext<>(sc, new ExampleConfiguration());
+
+ JavaRDD<String> lines = sc.textFile(args[0]).filter(PREDICATE);
+
+ ignite.fromCache("partitioned").saveValues(lines);
+
+ ignite.fromCache("partitioned").savePairs(lines.mapToPair(TO_PAIR_F));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java
new file mode 100644
index 0000000..e3243bf
--- /dev/null
+++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Demonstrates usage of Ignite and Spark from Java.
+ */
+package org.apache.ignite.spark.examples.java;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 0b8e845..742d7ee 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -43,9 +43,9 @@ import scala.collection.JavaConversions._
* @tparam V Value type.
*/
class IgniteRDD[K, V] (
- ic: IgniteContext[K, V],
- cacheName: String,
- cacheCfg: CacheConfiguration[K, V]
+ val ic: IgniteContext[K, V],
+ val cacheName: String,
+ val cacheCfg: CacheConfiguration[K, V]
) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) {
/**
* Computes iterator based on given partition.
@@ -73,7 +73,7 @@ class IgniteRDD[K, V] (
*
* @return Partitions.
*/
- override protected def getPartitions: Array[Partition] = {
+ override protected[spark] def getPartitions: Array[Partition] = {
ensureCache()
val parts = ic.ignite().affinity(cacheName).partitions()
@@ -87,7 +87,7 @@ class IgniteRDD[K, V] (
* @param split Split partition.
* @return
*/
- override protected def getPreferredLocations(split: Partition): Seq[String] = {
+ override protected[spark] def getPreferredLocations(split: Partition): Seq[String] = {
ensureCache()
ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
new file mode 100644
index 0000000..e2d57bf
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.lang.IgniteOutClosure
+import org.apache.spark.api.java.JavaSparkContext
+
+import scala.reflect.ClassTag
+
+/**
+ * Java-friendly Ignite context wrapper.
+ *
+ * @param sc Java Spark context.
+ * @param cfgF Configuration factory.
+ * @tparam K Key type.
+ * @tparam V Value type.
+ */
+class JavaIgniteContext[K, V](
+ @scala.transient val sc: JavaSparkContext,
+ val cfgF: IgniteOutClosure[IgniteConfiguration]) extends Serializable {
+
+ @transient val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => cfgF.apply())
+
+ def this(sc: JavaSparkContext, springUrl: String) {
+ this(sc, new IgniteOutClosure[IgniteConfiguration] {
+ override def apply() = IgnitionEx.loadConfiguration(springUrl).get1()
+ })
+ }
+
+ def fromCache(cacheName: String): JavaIgniteRDD[K, V] =
+ JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null))
+
+ def fromCache(cacheCfg: CacheConfiguration[K, V]) =
+ JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg))
+
+ def ignite(): Ignite = ic.ignite()
+
+ def close() = ic.close()
+
+ private[spark] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+
+ implicit val ktag: ClassTag[K] = fakeClassTag
+
+ implicit val vtag: ClassTag[V] = fakeClassTag
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
new file mode 100644
index 0000000..2e8702e
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import java.util
+
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.{Partition, TaskContext}
+
+import scala.annotation.varargs
+import scala.collection.JavaConversions._
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+/**
+ * Java-friendly Ignite RDD wrapper. Represents Ignite cache as Java Spark RDD abstraction.
+ *
+ * @param rdd Ignite RDD instance.
+ * @tparam K Key type.
+ * @tparam V Value type.
+ */
+class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V])
+ extends JavaPairRDD[K, V](rdd)(JavaIgniteRDD.fakeClassTag, JavaIgniteRDD.fakeClassTag) {
+
+ override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
+
+ override val classTag: ClassTag[(K, V)] = JavaIgniteRDD.fakeClassTag
+
+ /**
+ * Computes iterator based on given partition.
+ *
+ * @param part Partition to use.
+ * @param context Task context.
+ * @return Partition iterator.
+ */
+ def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
+ rdd.compute(part, context)
+ }
+
+ /**
+ * Gets partitions for the given cache RDD.
+ *
+ * @return Partitions.
+ */
+ protected def getPartitions: java.util.List[Partition] = {
+ new util.ArrayList[Partition](rdd.getPartitions.toSeq)
+ }
+
+ /**
+ * Gets preferred locations for the given partition.
+ *
+ * @param split Split partition.
+ * @return
+ */
+ protected def getPreferredLocations(split: Partition): Seq[String] = {
+ rdd.getPreferredLocations(split)
+ }
+
+ @varargs def objectSql(typeName: String, sql: String, args: Any*): JavaPairRDD[K, V] =
+ JavaPairRDD.fromRDD(rdd.objectSql(typeName, sql, args:_*))
+
+ @varargs def sql(sql: String, args: Any*): DataFrame = rdd.sql(sql, args:_*)
+
+ def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd))
+
+ def savePairs(jrdd: JavaPairRDD[K, V]) = {
+ val rrdd: RDD[(K, V)] = JavaPairRDD.toRDD(jrdd)
+
+ rdd.savePairs(rrdd)
+ }
+
+ def clear(): Unit = rdd.clear()
+}
+
+object JavaIgniteRDD {
+ implicit def fromIgniteRDD[K: ClassTag, V: ClassTag](rdd: IgniteRDD[K, V]): JavaIgniteRDD[K, V] =
+ new JavaIgniteRDD[K, V](rdd)
+
+ implicit def toIgniteRDD[K, V](rdd: JavaIgniteRDD[K, V]): IgniteRDD[K, V] = rdd.rdd
+
+ def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala
index e1d3d8e..29587e4 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala
@@ -17,16 +17,15 @@
package org.apache.ignite.spark.examples
-import org.apache.ignite.configuration.IgniteConfiguration
import org.apache.ignite.spark.IgniteContext
-import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.{SparkConf, SparkContext}
object ColocationTest {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Colocation test")
val sc = new SparkContext(conf)
- val ignite = new IgniteContext[Int, Int](sc, () ⇒ new IgniteConfiguration())
+ val ignite = new IgniteContext[Int, Int](sc, ExampleConfiguration.configuration _)
// Search for lines containing "Ignite".
val cache = ignite.fromCache("partitioned")
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
new file mode 100644
index 0000000..13bd3e8
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.IgniteCache
+import org.apache.ignite.spark.IgniteRDD
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike}
+
+abstract class JavaIgniteAbstractRDD[K, V](val rdd: IgniteRDD[K, V])
+ extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
+
+ protected def ensureCache(): IgniteCache[K, V] = {
+ // Make sure to deploy the cache
+ if (rdd.cacheCfg != null)
+ rdd.ic.ignite().getOrCreateCache(rdd.cacheCfg)
+ else
+ rdd.ic.ignite().getOrCreateCache(rdd.cacheName)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
new file mode 100644
index 0000000..e14abfc
--- /dev/null
+++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.*;
+
+import scala.*;
+
+import java.util.*;
+
+/**
+ * Tests for {@link JavaIgniteRDD}.
+ */
+public class JavaIgniteRDDSelfTest extends GridCommonAbstractTest {
+ /** Grid count. */
+ private static final int GRID_CNT = 3;
+
+ /** Keys count. */
+ private static final int KEYS_CNT = 10000;
+
+ /** Cache name. */
+ private static final String PARTITIONED_CACHE_NAME = "partitioned";
+
+ /** Ip finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Sum function. */
+ private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() {
+ public Integer call(Integer x, Integer y) {
+ return x + y;
+ }
+ };
+
+ /** To pair function. */
+ private static final PairFunction<Integer, String, String> TO_PAIR_F = new PairFunction<Integer, String, String>() {
+ /** {@inheritDoc} */
+ @Override public Tuple2<String, String> call(Integer i) {
+ return new Tuple2<>(String.valueOf(i), "val" + i);
+ }
+ };
+
+ /** (String, Integer); pair to Integer value function. */
+ private static final Function<Tuple2<String, Integer>, Integer> STR_INT_PAIR_TO_INT_F = new PairToValueFunction<>();
+
+ /** (String, Entity) pair to Entity value function. */
+ private static final Function<Tuple2<String, Entity>, Entity> STR_ENTITY_PAIR_TO_ENTITY_F =
+ new PairToValueFunction<>();
+
+ /** Integer to entity function. */
+ private static final PairFunction<Integer, String, Entity> INT_TO_ENTITY_F =
+ new PairFunction<Integer, String, Entity>() {
+ @Override public Tuple2<String, Entity> call(Integer i) throws Exception {
+ return new Tuple2<>(String.valueOf(i), new Entity(i, "name" + i, i * 100));
+ }
+ };
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ Ignition.stop("client", false);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ for (int i = 0; i < GRID_CNT; i++)
+ Ignition.start(getConfiguration("grid-" + i, false));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ for (int i = 0; i < GRID_CNT; i++)
+ Ignition.stop("grid-" + i, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStoreDataToIgnite() throws Exception {
+ JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+ try {
+ JavaIgniteContext<String, String> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+ ic.fromCache(PARTITIONED_CACHE_NAME)
+ .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 2).mapToPair(TO_PAIR_F));
+
+ Ignite ignite = Ignition.ignite("grid-0");
+
+ IgniteCache<String, String> cache = ignite.cache(PARTITIONED_CACHE_NAME);
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ String val = cache.get(String.valueOf(i));
+
+ assertNotNull("Value was not put to cache for key: " + i, val);
+ assertEquals("Invalid value stored for key: " + i, "val" + i, val);
+ }
+ }
+ finally {
+ sc.stop();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReadDataFromIgnite() throws Exception {
+ JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+ try {
+ JavaIgniteContext<String, Integer> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+ Ignite ignite = Ignition.ignite("grid-0");
+
+ IgniteCache<String, Integer> cache = ignite.cache(PARTITIONED_CACHE_NAME);
+
+ for (int i = 0; i < KEYS_CNT; i++)
+ cache.put(String.valueOf(i), i);
+
+ JavaRDD<Integer> values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F);
+
+ int sum = values.fold(0, SUM_F);
+
+ int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT;
+
+ assertEquals(expSum, sum);
+ }
+ finally {
+ sc.stop();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueryObjectsFromIgnite() throws Exception {
+ JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+ try {
+ JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+ JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
+
+ cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F));
+
+ List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000)
+ .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect();
+
+ assertEquals("Invalid result length", 1, res.size());
+ assertEquals("Invalid result", 50, res.get(0).id());
+ assertEquals("Invalid result", "name50", res.get(0).name());
+ assertEquals("Invalid result", 5000, res.get(0).salary());
+ assertEquals("Invalid count", 500, cache.objectSql("Entity", "id > 500").count());
+ }
+ finally {
+ sc.stop();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueryFieldsFromIgnite() throws Exception {
+ JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+ try {
+ JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+ JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
+
+ cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F));
+
+ DataFrame df =
+ cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000);
+
+ df.printSchema();
+
+ Row[] res = df.collect();
+
+ assertEquals("Invalid result length", 1, res.length);
+ assertEquals("Invalid result", 50, res[0].get(0));
+ assertEquals("Invalid result", "name50", res[0].get(1));
+ assertEquals("Invalid result", 5000, res[0].get(2));
+
+ Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000));
+
+ DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp);
+
+ df.printSchema();
+
+ Row[] res0 = df0.collect();
+
+ assertEquals("Invalid result length", 1, res0.length);
+ assertEquals("Invalid result", 50, res0[0].get(0));
+ assertEquals("Invalid result", "name50", res0[0].get(1));
+ assertEquals("Invalid result", 5000, res0[0].get(2));
+
+ assertEquals("Invalid count", 500, cache.sql("select id from Entity where id > 500").count());
+ }
+ finally {
+ sc.stop();
+ }
+
+ }
+
+ /**
+ * @param gridName Grid name.
+ * @param client Client.
+ */
+ private static IgniteConfiguration getConfiguration(String gridName, boolean client) throws Exception {
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ cfg.setCacheConfiguration(cacheConfiguration());
+
+ cfg.setClientMode(client);
+
+ cfg.setGridName(gridName);
+
+ return cfg;
+ }
+
+ /**
+ * Creates cache configuration.
+ */
+ private static CacheConfiguration<Object, Object> cacheConfiguration() {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setBackups(1);
+
+ ccfg.setName(PARTITIONED_CACHE_NAME);
+
+ ccfg.setIndexedTypes(String.class, Entity.class);
+
+ return ccfg;
+ }
+
+ /**
+ * Ignite configiration provider.
+ */
+ static class IgniteConfigProvider implements IgniteOutClosure<IgniteConfiguration> {
+ /** {@inheritDoc} */
+ @Override public IgniteConfiguration apply() {
+ try {
+ return getConfiguration("client", true);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * @param <K>
+ * @param <V>
+ */
+ static class PairToValueFunction<K, V> implements Function<Tuple2<K, V>, V> {
+ /** {@inheritDoc} */
+ @Override public V call(Tuple2<K, V> t) throws Exception {
+ return t._2();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d36d123/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index a514e35..f5b73df 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -321,6 +321,10 @@
<title>Mesos Framework</title>
<packages>org.apache.ignite.mesos*</packages>
</group>
+ <group>
+ <title>Spark Integration</title>
+ <packages>org.apache.ignite.spark.examples.java</packages>
+ </group>
</groups>
<header>
<![CDATA[