You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2017/03/01 14:32:51 UTC

[08/50] [abbrv] ignite git commit: IGNITE-3710 Upgrade ignite-spark module to Spark 2.0 (cherry picked from commit 8613c16)

IGNITE-3710 Upgrade ignite-spark module to Spark 2.0
(cherry picked from commit 8613c16)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/caf7b225
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/caf7b225
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/caf7b225

Branch: refs/heads/master
Commit: caf7b225c6ae55a530961a3b79ebde2368b6a24d
Parents: 5b94a7d
Author: Evgenii Zhuravlev <ez...@gridgain.com>
Authored: Mon Feb 20 19:24:42 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Feb 20 19:26:17 2017 +0300

----------------------------------------------------------------------
 .../examples/java8/spark/SharedRDDExample.java  |   4 +-
 modules/spark-2.10/pom.xml                      |  54 ++++++
 modules/spark/pom.xml                           | 183 ++++++++++++++++++-
 .../org/apache/ignite/spark/IgniteContext.scala |  22 ++-
 .../spark/JavaEmbeddedIgniteRDDSelfTest.java    |  10 +-
 .../spark/JavaStandaloneIgniteRDDSelfTest.java  |  22 +--
 parent/pom.xml                                  |   3 +-
 7 files changed, 270 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java
index 392180d..5f74a94 100644
--- a/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java
+++ b/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java
@@ -26,7 +26,7 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
 import scala.Tuple2;
 
 import java.util.List;
