You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/06/03 08:21:56 UTC
incubator-zeppelin git commit: [ZEPPELIN-23] Set version of default
spark interpreter build profile from 1.1 to 1.3
Repository: incubator-zeppelin
Updated Branches:
refs/heads/master fa61a054f -> a90908d72
[ZEPPELIN-23] Set version of default spark interpreter build profile from 1.1 to 1.3
I updated the version of spark used by default throughout the dependency listing to be 1.3.0 (to match the version used in the 1.3 profile).
Author: Lee moon soo <mo...@apache.org>
Author: Ilya Ganelin <il...@capitalone.com>
This patch had conflicts when merged, resolved by
Committer: Lee moon soo <mo...@apache.org>
Closes #71 from ilganeli/ZEPPELIN-23 and squashes the following commits:
d1d642e [Ilya Ganelin] Merge pull request #1 from Leemoonsoo/ZEPPELIN-23
0108781 [Lee moon soo] update isDataFrameSupported()
ac4e8bc [Lee moon soo] check spark version as a numerical number in tests
405da99 [Lee moon soo] Fix indentation
28dd7bf [Lee moon soo] Make unittest pass with spark-1.3 profile
cf2ed0c [Lee moon soo] Activate spark-1.3 profile by default. Set spark 1.3.1 as a default version of spark-1.3 profile
13bf6c1 [Lee moon soo] remove z.load() error for spark 1.2, spark 1.3
1402df2 [Ilya Ganelin] Updated default spark version to 1.3
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/a90908d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/a90908d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/a90908d7
Branch: refs/heads/master
Commit: a90908d7213c3fda5d0ac9e8f96c7a8bde458741
Parents: fa61a05
Author: Lee moon soo <mo...@apache.org>
Authored: Tue Jun 2 17:11:22 2015 -0700
Committer: Lee moon soo <mo...@apache.org>
Committed: Wed Jun 3 14:26:50 2015 +0900
----------------------------------------------------------------------
conf/log4j.properties | 1 +
pom.xml | 9 +-
.../zeppelin/spark/dep/DependencyResolver.java | 30 +++-
.../zeppelin/spark/SparkInterpreterTest.java | 129 +++++++------
.../zeppelin/spark/SparkSqlInterpreterTest.java | 180 +++++++++++--------
5 files changed, 212 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a90908d7/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/conf/log4j.properties b/conf/log4j.properties
index a7ef28b..b132ce1 100644
--- a/conf/log4j.properties
+++ b/conf/log4j.properties
@@ -21,6 +21,7 @@ log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
+log4j.appender.dailyfile.DatePattern=.yyyy-MM-dd
log4j.appender.dailyfile.Threshold = INFO
log4j.appender.dailyfile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.dailyfile.File = ${zeppelin.log.file}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a90908d7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 62fe12b..fcd1a25 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1397,7 +1397,7 @@
<id>spark-1.3</id>
<properties>
<akka.version>2.3.4-spark</akka.version>
- <spark.version>1.3.0</spark.version>
+ <spark.version>1.3.1</spark.version>
<mesos.version>0.21.0</mesos.version>
<hbase.version>0.98.7</hbase.version>
<hbase.artifact>hbase</hbase.artifact>
@@ -1419,17 +1419,18 @@
<snappy.version>1.1.1.6</snappy.version>
<mesos.version>0.21.0</mesos.version>
</properties>
-
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
<dependencies>
</dependencies>
-
</profile>
<profile>
<id>cassandra-spark-1.3</id>
<properties>
<akka.version>2.3.4-spark</akka.version>
- <spark.version>1.3.0</spark.version>
+ <spark.version>1.3.1</spark.version>
<mesos.version>0.21.0</mesos.version>
<hbase.version>0.98.7</hbase.version>
<hbase.artifact>hbase</hbase.artifact>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a90908d7/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java
index 06a4022..0702948 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java
@@ -143,8 +143,9 @@ public class DependencyResolver {
// Until spark 1.1.x
// check https://github.com/apache/spark/commit/191d7cf2a655d032f160b9fa181730364681d0e7
- private void updateRuntimeClassPath(URL[] urls) throws SecurityException, IllegalAccessException,
- IllegalArgumentException, InvocationTargetException, NoSuchMethodException {
+ private void updateRuntimeClassPath_1_x(URL[] urls) throws SecurityException,
+ IllegalAccessException, IllegalArgumentException,
+ InvocationTargetException, NoSuchMethodException {
ClassLoader cl = intp.classLoader().getParent();
Method addURL;
addURL = cl.getClass().getDeclaredMethod("addURL", new Class[] {URL.class});
@@ -154,6 +155,18 @@ public class DependencyResolver {
}
}
+ private void updateRuntimeClassPath_2_x(URL[] urls) throws SecurityException,
+ IllegalAccessException, IllegalArgumentException,
+ InvocationTargetException, NoSuchMethodException {
+ ClassLoader cl = intp.classLoader().getParent();
+ Method addURL;
+ addURL = cl.getClass().getDeclaredMethod("addNewUrl", new Class[] {URL.class});
+ addURL.setAccessible(true);
+ for (URL url : urls) {
+ addURL.invoke(cl, url);
+ }
+ }
+
private MergedClassPath<AbstractFile> mergeUrlsIntoClassPath(JavaPlatform platform, URL[] urls) {
IndexedSeq<ClassPath<AbstractFile>> entries =
((MergedClassPath<AbstractFile>) platform.classPath()).entries();
@@ -217,8 +230,11 @@ public class DependencyResolver {
intp.global().new Run();
- updateRuntimeClassPath(new URL[] {jarFile.toURI().toURL()});
- updateCompilerClassPath(new URL[] {jarFile.toURI().toURL()});
+ if (sc.version().startsWith("1.1")) {
+ updateRuntimeClassPath_1_x(new URL[] {jarFile.toURI().toURL()});
+ } else {
+ updateRuntimeClassPath_2_x(new URL[] {jarFile.toURI().toURL()});
+ }
if (addSparkContext) {
sc.addJar(jarFile.getAbsolutePath());
@@ -261,7 +277,11 @@ public class DependencyResolver {
}
intp.global().new Run();
- updateRuntimeClassPath(newClassPathList.toArray(new URL[0]));
+ if (sc.version().startsWith("1.1")) {
+ updateRuntimeClassPath_1_x(newClassPathList.toArray(new URL[0]));
+ } else {
+ updateRuntimeClassPath_2_x(newClassPathList.toArray(new URL[0]));
+ }
updateCompilerClassPath(newClassPathList.toArray(new URL[0]));
if (addSparkContext) {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a90908d7/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 87df793..c49f1e1 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -45,6 +45,21 @@ public class SparkInterpreterTest {
private InterpreterContext context;
private File tmpDir;
+
+ /**
+ * Get spark version number as a numerical value.
+ * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
+ */
+ public static int getSparkVersionNumber() {
+ if (repl == null) {
+ return 0;
+ }
+
+ String[] split = repl.getSparkContext().version().split(".");
+ int version = Integer.parseInt(split[0]) + Integer.parseInt(split[1]);
+ return version;
+ }
+
@Before
public void setUp() throws Exception {
tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis());
@@ -52,18 +67,19 @@ public class SparkInterpreterTest {
tmpDir.mkdirs();
- if (repl == null) {
- Properties p = new Properties();
+ if (repl == null) {
+ Properties p = new Properties();
- repl = new SparkInterpreter(p);
- repl.open();
- }
+ repl = new SparkInterpreter(p);
+ repl.open();
+ }
- InterpreterGroup intpGroup = new InterpreterGroup();
- context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
+ InterpreterGroup intpGroup = new InterpreterGroup();
+ context = new InterpreterContext("id", "title", "text",
+ new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
+ intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>());
- }
+ }
@After
public void tearDown() throws Exception {
@@ -83,52 +99,55 @@ public class SparkInterpreterTest {
}
}
- @Test
- public void testBasicIntp() {
- assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val a = 1\nval b = 2", context).code());
-
- // when interpret incomplete expression
- InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context);
- assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
- assertTrue(incomplete.message().length()>0); // expecting some error message
- /*
- assertEquals(1, repl.getValue("a"));
- assertEquals(2, repl.getValue("b"));
- repl.interpret("val ver = sc.version");
- assertNotNull(repl.getValue("ver"));
- assertEquals("HELLO\n", repl.interpret("println(\"HELLO\")").message());
- */
- }
-
- @Test
- public void testEndWithComment() {
- assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code());
- }
-
- @Test
- public void testSparkSql(){
- repl.interpret("case class Person(name:String, age:Int)\n", context);
- repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
- assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code());
-
- // create new interpreter
- Properties p = new Properties();
- SparkInterpreter repl2 = new SparkInterpreter(p);
- repl2.open();
-
- repl.interpret("case class Man(name:String, age:Int)", context);
- repl.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context);
- assertEquals(Code.SUCCESS, repl.interpret("man.take(3)", context).code());
- repl2.getSparkContext().stop();
- }
-
- @Test
- public void testReferencingUndefinedVal(){
- InterpreterResult result = repl.interpret("def category(min: Int) = {" +
- " if (0 <= value) \"error\"" +
- "}", context);
- assertEquals(Code.ERROR, result.code());
- }
+ @Test
+ public void testBasicIntp() {
+ assertEquals(InterpreterResult.Code.SUCCESS,
+ repl.interpret("val a = 1\nval b = 2", context).code());
+
+ // when interpret incomplete expression
+ InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context);
+ assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
+ assertTrue(incomplete.message().length() > 0); // expecting some error
+ // message
+ /*
+ * assertEquals(1, repl.getValue("a")); assertEquals(2, repl.getValue("b"));
+ * repl.interpret("val ver = sc.version");
+ * assertNotNull(repl.getValue("ver")); assertEquals("HELLO\n",
+ * repl.interpret("println(\"HELLO\")").message());
+ */
+ }
+
+ @Test
+ public void testEndWithComment() {
+ assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code());
+ }
+
+ @Test
+ public void testSparkSql(){
+ repl.interpret("case class Person(name:String, age:Int)\n", context);
+ repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
+ assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code());
+
+
+ if (getSparkVersionNumber() <= 11) { // spark 1.2 or later does not allow create multiple SparkContext in the same jvm by default.
+ // create new interpreter
+ Properties p = new Properties();
+ SparkInterpreter repl2 = new SparkInterpreter(p);
+ repl2.open();
+
+ repl.interpret("case class Man(name:String, age:Int)", context);
+ repl.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context);
+ assertEquals(Code.SUCCESS, repl.interpret("man.take(3)", context).code());
+ repl2.getSparkContext().stop();
+ }
+ }
+
+ @Test
+ public void testReferencingUndefinedVal() {
+ InterpreterResult result = repl.interpret("def category(min: Int) = {"
+ + " if (0 <= value) \"error\"" + "}", context);
+ assertEquals(Code.ERROR, result.code());
+ }
@Test
public void testZContextDependencyLoading() {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a90908d7/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index 27198b3..eaa0a8a 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -36,81 +36,115 @@ import org.junit.Test;
public class SparkSqlInterpreterTest {
- private SparkSqlInterpreter sql;
+ private SparkSqlInterpreter sql;
private SparkInterpreter repl;
private InterpreterContext context;
private InterpreterGroup intpGroup;
- @Before
- public void setUp() throws Exception {
- Properties p = new Properties();
-
- if (repl == null) {
-
- if (SparkInterpreterTest.repl == null) {
- repl = new SparkInterpreter(p);
- repl.open();
- SparkInterpreterTest.repl = repl;
- } else {
- repl = SparkInterpreterTest.repl;
- }
-
- sql = new SparkSqlInterpreter(p);
-
- intpGroup = new InterpreterGroup();
- intpGroup.add(repl);
- intpGroup.add(sql);
- sql.setInterpreterGroup(intpGroup);
- sql.open();
- }
- context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null),
- new LinkedList<InterpreterContextRunner>());
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test
- public void test() {
- repl.interpret("case class Test(name:String, age:Int)", context);
- repl.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context);
- repl.interpret("test.registerAsTable(\"test\")", context);
-
- InterpreterResult ret = sql.interpret("select name, age from test where age < 40", context);
- assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals(Type.TABLE, ret.type());
- assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message());
-
- assertEquals(InterpreterResult.Code.ERROR, sql.interpret("select wrong syntax", context).code());
- assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from people", context).code());
- }
-
- @Test
- public void testStruct(){
- repl.interpret("case class Person(name:String, age:Int)", context);
- repl.interpret("case class People(group:String, person:Person)", context);
- repl.interpret("val gr = sc.parallelize(Seq(People(\"g1\", Person(\"moon\",33)), People(\"g2\", Person(\"sun\",11))))", context);
- repl.interpret("gr.registerAsTable(\"gr\")", context);
- InterpreterResult ret = sql.interpret("select * from gr", context);
- assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
- }
-
- @Test
- public void test_null_value_in_row() {
- repl.interpret("import org.apache.spark.sql._", context);
- repl.interpret("def toInt(s:String): Any = {try { s.trim().toInt} catch {case e:Exception => null}}", context);
- repl.interpret("val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))", context);
- repl.interpret("val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))", context);
- repl.interpret("val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))", context);
- repl.interpret("val people = z.sqlContext.applySchema(raw, schema)", context);
- repl.interpret("people.registerTempTable(\"people\")", context);
-
- InterpreterResult ret = sql.interpret("select name, age from people where name = 'gates'", context);
- System.err.println("RET=" + ret.message());
- assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals(Type.TABLE, ret.type());
- assertEquals("name\tage\ngates\tnull\n", ret.message());
- }
+ @Before
+ public void setUp() throws Exception {
+ Properties p = new Properties();
+
+ if (repl == null) {
+
+ if (SparkInterpreterTest.repl == null) {
+ repl = new SparkInterpreter(p);
+ repl.open();
+ SparkInterpreterTest.repl = repl;
+ } else {
+ repl = SparkInterpreterTest.repl;
+ }
+
+ sql = new SparkSqlInterpreter(p);
+
+ intpGroup = new InterpreterGroup();
+ intpGroup.add(repl);
+ intpGroup.add(sql);
+ sql.setInterpreterGroup(intpGroup);
+ sql.open();
+ }
+ context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ boolean isDataFrameSupported() {
+ return SparkInterpreterTest.getSparkVersionNumber() >= 13;
+ }
+
+ @Test
+ public void test() {
+ repl.interpret("case class Test(name:String, age:Int)", context);
+ repl.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context);
+ if (isDataFrameSupported()) {
+ repl.interpret("test.toDF.registerTempTable(\"test\")", context);
+ } else {
+ repl.interpret("test.registerTempTable(\"test\")", context);
+ }
+
+ InterpreterResult ret = sql.interpret("select name, age from test where age < 40", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals(Type.TABLE, ret.type());
+ assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message());
+
+ assertEquals(InterpreterResult.Code.ERROR, sql.interpret("select wrong syntax", context).code());
+ assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from test", context).code());
+ }
+
+ @Test
+ public void testStruct() {
+ repl.interpret("case class Person(name:String, age:Int)", context);
+ repl.interpret("case class People(group:String, person:Person)", context);
+ repl.interpret(
+ "val gr = sc.parallelize(Seq(People(\"g1\", Person(\"moon\",33)), People(\"g2\", Person(\"sun\",11))))",
+ context);
+ if (isDataFrameSupported()) {
+ repl.interpret("gr.toDF.registerTempTable(\"gr\")", context);
+ } else {
+ repl.interpret("gr.registerTempTable(\"gr\")", context);
+ }
+
+ InterpreterResult ret = sql.interpret("select * from gr", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ }
+
+ @Test
+ public void test_null_value_in_row() {
+ repl.interpret("import org.apache.spark.sql._", context);
+ if (isDataFrameSupported()) {
+ repl.interpret(
+ "import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}",
+ context);
+ }
+ repl.interpret(
+ "def toInt(s:String): Any = {try { s.trim().toInt} catch {case e:Exception => null}}",
+ context);
+ repl.interpret(
+ "val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))",
+ context);
+ repl.interpret(
+ "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))",
+ context);
+ repl.interpret(
+ "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))",
+ context);
+ repl.interpret("val people = z.sqlContext.applySchema(raw, schema)",
+ context);
+ if (isDataFrameSupported()) {
+ repl.interpret("people.toDF.registerTempTable(\"people\")", context);
+ } else {
+ repl.interpret("people.registerTempTable(\"people\")", context);
+ }
+
+ InterpreterResult ret = sql.interpret(
+ "select name, age from people where name = 'gates'", context);
+ System.err.println("RET=" + ret.message());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals(Type.TABLE, ret.type());
+ assertEquals("name\tage\ngates\tnull\n", ret.message());
+ }
}