You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lb...@apache.org on 2016/05/06 18:46:25 UTC
[1/2] incubator-toree git commit: [TOREE 278]: Replaced RDD Magic
With DataFrame Added DataFrame magic to replace RDD magic Added support to
output DataFrames in different types
Repository: incubator-toree
Updated Branches:
refs/heads/master 31b2039ee -> 4463bf460
[TOREE 278]: Replaced RDD Magic With DataFrame
Added DataFrame magic to replace RDD magic
Added support to output DataFrames in different types
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/5a2b79e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/5a2b79e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/5a2b79e2
Branch: refs/heads/master
Commit: 5a2b79e25e75f24c64780361147aeae40bc13f47
Parents: 31b2039
Author: Corey A. Stubbs <cs...@us.ibm.com>
Authored: Wed May 4 15:06:00 2016 -0500
Committer: Corey A. Stubbs <cs...@us.ibm.com>
Committed: Fri May 6 12:58:36 2016 -0500
----------------------------------------------------------------------
etc/examples/notebooks/magic-tutorial.ipynb | 156 +++++++++++++++
.../apache/toree/magic/builtin/Dataframe.scala | 149 ++++++++++++++
.../org/apache/toree/magic/builtin/RDD.scala | 64 ------
.../apache/toree/utils/DataFrameConverter.scala | 76 +++++++
.../org/apache/toree/utils/json/RddToJson.scala | 42 ----
.../toree/magic/builtin/DataFrameSpec.scala | 196 +++++++++++++++++++
.../apache/toree/magic/builtin/RDDSpec.scala | 118 -----------
.../toree/utils/DataFrameConverterSpec.scala | 79 ++++++++
.../apache/toree/utils/json/RddToJsonSpec.scala | 56 ------
9 files changed, 656 insertions(+), 280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/etc/examples/notebooks/magic-tutorial.ipynb
----------------------------------------------------------------------
diff --git a/etc/examples/notebooks/magic-tutorial.ipynb b/etc/examples/notebooks/magic-tutorial.ipynb
index 8725868..3d3ca28 100644
--- a/etc/examples/notebooks/magic-tutorial.ipynb
+++ b/etc/examples/notebooks/magic-tutorial.ipynb
@@ -514,6 +514,162 @@
},
{
"cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### %%DataFrame\n",
+ "The `%%DataFrame` magic is used to convert a Spark SQL DataFrame into various formats. Currently, `json`, `html`, and `csv` are supported. The magic takes an expression, which evauluates to a dataframe, to perform the conversion. So, we first need to create a DataFrame object for reference."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {
+ "collapsed": true
+ },
+ "outputs": [],
+ "source": [
+ "case class DFRecord(key: String, value: Int)\n",
+ "val sqlc = sqlContext\n",
+ "import sqlc.implicits._\n",
+ "val df = sc.parallelize(1 to 10).map(x => DFRecord(x.toString, x)).toDF()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The default output is `html`"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {
+ "collapsed": false
+ },
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "%%dataframe [arguments]\n",
+ "DATAFRAME_CODE\n",
+ "\n",
+ "DATAGRAME_CODE can be any numbered lines of code, as long as the\n",
+ "last line is a reference to a variable which is a DataFrame.\n",
+ " Option Description \n",
+ "------ ----------- \n",
+ "--help Displays the help and usage text for \n",
+ " this magic. \n",
+ "--limit The type of the output: html \n",
+ " (default), csv, json (default: 10) \n",
+ "--output The type of the output: html \n",
+ " (default), csv, json (default: html)\n"
+ ]
+ },
+ "execution_count": 1,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "%%dataframe"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {
+ "collapsed": false
+ },
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<table><tr><th>key</th><th>value</th></tr><tr><td>1</td><td>1</td></tr><tr><td>2</td><td>2</td></tr><tr><td>3</td><td>3</td></tr><tr><td>4</td><td>4</td></tr><tr><td>5</td><td>5</td></tr><tr><td>6</td><td>6</td></tr><tr><td>7</td><td>7</td></tr><tr><td>8</td><td>8</td></tr><tr><td>9</td><td>9</td></tr><tr><td>10</td><td>10</td></tr></table>"
+ ]
+ },
+ "execution_count": 2,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "%%dataframe\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "You can specify the `--output` argument to change the output type."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {
+ "collapsed": false
+ },
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "key,value\n",
+ "1,1\n",
+ "2,2\n",
+ "3,3\n",
+ "4,4\n",
+ "5,5\n",
+ "6,6\n",
+ "7,7\n",
+ "8,8\n",
+ "9,9\n",
+ "10,10"
+ ]
+ },
+ "execution_count": 3,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "%%dataframe --output=csv\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "There is also an option to limit the number of records returned."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {
+ "collapsed": false
+ },
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<table><tr><th>key</th><th>value</th></tr><tr><td>1</td><td>1</td></tr><tr><td>2</td><td>2</td></tr><tr><td>3</td><td>3</td></tr></table>"
+ ]
+ },
+ "execution_count": 4,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "%%dataframe --limit=3\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
"metadata": {
"collapsed": true
},
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/main/scala/org/apache/toree/magic/builtin/Dataframe.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/Dataframe.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/Dataframe.scala
new file mode 100644
index 0000000..3f00f82
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/Dataframe.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.toree.magic.builtin
+
+import java.io.{PrintStream, StringWriter}
+
+import org.apache.toree.interpreter.{ExecuteAborted, ExecuteError, ExecuteFailure, Results}
+import org.apache.toree.kernel.protocol.v5._
+import org.apache.toree.magic._
+import org.apache.toree.magic.dependencies.{IncludeKernelInterpreter, IncludeOutputStream}
+import org.apache.toree.plugins.annotations.{Event, Init}
+import org.apache.toree.utils.{ArgumentParsingSupport, DataFrameConverter, LogLike}
+
+import scala.util.Try
+
+
+class DFConversionException extends Exception{}
+
+
+object DataFrameResponses {
+ val MagicAborted = s"${classOf[DataFrame].getSimpleName} magic aborted!"
+
+ def ErrorMessage(outputType: String, error: String) = {
+ s"An error occurred converting DataFrame to ${outputType}.\n${error}"
+ }
+
+ def NoVariableFound(name: String) = {
+ s"No variable found with the name ${name}!"
+ }
+
+ val Incomplete = "DataFrame code was an incomplete code snippet"
+
+ val Usage =
+ """%%dataframe [arguments]
+ |DATAFRAME_CODE
+ |
+ |DATAFRAME_CODE can be any numbered lines of code, as long as the
+ |last line is a reference to a variable which is a DataFrame.
+ """.stripMargin
+}
+
+class DataFrame extends CellMagic with IncludeKernelInterpreter
+ with IncludeOutputStream with ArgumentParsingSupport with LogLike {
+ private var _dataFrameConverter: DataFrameConverter = _
+ private val outputTypeMap = Map[String, String](
+ "html" -> MIMEType.TextHtml,
+ "csv" -> MIMEType.PlainText,
+ "json" -> MIMEType.ApplicationJson
+ )
+
+ @Init def initMethod(dataFrameConverter: DataFrameConverter) = {
+ _dataFrameConverter = dataFrameConverter
+ }
+ private def printStream = new PrintStream(outputStream)
+
+ private val _outputType = parser.accepts(
+ "output", "The type of the output: html, csv, json"
+ ).withRequiredArg().defaultsTo("html")
+
+ private val _limit = parser.accepts(
+ "limit", "The number of records to return"
+ ).withRequiredArg().defaultsTo("10")
+
+ private def outputType(): String = {
+ _outputType.getOrElse("html")
+ }
+ private def limit(): Int = {
+ _limit.getOrElse("10").toInt
+ }
+
+ private def outputTypeToMimeType(): String = {
+ outputTypeMap.getOrElse(outputType, MIMEType.PlainText)
+ }
+
+ private def convertToJson(rddCode: String): CellMagicOutput = {
+ val (result, message) = kernelInterpreter.interpret(rddCode)
+ result match {
+ case Results.Success =>
+ val rddVarName = kernelInterpreter.lastExecutionVariableName.get
+ kernelInterpreter.read(rddVarName).map(variableVal => {
+ _dataFrameConverter.convert(
+ variableVal.asInstanceOf[org.apache.spark.sql.DataFrame],
+ outputType,
+ limit
+ ).map(output =>
+ CellMagicOutput(outputTypeToMimeType -> output)
+ ).get
+ }).getOrElse(CellMagicOutput(MIMEType.PlainText -> DataFrameResponses.NoVariableFound(rddVarName)))
+ case Results.Aborted =>
+ logger.error(DataFrameResponses.ErrorMessage(outputType, DataFrameResponses.MagicAborted))
+ CellMagicOutput(
+ MIMEType.PlainText -> DataFrameResponses.ErrorMessage(outputType, DataFrameResponses.MagicAborted)
+ )
+ case Results.Error =>
+ val error = message.right.get.asInstanceOf[ExecuteError]
+ val errorMessage = DataFrameResponses.ErrorMessage(outputType, error.value)
+ logger.error(errorMessage)
+ CellMagicOutput(MIMEType.PlainText -> errorMessage)
+ case Results.Incomplete =>
+ logger.error(DataFrameResponses.Incomplete)
+ CellMagicOutput(MIMEType.PlainText -> DataFrameResponses.Incomplete)
+ }
+ }
+
+ private def helpToCellMagicOutput(optionalException: Option[Exception] = None): CellMagicOutput = {
+ val stringWriter = new StringWriter()
+ stringWriter.append(optionalException.map(e => {
+ s"ERROR: ${e.getMessage}\n"
+ }).getOrElse(""))
+ stringWriter.write(DataFrameResponses.Usage)
+ parser.printHelpOn(stringWriter)
+ CellMagicOutput(MIMEType.PlainText -> stringWriter.toString)
+ }
+
+ @Event(name = "dataframe")
+ override def execute(code: String): CellMagicOutput = {
+ val lines = code.trim.split("\n")
+ Try({
+ val res: CellMagicOutput = if (lines.length == 1 && lines.head.length == 0){
+ helpToCellMagicOutput()
+ } else if (lines.length == 1) {
+ parseArgs("")
+ convertToJson(lines.head)
+ } else {
+ parseArgs(lines.head)
+ convertToJson(lines.drop(1).reduce(_ + _))
+ }
+ res
+ }).recover({
+ case e: Exception =>
+ helpToCellMagicOutput(Some(e))
+ }).get
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala
deleted file mode 100644
index b165ac8..0000000
--- a/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.toree.magic.builtin
-
-import org.apache.toree.interpreter.{ExecuteFailure, Results, ExecuteAborted, ExecuteError}
-import org.apache.toree.kernel.protocol.v5.MIMEType
-import org.apache.toree.magic._
-import org.apache.toree.magic.dependencies.{IncludeKernelInterpreter, IncludeInterpreter}
-import org.apache.toree.utils.LogLike
-import org.apache.toree.utils.json.RddToJson
-import org.apache.spark.sql.SchemaRDD
-import org.apache.toree.plugins.annotations.Event
-
-/**
- * Temporary magic to show an RDD as JSON
- */
-class RDD extends CellMagic with IncludeKernelInterpreter with LogLike {
-
- private def convertToJson(code: String) = {
- val (result, message) = kernelInterpreter.interpret(code)
- result match {
- case Results.Success =>
- val rddVarName = kernelInterpreter.lastExecutionVariableName.getOrElse("")
- kernelInterpreter.read(rddVarName).map(rddVal => {
- try{
- CellMagicOutput(MIMEType.ApplicationJson -> RddToJson.convert(rddVal.asInstanceOf[SchemaRDD]))
- } catch {
- case _: Throwable =>
- CellMagicOutput(MIMEType.PlainText -> s"Could note convert RDD to JSON: ${rddVarName}->${rddVal}")
- }
- }).getOrElse(CellMagicOutput(MIMEType.PlainText -> "No RDD Value found!"))
- case _ =>
- val errorMessage = message.right.toOption match {
- case Some(executeFailure) => executeFailure match {
- case _: ExecuteAborted => throw new Exception("RDD magic aborted!")
- case executeError: ExecuteError => throw new Exception(executeError.value)
- }
- case _ => "No error information available!"
- }
- logger.error(s"Error retrieving RDD value: ${errorMessage}")
- CellMagicOutput(MIMEType.PlainText ->
- (s"An error occurred converting RDD to JSON.\n${errorMessage}"))
- }
- }
-
- @Event(name = "rdd")
- override def execute(code: String): CellMagicOutput =
- convertToJson(code)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/main/scala/org/apache/toree/utils/DataFrameConverter.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/utils/DataFrameConverter.scala b/kernel/src/main/scala/org/apache/toree/utils/DataFrameConverter.scala
new file mode 100644
index 0000000..b28879e
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/utils/DataFrameConverter.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.toree.utils
+
+import org.apache.spark.sql.DataFrame
+import org.apache.toree.plugins.Plugin
+import play.api.libs.json.{JsObject, Json}
+
+import scala.util.{Failure, Try}
+import org.apache.toree.plugins.annotations.Init
+
+class DataFrameConverter extends Plugin with LogLike {
+ @Init def init() = {
+ register(this)
+ }
+
+ def convert(
+ df: DataFrame, outputType: String, limit: Int = 10
+ ): Try[String] = {
+ Try(
+ outputType.toLowerCase() match {
+ case "html" =>
+ convertToHtml(df = df, limit = limit)
+ case "json" =>
+ convertToJson(df = df, limit = limit)
+ case "csv" =>
+ convertToCsv(df = df, limit = limit)
+ }
+ )
+ }
+
+ private def convertToHtml(df: DataFrame, limit: Int = 10): String = {
+ val columnFields = df.schema.fieldNames.map(columnName => {
+ s"<th>${columnName}</th>"
+ }).reduce(_ + _)
+ val columns = s"<tr>${columnFields}</tr>"
+ val rows = df.map(row => {
+ val fieldValues = row.toSeq.map(field => {
+ s"<td>${field.toString}</td>"
+ }).reduce(_ + _)
+ s"<tr>${fieldValues}</tr>"
+ }).take(limit).reduce(_ + _)
+ s"<table>${columns}${rows}</table>"
+ }
+
+ private def convertToJson(df: DataFrame, limit: Int = 10): String = {
+ JsObject(Seq(
+ "columns" -> Json.toJson(df.schema.fieldNames),
+ "rows" -> Json.toJson(df.map(row =>
+ row.toSeq.map(_.toString).toArray).take(limit))
+ )).toString()
+ }
+
+ private def convertToCsv(df: DataFrame, limit: Int = 10): String = {
+ val headers = df.schema.fieldNames.reduce(_ + "," + _)
+ val rows = df.map(row => {
+ row.toSeq.map(field => field.toString).reduce(_ + "," + _)
+ }).take(limit).reduce(_ + "\n" + _)
+ s"${headers}\n${rows}"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala b/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala
deleted file mode 100644
index d3c25fb..0000000
--- a/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.toree.utils.json
-
-import org.apache.spark.sql.{DataFrame, SchemaRDD}
-import play.api.libs.json.{JsObject, JsString, Json}
-
-/**
- * Utility to convert RDD to JSON.
- */
-object RddToJson {
-
- /**
- * Converts a SchemaRDD to a JSON table format.
- *
- * @param rdd The schema rdd (now a dataframe) to convert
- *
- * @return The resulting string representing the JSON
- */
- def convert(rdd: DataFrame, limit: Int = 10): String =
- JsObject(Seq(
- "type" -> JsString("rdd/schema"),
- "columns" -> Json.toJson(rdd.schema.fieldNames),
- "rows" -> Json.toJson(rdd.map(row =>
- row.toSeq.map(_.toString).toArray).take(limit))
- )).toString()
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/test/scala/org/apache/toree/magic/builtin/DataFrameSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/magic/builtin/DataFrameSpec.scala b/kernel/src/test/scala/org/apache/toree/magic/builtin/DataFrameSpec.scala
new file mode 100644
index 0000000..e6c011f
--- /dev/null
+++ b/kernel/src/test/scala/org/apache/toree/magic/builtin/DataFrameSpec.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.toree.magic.builtin
+
+import org.apache.toree.interpreter._
+import org.apache.toree.kernel.protocol.v5.MIMEType
+import org.apache.toree.magic.dependencies.IncludeKernelInterpreter
+import org.apache.toree.utils.DataFrameConverter
+import org.mockito.Matchers._
+import org.mockito.Matchers.{eq => mockEq}
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
+import scala.util.Success
+
+class DataFrameSpec extends FunSpec with Matchers with MockitoSugar with BeforeAndAfter {
+
+ def createMocks = {
+ val interpreter = mock[Interpreter]
+ val converter = mock[DataFrameConverter]
+ val magic = new DataFrame with IncludeKernelInterpreter {
+ override val kernelInterpreter: Interpreter = interpreter
+ }
+ magic.initMethod(converter)
+ (magic, interpreter, converter)
+ }
+
+ describe("DataFrame") {
+ describe("#execute") {
+ it("should return a plain text error message on aborted execution"){
+ val (magic, interpreter, _) = createMocks
+ val message: Either[ExecuteOutput, ExecuteFailure] = Right(mock[ExecuteAborted])
+ val code = "code"
+ doReturn((Results.Aborted,message)).when(interpreter).interpret(code, false)
+ val output = magic.execute(code)
+ output.contains(MIMEType.PlainText) should be(true)
+ output(MIMEType.PlainText) should be(DataFrameResponses.ErrorMessage(
+ "html",
+ DataFrameResponses.MagicAborted
+ ))
+ }
+
+ it("should return a plain text error message on execution errors"){
+ val (magic, interpreter, _) = createMocks
+ val mockExecuteError = mock[ExecuteError]
+ val mockError = "error"
+ doReturn(mockError).when(mockExecuteError).value
+ val message: Either[ExecuteOutput, ExecuteFailure] = Right(mockExecuteError)
+ val code = "code"
+ doReturn((Results.Error,message)).when(interpreter).interpret(code, false)
+ val output = magic.execute(code)
+ output.contains(MIMEType.PlainText) should be(true)
+ output(MIMEType.PlainText) should be(DataFrameResponses.ErrorMessage(
+ "html",
+ mockError
+ ))
+ }
+
+ it("should return a plain text message when there is no variable reference"){
+ val (magic, interpreter, _) = createMocks
+ val mockExecuteError = mock[ExecuteError]
+ val mockError = "error"
+ doReturn(mockError).when(mockExecuteError).value
+ val message: Either[ExecuteOutput, ExecuteFailure] = Right(mockExecuteError )
+ val code = "code"
+ doReturn((Results.Error,message)).when(interpreter).interpret(code, false)
+ val output = magic.execute(code)
+ output.contains(MIMEType.PlainText) should be(true)
+ output(MIMEType.PlainText) should be(DataFrameResponses.ErrorMessage(
+ "html",
+ mockError
+ ))
+ }
+
+ it("should return a plain text message with help when there are no args"){
+ val (magic, _, _) = createMocks
+ val code = ""
+ val output = magic.execute(code)
+ output.contains(MIMEType.PlainText) should be(true)
+ output(MIMEType.PlainText).contains(DataFrameResponses.Usage) should be(true)
+ }
+
+ it("should return a json message when json is the selected output"){
+ val (magic, interpreter, converter) = createMocks
+ val outputText = "test output"
+ val message: Either[ExecuteOutput, ExecuteFailure] = Left(outputText)
+ val mockDataFrame = mock[org.apache.spark.sql.DataFrame]
+ val variableName = "variable"
+ val executeCode =s"""--output=json
+ |${variableName}
+ """.stripMargin
+ doReturn((Results.Success,message)).when(interpreter).interpret(variableName, false)
+ doReturn(Some(variableName)).when(interpreter).lastExecutionVariableName
+ doReturn(Some(mockDataFrame)).when(interpreter).read(variableName)
+ doReturn(Success(outputText)).when(converter).convert(
+ mockDataFrame,"json", 10
+ )
+ val output = magic.execute(executeCode)
+ output.contains(MIMEType.ApplicationJson) should be(true)
+ output(MIMEType.ApplicationJson).contains(outputText) should be(true)
+ }
+
+ it("should return an html message when html is the selected output"){
+ val (magic, interpreter, converter) = createMocks
+ val outputText = "test output"
+ val message: Either[ExecuteOutput, ExecuteFailure] = Left(outputText)
+ val mockDataFrame = mock[org.apache.spark.sql.DataFrame]
+ val variableName = "variable"
+ val executeCode =s"""--output=html
+ |${variableName}
+ """.stripMargin
+ doReturn((Results.Success,message)).when(interpreter).interpret(variableName, false)
+ doReturn(Some(variableName)).when(interpreter).lastExecutionVariableName
+ doReturn(Some(mockDataFrame)).when(interpreter).read(variableName)
+ doReturn(Success(outputText)).when(converter).convert(
+ mockDataFrame,"html", 10
+ )
+ val output = magic.execute(executeCode)
+ output.contains(MIMEType.TextHtml) should be(true)
+ output(MIMEType.TextHtml).contains(outputText) should be(true)
+ }
+
+ it("should return a csv message when csv is the selected output"){
+ val (magic, interpreter, converter) = createMocks
+ val outputText = "test output"
+ val message: Either[ExecuteOutput, ExecuteFailure] = Left(outputText)
+ val mockDataFrame = mock[org.apache.spark.sql.DataFrame]
+ val variableName = "variable"
+ val executeCode =s"""--output=csv
+ |${variableName}
+ """.stripMargin
+ doReturn((Results.Success,message)).when(interpreter).interpret(variableName, false)
+ doReturn(Some(variableName)).when(interpreter).lastExecutionVariableName
+ doReturn(Some(mockDataFrame)).when(interpreter).read(variableName)
+ doReturn(Success(outputText)).when(converter).convert(
+ mockDataFrame,"csv", 10
+ )
+ val output = magic.execute(executeCode)
+ output.contains(MIMEType.PlainText) should be(true)
+ output(MIMEType.PlainText).contains(outputText) should be(true)
+ }
+
+ it("should pass the limit argument to the converter"){
+ val (magic, interpreter, converter) = createMocks
+ val outputText = "test output"
+ val message: Either[ExecuteOutput, ExecuteFailure] = Left(outputText)
+ val mockDataFrame = mock[org.apache.spark.sql.DataFrame]
+ val variableName = "variable"
+ val executeCode =s"""--output=html --limit=3
+ |${variableName}
+ """.stripMargin
+ doReturn((Results.Success,message)).when(interpreter).interpret(variableName, false)
+ doReturn(Some(variableName)).when(interpreter).lastExecutionVariableName
+ doReturn(Some(mockDataFrame)).when(interpreter).read(variableName)
+ doReturn(Success(outputText)).when(converter).convert(
+ mockDataFrame,"html", 3
+ )
+ magic.execute(executeCode)
+ verify(converter).convert(any(), anyString(), mockEq(3))
+ }
+
+ it("should return a plain text message with help when the converter throws an exception"){
+ val (magic, interpreter, converter) = createMocks
+ val outputText = "test output"
+ val message: Either[ExecuteOutput, ExecuteFailure] = Left(outputText)
+ val mockDataFrame = mock[org.apache.spark.sql.DataFrame]
+ val code = "variable"
+ doReturn((Results.Success,message)).when(interpreter).interpret(code, false)
+ doReturn(Some(code)).when(interpreter).lastExecutionVariableName
+ doReturn(Some(mockDataFrame)).when(interpreter).read(code)
+ doThrow(new RuntimeException()).when(converter).convert(
+ mockDataFrame,"html", 10
+ )
+ val output = magic.execute(code)
+ output.contains(MIMEType.PlainText) should be(true)
+ output(MIMEType.PlainText).contains(DataFrameResponses.Usage) should be(true)
+
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/test/scala/org/apache/toree/magic/builtin/RDDSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/magic/builtin/RDDSpec.scala b/kernel/src/test/scala/org/apache/toree/magic/builtin/RDDSpec.scala
deleted file mode 100644
index 3062036..0000000
--- a/kernel/src/test/scala/org/apache/toree/magic/builtin/RDDSpec.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.toree.magic.builtin
-
-import org.apache.toree.interpreter.Results.Result
-import org.apache.toree.interpreter.{Results, ExecuteAborted, ExecuteError, Interpreter}
-import org.apache.toree.kernel.protocol.v5.MIMEType
-import org.apache.toree.magic.dependencies.{IncludeKernelInterpreter, IncludeInterpreter}
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.types.StructType
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
-import play.api.libs.json.Json
-
-class RDDSpec extends FunSpec with Matchers with MockitoSugar with BeforeAndAfter {
-
- val resOutput = "res1: org.apache.spark.sql.SchemaRDD ="
-
- val mockInterpreter = mock[Interpreter]
- val mockDataFrame = mock[DataFrame]
- val mockRdd = mock[org.apache.spark.rdd.RDD[Any]]
- val mockStruct = mock[StructType]
- val columns = Seq("foo", "bar").toArray
- val rows = Array( Array("a", "b"), Array("c", "d") )
-
- doReturn(mockStruct).when(mockDataFrame).schema
- doReturn(columns).when(mockStruct).fieldNames
- doReturn(mockRdd).when(mockDataFrame).map(any())(any())
- doReturn(rows).when(mockRdd).take(anyInt())
-
- val rddMagic = new RDD with IncludeKernelInterpreter {
- override val kernelInterpreter: Interpreter = mockInterpreter
- }
-
- before {
- doReturn(Some("someRDD")).when(mockInterpreter).lastExecutionVariableName
- doReturn(Some(mockDataFrame)).when(mockInterpreter).read(anyString())
- doReturn((Results.Success, Left(resOutput)))
- .when(mockInterpreter).interpret(anyString(), anyBoolean())
- }
-
- describe("RDD") {
- describe("#execute") {
- it("should return valid JSON when the executed code evaluates to a " +
- "SchemaRDD") {
- val magicOutput = rddMagic.execute("schemaRDD")
- magicOutput.contains(MIMEType.ApplicationJson) should be (true)
- Json.parse(magicOutput(MIMEType.ApplicationJson))
- }
-
- it("should return normally when the executed code does not evaluate to " +
- "a SchemaRDD") {
- doReturn((mock[Result], Left("foo"))).when(mockInterpreter)
- .interpret(anyString(), anyBoolean())
- val magicOutput = rddMagic.execute("")
- magicOutput.contains(MIMEType.PlainText) should be (true)
- }
-
- it("should return error message when the interpreter does not return " +
- "SchemaRDD as expected") {
- doReturn(Some("foo")).when(mockInterpreter).read(anyString())
- val magicOutput = rddMagic.execute("")
- magicOutput.contains(MIMEType.PlainText) should be (true)
- }
-
- it("should throw a Throwable if the interpreter returns an ExecuteError"){
- val expected = "some error message"
- val mockExecuteError = mock[ExecuteError]
- doReturn(expected).when(mockExecuteError).value
-
- doReturn((mock[Result], Right(mockExecuteError))).when(mockInterpreter)
- .interpret(anyString(), anyBoolean())
- val actual = {
- val exception = intercept[Throwable] {
- rddMagic.execute("")
- }
- exception.getLocalizedMessage
- }
-
- actual should be (expected)
- }
-
- it("should throw a Throwable if the interpreter returns an " +
- "ExecuteAborted") {
- val expected = "RDD magic aborted!"
- val mockExecuteAborted = mock[ExecuteAborted]
-
- doReturn((mock[Result], Right(mockExecuteAborted)))
- .when(mockInterpreter).interpret(anyString(), anyBoolean())
- val actual = {
- val exception = intercept[Throwable] {
- rddMagic.execute("")
- }
- exception.getLocalizedMessage
- }
-
- actual should be (expected)
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/test/scala/org/apache/toree/utils/DataFrameConverterSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/utils/DataFrameConverterSpec.scala b/kernel/src/test/scala/org/apache/toree/utils/DataFrameConverterSpec.scala
new file mode 100644
index 0000000..d45ecce
--- /dev/null
+++ b/kernel/src/test/scala/org/apache/toree/utils/DataFrameConverterSpec.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.toree.utils
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.types.StructType
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FunSpec, Matchers}
+import play.api.libs.json.{JsArray, JsString, Json}
+
+class DataFrameConverterSpec extends FunSpec with MockitoSugar with Matchers {
+ val dataFrameConverter: DataFrameConverter = new DataFrameConverter
+ val mockDataFrame = mock[DataFrame]
+ val mockRdd = mock[RDD[Any]]
+ val mockStruct = mock[StructType]
+ val columns = Seq("foo", "bar").toArray
+ val rowsOfArrays = Array( Array("a", "b"), Array("c", "d") )
+ val rowsOfStrings = Array("test1","test2")
+ val rowsOfString = Array("test1")
+
+ doReturn(mockStruct).when(mockDataFrame).schema
+ doReturn(columns).when(mockStruct).fieldNames
+ doReturn(mockRdd).when(mockDataFrame).map(any())(any())
+ doReturn(rowsOfArrays).when(mockRdd).take(anyInt())
+
+ describe("DataFrameConverter") {
+ describe("#convert") {
+ it("should convert to a valid JSON object") {
+ val someJson = dataFrameConverter.convert(mockDataFrame, "json")
+ val jsValue = Json.parse(someJson.get)
+ jsValue \ "columns" should be (JsArray(Seq(JsString("foo"), JsString("bar"))))
+ jsValue \ "rows" should be (JsArray(Seq(
+ JsArray(Seq(JsString("a"), JsString("b"))),
+ JsArray(Seq(JsString("c"), JsString("d")))))
+ )
+ }
+ it("should convert to csv") {
+ doReturn(rowsOfStrings).when(mockRdd).take(anyInt())
+ val csv = dataFrameConverter.convert(mockDataFrame, "csv").get
+ val values = csv.split("\n").map(_.split(","))
+ values(0) should contain allOf ("foo","bar")
+ }
+ it("should convert to html") {
+ doReturn(rowsOfStrings).when(mockRdd).take(anyInt())
+ val html = dataFrameConverter.convert(mockDataFrame, "html").get
+ html.contains("<th>foo</th>") should be(true)
+ html.contains("<th>bar</th>") should be(true)
+ }
+ it("should convert limit the selection") {
+ doReturn(rowsOfString).when(mockRdd).take(1)
+ val someLimited = dataFrameConverter.convert(mockDataFrame, "csv", 1)
+ val limitedLines = someLimited.get.split("\n")
+ limitedLines.length should be(2)
+ }
+ it("should return a Failure for invalid types") {
+ val result = dataFrameConverter.convert(mockDataFrame, "Invalid Type")
+ result.isFailure should be(true)
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/test/scala/org/apache/toree/utils/json/RddToJsonSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/utils/json/RddToJsonSpec.scala b/kernel/src/test/scala/org/apache/toree/utils/json/RddToJsonSpec.scala
deleted file mode 100644
index e2b403e..0000000
--- a/kernel/src/test/scala/org/apache/toree/utils/json/RddToJsonSpec.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.toree.utils.json
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.types.StructType
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSpec}
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-import play.api.libs.json.{JsArray, JsString, Json}
-
-class RddToJsonSpec extends FunSpec with MockitoSugar with Matchers {
-
- val mockDataFrame = mock[DataFrame]
- val mockRdd = mock[RDD[Any]]
- val mockStruct = mock[StructType]
- val columns = Seq("foo", "bar").toArray
- val rows = Array( Array("a", "b"), Array("c", "d") )
-
- doReturn(mockStruct).when(mockDataFrame).schema
- doReturn(columns).when(mockStruct).fieldNames
- doReturn(mockRdd).when(mockDataFrame).map(any())(any())
- doReturn(rows).when(mockRdd).take(anyInt())
-
- describe("RddToJson") {
- describe("#convert(SchemaRDD)") {
- it("should convert to valid JSON object") {
-
- val json = RddToJson.convert(mockDataFrame)
- val jsValue = Json.parse(json)
-
- jsValue \ "columns" should be (JsArray(Seq(JsString("foo"), JsString("bar"))))
- jsValue \ "rows" should be (JsArray(Seq(
- JsArray(Seq(JsString("a"), JsString("b"))),
- JsArray(Seq(JsString("c"), JsString("d"))))))
- }
- }
- }
-}
[2/2] incubator-toree git commit: [skip ci] Added dataframe to the
TOC of the sample notebook
Posted by lb...@apache.org.
[skip ci] Added dataframe to the TOC of the sample notebook
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/4463bf46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/4463bf46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/4463bf46
Branch: refs/heads/master
Commit: 4463bf460fd5bb04e2fe818e9127b4ec79678564
Parents: 5a2b79e
Author: Gino Bustelo <lb...@apache.org>
Authored: Fri May 6 13:42:48 2016 -0500
Committer: Gino Bustelo <lb...@apache.org>
Committed: Fri May 6 13:42:48 2016 -0500
----------------------------------------------------------------------
etc/examples/notebooks/magic-tutorial.ipynb | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/4463bf46/etc/examples/notebooks/magic-tutorial.ipynb
----------------------------------------------------------------------
diff --git a/etc/examples/notebooks/magic-tutorial.ipynb b/etc/examples/notebooks/magic-tutorial.ipynb
index 3d3ca28..e6d6a62 100644
--- a/etc/examples/notebooks/magic-tutorial.ipynb
+++ b/etc/examples/notebooks/magic-tutorial.ipynb
@@ -16,6 +16,7 @@
" 1. [AddJar](#addjar)\n",
" 1. [AddDeps](#adddeps)\n",
"1. [Cell Magics](#cell-magics)\n",
+ " 1. [DataFrame](#dataframe)\n",
" 1. [Html](#html)\n",
" 1. [JavaScript](#javascript)\n",
" 1. [PySpark](#pyspark)\n",
@@ -516,7 +517,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
- "### %%DataFrame\n",
+ "### %%DataFrame<a name=\"dataframe\"></a><span style=\"float: right; font-size: 0.5em\"><a href=\"#top\">Top</a></span>\n",
"The `%%DataFrame` magic is used to convert a Spark SQL DataFrame into various formats. Currently, `json`, `html`, and `csv` are supported. The magic takes an expression, which evauluates to a dataframe, to perform the conversion. So, we first need to create a DataFrame object for reference."
]
},