@@ -99,7 +99,7 @@ public class SharedRDDExample {
         System.out.println(">>> Executing SQL query over Ignite Shared RDD...");
 
         // Execute SQL query over the Ignite RDD.
-        DataFrame df = sharedRDD.sql("select _val from Integer where _key < 9");
+        Dataset df = sharedRDD.sql("select _val from Integer where _key < 9");
 
         // Show the result of the execution.
         df.show();

http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/modules/spark-2.10/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark-2.10/pom.xml b/modules/spark-2.10/pom.xml
index ed42227..58e2860 100644
--- a/modules/spark-2.10/pom.xml
+++ b/modules/spark-2.10/pom.xml
@@ -63,10 +63,64 @@
 
         <dependency>
             <groupId>org.apache.spark</groupId>
+            <artifactId>spark-unsafe_2.10</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_2.10</artifactId>
             <version>${spark.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson2.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-network-shuffle_2.10</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <version>${jackson2.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-tags_2.10</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_2.10</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-common</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.json4s</groupId>
+            <artifactId>json4s-core_2.11</artifactId>
+            <version>3.5.0</version>
+        </dependency>
+
         <!-- Test dependencies -->
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index d8cb894..140f637 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -52,7 +52,7 @@
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
-            <version>2.11.7</version>
+            <version>2.11.8</version>
         </dependency>
 
         <dependency>
@@ -63,11 +63,53 @@
 
         <dependency>
             <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_2.11</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_2.11</artifactId>
             <version>${spark.version}</version>
         </dependency>
 
         <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-network-common_2.11</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-network-shuffle_2.11</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson2.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <version>${jackson2.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.json4s</groupId>
+            <artifactId>json4s-core_2.11</artifactId>
+            <version>3.5.0</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-indexing</artifactId>
             <version>${project.version}</version>
@@ -89,7 +131,7 @@
         <dependency>
             <groupId>org.scalatest</groupId>
             <artifactId>scalatest_2.11</artifactId>
-            <version>2.2.4</version>
+            <version>2.2.6</version>
             <scope>test</scope>
             <exclusions>
                 <exclusion>
@@ -98,6 +140,143 @@
                 </exclusion>
             </exclusions>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-unsafe_2.11</artifactId>
+            <version>${spark.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-launcher_2.11</artifactId>
+            <version>${spark.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-tags_2.11</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-unsafe_2.10</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.5</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>4.0.29.Final</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.esotericsoftware.kryo</groupId>
+            <artifactId>kryo</artifactId>
+            <version>2.20</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.twitter</groupId>
+            <artifactId>chill_2.11</artifactId>
+            <version>0.8.1</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.codahale.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>3.0.2</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-json</artifactId>
+            <version>3.1.2</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tomcat</groupId>
+            <artifactId>tomcat-servlet-api</artifactId>
+            <version>8.0.23</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.containers</groupId>
+            <artifactId>jersey-container-servlet-core</artifactId>
+            <version>2.25</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson2.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            <artifactId>jackson-module-scala_2.11</artifactId>
+            <version>${jackson2.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.xbean</groupId>
+            <artifactId>xbean-asm5-shaded</artifactId>
+            <version>4.5</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>net.jpountz.lz4</groupId>
+            <artifactId>lz4</artifactId>
+            <version>1.3.0</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-hadoop</artifactId>
+            <version>1.9.0</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.codehaus.janino</groupId>
+            <artifactId>janino</artifactId>
+            <version>3.0.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.janino</groupId>
+            <artifactId>commons-compiler</artifactId>
+            <version>3.0.0</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index 04139d1..842c459 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -22,7 +22,8 @@ import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
 import org.apache.ignite.internal.IgnitionEx
 import org.apache.ignite.internal.util.IgniteUtils
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.SparkContext
+import org.apache.log4j.Logger
 
 /**
  * Ignite context.
@@ -34,7 +35,7 @@ class IgniteContext(
     @transient val sparkContext: SparkContext,
     cfgF: () \u21d2 IgniteConfiguration,
     standalone: Boolean = true
-    ) extends Serializable with Logging {
+    ) extends Serializable {
     private val cfgClo = new Once(cfgF)
 
     private val igniteHome = IgniteUtils.getIgniteHome
@@ -47,7 +48,7 @@ class IgniteContext(
         if (workers <= 0)
             throw new IllegalStateException("No Spark executors found to start Ignite nodes.")
 
-        logInfo("Will start Ignite nodes on " + workers + " workers")
+        Logging.log.info("Will start Ignite nodes on " + workers + " workers")
 
         // Start ignite server node on each worker in server mode.
         sparkContext.parallelize(1 to workers, workers).foreachPartition(it \u21d2 ignite())
@@ -126,7 +127,7 @@ class IgniteContext(
         val home = IgniteUtils.getIgniteHome
 
         if (home == null && igniteHome != null) {
-            logInfo("Setting IGNITE_HOME from driver not as it is not available on this worker: " + igniteHome)
+            Logging.log.info("Setting IGNITE_HOME from driver not as it is not available on this worker: " + igniteHome)
 
             IgniteUtils.nullifyHomeDirectory()
 
@@ -143,7 +144,7 @@ class IgniteContext(
         }
         catch {
             case e: IgniteException \u21d2
-                logError("Failed to start Ignite.", e)
+                Logging.log.error("Failed to start Ignite.", e)
 
                 throw e
         }
@@ -161,7 +162,7 @@ class IgniteContext(
                 sparkContext.getExecutorStorageStatus.length)
 
             if (workers > 0) {
-                logInfo("Will stop Ignite nodes on " + workers + " workers")
+                Logging.log.info("Will stop Ignite nodes on " + workers + " workers")
 
                 // Start ignite server node on each worker in server mode.
                 sparkContext.parallelize(1 to workers, workers).foreachPartition(it \u21d2 doClose())
@@ -200,3 +201,12 @@ private class Once(clo: () \u21d2 IgniteConfiguration) extends Serializable {
         res
     }
 }
+
+/**
+  * Spark uses log4j by default. Using this logger in IgniteContext as well.
+  *
+  * This object is used to avoid problems with log4j serialization.
+  */
+object Logging extends Serializable {
+    @transient lazy val log = Logger.getLogger(classOf[IgniteContext])
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
index 0c4d556..53aff75 100644
--- a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
+++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
@@ -35,7 +35,7 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.sql.Column;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import scala.Tuple2;
 
@@ -237,12 +237,12 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest {
 
             cache.savePairs(sc.parallelize(F.range(0, 1001), GRID_CNT).mapToPair(INT_TO_ENTITY_F), true);
 
-            DataFrame df =
+            Dataset<Row> df =
                 cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000);
 
             df.printSchema();
 
-            Row[] res = df.collect();
+            Row[] res = (Row[])df.collect();
 
             assertEquals("Invalid result length", 1, res.length);
             assertEquals("Invalid result", 50, res[0].get(0));
@@ -251,11 +251,11 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest {
 
             Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000));
 
-            DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp);
+            Dataset<Row> df0 = cache.sql("select id, name, salary from Entity").where(exp);
 
             df.printSchema();
 
-            Row[] res0 = df0.collect();
+            Row[] res0 = (Row[])df0.collect();
 
             assertEquals("Invalid result length", 1, res0.length);
             assertEquals("Invalid result", 50, res0[0].get(0));

http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java
index e9d97a4..1075f96 100644
--- a/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java
+++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java
@@ -35,7 +35,7 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.sql.Column;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import scala.Tuple2;
 
@@ -216,12 +216,12 @@ public class JavaStandaloneIgniteRDDSelfTest extends GridCommonAbstractTest {
 
             cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F));
 
-            DataFrame df =
+            Dataset<Row> df =
                 cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000);
 
             df.printSchema();
 
-            Row[] res = df.collect();
+            Row[] res = (Row[])df.collect();
 
             assertEquals("Invalid result length", 1, res.length);
             assertEquals("Invalid result", 50, res[0].get(0));
@@ -230,11 +230,11 @@ public class JavaStandaloneIgniteRDDSelfTest extends GridCommonAbstractTest {
 
             Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000));
 
-            DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp);
+            Dataset<Row> df0 = cache.sql("select id, name, salary from Entity").where(exp);
 
             df.printSchema();
 
-            Row[] res0 = df0.collect();
+            Row[] res0 = (Row[])df0.collect();
 
             assertEquals("Invalid result length", 1, res0.length);
             assertEquals("Invalid result", 50, res0[0].get(0));
@@ -269,25 +269,25 @@ public class JavaStandaloneIgniteRDDSelfTest extends GridCommonAbstractTest {
 
                 Object val = GridTestUtils.getFieldValue(e, fieldName);
 
-                DataFrame df = cache.sql(
+                Dataset<Row> df = cache.sql(
                     String.format("select %s from EntityTestAllTypeFields where %s = ?", fieldName, fieldName),
                     val);
 
                 if (val instanceof BigDecimal) {
-                    Object res = df.collect()[0].get(0);
+                    Object res = ((Row[])df.collect())[0].get(0);
 
                     assertTrue(String.format("+++ Fail on %s field", fieldName),
                         ((Comparable<BigDecimal>)val).compareTo((BigDecimal)res) == 0);
                 }
                 else if (val instanceof java.sql.Date)
                     assertEquals(String.format("+++ Fail on %s field", fieldName),
-                        val.toString(), df.collect()[0].get(0).toString());
+                        val.toString(), ((Row[])df.collect())[0].get(0).toString());
                 else if (val.getClass().isArray())
                     assertTrue(String.format("+++ Fail on %s field", fieldName), 1 <= df.count());
                 else {
-                    assertTrue(String.format("+++ Fail on %s field", fieldName), df.collect().length > 0);
-                    assertTrue(String.format("+++ Fail on %s field", fieldName), df.collect()[0].size() > 0);
-                    assertEquals(String.format("+++ Fail on %s field", fieldName), val, df.collect()[0].get(0));
+                    assertTrue(String.format("+++ Fail on %s field", fieldName), ((Row[])df.collect()).length > 0);
+                    assertTrue(String.format("+++ Fail on %s field", fieldName), ((Row[])df.collect())[0].size() > 0);
+                    assertEquals(String.format("+++ Fail on %s field", fieldName), val, ((Row[])df.collect())[0].get(0));
                 }
 
                 info(String.format("+++ Query on the filed: %s : %s passed", fieldName, f.getType().getSimpleName()));

http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 3514435..a4972d1 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -36,7 +36,7 @@
     <properties>
         <ignite.edition>fabric</ignite.edition>
         <hadoop.version>2.4.1</hadoop.version>
-        <spark.version>1.5.2</spark.version>
+        <spark.version>2.1.0</spark.version>
         <spring.version>4.1.0.RELEASE</spring.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <maven.build.timestamp.format>MMMM d yyyy</maven.build.timestamp.format>
@@ -99,7 +99,6 @@
         <scala211.library.version>2.11.7</scala211.library.version>
         <slf4j.version>1.7.7</slf4j.version>
         <slf4j16.version>1.6.4</slf4j16.version>
-        <spark.version>1.5.2</spark.version>
         <spring.version>4.1.0.RELEASE</spring.version>
         <spring41.osgi.feature.version>4.1.7.RELEASE_1</spring41.osgi.feature.version>
         <tomcat.version>8.0.23</tomcat.version>