You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/05/28 08:30:54 UTC

incubator-ignite git commit: #IGNITE-389 - Adding test, fixing colocation.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-389 d151244ee -> f00a9e998


#IGNITE-389 - Adding test, fixing colocation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f00a9e99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f00a9e99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f00a9e99

Branch: refs/heads/ignite-389
Commit: f00a9e9980aefeba2d80969730552e5c5651f1c6
Parents: d151244
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed May 27 23:30:49 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed May 27 23:30:49 2015 -0700

----------------------------------------------------------------------
 modules/spark/pom.xml                           |  8 ----
 .../org/apache/ignite/spark/IgniteRDD.scala     |  8 ++--
 .../ignite/spark/examples/ColocationTest.scala  | 40 ++++++++++++++++++++
 3 files changed, 45 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f00a9e99/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index dc01c76..84055d6 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -49,14 +49,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-indexing</artifactId>
-            <version>${project.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
             <version>2.10.4</version>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f00a9e99/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index ee0e9b3..30efa7a 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -23,6 +23,7 @@ import org.apache.ignite.cluster.ClusterNode
 import org.apache.ignite.configuration.CacheConfiguration
 import org.apache.ignite.lang.IgniteUuid
 import org.apache.ignite.spark.impl.{IgniteAbstractRDD, IgniteSqlRDD, IgnitePartition, IgniteQueryIterator}
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode
 import org.apache.spark.rdd.RDD
 import org.apache.spark.{TaskContext, Partition}
 
@@ -37,9 +38,9 @@ class IgniteRDD[K, V] (
     override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
         val cache = ensureCache()
 
-        val qry: ScanQuery[K, V] = new ScanQuery[K, V]()
+        val qry: ScanQuery[K, V] = new ScanQuery[K, V](part.index)
 
-        qry.setPartition(part.index)
+        val partNodes = ic.ignite().affinity(cache.getName).mapPartitionToPrimaryAndBackups(part.index)
 
         val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(qry).iterator()
 
@@ -59,7 +60,8 @@ class IgniteRDD[K, V] (
     override protected def getPreferredLocations(split: Partition): Seq[String] = {
         ensureCache()
 
-        ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index).map(_.addresses()).flatten.toList
+        ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index)
+            .map(_.asInstanceOf[TcpDiscoveryNode].socketAddresses()).flatten.map(_.getHostName).toList
     }
 
     def objectSql(typeName: String, sql: String, args: Any*): RDD[(K, V)] = {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f00a9e99/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala
new file mode 100644
index 0000000..a0814fa
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples
+
+import org.apache.ignite.configuration.IgniteConfiguration
+import org.apache.ignite.spark.IgniteContext
+import org.apache.spark.{SparkContext, SparkConf}
+
+object ColocationTest {
+    def main(args: Array[String]) {
+        val conf = new SparkConf().setAppName("Colocation test")
+        val sc = new SparkContext(conf)
+
+        val ignite = new IgniteContext[Int, Int](sc, () ⇒ new IgniteConfiguration())
+
+        // Search for lines containing "Ignite".
+        val cache = ignite.fromCache("partitioned")
+
+        cache.saveTuples(sc.parallelize((1 to 100000).toSeq, 48).map(i => (i, i)))
+
+        // Execute parallel sum.
+        println("Local sum: " + (1 to 100000).sum)
+        println("Distributed sum: " + cache.map(_._2).sum())
+    }
+}
\ No newline at end of file