You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/06/03 08:21:56 UTC

incubator-zeppelin git commit: [ZEPPELIN-23] Set version of default spark interpreter build profile from 1.1 to 1.3

Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master fa61a054f -> a90908d72


[ZEPPELIN-23] Set version of default spark interpreter build profile from 1.1 to 1.3

I updated the version of spark used by default throughout the dependency listing to be 1.3.0 (to match the version used in the 1.3 profile).

Author: Lee moon soo <mo...@apache.org>
Author: Ilya Ganelin <il...@capitalone.com>

This patch had conflicts when merged, resolved by
Committer: Lee moon soo <mo...@apache.org>

Closes #71 from ilganeli/ZEPPELIN-23 and squashes the following commits:

d1d642e [Ilya Ganelin] Merge pull request #1 from Leemoonsoo/ZEPPELIN-23
0108781 [Lee moon soo] update isDataFrameSupported()
ac4e8bc [Lee moon soo] check spark version as a numerical number in tests
405da99 [Lee moon soo] Fix indentation
28dd7bf [Lee moon soo] Make unittest pass with spark-1.3 profile
cf2ed0c [Lee moon soo] Activate spark-1.3 profile by default. Set spark 1.3.1 as a default version of spark-1.3 profile
13bf6c1 [Lee moon soo] remove z.load() error for spark 1.2, spark 1.3
1402df2 [Ilya Ganelin] Updated default spark version to 1.3


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/a90908d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/a90908d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/a90908d7

Branch: refs/heads/master
Commit: a90908d7213c3fda5d0ac9e8f96c7a8bde458741
Parents: fa61a05
Author: Lee moon soo <mo...@apache.org>
Authored: Tue Jun 2 17:11:22 2015 -0700
Committer: Lee moon soo <mo...@apache.org>
Committed: Wed Jun 3 14:26:50 2015 +0900

----------------------------------------------------------------------
 conf/log4j.properties                           |   1 +
 pom.xml                                         |   9 +-
 .../zeppelin/spark/dep/DependencyResolver.java  |  30 +++-
 .../zeppelin/spark/SparkInterpreterTest.java    | 129 +++++++------
 .../zeppelin/spark/SparkSqlInterpreterTest.java | 180 +++++++++++--------
 5 files changed, 212 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a90908d7/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/conf/log4j.properties b/conf/log4j.properties
index a7ef28b..b132ce1 100644
--- a/conf/log4j.properties
+++ b/conf/log4j.properties
@@ -21,6 +21,7 @@ log4j.appender.stdout = org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
 
+log4j.appender.dailyfile.DatePattern=.yyyy-MM-dd
 log4j.appender.dailyfile.Threshold = INFO
 log4j.appender.dailyfile = org.apache.log4j.DailyRollingFileAppender
 log4j.appender.dailyfile.File = ${zeppelin.log.file}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a90908d7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 62fe12b..fcd1a25 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1397,7 +1397,7 @@
       <id>spark-1.3</id>
       <properties>
         <akka.version>2.3.4-spark</akka.version>
-        <spark.version>1.3.0</spark.version>
+        <spark.version>1.3.1</spark.version>
         <mesos.version>0.21.0</mesos.version>
         <hbase.version>0.98.7</hbase.version>
         <hbase.artifact>hbase</hbase.artifact>
@@ -1419,17 +1419,18 @@
         <snappy.version>1.1.1.6</snappy.version>
         <mesos.version>0.21.0</mesos.version>
       </properties>
-
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
       <dependencies>
       </dependencies>
-
     </profile>
 
     <profile>
       <id>cassandra-spark-1.3</id>
       <properties>
         <akka.version>2.3.4-spark</akka.version>
-        <spark.version>1.3.0</spark.version>
+        <spark.version>1.3.1</spark.version>
         <mesos.version>0.21.0</mesos.version>
         <hbase.version>0.98.7</hbase.version>
         <hbase.artifact>hbase</hbase.artifact>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a90908d7/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java
