You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/02/16 17:36:41 UTC
[40/50] [abbrv] ignite git commit: IGNITE-4526: Add Spark Shared RDD
examples Reviewed by Denis Magda
IGNITE-4526: Add Spark Shared RDD examples
Reviewed by Denis Magda <dm...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b461cb47
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b461cb47
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b461cb47
Branch: refs/heads/ignite-comm-balance-master
Commit: b461cb47882861356ede58775bd9e253dcf26202
Parents: 79e1e53
Author: Manish Mishra <ma...@knoldus.com>
Authored: Tue Feb 14 16:54:11 2017 -0800
Committer: Denis Magda <dm...@gridgain.com>
Committed: Tue Feb 14 16:54:11 2017 -0800
----------------------------------------------------------------------
examples/config/spark/example-shared-rdd.xml | 83 ++++++++++++++
examples/pom.xml | 27 ++++-
.../examples/java8/spark/SharedRDDExample.java | 110 +++++++++++++++++++
.../examples/spark/ScalarSharedRDDExample.scala | 89 +++++++++++++++
.../examples/SharedRDDExampleSelfTest.java | 36 ++++++
.../IgniteExamplesJ8SelfTestSuite.java | 3 +
.../tests/examples/ScalarExamplesSelfTest.scala | 6 +
.../apache/ignite/spark/JavaIgniteContext.scala | 6 +
8 files changed, 359 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/config/spark/example-shared-rdd.xml
----------------------------------------------------------------------
diff --git a/examples/config/spark/example-shared-rdd.xml b/examples/config/spark/example-shared-rdd.xml
new file mode 100644
index 0000000..83de6a3
--- /dev/null
+++ b/examples/config/spark/example-shared-rdd.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Ignite Spring configuration file to startup Ignite cache.
+
+ This file demonstrates how to configure cache using Spring. Provided cache
+ will be created on node startup.
+
+ When starting a standalone node, you need to execute the following command:
+ {IGNITE_HOME}/bin/ignite.{bat|sh} examples/config/example-shared-rdd.xml
+
+ When starting Ignite from Java IDE, pass path to this file to Ignition:
+ Ignition.start("examples/config/example-shared-rdd.xml");
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="cacheConfiguration">
+ <!-- SharedRDD cache example configuration (Atomic mode). -->
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <!-- Set a cache name. -->
+ <property name="name" value="sharedRDD"/>
+ <!-- Set a cache mode. -->
+ <property name="cacheMode" value="PARTITIONED"/>
+ <!-- Index Integer pairs used in the example. -->
+ <property name="indexedTypes">
+ <list>
+ <value>java.lang.Integer</value>
+ <value>java.lang.Integer</value>
+ </list>
+ </property>
+ <!-- Set atomicity mode. -->
+ <property name="atomicityMode" value="ATOMIC"/>
+ <!-- Configure a number of backups. -->
+ <property name="backups" value="1"/>
+ </bean>
+ </property>
+
+ <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <!--
+ Ignite provides several options for automatic discovery that can be used
+ instead os static IP based discovery. For information on all options refer
+ to our documentation: http://apacheignite.readme.io/docs/cluster-config
+ -->
+ <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
+ <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
+ <property name="addresses">
+ <list>
+ <!-- In distributed environment, replace with actual host IP address. -->
+ <value>127.0.0.1:47500..47509</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+</beans>
http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 3a6a026..1c4ad25 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -17,7 +17,8 @@
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -138,6 +139,18 @@
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spark</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.2.9.Final</version>
+ </dependency>
</dependencies>
<build>
@@ -172,6 +185,18 @@
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spark_2.10</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.2.9.Final</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java
new file mode 100644
index 0000000..392180d
--- /dev/null
+++ b/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8.spark;
+
+import org.apache.ignite.spark.JavaIgniteContext;
+import org.apache.ignite.spark.JavaIgniteRDD;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.DataFrame;
+import scala.Tuple2;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * This example demonstrates how to create an JavaIgnitedRDD and share it with multiple spark workers. The goal of this
+ * particular example is to provide the simplest code example of this logic.
+ * <p>
+ * This example will start Ignite in the embedded mode and will start an JavaIgniteContext on each Spark worker node.
+ * <p>
+ * The example can work in the standalone mode as well that can be enabled by setting JavaIgniteContext's
+ * {@code standalone} property to {@code true} and running an Ignite node separately with
+ * `examples/config/spark/example-shared-rdd.xml` config.
+ */
+public class SharedRDDExample {
+ /**
+ * Executes the example.
+ * @param args Command line arguments, none required.
+ */
+ public static void main(String args[]) {
+ // Spark Configuration.
+ SparkConf sparkConf = new SparkConf()
+ .setAppName("JavaIgniteRDDExample")
+ .setMaster("local")
+ .set("spark.executor.instances", "2");
+
+ // Spark context.
+ JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
+
+ // Adjust the logger to exclude the logs of no interest.
+ Logger.getRootLogger().setLevel(Level.ERROR);
+ Logger.getLogger("org.apache.ignite").setLevel(Level.INFO);
+
+ // Creates Ignite context with specific configuration and runs Ignite in the embedded mode.
+ JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>(
+ sparkContext,"examples/config/spark/example-shared-rdd.xml", false);
+
+ // Create a Java Ignite RDD of Type (Int,Int) Integer Pair.
+ JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");
+
+ // Define data to be stored in the Ignite RDD (cache).
+ List<Integer> data = IntStream.range(0, 20).boxed().collect(Collectors.toList());
+
+ // Preparing a Java RDD.
+ JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data);
+
+ // Fill the Ignite RDD in with Int pairs. Here Pairs are represented as Scala Tuple2.
+ sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() {
+ @Override public Tuple2<Integer, Integer> call(Integer val) throws Exception {
+ return new Tuple2<Integer, Integer>(val, val);
+ }
+ }));
+
+ System.out.println(">>> Iterating over Ignite Shared RDD...");
+
+ // Iterate over the Ignite RDD.
+ sharedRDD.foreach((x) -> System.out.println("(" + x._1 + "," + x._2 + ")"));
+
+ System.out.println(">>> Transforming values stored in Ignite Shared RDD...");
+
+ // Filter out even values as a transformed RDD.
+ JavaPairRDD<Integer, Integer> transformedValues =
+ sharedRDD.filter((Tuple2<Integer, Integer> pair) -> pair._2() % 2 == 0);
+
+ // Print out the transformed values.
+ transformedValues.foreach((x) -> System.out.println("(" + x._1 + "," + x._2 + ")"));
+
+ System.out.println(">>> Executing SQL query over Ignite Shared RDD...");
+
+ // Execute SQL query over the Ignite RDD.
+ DataFrame df = sharedRDD.sql("select _val from Integer where _key < 9");
+
+ // Show the result of the execution.
+ df.show();
+
+ // Close IgniteContext on all the workers.
+ igniteContext.close(true);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala
new file mode 100644
index 0000000..18662e8
--- /dev/null
+++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.scalar.examples.spark
+
+import org.apache.ignite.spark.{IgniteContext, IgniteRDD}
+import org.apache.log4j.{Level, Logger}
+import org.apache.spark.{SparkConf, SparkContext}
+
+/**
+ * This example demonstrates how to create an IgnitedRDD and share it with multiple spark workers.
+ * The goal of this particular example is to provide the simplest code example of this logic.
+ * <p>
+ * This example will start Ignite in the embedded mode and will start an IgniteContext on each Spark worker node.
+ * <p>
+ * The example can work in the standalone mode as well that can be enabled by setting IgniteContext's {@code isClient}
+ * property to {@code true} and running an Ignite node separately with `examples/config/spark/
+ * example-shared-rdd.xml` config.
+ * <p>
+ */
+object ScalarSharedRDDExample extends App {
+ // Spark Configuration.
+ private val conf = new SparkConf()
+ .setAppName("IgniteRDDExample")
+ .setMaster("local")
+ .set("spark.executor.instances", "2")
+
+ // Spark context.
+ val sparkContext = new SparkContext(conf)
+
+ // Adjust the logger to exclude the logs of no interest.
+ Logger.getRootLogger.setLevel(Level.ERROR)
+ Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
+
+ // Defines spring cache Configuration path.
+ private val CONFIG = "examples/config/spark/example-shared-rdd.xml"
+
+ // Creates Ignite context with above configuration.
+ val igniteContext = new IgniteContext(sparkContext, CONFIG, false)
+
+ // Creates an Ignite Shared RDD of Type (Int,Int) Integer Pair.
+ val sharedRDD: IgniteRDD[Int, Int] = igniteContext.fromCache[Int, Int]("sharedRDD")
+
+ // Fill the Ignite Shared RDD in with Int pairs.
+ sharedRDD.savePairs(sparkContext.parallelize(1 to 100000, 10).map(i => (i, i)))
+
+ // Transforming Pairs to contain their Squared value.
+ sharedRDD.mapValues(x => (x * x))
+
+ // Retrieve sharedRDD back from the Cache.
+ val transformedValues: IgniteRDD[Int, Int] = igniteContext.fromCache("sharedRDD")
+
+ // Perform some transformations on IgniteRDD and print.
+ val squareAndRootPair = transformedValues.map { case (x, y) => (x, Math.sqrt(y.toDouble)) }
+
+ println(">>> Transforming values stored in Ignite Shared RDD...")
+
+ // Filter out pairs which square roots are less than 100 and
+ // take the first five elements from the transformed IgniteRDD and print them.
+ squareAndRootPair.filter(_._2 < 100.0).take(5).foreach(println)
+
+ println(">>> Executing SQL query over Ignite Shared RDD...")
+
+ // Execute a SQL query over the Ignite Shared RDD.
+ val df = transformedValues.sql("select _val from Integer where _val < 100 and _val > 9 ")
+
+ // Show ten rows from the result set.
+ df.show(10)
+
+ // Close IgniteContext on all workers.
+ igniteContext.close(true)
+
+ // Stop SparkContext.
+ sparkContext.stop()
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java
----------------------------------------------------------------------
diff --git a/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java b/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java
new file mode 100644
index 0000000..0fafb4d
--- /dev/null
+++ b/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.java8.examples;
+
+import org.apache.ignite.examples.java8.spark.SharedRDDExample;
+import org.junit.Test;
+
+/**
+ * SharedRDD examples self test.
+ */
+public class SharedRDDExampleSelfTest {
+ static final String[] EMPTY_ARGS = new String[0];
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSharedRDDExample() throws Exception {
+ SharedRDDExample.main(EMPTY_ARGS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java
----------------------------------------------------------------------
diff --git a/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java b/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java
index 949324c..c32339f 100644
--- a/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java
+++ b/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java
@@ -26,6 +26,7 @@ import org.apache.ignite.java8.examples.EventsExamplesMultiNodeSelfTest;
import org.apache.ignite.java8.examples.EventsExamplesSelfTest;
import org.apache.ignite.java8.examples.IndexingBridgeMethodTest;
import org.apache.ignite.java8.examples.MessagingExamplesSelfTest;
+import org.apache.ignite.java8.examples.SharedRDDExampleSelfTest;
import org.apache.ignite.testframework.GridTestUtils;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP;
@@ -49,6 +50,8 @@ public class IgniteExamplesJ8SelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(IndexingBridgeMethodTest.class));
suite.addTest(new TestSuite(CacheExamplesSelfTest.class));
suite.addTest(new TestSuite(BasicExamplesSelfTest.class));
+ suite.addTest(new TestSuite(SharedRDDExampleSelfTest.class));
+
// suite.addTest(new TestSuite(ContinuationExamplesSelfTest.class));
// suite.addTest(new TestSuite(ContinuousMapperExamplesSelfTest.class));
// suite.addTest(new TestSuite(DeploymentExamplesSelfTest.class));
http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala b/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala
index 94c41ad..28e509e 100644
--- a/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala
+++ b/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala
@@ -18,6 +18,7 @@
package org.apache.ignite.scalar.tests.examples
import org.apache.ignite.scalar.examples._
+import org.apache.ignite.scalar.examples.spark._
import org.apache.ignite.scalar.scalar
import org.apache.ignite.testframework.junits.common.GridAbstractExamplesTest
import org.scalatest.junit.JUnitSuiteLike
@@ -95,4 +96,9 @@ class ScalarExamplesSelfTest extends GridAbstractExamplesTest with JUnitSuiteLik
def testScalarSnowflakeSchemaExample() {
ScalarSnowflakeSchemaExample.main(EMPTY_ARGS)
}
+
+ /** */
+ def testScalarSharedRDDExample() {
+ ScalarSharedRDDExample.main(EMPTY_ARGS)
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
index 689a22d..d8a521b 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
@@ -51,6 +51,12 @@ class JavaIgniteContext[K, V](
})
}
+ def this(sc: JavaSparkContext, springUrl: String, standalone: Boolean) {
+ this(sc, new IgniteOutClosure[IgniteConfiguration] {
+ override def apply() = IgnitionEx.loadConfiguration(springUrl).get1()
+ }, standalone)
+ }
+
def fromCache(cacheName: String): JavaIgniteRDD[K, V] =
JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null, false))