You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2017/03/01 14:32:51 UTC
[08/50] [abbrv] ignite git commit: IGNITE-3710 Upgrade ignite-spark
module to Spark 2.0 (cherry picked from commit 8613c16)
IGNITE-3710 Upgrade ignite-spark module to Spark 2.0
(cherry picked from commit 8613c16)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/caf7b225
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/caf7b225
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/caf7b225
Branch: refs/heads/master
Commit: caf7b225c6ae55a530961a3b79ebde2368b6a24d
Parents: 5b94a7d
Author: Evgenii Zhuravlev <ez...@gridgain.com>
Authored: Mon Feb 20 19:24:42 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Mon Feb 20 19:26:17 2017 +0300
----------------------------------------------------------------------
.../examples/java8/spark/SharedRDDExample.java | 4 +-
modules/spark-2.10/pom.xml | 54 ++++++
modules/spark/pom.xml | 183 ++++++++++++++++++-
.../org/apache/ignite/spark/IgniteContext.scala | 22 ++-
.../spark/JavaEmbeddedIgniteRDDSelfTest.java | 10 +-
.../spark/JavaStandaloneIgniteRDDSelfTest.java | 22 +--
parent/pom.xml | 3 +-
7 files changed, 270 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java
index 392180d..5f74a94 100644
--- a/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java
+++ b/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java
@@ -26,7 +26,7 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
import scala.Tuple2;
import java.util.List;
@@ -99,7 +99,7 @@ public class SharedRDDExample {
System.out.println(">>> Executing SQL query over Ignite Shared RDD...");
// Execute SQL query over the Ignite RDD.
- DataFrame df = sharedRDD.sql("select _val from Integer where _key < 9");
+ Dataset df = sharedRDD.sql("select _val from Integer where _key < 9");
// Show the result of the execution.
df.show();
http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/modules/spark-2.10/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark-2.10/pom.xml b/modules/spark-2.10/pom.xml
index ed42227..58e2860 100644
--- a/modules/spark-2.10/pom.xml
+++ b/modules/spark-2.10/pom.xml
@@ -63,10 +63,64 @@
<dependency>
<groupId>org.apache.spark</groupId>
+ <artifactId>spark-unsafe_2.10</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson2.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-network-shuffle_2.10</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>${jackson2.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_2.10</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_2.10</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.json4s</groupId>
+ <artifactId>json4s-core_2.11</artifactId>
+ <version>3.5.0</version>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index d8cb894..140f637 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -52,7 +52,7 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
- <version>2.11.7</version>
+ <version>2.11.8</version>
</dependency>
<dependency>
@@ -63,11 +63,53 @@
<dependency>
<groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_2.11</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-network-common_2.11</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-network-shuffle_2.11</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson2.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>${jackson2.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.json4s</groupId>
+ <artifactId>json4s-core_2.11</artifactId>
+ <version>3.5.0</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-indexing</artifactId>
<version>${project.version}</version>
@@ -89,7 +131,7 @@
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
- <version>2.2.4</version>
+ <version>2.2.6</version>
<scope>test</scope>
<exclusions>
<exclusion>
@@ -98,6 +140,143 @@
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-unsafe_2.11</artifactId>
+ <version>${spark.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-launcher_2.11</artifactId>
+ <version>${spark.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_2.11</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-unsafe_2.10</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.5</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>4.0.29.Final</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ <version>2.20</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill_2.11</artifactId>
+ <version>0.8.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>3.0.2</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-json</artifactId>
+ <version>3.1.2</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.tomcat</groupId>
+ <artifactId>tomcat-servlet-api</artifactId>
+ <version>8.0.23</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet-core</artifactId>
+ <version>2.25</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson2.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-scala_2.11</artifactId>
+ <version>${jackson2.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-asm5-shaded</artifactId>
+ <version>4.5</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>lz4</artifactId>
+ <version>1.3.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>1.9.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>janino</artifactId>
+ <version>3.0.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>commons-compiler</artifactId>
+ <version>3.0.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index 04139d1..842c459 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -22,7 +22,8 @@ import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
import org.apache.ignite.internal.IgnitionEx
import org.apache.ignite.internal.util.IgniteUtils
import org.apache.spark.sql.SQLContext
-import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.SparkContext
+import org.apache.log4j.Logger
/**
* Ignite context.
@@ -34,7 +35,7 @@ class IgniteContext(
@transient val sparkContext: SparkContext,
cfgF: () \u21d2 IgniteConfiguration,
standalone: Boolean = true
- ) extends Serializable with Logging {
+ ) extends Serializable {
private val cfgClo = new Once(cfgF)
private val igniteHome = IgniteUtils.getIgniteHome
@@ -47,7 +48,7 @@ class IgniteContext(
if (workers <= 0)
throw new IllegalStateException("No Spark executors found to start Ignite nodes.")
- logInfo("Will start Ignite nodes on " + workers + " workers")
+ Logging.log.info("Will start Ignite nodes on " + workers + " workers")
// Start ignite server node on each worker in server mode.
sparkContext.parallelize(1 to workers, workers).foreachPartition(it \u21d2 ignite())
@@ -126,7 +127,7 @@ class IgniteContext(
val home = IgniteUtils.getIgniteHome
if (home == null && igniteHome != null) {
- logInfo("Setting IGNITE_HOME from driver not as it is not available on this worker: " + igniteHome)
+ Logging.log.info("Setting IGNITE_HOME from driver not as it is not available on this worker: " + igniteHome)
IgniteUtils.nullifyHomeDirectory()
@@ -143,7 +144,7 @@ class IgniteContext(
}
catch {
case e: IgniteException \u21d2
- logError("Failed to start Ignite.", e)
+ Logging.log.error("Failed to start Ignite.", e)
throw e
}
@@ -161,7 +162,7 @@ class IgniteContext(
sparkContext.getExecutorStorageStatus.length)
if (workers > 0) {
- logInfo("Will stop Ignite nodes on " + workers + " workers")
+ Logging.log.info("Will stop Ignite nodes on " + workers + " workers")
// Start ignite server node on each worker in server mode.
sparkContext.parallelize(1 to workers, workers).foreachPartition(it \u21d2 doClose())
@@ -200,3 +201,12 @@ private class Once(clo: () \u21d2 IgniteConfiguration) extends Serializable {
res
}
}
+
+/**
+ * Spark uses log4j by default. Using this logger in IgniteContext as well.
+ *
+ * This object is used to avoid problems with log4j serialization.
+ */
+object Logging extends Serializable {
+ @transient lazy val log = Logger.getLogger(classOf[IgniteContext])
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
index 0c4d556..53aff75 100644
--- a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
+++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
@@ -35,7 +35,7 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Column;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.Tuple2;
@@ -237,12 +237,12 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest {
cache.savePairs(sc.parallelize(F.range(0, 1001), GRID_CNT).mapToPair(INT_TO_ENTITY_F), true);
- DataFrame df =
+ Dataset<Row> df =
cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000);
df.printSchema();
- Row[] res = df.collect();
+ Row[] res = (Row[])df.collect();
assertEquals("Invalid result length", 1, res.length);
assertEquals("Invalid result", 50, res[0].get(0));
@@ -251,11 +251,11 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest {
Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000));
- DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp);
+ Dataset<Row> df0 = cache.sql("select id, name, salary from Entity").where(exp);
df.printSchema();
- Row[] res0 = df0.collect();
+ Row[] res0 = (Row[])df0.collect();
assertEquals("Invalid result length", 1, res0.length);
assertEquals("Invalid result", 50, res0[0].get(0));
http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java
index e9d97a4..1075f96 100644
--- a/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java
+++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java
@@ -35,7 +35,7 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Column;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.Tuple2;
@@ -216,12 +216,12 @@ public class JavaStandaloneIgniteRDDSelfTest extends GridCommonAbstractTest {
cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F));
- DataFrame df =
+ Dataset<Row> df =
cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000);
df.printSchema();
- Row[] res = df.collect();
+ Row[] res = (Row[])df.collect();
assertEquals("Invalid result length", 1, res.length);
assertEquals("Invalid result", 50, res[0].get(0));
@@ -230,11 +230,11 @@ public class JavaStandaloneIgniteRDDSelfTest extends GridCommonAbstractTest {
Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000));
- DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp);
+ Dataset<Row> df0 = cache.sql("select id, name, salary from Entity").where(exp);
df.printSchema();
- Row[] res0 = df0.collect();
+ Row[] res0 = (Row[])df0.collect();
assertEquals("Invalid result length", 1, res0.length);
assertEquals("Invalid result", 50, res0[0].get(0));
@@ -269,25 +269,25 @@ public class JavaStandaloneIgniteRDDSelfTest extends GridCommonAbstractTest {
Object val = GridTestUtils.getFieldValue(e, fieldName);
- DataFrame df = cache.sql(
+ Dataset<Row> df = cache.sql(
String.format("select %s from EntityTestAllTypeFields where %s = ?", fieldName, fieldName),
val);
if (val instanceof BigDecimal) {
- Object res = df.collect()[0].get(0);
+ Object res = ((Row[])df.collect())[0].get(0);
assertTrue(String.format("+++ Fail on %s field", fieldName),
((Comparable<BigDecimal>)val).compareTo((BigDecimal)res) == 0);
}
else if (val instanceof java.sql.Date)
assertEquals(String.format("+++ Fail on %s field", fieldName),
- val.toString(), df.collect()[0].get(0).toString());
+ val.toString(), ((Row[])df.collect())[0].get(0).toString());
else if (val.getClass().isArray())
assertTrue(String.format("+++ Fail on %s field", fieldName), 1 <= df.count());
else {
- assertTrue(String.format("+++ Fail on %s field", fieldName), df.collect().length > 0);
- assertTrue(String.format("+++ Fail on %s field", fieldName), df.collect()[0].size() > 0);
- assertEquals(String.format("+++ Fail on %s field", fieldName), val, df.collect()[0].get(0));
+ assertTrue(String.format("+++ Fail on %s field", fieldName), ((Row[])df.collect()).length > 0);
+ assertTrue(String.format("+++ Fail on %s field", fieldName), ((Row[])df.collect())[0].size() > 0);
+ assertEquals(String.format("+++ Fail on %s field", fieldName), val, ((Row[])df.collect())[0].get(0));
}
info(String.format("+++ Query on the filed: %s : %s passed", fieldName, f.getType().getSimpleName()));
http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 3514435..a4972d1 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -36,7 +36,7 @@
<properties>
<ignite.edition>fabric</ignite.edition>
<hadoop.version>2.4.1</hadoop.version>
- <spark.version>1.5.2</spark.version>
+ <spark.version>2.1.0</spark.version>
<spring.version>4.1.0.RELEASE</spring.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.build.timestamp.format>MMMM d yyyy</maven.build.timestamp.format>
@@ -99,7 +99,6 @@
<scala211.library.version>2.11.7</scala211.library.version>
<slf4j.version>1.7.7</slf4j.version>
<slf4j16.version>1.6.4</slf4j16.version>
- <spark.version>1.5.2</spark.version>
<spring.version>4.1.0.RELEASE</spring.version>
<spring41.osgi.feature.version>4.1.7.RELEASE_1</spring41.osgi.feature.version>
<tomcat.version>8.0.23</tomcat.version>