You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/11 12:13:35 UTC
[2/7] incubator-ignite git commit: ignite-sprint-6: merge from
ignite-sprint-5
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/licenses/apache-2.0.txt
----------------------------------------------------------------------
diff --git a/modules/spark/licenses/apache-2.0.txt b/modules/spark/licenses/apache-2.0.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/modules/spark/licenses/apache-2.0.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/licenses/scala-bsd-license.txt
----------------------------------------------------------------------
diff --git a/modules/spark/licenses/scala-bsd-license.txt b/modules/spark/licenses/scala-bsd-license.txt
new file mode 100644
index 0000000..b2be111
--- /dev/null
+++ b/modules/spark/licenses/scala-bsd-license.txt
@@ -0,0 +1,18 @@
+Copyright (c) 2002-2014 EPFL
+Copyright (c) 2011-2014 Typesafe, Inc.
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
+
+Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or
+other materials provided with the distribution. Neither the name of the EPFL nor the names of its contributors may be used to endorse or promote products
+derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
+BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
+GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+DAMAGE.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
new file mode 100644
index 0000000..8900a10
--- /dev/null
+++ b/modules/spark/pom.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>ignite-spark</artifactId>
+ <version>1.1.1-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>2.11.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.11</artifactId>
+ <version>1.3.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.11</artifactId>
+ <version>1.3.1</version>
+ </dependency>
+
+ <!-- Test dependencies -->
+
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_2.11</artifactId>
+ <version>2.2.2</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-indexing</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-beans</artifactId>
+ <version>${spring.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ <version>${spring.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
new file mode 100644
index 0000000..e52555a
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.{Ignition, Ignite}
+import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.sql.SQLContext
+
+/**
+ * Ignite context.
+ *
+ * @param sparkContext Spark context.
+ * @param cfgF Configuration factory.
+ * @tparam K Key type.
+ * @tparam V Value type.
+ */
+class IgniteContext[K, V](
+ @scala.transient val sparkContext: SparkContext,
+ cfgF: () ⇒ IgniteConfiguration,
+ client: Boolean = true
+) extends Serializable with Logging {
+ @scala.transient private val driver = true
+
+ if (!client) {
+ val workers = sparkContext.getExecutorStorageStatus.length - 1
+
+ if (workers <= 0)
+ throw new IllegalStateException("No Spark executors found to start Ignite nodes.")
+
+ logInfo("Will start Ignite nodes on " + workers + " workers")
+
+ // Start ignite server node on each worker in server mode.
+ sparkContext.parallelize(1 to workers, workers).foreach(it ⇒ ignite())
+ }
+
+ def this(
+ sc: SparkContext,
+ springUrl: String
+ ) {
+ this(sc, () ⇒ IgnitionEx.loadConfiguration(springUrl).get1())
+ }
+
+ val sqlContext = new SQLContext(sparkContext)
+
+ /**
+ * Creates an `IgniteRDD` instance from the given cache name. If the cache does not exist, it will be
+ * automatically started from template on the first invoked RDD action.
+ *
+ * @param cacheName Cache name.
+ * @return `IgniteRDD` instance.
+ */
+ def fromCache(cacheName: String): IgniteRDD[K, V] = {
+ new IgniteRDD[K, V](this, cacheName, null)
+ }
+
+ /**
+ * Creates an `IgniteRDD` instance from the given cache configuration. If the cache does not exist, it will be
+ * automatically started using the configuration provided on the first invoked RDD action.
+ *
+ * @param cacheCfg Cache configuration to use.
+ * @return `IgniteRDD` instance.
+ */
+ def fromCache(cacheCfg: CacheConfiguration[K, V]) = {
+ new IgniteRDD[K, V](this, cacheCfg.getName, cacheCfg)
+ }
+
+ /**
+ * Gets an Ignite instance supporting this context. Ignite instance will be started
+ * if it has not been started yet.
+ *
+ * @return Ignite instance.
+ */
+ def ignite(): Ignite = {
+ val igniteCfg = cfgF()
+
+ try {
+ Ignition.ignite(igniteCfg.getGridName)
+ }
+ catch {
+ case e: Exception ⇒
+ try {
+ igniteCfg.setClientMode(client || driver)
+
+ Ignition.start(igniteCfg)
+ }
+ catch {
+ case e: Exception ⇒ Ignition.ignite(igniteCfg.getGridName)
+ }
+ }
+ }
+
+ /**
+ * Stops supporting ignite instance. If ignite instance has been already stopped, this operation will be
+ * a no-op.
+ */
+ def close() = {
+ val igniteCfg = cfgF()
+
+ Ignition.stop(igniteCfg.getGridName, false)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
new file mode 100644
index 0000000..2146acb
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.spark
+
+import javax.cache.Cache
+
+import org.apache.ignite.cache.query._
+import org.apache.ignite.cluster.ClusterNode
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata
+import org.apache.ignite.lang.IgniteUuid
+import org.apache.ignite.spark.impl._
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types._
+import org.apache.spark.sql._
+import org.apache.spark._
+
+import scala.collection.JavaConversions._
+
+/**
+ * Ignite RDD. Represents Ignite cache as Spark RDD abstraction.
+ *
+ * @param ic Ignite context to use.
+ * @param cacheName Cache name.
+ * @param cacheCfg Cache configuration.
+ * @tparam K Key type.
+ * @tparam V Value type.
+ */
+class IgniteRDD[K, V] (
+ val ic: IgniteContext[K, V],
+ val cacheName: String,
+ val cacheCfg: CacheConfiguration[K, V]
+) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) {
+ /**
+ * Computes iterator based on given partition.
+ *
+ * @param part Partition to use.
+ * @param context Task context.
+ * @return Partition iterator.
+ */
+ override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
+ val cache = ensureCache()
+
+ val qry: ScanQuery[K, V] = new ScanQuery[K, V](part.index)
+
+ val partNodes = ic.ignite().affinity(cache.getName).mapPartitionToPrimaryAndBackups(part.index)
+
+ val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(qry).iterator()
+
+ new IgniteQueryIterator[Cache.Entry[K, V], (K, V)](it, entry ⇒ {
+ (entry.getKey, entry.getValue)
+ })
+ }
+
+ /**
+ * Gets partitions for the given cache RDD.
+ *
+ * @return Partitions.
+ */
+ override protected[spark] def getPartitions: Array[Partition] = {
+ ensureCache()
+
+ val parts = ic.ignite().affinity(cacheName).partitions()
+
+ (0 until parts).map(new IgnitePartition(_)).toArray
+ }
+
+ /**
+ * Gets preferred locations for the given partition.
+ *
+ * @param split Split partition.
+ * @return
+ */
+ override protected[spark] def getPreferredLocations(split: Partition): Seq[String] = {
+ ensureCache()
+
+ ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index)
+ .map(_.asInstanceOf[TcpDiscoveryNode].socketAddresses()).flatten.map(_.getHostName).toList
+ }
+
+ /**
+ * Runs an object SQL on corresponding Ignite cache.
+ *
+ * @param typeName Type name to run SQL against.
+ * @param sql SQL query to run.
+ * @param args Optional SQL query arguments.
+ * @return RDD with query results.
+ */
+ def objectSql(typeName: String, sql: String, args: Any*): RDD[(K, V)] = {
+ val qry: SqlQuery[K, V] = new SqlQuery[K, V](typeName, sql)
+
+ qry.setArgs(args.map(_.asInstanceOf[Object]):_*)
+
+ new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry ⇒ (entry.getKey, entry.getValue))
+ }
+
+ /**
+ * Runs an SQL fields query.
+ *
+ * @param sql SQL statement to run.
+ * @param args Optional SQL query arguments.
+ * @return `DataFrame` instance with the query results.
+ */
+ def sql(sql: String, args: Any*): DataFrame = {
+ val qry = new SqlFieldsQuery(sql)
+
+ qry.setArgs(args.map(_.asInstanceOf[Object]):_*)
+
+ val schema = buildSchema(ensureCache().query(qry).asInstanceOf[QueryCursorEx[java.util.List[_]]].fieldsMeta())
+
+ val rowRdd = new IgniteSqlRDD[Row, java.util.List[_], K, V](ic, cacheName, cacheCfg, qry, list ⇒ Row.fromSeq(list))
+
+ ic.sqlContext.createDataFrame(rowRdd, schema)
+ }
+
+ /**
+ * Saves values from given RDD into Ignite. A unique key will be generated for each value of the given RDD.
+ *
+ * @param rdd RDD instance to save values from.
+ */
+ def saveValues(rdd: RDD[V]) = {
+ rdd.foreachPartition(it ⇒ {
+ val ig = ic.ignite()
+
+ ensureCache()
+
+ val locNode = ig.cluster().localNode()
+
+ val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode))
+
+ val streamer = ig.dataStreamer[Object, V](cacheName)
+
+ try {
+ it.foreach(value ⇒ {
+ val key = affinityKeyFunc(value, node.orNull)
+
+ streamer.addData(key, value)
+ })
+ }
+ finally {
+ streamer.close()
+ }
+ })
+ }
+
+ /**
+ * Saves values from the given key-value RDD into Ignite.
+ *
+ * @param rdd RDD instance to save values from.
+ * @param overwrite Boolean flag indicating whether the call on this method should overwrite existing
+ * values in Ignite cache.
+ */
+ def savePairs(rdd: RDD[(K, V)], overwrite: Boolean = false) = {
+ rdd.foreachPartition(it ⇒ {
+ val ig = ic.ignite()
+
+ // Make sure to deploy the cache
+ ensureCache()
+
+ val streamer = ig.dataStreamer[K, V](cacheName)
+
+ try {
+ streamer.allowOverwrite(overwrite)
+
+ it.foreach(tup ⇒ {
+ streamer.addData(tup._1, tup._2)
+ })
+ }
+ finally {
+ streamer.close()
+ }
+ })
+ }
+
+ /**
+ * Removes all values from the underlying Ignite cache.
+ */
+ def clear(): Unit = {
+ ensureCache().removeAll()
+ }
+
+ /**
+ * Builds spark schema from query metadata.
+ *
+ * @param fieldsMeta Fields metadata.
+ * @return Spark schema.
+ */
+ private def buildSchema(fieldsMeta: java.util.List[GridQueryFieldMetadata]): StructType = {
+ new StructType(fieldsMeta.map(i ⇒ new StructField(i.fieldName(), dataType(i.fieldTypeName()), nullable = true))
+ .toArray)
+ }
+
+ /**
+ * Gets Spark data type based on type name.
+ *
+ * @param typeName Type name.
+ * @return Spark data type.
+ */
+ private def dataType(typeName: String): DataType = typeName match {
+ case "java.lang.Boolean" ⇒ BooleanType
+ case "java.lang.Byte" ⇒ ByteType
+ case "java.lang.Short" ⇒ ShortType
+ case "java.lang.Integer" ⇒ IntegerType
+ case "java.lang.Long" ⇒ LongType
+ case "java.lang.Float" ⇒ FloatType
+ case "java.lang.Double" ⇒ DoubleType
+ case "java.lang.String" ⇒ StringType
+ case "java.util.Date" ⇒ DateType
+ case "java.sql.Timestamp" ⇒ TimestampType
+ case "[B" ⇒ BinaryType
+
+ case _ ⇒ StructType(new Array[StructField](0))
+ }
+
+ /**
+ * Generates affinity key for given cluster node.
+ *
+ * @param value Value to generate key for.
+ * @param node Node to generate key for.
+ * @return Affinity key.
+ */
+ private def affinityKeyFunc(value: V, node: ClusterNode): IgniteUuid = {
+ val aff = ic.ignite().affinity[IgniteUuid](cacheName)
+
+ Stream.from(1, 1000).map(_ ⇒ IgniteUuid.randomUuid()).find(node == null || aff.mapKeyToNode(_).eq(node))
+ .getOrElse(IgniteUuid.randomUuid())
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
new file mode 100644
index 0000000..e2d57bf
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.lang.IgniteOutClosure
+import org.apache.spark.api.java.JavaSparkContext
+
+import scala.reflect.ClassTag
+
+/**
+ * Java-friendly Ignite context wrapper.
+ *
+ * @param sc Java Spark context.
+ * @param cfgF Configuration factory.
+ * @tparam K Key type.
+ * @tparam V Value type.
+ */
+class JavaIgniteContext[K, V](
+ @scala.transient val sc: JavaSparkContext,
+ val cfgF: IgniteOutClosure[IgniteConfiguration]) extends Serializable {
+
+ @transient val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => cfgF.apply())
+
+ def this(sc: JavaSparkContext, springUrl: String) {
+ this(sc, new IgniteOutClosure[IgniteConfiguration] {
+ override def apply() = IgnitionEx.loadConfiguration(springUrl).get1()
+ })
+ }
+
+ def fromCache(cacheName: String): JavaIgniteRDD[K, V] =
+ JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null))
+
+ def fromCache(cacheCfg: CacheConfiguration[K, V]) =
+ JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg))
+
+ def ignite(): Ignite = ic.ignite()
+
+ def close() = ic.close()
+
+ private[spark] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+
+ implicit val ktag: ClassTag[K] = fakeClassTag
+
+ implicit val vtag: ClassTag[V] = fakeClassTag
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
new file mode 100644
index 0000000..2e8702e
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import java.util
+
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.{Partition, TaskContext}
+
+import scala.annotation.varargs
+import scala.collection.JavaConversions._
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+/**
+ * Java-friendly Ignite RDD wrapper. Represents Ignite cache as Java Spark RDD abstraction.
+ *
+ * @param rdd Ignite RDD instance.
+ * @tparam K Key type.
+ * @tparam V Value type.
+ */
+class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V])
+ extends JavaPairRDD[K, V](rdd)(JavaIgniteRDD.fakeClassTag, JavaIgniteRDD.fakeClassTag) {
+
+ override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
+
+ override val classTag: ClassTag[(K, V)] = JavaIgniteRDD.fakeClassTag
+
+ /**
+ * Computes iterator based on given partition.
+ *
+ * @param part Partition to use.
+ * @param context Task context.
+ * @return Partition iterator.
+ */
+ def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
+ rdd.compute(part, context)
+ }
+
+ /**
+ * Gets partitions for the given cache RDD.
+ *
+ * @return Partitions.
+ */
+ protected def getPartitions: java.util.List[Partition] = {
+ new util.ArrayList[Partition](rdd.getPartitions.toSeq)
+ }
+
+ /**
+ * Gets preferred locations for the given partition.
+ *
+ * @param split Split partition.
+ * @return
+ */
+ protected def getPreferredLocations(split: Partition): Seq[String] = {
+ rdd.getPreferredLocations(split)
+ }
+
+ @varargs def objectSql(typeName: String, sql: String, args: Any*): JavaPairRDD[K, V] =
+ JavaPairRDD.fromRDD(rdd.objectSql(typeName, sql, args:_*))
+
+ @varargs def sql(sql: String, args: Any*): DataFrame = rdd.sql(sql, args:_*)
+
+ def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd))
+
+ def savePairs(jrdd: JavaPairRDD[K, V]) = {
+ val rrdd: RDD[(K, V)] = JavaPairRDD.toRDD(jrdd)
+
+ rdd.savePairs(rrdd)
+ }
+
+ def clear(): Unit = rdd.clear()
+}
+
+object JavaIgniteRDD {
+ implicit def fromIgniteRDD[K: ClassTag, V: ClassTag](rdd: IgniteRDD[K, V]): JavaIgniteRDD[K, V] =
+ new JavaIgniteRDD[K, V](rdd)
+
+ implicit def toIgniteRDD[K, V](rdd: JavaIgniteRDD[K, V]): IgniteRDD[K, V] = rdd.rdd
+
+ def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
new file mode 100644
index 0000000..25b3b56
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.IgniteCache
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.spark.IgniteContext
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+abstract class IgniteAbstractRDD[R:ClassTag, K, V] (
+ ic: IgniteContext[K, V],
+ cacheName: String,
+ cacheCfg: CacheConfiguration[K, V]
+) extends RDD[R] (ic.sparkContext, deps = Nil) {
+ protected def ensureCache(): IgniteCache[K, V] = {
+ // Make sure to deploy the cache
+ if (cacheCfg != null)
+ ic.ignite().getOrCreateCache(cacheCfg)
+ else
+ ic.ignite().getOrCreateCache(cacheName)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala
new file mode 100644
index 0000000..2def636
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.spark.Partition
+
+class IgnitePartition(idx: Int) extends Partition {
+ override def index: Int = idx
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala
new file mode 100644
index 0000000..4165fd3
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+class IgniteQueryIterator[T, R] (
+ cur: java.util.Iterator[T],
+ conv: (T) ⇒ R
+) extends Iterator[R] {
+ override def hasNext: Boolean = cur.hasNext
+
+ override def next(): R = conv(cur.next())
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
new file mode 100644
index 0000000..762a6ed
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.cache.query.Query
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.spark.IgniteContext
+import org.apache.spark.{TaskContext, Partition}
+
+import scala.reflect.ClassTag
+
+class IgniteSqlRDD[R: ClassTag, T, K, V](
+ ic: IgniteContext[K, V],
+ cacheName: String,
+ cacheCfg: CacheConfiguration[K, V],
+ qry: Query[T],
+ conv: (T) ⇒ R
+) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg) {
+ override def compute(split: Partition, context: TaskContext): Iterator[R] = {
+ new IgniteQueryIterator[T, R](ensureCache().query(qry).iterator(), conv)
+ }
+
+ override protected def getPartitions: Array[Partition] = {
+ Array(new IgnitePartition(0))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
new file mode 100644
index 0000000..13bd3e8
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.IgniteCache
+import org.apache.ignite.spark.IgniteRDD
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike}
+
+abstract class JavaIgniteAbstractRDD[K, V](val rdd: IgniteRDD[K, V])
+ extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
+
+ protected def ensureCache(): IgniteCache[K, V] = {
+ // Make sure to deploy the cache
+ if (rdd.cacheCfg != null)
+ rdd.ic.ignite().getOrCreateCache(rdd.cacheCfg)
+ else
+ rdd.ic.ignite().getOrCreateCache(rdd.cacheName)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
new file mode 100644
index 0000000..e14abfc
--- /dev/null
+++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.*;
+
+import scala.*;
+
+import java.util.*;
+
+/**
+ * Tests for {@link JavaIgniteRDD}.
+ */
+public class JavaIgniteRDDSelfTest extends GridCommonAbstractTest {
+ /** Grid count. */
+ private static final int GRID_CNT = 3;
+
+ /** Keys count. */
+ private static final int KEYS_CNT = 10000;
+
+ /** Cache name. */
+ private static final String PARTITIONED_CACHE_NAME = "partitioned";
+
+ /** Ip finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Sum function. */
+ private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() {
+ public Integer call(Integer x, Integer y) {
+ return x + y;
+ }
+ };
+
+ /** To pair function. */
+ private static final PairFunction<Integer, String, String> TO_PAIR_F = new PairFunction<Integer, String, String>() {
+ /** {@inheritDoc} */
+ @Override public Tuple2<String, String> call(Integer i) {
+ return new Tuple2<>(String.valueOf(i), "val" + i);
+ }
+ };
+
+ /** (String, Integer); pair to Integer value function. */
+ private static final Function<Tuple2<String, Integer>, Integer> STR_INT_PAIR_TO_INT_F = new PairToValueFunction<>();
+
+ /** (String, Entity) pair to Entity value function. */
+ private static final Function<Tuple2<String, Entity>, Entity> STR_ENTITY_PAIR_TO_ENTITY_F =
+ new PairToValueFunction<>();
+
+ /** Integer to entity function. */
+ private static final PairFunction<Integer, String, Entity> INT_TO_ENTITY_F =
+ new PairFunction<Integer, String, Entity>() {
+ @Override public Tuple2<String, Entity> call(Integer i) throws Exception {
+ return new Tuple2<>(String.valueOf(i), new Entity(i, "name" + i, i * 100));
+ }
+ };
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ Ignition.stop("client", false);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ for (int i = 0; i < GRID_CNT; i++)
+ Ignition.start(getConfiguration("grid-" + i, false));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ for (int i = 0; i < GRID_CNT; i++)
+ Ignition.stop("grid-" + i, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStoreDataToIgnite() throws Exception {
+ JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+ try {
+ JavaIgniteContext<String, String> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+ ic.fromCache(PARTITIONED_CACHE_NAME)
+ .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 2).mapToPair(TO_PAIR_F));
+
+ Ignite ignite = Ignition.ignite("grid-0");
+
+ IgniteCache<String, String> cache = ignite.cache(PARTITIONED_CACHE_NAME);
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ String val = cache.get(String.valueOf(i));
+
+ assertNotNull("Value was not put to cache for key: " + i, val);
+ assertEquals("Invalid value stored for key: " + i, "val" + i, val);
+ }
+ }
+ finally {
+ sc.stop();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReadDataFromIgnite() throws Exception {
+ JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+ try {
+ JavaIgniteContext<String, Integer> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+ Ignite ignite = Ignition.ignite("grid-0");
+
+ IgniteCache<String, Integer> cache = ignite.cache(PARTITIONED_CACHE_NAME);
+
+ for (int i = 0; i < KEYS_CNT; i++)
+ cache.put(String.valueOf(i), i);
+
+ JavaRDD<Integer> values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F);
+
+ int sum = values.fold(0, SUM_F);
+
+ int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT;
+
+ assertEquals(expSum, sum);
+ }
+ finally {
+ sc.stop();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueryObjectsFromIgnite() throws Exception {
+ JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+ try {
+ JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+ JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
+
+ cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F));
+
+ List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000)
+ .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect();
+
+ assertEquals("Invalid result length", 1, res.size());
+ assertEquals("Invalid result", 50, res.get(0).id());
+ assertEquals("Invalid result", "name50", res.get(0).name());
+ assertEquals("Invalid result", 5000, res.get(0).salary());
+ assertEquals("Invalid count", 500, cache.objectSql("Entity", "id > 500").count());
+ }
+ finally {
+ sc.stop();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueryFieldsFromIgnite() throws Exception {
+ JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+ try {
+ JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+ JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
+
+ cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F));
+
+ DataFrame df =
+ cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000);
+
+ df.printSchema();
+
+ Row[] res = df.collect();
+
+ assertEquals("Invalid result length", 1, res.length);
+ assertEquals("Invalid result", 50, res[0].get(0));
+ assertEquals("Invalid result", "name50", res[0].get(1));
+ assertEquals("Invalid result", 5000, res[0].get(2));
+
+ Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000));
+
+ DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp);
+
+ df.printSchema();
+
+ Row[] res0 = df0.collect();
+
+ assertEquals("Invalid result length", 1, res0.length);
+ assertEquals("Invalid result", 50, res0[0].get(0));
+ assertEquals("Invalid result", "name50", res0[0].get(1));
+ assertEquals("Invalid result", 5000, res0[0].get(2));
+
+ assertEquals("Invalid count", 500, cache.sql("select id from Entity where id > 500").count());
+ }
+ finally {
+ sc.stop();
+ }
+
+ }
+
+ /**
+ * @param gridName Grid name.
+ * @param client Client.
+ */
+ private static IgniteConfiguration getConfiguration(String gridName, boolean client) throws Exception {
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ cfg.setCacheConfiguration(cacheConfiguration());
+
+ cfg.setClientMode(client);
+
+ cfg.setGridName(gridName);
+
+ return cfg;
+ }
+
+ /**
+ * Creates cache configuration.
+ */
+ private static CacheConfiguration<Object, Object> cacheConfiguration() {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setBackups(1);
+
+ ccfg.setName(PARTITIONED_CACHE_NAME);
+
+ ccfg.setIndexedTypes(String.class, Entity.class);
+
+ return ccfg;
+ }
+
+ /**
+ * Ignite configiration provider.
+ */
+ static class IgniteConfigProvider implements IgniteOutClosure<IgniteConfiguration> {
+ /** {@inheritDoc} */
+ @Override public IgniteConfiguration apply() {
+ try {
+ return getConfiguration("client", true);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * @param <K>
+ * @param <V>
+ */
+ static class PairToValueFunction<K, V> implements Function<Tuple2<K, V>, V> {
+ /** {@inheritDoc} */
+ @Override public V call(Tuple2<K, V> t) throws Exception {
+ return t._2();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/test/scala/org/apache/ignite/spark/Entity.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/Entity.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/Entity.scala
new file mode 100644
index 0000000..00beac6
--- /dev/null
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/Entity.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.spark.IgniteRddSpec.ScalarCacheQuerySqlField
+
+class Entity (
+ @ScalarCacheQuerySqlField(index = true) val id: Int,
+ @ScalarCacheQuerySqlField(index = true) val name: String,
+ @ScalarCacheQuerySqlField(index = true) val salary: Int
+) extends Serializable {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
new file mode 100644
index 0000000..26ce693
--- /dev/null
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignition
+import org.apache.ignite.cache.query.annotations.{QueryTextField, QuerySqlField}
+import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder
+import org.apache.spark.SparkContext
+import org.junit.runner.RunWith
+import org.scalatest._
+import org.scalatest.junit.JUnitRunner
+
+import IgniteRddSpec._
+
+import scala.annotation.meta.field
+
+@RunWith(classOf[JUnitRunner])
+class IgniteRddSpec extends FunSpec with Matchers with BeforeAndAfterAll with BeforeAndAfterEach {
+ describe("IgniteRDD") {
+ it("should successfully store data to ignite") {
+ val sc = new SparkContext("local[*]", "test")
+
+ try {
+ val ic = new IgniteContext[String, String](sc,
+ () ⇒ configuration("client", client = true))
+
+ // Save pairs ("0", "val0"), ("1", "val1"), ... to Ignite cache.
+ ic.fromCache(PARTITIONED_CACHE_NAME).savePairs(sc.parallelize(0 to 10000, 2).map(i ⇒ (String.valueOf(i), "val" + i)))
+
+ // Check cache contents.
+ val ignite = Ignition.ignite("grid-0")
+
+ for (i ← 0 to 10000) {
+ val res = ignite.cache[String, String](PARTITIONED_CACHE_NAME).get(String.valueOf(i))
+
+ assert(res != null, "Value was not put to cache for key: " + i)
+ assert("val" + i == res, "Invalid value stored for key: " + i)
+ }
+ }
+ finally {
+ sc.stop()
+ }
+ }
+
+ it("should successfully read data from ignite") {
+ val sc = new SparkContext("local[*]", "test")
+
+ try {
+ val cache = Ignition.ignite("grid-0").cache[String, Int](PARTITIONED_CACHE_NAME)
+
+ val num = 10000
+
+ for (i ← 0 to num) {
+ cache.put(String.valueOf(i), i)
+ }
+
+ val ic = new IgniteContext[String, Int](sc,
+ () ⇒ configuration("client", client = true))
+
+ val res = ic.fromCache(PARTITIONED_CACHE_NAME).map(_._2).sum()
+
+ assert(res == (0 to num).sum)
+ }
+ finally {
+ sc.stop()
+ }
+ }
+
+ it("should successfully query objects from ignite") {
+ val sc = new SparkContext("local[*]", "test")
+
+ try {
+ val ic = new IgniteContext[String, Entity](sc,
+ () ⇒ configuration("client", client = true))
+
+ val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME)
+
+ cache.savePairs(sc.parallelize(0 to 1000, 2).map(i ⇒ (String.valueOf(i), new Entity(i, "name" + i, i * 100))))
+
+ val res: Array[Entity] = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000).map(_._2).collect()
+
+ assert(res.length == 1, "Invalid result length")
+ assert(50 == res(0).id, "Invalid result")
+ assert("name50" == res(0).name, "Invalid result")
+ assert(5000 == res(0).salary)
+
+ assert(500 == cache.objectSql("Entity", "id > 500").count(), "Invalid count")
+ }
+ finally {
+ sc.stop()
+ }
+ }
+
+ it("should successfully query fields from ignite") {
+ val sc = new SparkContext("local[*]", "test")
+
+ try {
+ val ic = new IgniteContext[String, Entity](sc,
+ () ⇒ configuration("client", client = true))
+
+ val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME)
+
+ import ic.sqlContext.implicits._
+
+ cache.savePairs(sc.parallelize(0 to 1000, 2).map(i ⇒ (String.valueOf(i), new Entity(i, "name" + i, i * 100))))
+
+ val df = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000)
+
+ df.printSchema()
+
+ val res = df.collect()
+
+ assert(res.length == 1, "Invalid result length")
+ assert(50 == res(0)(0), "Invalid result")
+ assert("name50" == res(0)(1), "Invalid result")
+ assert(5000 == res(0)(2), "Invalid result")
+
+ val df0 = cache.sql("select id, name, salary from Entity").where('NAME === "name50" and 'SALARY === 5000)
+
+ val res0 = df0.collect()
+
+ assert(res0.length == 1, "Invalid result length")
+ assert(50 == res0(0)(0), "Invalid result")
+ assert("name50" == res0(0)(1), "Invalid result")
+ assert(5000 == res0(0)(2), "Invalid result")
+
+ assert(500 == cache.sql("select id from Entity where id > 500").count(), "Invalid count")
+ }
+ finally {
+ sc.stop()
+ }
+ }
+ }
+
+ override protected def beforeEach() = {
+ Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll()
+ }
+
+ override protected def afterEach() = {
+ Ignition.stop("client", false)
+ }
+
+ override protected def beforeAll() = {
+ for (i ← 0 to 3) {
+ Ignition.start(configuration("grid-" + i, client = false))
+ }
+ }
+
+ override protected def afterAll() = {
+ for (i ← 0 to 3) {
+ Ignition.stop("grid-" + i, false)
+ }
+ }
+}
+
+/**
+ * Constants and utility methods.
+ */
+object IgniteRddSpec {
+ /** IP finder for the test. */
+ val IP_FINDER = new TcpDiscoveryVmIpFinder(true)
+
+ /** Partitioned cache name. */
+ val PARTITIONED_CACHE_NAME = "partitioned"
+
+ /** Type alias for `QuerySqlField`. */
+ type ScalarCacheQuerySqlField = QuerySqlField @field
+
+ /** Type alias for `QueryTextField`. */
+ type ScalarCacheQueryTextField = QueryTextField @field
+
+ /**
+ * Gets ignite configuration.
+ *
+ * @param gridName Grid name.
+ * @param client Client mode flag.
+ * @return Ignite configuration.
+ */
+ def configuration(gridName: String, client: Boolean): IgniteConfiguration = {
+ val cfg = new IgniteConfiguration
+
+ val discoSpi = new TcpDiscoverySpi
+
+ discoSpi.setIpFinder(IgniteRddSpec.IP_FINDER)
+
+ cfg.setDiscoverySpi(discoSpi)
+
+ cfg.setCacheConfiguration(cacheConfiguration(gridName))
+
+ cfg.setClientMode(client)
+
+ cfg.setGridName(gridName)
+
+ cfg
+ }
+
+ /**
+ * Gets cache configuration for the given grid name.
+ *
+ * @param gridName Grid name.
+ * @return Cache configuration.
+ */
+ def cacheConfiguration(gridName: String): CacheConfiguration[Object, Object] = {
+ val ccfg = new CacheConfiguration[Object, Object]()
+
+ ccfg.setBackups(1)
+
+ ccfg.setName(PARTITIONED_CACHE_NAME)
+
+ ccfg.setIndexedTypes(classOf[String], classOf[Entity])
+
+ ccfg
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/visor-console-2.10/README.txt
----------------------------------------------------------------------
diff --git a/modules/visor-console-2.10/README.txt b/modules/visor-console-2.10/README.txt
new file mode 100644
index 0000000..1a018b9
--- /dev/null
+++ b/modules/visor-console-2.10/README.txt
@@ -0,0 +1,4 @@
+Apache Ignite Visor Console Module
+---------------------------
+
+Apache Ignite Visor Console module to be build with Scala 2.10.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/visor-console-2.10/pom.xml
----------------------------------------------------------------------
diff --git a/modules/visor-console-2.10/pom.xml b/modules/visor-console-2.10/pom.xml
new file mode 100644
index 0000000..f0df657
--- /dev/null
+++ b/modules/visor-console-2.10/pom.xml
@@ -0,0 +1,174 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>ignite-visor-console_2.10</artifactId>
+ <version>1.1.1-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-ssh</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spring</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-core</artifactId>
+ <version>${spring.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-beans</artifactId>
+ <version>${spring.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ <version>${spring.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-expression</artifactId>
+ <version>${spring.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>2.10.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>jline</artifactId>
+ <version>2.10.4</version>
+ </dependency>
+ <!-- Third party dependencies -->
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-indexing</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_2.10</artifactId>
+ <version>2.2.2</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- Test dependencies -->
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>../visor-console/src/main/scala</directory>
+ <excludes>
+ <exclude>**/*.scala</exclude>
+ </excludes>
+ </resource>
+ </resources>
+
+ <testResources>
+ <testResource>
+ <directory>../visor-console/src/test/scala</directory>
+ <excludes>
+ <exclude>**/*.scala</exclude>
+ </excludes>
+ </testResource>
+ </testResources>
+
+ <!-- TODO IGNITE-956 FIX scaladocs plugins-->
+ <!--<plugin>-->
+ <!--<groupId>net.alchim31.maven</groupId>-->
+ <!--<artifactId>scala-maven-plugin</artifactId>-->
+ <!--<executions>-->
+ <!--<execution>-->
+ <!--<id>scaladoc</id>-->
+ <!--<phase>prepare-package</phase>-->
+ <!--<goals>-->
+ <!--<goal>doc</goal>-->
+ <!--</goals>-->
+ <!--<configuration>-->
+ <!--<doctitle>Ignite Console Visor</doctitle>-->
+ <!--<windowtitle>Ignite Console Visor</windowtitle>-->
+ <!--</configuration>-->
+ <!--</execution>-->
+ <!--</executions>-->
+ <!--</plugin>-->
+
+ <!--<plugin>-->
+ <!--<groupId>org.apache.maven.plugins</groupId>-->
+ <!--<artifactId>maven-antrun-plugin</artifactId>-->
+ <!--<version>1.7</version>-->
+ <!--<executions>-->
+ <!--<execution>-->
+ <!--<id>javadoc-zip</id>-->
+ <!--<goals>-->
+ <!--<goal>run</goal>-->
+ <!--</goals>-->
+ <!--<phase>prepare-package</phase>-->
+ <!--<configuration>-->
+ <!--<target>-->
+ <!--<zip destfile="target/ignite-visor-console-${project.version}-javadoc.jar" basedir="target/site/scaladocs" encoding="UTF-8" />-->
+ <!--</target>-->
+ <!--</configuration>-->
+ <!--</execution>-->
+ <!--</executions>-->
+ <!--</plugin>-->
+ <!--/plugins-->
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index a514e35..f5b73df 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -321,6 +321,10 @@
<title>Mesos Framework</title>
<packages>org.apache.ignite.mesos*</packages>
</group>
+ <group>
+ <title>Spark Integration</title>
+ <packages>org.apache.ignite.spark.examples.java</packages>
+ </group>
</groups>
<header>
<![CDATA[
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6f8524f..a08dd6c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -409,18 +409,34 @@
<id>scala</id>
<activation>
- <activeByDefault>true</activeByDefault>
- <jdk>[1.7,)</jdk>
+ <property><name>!scala-2.10</name></property>
</activation>
<modules>
<module>modules/scalar</module>
+ <module>modules/spark</module>
<module>modules/visor-console</module>
<module>modules/visor-plugins</module>
</modules>
</profile>
<profile>
+ <id>scala-2.10</id>
+
+ <activation>
+ <property><name>scala-2.10</name></property>
+ </activation>
+
+ <modules>
+ <module>modules/scalar-2.10</module>
+ <module>modules/spark-2.10</module>
+ <module>modules/visor-console-2.10</module>
+ <module>modules/visor-plugins</module>
+ </modules>
+ </profile>
+
+
+ <profile>
<id>lgpl</id>
<modules>
<module>modules/hibernate</module>