You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/11 12:31:23 UTC
[07/50] incubator-ignite git commit: #IGNITE-389 - Adding test,
fixing colocation.
#IGNITE-389 - Adding test, fixing colocation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f00a9e99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f00a9e99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f00a9e99
Branch: refs/heads/ignite-929
Commit: f00a9e9980aefeba2d80969730552e5c5651f1c6
Parents: d151244
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed May 27 23:30:49 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed May 27 23:30:49 2015 -0700
----------------------------------------------------------------------
modules/spark/pom.xml | 8 ----
.../org/apache/ignite/spark/IgniteRDD.scala | 8 ++--
.../ignite/spark/examples/ColocationTest.scala | 40 ++++++++++++++++++++
3 files changed, 45 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f00a9e99/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index dc01c76..84055d6 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -49,14 +49,6 @@
</dependency>
<dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-indexing</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f00a9e99/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index ee0e9b3..30efa7a 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -23,6 +23,7 @@ import org.apache.ignite.cluster.ClusterNode
import org.apache.ignite.configuration.CacheConfiguration
import org.apache.ignite.lang.IgniteUuid
import org.apache.ignite.spark.impl.{IgniteAbstractRDD, IgniteSqlRDD, IgnitePartition, IgniteQueryIterator}
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode
import org.apache.spark.rdd.RDD
import org.apache.spark.{TaskContext, Partition}
@@ -37,9 +38,9 @@ class IgniteRDD[K, V] (
override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
val cache = ensureCache()
- val qry: ScanQuery[K, V] = new ScanQuery[K, V]()
+ val qry: ScanQuery[K, V] = new ScanQuery[K, V](part.index)
- qry.setPartition(part.index)
+ val partNodes = ic.ignite().affinity(cache.getName).mapPartitionToPrimaryAndBackups(part.index)
val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(qry).iterator()
@@ -59,7 +60,8 @@ class IgniteRDD[K, V] (
override protected def getPreferredLocations(split: Partition): Seq[String] = {
ensureCache()
- ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index).map(_.addresses()).flatten.toList
+ ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index)
+ .map(_.asInstanceOf[TcpDiscoveryNode].socketAddresses()).flatten.map(_.getHostName).toList
}
def objectSql(typeName: String, sql: String, args: Any*): RDD[(K, V)] = {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f00a9e99/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala
new file mode 100644
index 0000000..a0814fa
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/ColocationTest.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples
+
+import org.apache.ignite.configuration.IgniteConfiguration
+import org.apache.ignite.spark.IgniteContext
+import org.apache.spark.{SparkContext, SparkConf}
+
+object ColocationTest {
+ def main(args: Array[String]) {
+ val conf = new SparkConf().setAppName("Colocation test")
+ val sc = new SparkContext(conf)
+
+ val ignite = new IgniteContext[Int, Int](sc, () ⇒ new IgniteConfiguration())
+
+ // Search for lines containing "Ignite".
+ val cache = ignite.fromCache("partitioned")
+
+ cache.saveTuples(sc.parallelize((1 to 100000).toSeq, 48).map(i => (i, i)))
+
+ // Execute parallel sum.
+ println("Local sum: " + (1 to 100000).sum)
+ println("Distributed sum: " + cache.map(_._2).sum())
+ }
+}
\ No newline at end of file