You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/20 11:46:33 UTC

[09/29] ignite git commit: IGNITE-4526: Add Spark Shared RDD examples Reviewed by Denis Magda

IGNITE-4526: Add Spark Shared RDD examples
Reviewed by Denis Magda <dm...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b461cb47
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b461cb47
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b461cb47

Branch: refs/heads/ignite-2.0
Commit: b461cb47882861356ede58775bd9e253dcf26202
Parents: 79e1e53
Author: Manish Mishra <ma...@knoldus.com>
Authored: Tue Feb 14 16:54:11 2017 -0800
Committer: Denis Magda <dm...@gridgain.com>
Committed: Tue Feb 14 16:54:11 2017 -0800

----------------------------------------------------------------------
 examples/config/spark/example-shared-rdd.xml    |  83 ++++++++++++++
 examples/pom.xml                                |  27 ++++-
 .../examples/java8/spark/SharedRDDExample.java  | 110 +++++++++++++++++++
 .../examples/spark/ScalarSharedRDDExample.scala |  89 +++++++++++++++
 .../examples/SharedRDDExampleSelfTest.java      |  36 ++++++
 .../IgniteExamplesJ8SelfTestSuite.java          |   3 +
 .../tests/examples/ScalarExamplesSelfTest.scala |   6 +
 .../apache/ignite/spark/JavaIgniteContext.scala |   6 +
 8 files changed, 359 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/config/spark/example-shared-rdd.xml
----------------------------------------------------------------------
diff --git a/examples/config/spark/example-shared-rdd.xml b/examples/config/spark/example-shared-rdd.xml
new file mode 100644
index 0000000..83de6a3
--- /dev/null
+++ b/examples/config/spark/example-shared-rdd.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    Ignite Spring configuration file to startup Ignite cache.
+
+    This file demonstrates how to configure cache using Spring. Provided cache
+    will be created on node startup.
+
+    When starting a standalone node, you need to execute the following command:
+    {IGNITE_HOME}/bin/ignite.{bat|sh} examples/config/example-shared-rdd.xml
+
+    When starting Ignite from Java IDE, pass path to this file to Ignition:
+    Ignition.start("examples/config/example-shared-rdd.xml");
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="cacheConfiguration">
+            <!-- SharedRDD cache example configuration (Atomic mode). -->
+            <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                <!-- Set a cache name. -->
+                <property name="name" value="sharedRDD"/>
+                <!-- Set a cache mode. -->
+                <property name="cacheMode" value="PARTITIONED"/>
+                <!-- Index Integer pairs used in the example. -->
+                <property name="indexedTypes">
+                    <list>
+                        <value>java.lang.Integer</value>
+                        <value>java.lang.Integer</value>
+                    </list>
+                </property>
+                <!-- Set atomicity mode. -->
+                <property name="atomicityMode" value="ATOMIC"/>
+                <!-- Configure a number of backups. -->
+                <property name="backups" value="1"/>
+            </bean>
+        </property>
+
+        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
+        <property name="discoverySpi">
+            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+                <property name="ipFinder">
+                    <!--
+                        Ignite provides several options for automatic discovery that can be used
+                        instead os static IP based discovery. For information on all options refer
+                        to our documentation: http://apacheignite.readme.io/docs/cluster-config
+                    -->
+                    <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
+                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+                        <property name="addresses">
+                            <list>
+                                <!-- In distributed environment, replace with actual host IP address. -->
+                                <value>127.0.0.1:47500..47509</value>
+                            </list>
+                        </property>
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+</beans>

http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 3a6a026..1c4ad25 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -17,7 +17,8 @@
   limitations under the License.
 -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -138,6 +139,18 @@
                         </exclusion>
                     </exclusions>
                 </dependency>
+
+                <dependency>
+                    <groupId>org.apache.ignite</groupId>
+                    <artifactId>ignite-spark</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.jboss.netty</groupId>
+                    <artifactId>netty</artifactId>
+                    <version>3.2.9.Final</version>
+                </dependency>
             </dependencies>
 
             <build>
@@ -172,6 +185,18 @@
                         </exclusion>
                     </exclusions>
                 </dependency>
+
+                <dependency>
+                    <groupId>org.apache.ignite</groupId>
+                    <artifactId>ignite-spark_2.10</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.jboss.netty</groupId>
+                    <artifactId>netty</artifactId>
+                    <version>3.2.9.Final</version>
+                </dependency>
             </dependencies>
 
             <build>