index 06a4022..0702948 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java
@@ -143,8 +143,9 @@ public class DependencyResolver {
 
   // Until spark 1.1.x
   // check https://github.com/apache/spark/commit/191d7cf2a655d032f160b9fa181730364681d0e7
-  private void updateRuntimeClassPath(URL[] urls) throws SecurityException, IllegalAccessException,
-      IllegalArgumentException, InvocationTargetException, NoSuchMethodException {
+  private void updateRuntimeClassPath_1_x(URL[] urls) throws SecurityException,
+      IllegalAccessException, IllegalArgumentException,
+      InvocationTargetException, NoSuchMethodException {
     ClassLoader cl = intp.classLoader().getParent();
     Method addURL;
     addURL = cl.getClass().getDeclaredMethod("addURL", new Class[] {URL.class});
@@ -154,6 +155,18 @@ public class DependencyResolver {
     }
   }
 
+  private void updateRuntimeClassPath_2_x(URL[] urls) throws SecurityException,
+      IllegalAccessException, IllegalArgumentException,
+      InvocationTargetException, NoSuchMethodException {
+    ClassLoader cl = intp.classLoader().getParent();
+    Method addURL;
+    addURL = cl.getClass().getDeclaredMethod("addNewUrl", new Class[] {URL.class});
+    addURL.setAccessible(true);
+    for (URL url : urls) {
+      addURL.invoke(cl, url);
+    }
+  }
+
   private MergedClassPath<AbstractFile> mergeUrlsIntoClassPath(JavaPlatform platform, URL[] urls) {
     IndexedSeq<ClassPath<AbstractFile>> entries =
         ((MergedClassPath<AbstractFile>) platform.classPath()).entries();
@@ -217,8 +230,11 @@ public class DependencyResolver {
 
     intp.global().new Run();
 
-    updateRuntimeClassPath(new URL[] {jarFile.toURI().toURL()});
-    updateCompilerClassPath(new URL[] {jarFile.toURI().toURL()});
+    if (sc.version().startsWith("1.1")) {
+      updateRuntimeClassPath_1_x(new URL[] {jarFile.toURI().toURL()});
+    } else {
+      updateRuntimeClassPath_2_x(new URL[] {jarFile.toURI().toURL()});
+    }
 
     if (addSparkContext) {
       sc.addJar(jarFile.getAbsolutePath());
@@ -261,7 +277,11 @@ public class DependencyResolver {
     }
 
     intp.global().new Run();
-    updateRuntimeClassPath(newClassPathList.toArray(new URL[0]));
+    if (sc.version().startsWith("1.1")) {
+      updateRuntimeClassPath_1_x(newClassPathList.toArray(new URL[0]));
+    } else {
+      updateRuntimeClassPath_2_x(newClassPathList.toArray(new URL[0]));
+    }
     updateCompilerClassPath(newClassPathList.toArray(new URL[0]));
 
     if (addSparkContext) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a90908d7/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 87df793..c49f1e1 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -45,6 +45,21 @@ public class SparkInterpreterTest {
   private InterpreterContext context;
   private File tmpDir;
 
+
+  /**
+   * Get spark version number as a numerical value.
+   * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
+   */
+  public static int getSparkVersionNumber() {
+    if (repl == null) {
+      return 0;
+    }
+
+    String[] split = repl.getSparkContext().version().split(".");
+    int version = Integer.parseInt(split[0]) + Integer.parseInt(split[1]);
+    return version;
+  }
+
   @Before
   public void setUp() throws Exception {
     tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis());
@@ -52,18 +67,19 @@ public class SparkInterpreterTest {
 
     tmpDir.mkdirs();
 
-	  if (repl == null) {
-		  Properties p = new Properties();
+    if (repl == null) {
+      Properties p = new Properties();
 
-	    repl = new SparkInterpreter(p);
-  	  repl.open();
-	  }
+      repl = new SparkInterpreter(p);
+      repl.open();
+    }
 
-	  InterpreterGroup intpGroup = new InterpreterGroup();
-    context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
-        new AngularObjectRegistry(intpGroup.getId(), null),
+    InterpreterGroup intpGroup = new InterpreterGroup();
+    context = new InterpreterContext("id", "title", "text",
+        new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
+            intpGroup.getId(), null),
         new LinkedList<InterpreterContextRunner>());
-	}
+  }
 
   @After
   public void tearDown() throws Exception {
@@ -83,52 +99,55 @@ public class SparkInterpreterTest {
     }
   }
 
-	@Test
-	public void testBasicIntp() {
-		assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val a = 1\nval b = 2", context).code());
-
-		// when interpret incomplete expression
-		InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context);
-		assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
-		assertTrue(incomplete.message().length()>0); // expecting some error message
-		/*
-		assertEquals(1, repl.getValue("a"));
-		assertEquals(2, repl.getValue("b"));
-		repl.interpret("val ver = sc.version");
-		assertNotNull(repl.getValue("ver"));
-		assertEquals("HELLO\n", repl.interpret("println(\"HELLO\")").message());
-		*/
-	}
-
-	@Test
-	public void testEndWithComment() {
-		assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code());
-	}
-
-	@Test
-	public void testSparkSql(){
-		repl.interpret("case class Person(name:String, age:Int)\n", context);
-		repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
-		assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code());
-
-		// create new interpreter
-		Properties p = new Properties();
-		SparkInterpreter repl2 = new SparkInterpreter(p);
-		repl2.open();
-
-		repl.interpret("case class Man(name:String, age:Int)", context);
-		repl.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context);
-		assertEquals(Code.SUCCESS, repl.interpret("man.take(3)", context).code());
-		repl2.getSparkContext().stop();
-	}
-
-	@Test
-	public void testReferencingUndefinedVal(){
-		InterpreterResult result = repl.interpret("def category(min: Int) = {" +
-				       "    if (0 <= value) \"error\"" +
-                       "}", context);
-		assertEquals(Code.ERROR, result.code());
-	}
+  @Test
+  public void testBasicIntp() {
+    assertEquals(InterpreterResult.Code.SUCCESS,
+        repl.interpret("val a = 1\nval b = 2", context).code());
+
+    // when interpret incomplete expression
+    InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context);
+    assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
+    assertTrue(incomplete.message().length() > 0); // expecting some error
+                                                   // message
+    /*
+     * assertEquals(1, repl.getValue("a")); assertEquals(2, repl.getValue("b"));
+     * repl.interpret("val ver = sc.version");
+     * assertNotNull(repl.getValue("ver")); assertEquals("HELLO\n",
+     * repl.interpret("println(\"HELLO\")").message());
+     */
+  }
+
+  @Test
+  public void testEndWithComment() {
+    assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code());
+  }
+
+  @Test
+  public void testSparkSql(){
+    repl.interpret("case class Person(name:String, age:Int)\n", context);
+    repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
+    assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code());
+
+
+    if (getSparkVersionNumber() <= 11) { // spark 1.2 or later does not allow create multiple SparkContext in the same jvm by default.
+    // create new interpreter
+    Properties p = new Properties();
+    SparkInterpreter repl2 = new SparkInterpreter(p);
+    repl2.open();
+
+    repl.interpret("case class Man(name:String, age:Int)", context);
+    repl.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context);
+    assertEquals(Code.SUCCESS, repl.interpret("man.take(3)", context).code());
+    repl2.getSparkContext().stop();
+    }
+  }
+
+  @Test
+  public void testReferencingUndefinedVal() {
+    InterpreterResult result = repl.interpret("def category(min: Int) = {"
+        + "    if (0 <= value) \"error\"" + "}", context);
+    assertEquals(Code.ERROR, result.code());
+  }
 
   @Test
   public void testZContextDependencyLoading() {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a90908d7/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index 27198b3..eaa0a8a 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -36,81 +36,115 @@ import org.junit.Test;
 
 public class SparkSqlInterpreterTest {
 
-	private SparkSqlInterpreter sql;
+  private SparkSqlInterpreter sql;
   private SparkInterpreter repl;
   private InterpreterContext context;
   private InterpreterGroup intpGroup;
 
-	@Before
-	public void setUp() throws Exception {
-		Properties p = new Properties();
-
-		if (repl == null) {
-
-		  if (SparkInterpreterTest.repl == null) {
-		    repl = new SparkInterpreter(p);
-		    repl.open();
-		    SparkInterpreterTest.repl = repl;
-		  } else {
-		    repl = SparkInterpreterTest.repl;
-		  }
-
-  		sql = new SparkSqlInterpreter(p);
-
-  		intpGroup = new InterpreterGroup();
-		  intpGroup.add(repl);
-		  intpGroup.add(sql);
-		  sql.setInterpreterGroup(intpGroup);
-		  sql.open();
-		}
-		context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
-		    new AngularObjectRegistry(intpGroup.getId(), null),
-		    new LinkedList<InterpreterContextRunner>());
-	}
-
-	@After
-	public void tearDown() throws Exception {
-	}
-
-	@Test
-	public void test() {
-		repl.interpret("case class Test(name:String, age:Int)", context);
-		repl.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context);
-		repl.interpret("test.registerAsTable(\"test\")", context);
-
-		InterpreterResult ret = sql.interpret("select name, age from test where age < 40", context);
-		assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-		assertEquals(Type.TABLE, ret.type());
-		assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message());
-
-	  assertEquals(InterpreterResult.Code.ERROR, sql.interpret("select wrong syntax", context).code());
-		assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from people", context).code());
-	}
-
-	@Test
-	public void testStruct(){
-		repl.interpret("case class Person(name:String, age:Int)", context);
-		repl.interpret("case class People(group:String, person:Person)", context);
-		repl.interpret("val gr = sc.parallelize(Seq(People(\"g1\", Person(\"moon\",33)), People(\"g2\", Person(\"sun\",11))))", context);
-		repl.interpret("gr.registerAsTable(\"gr\")", context);
-		InterpreterResult ret = sql.interpret("select * from gr", context);
-		assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-	}
-
-	@Test
-	public void test_null_value_in_row() {
-		repl.interpret("import org.apache.spark.sql._", context);
-		repl.interpret("def toInt(s:String): Any = {try { s.trim().toInt} catch {case e:Exception => null}}", context);
-		repl.interpret("val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))", context);
-		repl.interpret("val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))", context);
-		repl.interpret("val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))", context);
-		repl.interpret("val people = z.sqlContext.applySchema(raw, schema)", context);
-		repl.interpret("people.registerTempTable(\"people\")", context);
-
-		InterpreterResult ret = sql.interpret("select name, age from people where name = 'gates'", context);
-		System.err.println("RET=" + ret.message());
-		assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-		assertEquals(Type.TABLE, ret.type());
-		assertEquals("name\tage\ngates\tnull\n", ret.message());
-	}
+  @Before
+  public void setUp() throws Exception {
+    Properties p = new Properties();
+
+    if (repl == null) {
+
+      if (SparkInterpreterTest.repl == null) {
+        repl = new SparkInterpreter(p);
+        repl.open();
+        SparkInterpreterTest.repl = repl;
+      } else {
+        repl = SparkInterpreterTest.repl;
+      }
+
+    sql = new SparkSqlInterpreter(p);
+
+    intpGroup = new InterpreterGroup();
+      intpGroup.add(repl);
+      intpGroup.add(sql);
+      sql.setInterpreterGroup(intpGroup);
+      sql.open();
+    }
+    context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
+        new AngularObjectRegistry(intpGroup.getId(), null),
+        new LinkedList<InterpreterContextRunner>());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  boolean isDataFrameSupported() {
+    return SparkInterpreterTest.getSparkVersionNumber() >= 13;
+  }
+
+  @Test
+  public void test() {
+    repl.interpret("case class Test(name:String, age:Int)", context);
+    repl.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context);
+    if (isDataFrameSupported()) {
+      repl.interpret("test.toDF.registerTempTable(\"test\")", context);
+    } else {
+      repl.interpret("test.registerTempTable(\"test\")", context);
+    }
+
+    InterpreterResult ret = sql.interpret("select name, age from test where age < 40", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(Type.TABLE, ret.type());
+    assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message());
+
+    assertEquals(InterpreterResult.Code.ERROR, sql.interpret("select wrong syntax", context).code());
+    assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from test", context).code());
+  }
+
+  @Test
+  public void testStruct() {
+    repl.interpret("case class Person(name:String, age:Int)", context);
+    repl.interpret("case class People(group:String, person:Person)", context);
+    repl.interpret(
+        "val gr = sc.parallelize(Seq(People(\"g1\", Person(\"moon\",33)), People(\"g2\", Person(\"sun\",11))))",
+        context);
+    if (isDataFrameSupported()) {
+      repl.interpret("gr.toDF.registerTempTable(\"gr\")", context);
+    } else {
+      repl.interpret("gr.registerTempTable(\"gr\")", context);
+    }
+
+    InterpreterResult ret = sql.interpret("select * from gr", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+  }
+
+  @Test
+  public void test_null_value_in_row() {
+    repl.interpret("import org.apache.spark.sql._", context);
+    if (isDataFrameSupported()) {
+      repl.interpret(
+          "import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}",
+          context);
+    }
+    repl.interpret(
+        "def toInt(s:String): Any = {try { s.trim().toInt} catch {case e:Exception => null}}",
+        context);
+    repl.interpret(
+        "val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))",
+        context);
+    repl.interpret(
+        "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))",
+        context);
+    repl.interpret(
+        "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))",
+        context);
+    repl.interpret("val people = z.sqlContext.applySchema(raw, schema)",
+        context);
+    if (isDataFrameSupported()) {
+      repl.interpret("people.toDF.registerTempTable(\"people\")", context);
+    } else {
+      repl.interpret("people.registerTempTable(\"people\")", context);
+    }
+
+    InterpreterResult ret = sql.interpret(
+        "select name, age from people where name = 'gates'", context);
+    System.err.println("RET=" + ret.message());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(Type.TABLE, ret.type());
+    assertEquals("name\tage\ngates\tnull\n", ret.message());
+  }
 }