You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/07/03 02:55:22 UTC

[13/50] incubator-ignite git commit: IGNITE-1061 - Fixed as discussed.

IGNITE-1061 - Fixed as discussed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b467822d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b467822d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b467822d

Branch: refs/heads/ignite-428
Commit: b467822d55c8f796de2d7b3c2aa80b46f81811c1
Parents: 68c21ac
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Mon Jun 29 20:33:20 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Mon Jun 29 20:33:20 2015 -0700

----------------------------------------------------------------------
 .../core/src/test/config/spark/spark-config.xml | 46 ++++++++++++++++++
 .../org/apache/ignite/spark/IgniteContext.scala | 50 ++++++++++++++++++--
 .../org/apache/ignite/spark/IgniteRddSpec.scala | 18 +++++++
 3 files changed, 110 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b467822d/modules/core/src/test/config/spark/spark-config.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/spark/spark-config.xml b/modules/core/src/test/config/spark/spark-config.xml
new file mode 100644
index 0000000..4b7ffe1
--- /dev/null
+++ b/modules/core/src/test/config/spark/spark-config.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd">
+    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="localHost" value="127.0.0.1"/>
+
+        <property name="discoverySpi">
+            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+                <property name="ipFinder">
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                        <property name="addresses">
+                            <list>
+                                <value>127.0.0.1:47500</value>
+                                <value>127.0.0.1:47501</value>
+                                <value>127.0.0.1:47502</value>
+                                <value>127.0.0.1:47503</value>
+                                <value>127.0.0.1:47504</value>
+                            </list>
+                        </property>
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b467822d/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index e52555a..5dbb1d3 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -33,11 +33,13 @@ import org.apache.spark.sql.SQLContext
  * @tparam V Value type.
  */
 class IgniteContext[K, V](
-    @scala.transient val sparkContext: SparkContext,
+    @transient val sparkContext: SparkContext,
     cfgF: () ⇒ IgniteConfiguration,
     client: Boolean = true
 ) extends Serializable with Logging {
-    @scala.transient private val driver = true
+    @transient private val driver = true
+
+    private val cfgClo = new Once(cfgF)
 
     if (!client) {
         val workers = sparkContext.getExecutorStorageStatus.length - 1
@@ -51,6 +53,15 @@ class IgniteContext[K, V](
         sparkContext.parallelize(1 to workers, workers).foreach(it ⇒ ignite())
     }
 
+    // Make sure to start Ignite on context creation.
+    ignite()
+
+    /**
+     * Creates an instance of IgniteContext with the given spring configuration.
+     *
+     * @param sc Spark context.
+     * @param springUrl Spring configuration path.
+     */
     def this(
         sc: SparkContext,
         springUrl: String
@@ -58,6 +69,17 @@ class IgniteContext[K, V](
         this(sc, () ⇒ IgnitionEx.loadConfiguration(springUrl).get1())
     }
 
+    /**
+     * Creates an instance of IgniteContext with default Ignite configuration.
+     * By default this method will use grid configuration defined in `IGNITE_HOME/config/default-config.xml`
+     * configuration file.
+     *
+     * @param sc Spark context.
+     */
+    def this(sc: SparkContext) {
+        this(sc, IgnitionEx.DFLT_CFG)
+    }
+
     val sqlContext = new SQLContext(sparkContext)
 
     /**
@@ -89,7 +111,7 @@ class IgniteContext[K, V](
      * @return Ignite instance.
      */
     def ignite(): Ignite = {
-        val igniteCfg = cfgF()
+        val igniteCfg = cfgClo()
 
         try {
             Ignition.ignite(igniteCfg.getGridName)
@@ -112,8 +134,28 @@ class IgniteContext[K, V](
      * a no-op.
      */
     def close() = {
-        val igniteCfg = cfgF()
+        val igniteCfg = cfgClo()
 
         Ignition.stop(igniteCfg.getGridName, false)
     }
 }
+
+/**
+ * Auxiliary closure that ensures that passed in closure is executed only once.
+ *
+ * @param clo Closure to wrap.
+ */
+private class Once(clo: () ⇒ IgniteConfiguration) extends Serializable {
+    @transient @volatile var res: IgniteConfiguration = null
+
+    def apply(): IgniteConfiguration = {
+        if (res == null) {
+            this.synchronized {
+                if (res == null)
+                    res = clo()
+            }
+        }
+
+        res
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b467822d/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
index 26ce693..8fa6949 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
@@ -147,6 +147,24 @@ class IgniteRddSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be
                 sc.stop()
             }
         }
+
+        it("should successfully start spark context with XML configuration") {
+            val sc = new SparkContext("local[*]", "test")
+
+            try {
+                val ic = new IgniteContext[String, String](sc,
+                    "modules/core/src/test/config/spark/spark-config.xml")
+
+                val cache: IgniteRDD[String, String] = ic.fromCache(PARTITIONED_CACHE_NAME)
+
+                cache.savePairs(sc.parallelize(1 to 1000, 2).map(i ⇒ (String.valueOf(i), "val" + i)))
+
+                assert(1000 == cache.count())
+            }
+            finally {
+                sc.stop()
+            }
+        }
     }
 
     override protected def beforeEach() = {