http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java
new file mode 100644
index 0000000..392180d
--- /dev/null
+++ b/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8.spark;
+
+import org.apache.ignite.spark.JavaIgniteContext;
+import org.apache.ignite.spark.JavaIgniteRDD;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.DataFrame;
+import scala.Tuple2;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * This example demonstrates how to create an JavaIgnitedRDD and share it with multiple spark workers. The goal of this
+ * particular example is to provide the simplest code example of this logic.
+ * <p>
+ * This example will start Ignite in the embedded mode and will start an JavaIgniteContext on each Spark worker node.
+ * <p>
+ * The example can work in the standalone mode as well that can be enabled by setting JavaIgniteContext's
+ * {@code standalone} property to {@code true} and running an Ignite node separately with
+ * `examples/config/spark/example-shared-rdd.xml` config.
+ */
+public class SharedRDDExample {
+    /**
+     * Executes the example.
+     * @param args Command line arguments, none required.
+     */
+    public static void main(String args[]) {
+        // Spark Configuration.
+        SparkConf sparkConf = new SparkConf()
+            .setAppName("JavaIgniteRDDExample")
+            .setMaster("local")
+            .set("spark.executor.instances", "2");
+
+        // Spark context.
+        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
+
+        // Adjust the logger to exclude the logs of no interest.
+        Logger.getRootLogger().setLevel(Level.ERROR);
+        Logger.getLogger("org.apache.ignite").setLevel(Level.INFO);
+
+        // Creates Ignite context with specific configuration and runs Ignite in the embedded mode.
+        JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>(
+            sparkContext,"examples/config/spark/example-shared-rdd.xml", false);
+
+        // Create a Java Ignite RDD of Type (Int,Int) Integer Pair.
+        JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");
+
+        // Define data to be stored in the Ignite RDD (cache).
+        List<Integer> data = IntStream.range(0, 20).boxed().collect(Collectors.toList());
+
+        // Preparing a Java RDD.
+        JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data);
+
+        // Fill the Ignite RDD in with Int pairs. Here Pairs are represented as Scala Tuple2.
+        sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() {
+            @Override public Tuple2<Integer, Integer> call(Integer val) throws Exception {
+                return new Tuple2<Integer, Integer>(val, val);
+            }
+        }));
+
+        System.out.println(">>> Iterating over Ignite Shared RDD...");
+
+        // Iterate over the Ignite RDD.
+        sharedRDD.foreach((x) -> System.out.println("(" + x._1 + "," + x._2 + ")"));
+
+        System.out.println(">>> Transforming values stored in Ignite Shared RDD...");
+
+        // Filter out even values as a transformed RDD.
+        JavaPairRDD<Integer, Integer> transformedValues =
+            sharedRDD.filter((Tuple2<Integer, Integer> pair) -> pair._2() % 2 == 0);
+
+        // Print out the transformed values.
+        transformedValues.foreach((x) -> System.out.println("(" + x._1 + "," + x._2 + ")"));
+
+        System.out.println(">>> Executing SQL query over Ignite Shared RDD...");
+
+        // Execute SQL query over the Ignite RDD.
+        DataFrame df = sharedRDD.sql("select _val from Integer where _key < 9");
+
+        // Show the result of the execution.
+        df.show();
+
+        // Close IgniteContext on all the workers.
+        igniteContext.close(true);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala
new file mode 100644
index 0000000..18662e8
--- /dev/null
+++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.scalar.examples.spark
+
+import org.apache.ignite.spark.{IgniteContext, IgniteRDD}
+import org.apache.log4j.{Level, Logger}
+import org.apache.spark.{SparkConf, SparkContext}
+
+/**
+  * This example demonstrates how to create an IgnitedRDD and share it with multiple spark workers.
+  * The goal of this particular example is to provide the simplest code example of this logic.
+  * <p>
+  * This example will start Ignite in the embedded mode and will start an IgniteContext on each Spark worker node.
+  * <p>
+  * The example can work in the standalone mode as well that can be enabled by setting IgniteContext's {@code isClient}
+  * property to {@code true} and running an Ignite node separately with `examples/config/spark/
+  * example-shared-rdd.xml` config.
+  * <p>
+  */
+object ScalarSharedRDDExample extends App {
+    // Spark Configuration.
+    private val conf = new SparkConf()
+        .setAppName("IgniteRDDExample")
+        .setMaster("local")
+        .set("spark.executor.instances", "2")
+
+    // Spark context.
+    val sparkContext = new SparkContext(conf)
+
+    // Adjust the logger to exclude the logs of no interest.
+    Logger.getRootLogger.setLevel(Level.ERROR)
+    Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
+
+    // Defines spring cache Configuration path.
+    private val CONFIG = "examples/config/spark/example-shared-rdd.xml"
+
+    // Creates Ignite context with above configuration.
+    val igniteContext = new IgniteContext(sparkContext, CONFIG, false)
+
+    // Creates an Ignite Shared RDD of Type (Int,Int) Integer Pair.
+    val sharedRDD: IgniteRDD[Int, Int] = igniteContext.fromCache[Int, Int]("sharedRDD")
+
+    // Fill the Ignite Shared RDD in with Int pairs.
+    sharedRDD.savePairs(sparkContext.parallelize(1 to 100000, 10).map(i => (i, i)))
+
+    // Transforming Pairs to contain their Squared value.
+    sharedRDD.mapValues(x => (x * x))
+
+    // Retrieve sharedRDD back from the Cache.
+    val transformedValues: IgniteRDD[Int, Int] = igniteContext.fromCache("sharedRDD")
+
+    // Perform some transformations on IgniteRDD and print.
+    val squareAndRootPair = transformedValues.map { case (x, y) => (x, Math.sqrt(y.toDouble)) }
+
+    println(">>> Transforming values stored in Ignite Shared RDD...")
+
+    // Filter out pairs which square roots are less than 100 and
+    // take the first five elements from the transformed IgniteRDD and print them.
+    squareAndRootPair.filter(_._2 < 100.0).take(5).foreach(println)
+
+    println(">>> Executing SQL query over Ignite Shared RDD...")
+
+    // Execute a SQL query over the Ignite Shared RDD.
+    val df = transformedValues.sql("select _val from Integer where _val < 100 and _val > 9 ")
+
+    // Show ten rows from the result set.
+    df.show(10)
+
+    // Close IgniteContext on all workers.
+    igniteContext.close(true)
+
+    // Stop SparkContext.
+    sparkContext.stop()
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java
----------------------------------------------------------------------
diff --git a/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java b/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java
new file mode 100644
index 0000000..0fafb4d
--- /dev/null
+++ b/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.java8.examples;
+
+import org.apache.ignite.examples.java8.spark.SharedRDDExample;
+import org.junit.Test;
+
+/**
+ * SharedRDD  examples self test.
+ */
+public class SharedRDDExampleSelfTest {
+    static final String[] EMPTY_ARGS = new String[0];
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSharedRDDExample() throws Exception {
+        SharedRDDExample.main(EMPTY_ARGS);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java
----------------------------------------------------------------------
diff --git a/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java b/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java
index 949324c..c32339f 100644
--- a/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java
+++ b/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java
@@ -26,6 +26,7 @@ import org.apache.ignite.java8.examples.EventsExamplesMultiNodeSelfTest;
 import org.apache.ignite.java8.examples.EventsExamplesSelfTest;
 import org.apache.ignite.java8.examples.IndexingBridgeMethodTest;
 import org.apache.ignite.java8.examples.MessagingExamplesSelfTest;
+import org.apache.ignite.java8.examples.SharedRDDExampleSelfTest;
 import org.apache.ignite.testframework.GridTestUtils;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP;
@@ -49,6 +50,8 @@ public class IgniteExamplesJ8SelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(IndexingBridgeMethodTest.class));
         suite.addTest(new TestSuite(CacheExamplesSelfTest.class));
         suite.addTest(new TestSuite(BasicExamplesSelfTest.class));
+        suite.addTest(new TestSuite(SharedRDDExampleSelfTest.class));
+
 //        suite.addTest(new TestSuite(ContinuationExamplesSelfTest.class));
 //        suite.addTest(new TestSuite(ContinuousMapperExamplesSelfTest.class));
 //        suite.addTest(new TestSuite(DeploymentExamplesSelfTest.class));

http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala b/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala
index 94c41ad..28e509e 100644
--- a/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala
+++ b/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala
@@ -18,6 +18,7 @@
 package org.apache.ignite.scalar.tests.examples
 
 import org.apache.ignite.scalar.examples._
+import org.apache.ignite.scalar.examples.spark._
 import org.apache.ignite.scalar.scalar
 import org.apache.ignite.testframework.junits.common.GridAbstractExamplesTest
 import org.scalatest.junit.JUnitSuiteLike
@@ -95,4 +96,9 @@ class ScalarExamplesSelfTest extends GridAbstractExamplesTest with JUnitSuiteLik
     def testScalarSnowflakeSchemaExample() {
         ScalarSnowflakeSchemaExample.main(EMPTY_ARGS)
     }
+
+    /** */
+    def testScalarSharedRDDExample() {
+        ScalarSharedRDDExample.main(EMPTY_ARGS)
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
index 689a22d..d8a521b 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
@@ -51,6 +51,12 @@ class JavaIgniteContext[K, V](
         })
     }
 
+    def this(sc: JavaSparkContext, springUrl: String, standalone: Boolean) {
+        this(sc, new IgniteOutClosure[IgniteConfiguration] {
+            override def apply() = IgnitionEx.loadConfiguration(springUrl).get1()
+        }, standalone)
+    }
+
     def fromCache(cacheName: String): JavaIgniteRDD[K, V] =
         JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null, false))