You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lb...@apache.org on 2016/05/06 18:46:25 UTC

[1/2] incubator-toree git commit: [TOREE 278]: Replaced RDD Magic With DataFrame Added DataFrame magic to replace RDD magic Added support to output DataFrames in different types

Repository: incubator-toree
Updated Branches:
  refs/heads/master 31b2039ee -> 4463bf460


[TOREE 278]: Replaced RDD Magic With DataFrame
Added DataFrame magic to replace RDD magic
Added support to output DataFrames in different types


Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/5a2b79e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/5a2b79e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/5a2b79e2

Branch: refs/heads/master
Commit: 5a2b79e25e75f24c64780361147aeae40bc13f47
Parents: 31b2039
Author: Corey A. Stubbs <cs...@us.ibm.com>
Authored: Wed May 4 15:06:00 2016 -0500
Committer: Corey A. Stubbs <cs...@us.ibm.com>
Committed: Fri May 6 12:58:36 2016 -0500

----------------------------------------------------------------------
 etc/examples/notebooks/magic-tutorial.ipynb     | 156 +++++++++++++++
 .../apache/toree/magic/builtin/Dataframe.scala  | 149 ++++++++++++++
 .../org/apache/toree/magic/builtin/RDD.scala    |  64 ------
 .../apache/toree/utils/DataFrameConverter.scala |  76 +++++++
 .../org/apache/toree/utils/json/RddToJson.scala |  42 ----
 .../toree/magic/builtin/DataFrameSpec.scala     | 196 +++++++++++++++++++
 .../apache/toree/magic/builtin/RDDSpec.scala    | 118 -----------
 .../toree/utils/DataFrameConverterSpec.scala    |  79 ++++++++
 .../apache/toree/utils/json/RddToJsonSpec.scala |  56 ------
 9 files changed, 656 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/etc/examples/notebooks/magic-tutorial.ipynb
----------------------------------------------------------------------
diff --git a/etc/examples/notebooks/magic-tutorial.ipynb b/etc/examples/notebooks/magic-tutorial.ipynb
index 8725868..3d3ca28 100644
--- a/etc/examples/notebooks/magic-tutorial.ipynb
+++ b/etc/examples/notebooks/magic-tutorial.ipynb
@@ -514,6 +514,162 @@
   },
   {
    "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "### %%DataFrame\n",
+    "The `%%DataFrame` magic is used to convert a Spark SQL DataFrame into various formats. Currently, `json`, `html`, and `csv` are supported. The magic takes an expression, which evauluates to a dataframe, to perform the conversion. So, we first need to create a DataFrame object for reference."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 1,
+   "metadata": {
+    "collapsed": true
+   },
+   "outputs": [],
+   "source": [
+    "case class DFRecord(key: String, value: Int)\n",
+    "val sqlc = sqlContext\n",
+    "import sqlc.implicits._\n",
+    "val df = sc.parallelize(1 to 10).map(x => DFRecord(x.toString, x)).toDF()"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "The default output is `html`"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 1,
+   "metadata": {
+    "collapsed": false
+   },
+   "outputs": [
+    {
+     "data": {
+      "text/plain": [
+       "%%dataframe [arguments]\n",
+       "DATAFRAME_CODE\n",
+       "\n",
+       "DATAGRAME_CODE can be any numbered lines of code, as long as the\n",
+       "last line is a reference to a variable which is a DataFrame.\n",
+       "    Option    Description                           \n",
+       "------    -----------                           \n",
+       "--help    Displays the help and usage text for  \n",
+       "            this magic.                         \n",
+       "--limit   The type of the output: html          \n",
+       "            (default), csv, json (default: 10)  \n",
+       "--output  The type of the output: html          \n",
+       "            (default), csv, json (default: html)\n"
+      ]
+     },
+     "execution_count": 1,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "%%dataframe"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 2,
+   "metadata": {
+    "collapsed": false
+   },
+   "outputs": [
+    {
+     "data": {
+      "text/html": [
+       "<table><tr><th>key</th><th>value</th></tr><tr><td>1</td><td>1</td></tr><tr><td>2</td><td>2</td></tr><tr><td>3</td><td>3</td></tr><tr><td>4</td><td>4</td></tr><tr><td>5</td><td>5</td></tr><tr><td>6</td><td>6</td></tr><tr><td>7</td><td>7</td></tr><tr><td>8</td><td>8</td></tr><tr><td>9</td><td>9</td></tr><tr><td>10</td><td>10</td></tr></table>"
+      ]
+     },
+     "execution_count": 2,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "%%dataframe\n",
+    "df"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "You can specify the `--output` argument to change the output type."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 3,
+   "metadata": {
+    "collapsed": false
+   },
+   "outputs": [
+    {
+     "data": {
+      "text/plain": [
+       "key,value\n",
+       "1,1\n",
+       "2,2\n",
+       "3,3\n",
+       "4,4\n",
+       "5,5\n",
+       "6,6\n",
+       "7,7\n",
+       "8,8\n",
+       "9,9\n",
+       "10,10"
+      ]
+     },
+     "execution_count": 3,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "%%dataframe --output=csv\n",
+    "df"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "There is also an option to limit the number of records returned."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 4,
+   "metadata": {
+    "collapsed": false
+   },
+   "outputs": [
+    {
+     "data": {
+      "text/html": [
+       "<table><tr><th>key</th><th>value</th></tr><tr><td>1</td><td>1</td></tr><tr><td>2</td><td>2</td></tr><tr><td>3</td><td>3</td></tr></table>"
+      ]
+     },
+     "execution_count": 4,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "%%dataframe --limit=3\n",
+    "df"
+   ]
+  },
+  {
+   "cell_type": "markdown",
    "metadata": {
     "collapsed": true
    },

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/main/scala/org/apache/toree/magic/builtin/Dataframe.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/Dataframe.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/Dataframe.scala
new file mode 100644
index 0000000..3f00f82
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/Dataframe.scala
@@ -0,0 +1,149 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+package org.apache.toree.magic.builtin
+
+import java.io.{PrintStream, StringWriter}
+
+import org.apache.toree.interpreter.{ExecuteAborted, ExecuteError, ExecuteFailure, Results}
+import org.apache.toree.kernel.protocol.v5._
+import org.apache.toree.magic._
+import org.apache.toree.magic.dependencies.{IncludeKernelInterpreter, IncludeOutputStream}
+import org.apache.toree.plugins.annotations.{Event, Init}
+import org.apache.toree.utils.{ArgumentParsingSupport, DataFrameConverter, LogLike}
+
+import scala.util.Try
+
+
+class DFConversionException extends Exception{}
+
+
+object DataFrameResponses {
+  val MagicAborted = s"${classOf[DataFrame].getSimpleName} magic aborted!"
+
+  def ErrorMessage(outputType: String, error: String) = {
+    s"An error occurred converting DataFrame to ${outputType}.\n${error}"
+  }
+
+  def NoVariableFound(name: String) = {
+    s"No variable found with the name ${name}!"
+  }
+
+  val Incomplete = "DataFrame code was an incomplete code snippet"
+
+  val Usage  =
+    """%%dataframe [arguments]
+      |DATAFRAME_CODE
+      |
+      |DATAFRAME_CODE can be any numbered lines of code, as long as the
+      |last line is a reference to a variable which is a DataFrame.
+    """.stripMargin
+}
+
+class DataFrame extends CellMagic with IncludeKernelInterpreter
+  with IncludeOutputStream with ArgumentParsingSupport with LogLike {
+  private var _dataFrameConverter: DataFrameConverter = _
+  private val outputTypeMap = Map[String, String](
+    "html" -> MIMEType.TextHtml,
+    "csv" -> MIMEType.PlainText,
+    "json" -> MIMEType.ApplicationJson
+  )
+
+  @Init def initMethod(dataFrameConverter: DataFrameConverter) = {
+    _dataFrameConverter = dataFrameConverter
+  }
+  private def printStream = new PrintStream(outputStream)
+  
+  private val _outputType = parser.accepts(
+    "output", "The type of the output: html, csv, json"
+  ).withRequiredArg().defaultsTo("html")
+
+  private val _limit = parser.accepts(
+    "limit", "The number of records to return"
+  ).withRequiredArg().defaultsTo("10")
+
+  private def outputType(): String = {
+    _outputType.getOrElse("html")
+  }
+  private def limit(): Int = {
+    _limit.getOrElse("10").toInt
+  }
+  
+  private def outputTypeToMimeType(): String = {
+    outputTypeMap.getOrElse(outputType, MIMEType.PlainText)
+  }
+  
+  private def convertToJson(rddCode: String): CellMagicOutput = {
+    val (result, message) = kernelInterpreter.interpret(rddCode)
+    result match {
+      case Results.Success =>
+        val rddVarName = kernelInterpreter.lastExecutionVariableName.get
+        kernelInterpreter.read(rddVarName).map(variableVal => {
+          _dataFrameConverter.convert(
+            variableVal.asInstanceOf[org.apache.spark.sql.DataFrame],
+            outputType,
+            limit
+          ).map(output =>
+            CellMagicOutput(outputTypeToMimeType -> output)
+          ).get
+        }).getOrElse(CellMagicOutput(MIMEType.PlainText -> DataFrameResponses.NoVariableFound(rddVarName)))
+      case Results.Aborted =>
+        logger.error(DataFrameResponses.ErrorMessage(outputType, DataFrameResponses.MagicAborted))
+        CellMagicOutput(
+          MIMEType.PlainText -> DataFrameResponses.ErrorMessage(outputType, DataFrameResponses.MagicAborted)
+        )
+      case Results.Error =>
+        val error = message.right.get.asInstanceOf[ExecuteError]
+        val errorMessage = DataFrameResponses.ErrorMessage(outputType, error.value)
+        logger.error(errorMessage)
+        CellMagicOutput(MIMEType.PlainText -> errorMessage)
+      case Results.Incomplete =>
+        logger.error(DataFrameResponses.Incomplete)
+        CellMagicOutput(MIMEType.PlainText -> DataFrameResponses.Incomplete)
+    }
+  }
+
+  private def helpToCellMagicOutput(optionalException: Option[Exception] = None): CellMagicOutput = {
+    val stringWriter = new StringWriter()
+    stringWriter.append(optionalException.map(e => {
+      s"ERROR: ${e.getMessage}\n"
+    }).getOrElse(""))
+    stringWriter.write(DataFrameResponses.Usage)
+    parser.printHelpOn(stringWriter)
+    CellMagicOutput(MIMEType.PlainText -> stringWriter.toString)
+  }
+  
+  @Event(name = "dataframe")
+  override def execute(code: String): CellMagicOutput = {
+    val lines = code.trim.split("\n")
+    Try({
+      val res: CellMagicOutput = if (lines.length == 1 && lines.head.length == 0){
+        helpToCellMagicOutput()
+      } else if (lines.length == 1) {
+        parseArgs("")
+        convertToJson(lines.head)
+      } else {
+        parseArgs(lines.head)
+        convertToJson(lines.drop(1).reduce(_ + _))
+      }
+      res
+    }).recover({
+      case e: Exception =>
+        helpToCellMagicOutput(Some(e))
+    }).get
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala
deleted file mode 100644
index b165ac8..0000000
--- a/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License
- */
-
-package org.apache.toree.magic.builtin
-
-import org.apache.toree.interpreter.{ExecuteFailure, Results, ExecuteAborted, ExecuteError}
-import org.apache.toree.kernel.protocol.v5.MIMEType
-import org.apache.toree.magic._
-import org.apache.toree.magic.dependencies.{IncludeKernelInterpreter, IncludeInterpreter}
-import org.apache.toree.utils.LogLike
-import org.apache.toree.utils.json.RddToJson
-import org.apache.spark.sql.SchemaRDD
-import org.apache.toree.plugins.annotations.Event
-
-/**
- * Temporary magic to show an RDD as JSON
- */
-class RDD extends CellMagic with IncludeKernelInterpreter with LogLike {
-
-  private def convertToJson(code: String) = {
-    val (result, message) = kernelInterpreter.interpret(code)
-    result match {
-      case Results.Success =>
-        val rddVarName = kernelInterpreter.lastExecutionVariableName.getOrElse("")
-        kernelInterpreter.read(rddVarName).map(rddVal => {
-          try{
-            CellMagicOutput(MIMEType.ApplicationJson -> RddToJson.convert(rddVal.asInstanceOf[SchemaRDD]))
-          } catch {
-            case _: Throwable =>
-              CellMagicOutput(MIMEType.PlainText -> s"Could note convert RDD to JSON: ${rddVarName}->${rddVal}")
-          }
-        }).getOrElse(CellMagicOutput(MIMEType.PlainText -> "No RDD Value found!"))
-      case _ =>
-        val errorMessage = message.right.toOption match {
-          case Some(executeFailure) => executeFailure match {
-            case _: ExecuteAborted => throw new Exception("RDD magic aborted!")
-            case executeError: ExecuteError => throw new Exception(executeError.value)
-          }
-          case _ =>  "No error information available!"
-        }
-        logger.error(s"Error retrieving RDD value: ${errorMessage}")
-        CellMagicOutput(MIMEType.PlainText ->
-          (s"An error occurred converting RDD to JSON.\n${errorMessage}"))
-    }
-  }
-
-  @Event(name = "rdd")
-  override def execute(code: String): CellMagicOutput =
-    convertToJson(code)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/main/scala/org/apache/toree/utils/DataFrameConverter.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/utils/DataFrameConverter.scala b/kernel/src/main/scala/org/apache/toree/utils/DataFrameConverter.scala
new file mode 100644
index 0000000..b28879e
--- /dev/null
+++ b/kernel/src/main/scala/org/apache/toree/utils/DataFrameConverter.scala
@@ -0,0 +1,76 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+package org.apache.toree.utils
+
+import org.apache.spark.sql.DataFrame
+import org.apache.toree.plugins.Plugin
+import play.api.libs.json.{JsObject, Json}
+
+import scala.util.{Failure, Try}
+import org.apache.toree.plugins.annotations.Init
+
+class DataFrameConverter extends Plugin with LogLike {
+  @Init def init() = {
+    register(this)
+  }
+
+  def convert(
+     df: DataFrame, outputType: String, limit: Int = 10
+  ): Try[String] = {
+    Try(
+      outputType.toLowerCase() match {
+        case "html" =>
+          convertToHtml(df = df, limit = limit)
+        case "json" =>
+          convertToJson(df = df, limit = limit)
+        case "csv" =>
+          convertToCsv(df = df, limit = limit)
+      }
+    )
+  }
+
+  private def convertToHtml(df: DataFrame, limit: Int = 10): String = {
+      val columnFields = df.schema.fieldNames.map(columnName => {
+        s"<th>${columnName}</th>"
+      }).reduce(_ + _)
+      val columns = s"<tr>${columnFields}</tr>"
+      val rows = df.map(row => {
+        val fieldValues = row.toSeq.map(field => {
+         s"<td>${field.toString}</td>"
+        }).reduce(_ + _)
+        s"<tr>${fieldValues}</tr>"
+      }).take(limit).reduce(_ + _)
+      s"<table>${columns}${rows}</table>"
+  }
+
+  private def convertToJson(df: DataFrame, limit: Int = 10): String = {
+    JsObject(Seq(
+      "columns" -> Json.toJson(df.schema.fieldNames),
+      "rows" -> Json.toJson(df.map(row =>
+        row.toSeq.map(_.toString).toArray).take(limit))
+    )).toString()
+  }
+
+  private def convertToCsv(df: DataFrame, limit: Int = 10): String = {
+      val headers = df.schema.fieldNames.reduce(_ + "," + _)
+      val rows = df.map(row => {
+        row.toSeq.map(field => field.toString).reduce(_ + "," + _)
+      }).take(limit).reduce(_ + "\n" + _)
+      s"${headers}\n${rows}"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala b/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala
deleted file mode 100644
index d3c25fb..0000000
--- a/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License
- */
-
-package org.apache.toree.utils.json
-
-import org.apache.spark.sql.{DataFrame, SchemaRDD}
-import play.api.libs.json.{JsObject, JsString, Json}
-
-/**
- * Utility to convert RDD to JSON.
- */
-object RddToJson {
-
-  /**
-   * Converts a SchemaRDD to a JSON table format.
-   *
-   * @param rdd The schema rdd (now a dataframe) to convert
-   *
-   * @return The resulting string representing the JSON
-   */
-  def convert(rdd: DataFrame, limit: Int = 10): String =
-    JsObject(Seq(
-      "type" -> JsString("rdd/schema"),
-      "columns" -> Json.toJson(rdd.schema.fieldNames),
-      "rows" -> Json.toJson(rdd.map(row =>
-        row.toSeq.map(_.toString).toArray).take(limit))
-    )).toString()
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/test/scala/org/apache/toree/magic/builtin/DataFrameSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/magic/builtin/DataFrameSpec.scala b/kernel/src/test/scala/org/apache/toree/magic/builtin/DataFrameSpec.scala
new file mode 100644
index 0000000..e6c011f
--- /dev/null
+++ b/kernel/src/test/scala/org/apache/toree/magic/builtin/DataFrameSpec.scala
@@ -0,0 +1,196 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+package org.apache.toree.magic.builtin
+
+import org.apache.toree.interpreter._
+import org.apache.toree.kernel.protocol.v5.MIMEType
+import org.apache.toree.magic.dependencies.IncludeKernelInterpreter
+import org.apache.toree.utils.DataFrameConverter
+import org.mockito.Matchers._
+import org.mockito.Matchers.{eq => mockEq}
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
+import scala.util.Success
+
+class DataFrameSpec extends FunSpec with Matchers with MockitoSugar with BeforeAndAfter {
+
+  def createMocks = {
+    val interpreter = mock[Interpreter]
+    val converter = mock[DataFrameConverter]
+    val magic = new DataFrame with IncludeKernelInterpreter {
+      override val kernelInterpreter: Interpreter = interpreter
+    }
+    magic.initMethod(converter)
+    (magic, interpreter, converter)
+  }
+
+  describe("DataFrame") {
+    describe("#execute") {
+      it("should return a plain text error message on aborted execution"){
+        val (magic, interpreter, _) = createMocks
+        val message: Either[ExecuteOutput, ExecuteFailure] = Right(mock[ExecuteAborted])
+        val code = "code"
+        doReturn((Results.Aborted,message)).when(interpreter).interpret(code, false)
+        val output = magic.execute(code)
+        output.contains(MIMEType.PlainText) should be(true)
+        output(MIMEType.PlainText) should be(DataFrameResponses.ErrorMessage(
+          "html",
+          DataFrameResponses.MagicAborted
+        ))
+      }
+
+      it("should return a plain text error message on execution errors"){
+        val (magic, interpreter, _) = createMocks
+        val mockExecuteError = mock[ExecuteError]
+        val mockError = "error"
+        doReturn(mockError).when(mockExecuteError).value
+        val message: Either[ExecuteOutput, ExecuteFailure] = Right(mockExecuteError)
+        val code = "code"
+        doReturn((Results.Error,message)).when(interpreter).interpret(code, false)
+        val output = magic.execute(code)
+        output.contains(MIMEType.PlainText) should be(true)
+        output(MIMEType.PlainText) should be(DataFrameResponses.ErrorMessage(
+          "html",
+          mockError
+        ))
+      }
+
+      it("should return a plain text message when there is no variable reference"){
+        val (magic, interpreter, _) = createMocks
+        val mockExecuteError = mock[ExecuteError]
+        val mockError = "error"
+        doReturn(mockError).when(mockExecuteError).value
+        val message: Either[ExecuteOutput, ExecuteFailure] = Right(mockExecuteError )
+        val code = "code"
+        doReturn((Results.Error,message)).when(interpreter).interpret(code, false)
+        val output = magic.execute(code)
+        output.contains(MIMEType.PlainText) should be(true)
+        output(MIMEType.PlainText) should be(DataFrameResponses.ErrorMessage(
+          "html",
+          mockError
+        ))
+      }
+
+      it("should return a plain text message with help when there are no args"){
+        val (magic, _, _) = createMocks
+        val code = ""
+        val output = magic.execute(code)
+        output.contains(MIMEType.PlainText) should be(true)
+        output(MIMEType.PlainText).contains(DataFrameResponses.Usage) should be(true)
+      }
+
+      it("should return a json message when json is the selected output"){
+        val (magic, interpreter, converter) = createMocks
+        val outputText = "test output"
+        val message: Either[ExecuteOutput, ExecuteFailure] = Left(outputText)
+        val mockDataFrame = mock[org.apache.spark.sql.DataFrame]
+        val variableName = "variable"
+        val executeCode =s"""--output=json
+                             |${variableName}
+                  """.stripMargin
+        doReturn((Results.Success,message)).when(interpreter).interpret(variableName, false)
+        doReturn(Some(variableName)).when(interpreter).lastExecutionVariableName
+        doReturn(Some(mockDataFrame)).when(interpreter).read(variableName)
+        doReturn(Success(outputText)).when(converter).convert(
+          mockDataFrame,"json", 10
+        )
+        val output = magic.execute(executeCode)
+        output.contains(MIMEType.ApplicationJson) should be(true)
+        output(MIMEType.ApplicationJson).contains(outputText) should be(true)
+      }
+
+      it("should return an html message when html is the selected output"){
+        val (magic, interpreter, converter) = createMocks
+        val outputText = "test output"
+        val message: Either[ExecuteOutput, ExecuteFailure] = Left(outputText)
+        val mockDataFrame = mock[org.apache.spark.sql.DataFrame]
+        val variableName = "variable"
+        val executeCode =s"""--output=html
+                             |${variableName}
+                  """.stripMargin
+        doReturn((Results.Success,message)).when(interpreter).interpret(variableName, false)
+        doReturn(Some(variableName)).when(interpreter).lastExecutionVariableName
+        doReturn(Some(mockDataFrame)).when(interpreter).read(variableName)
+        doReturn(Success(outputText)).when(converter).convert(
+          mockDataFrame,"html", 10
+        )
+        val output = magic.execute(executeCode)
+        output.contains(MIMEType.TextHtml) should be(true)
+        output(MIMEType.TextHtml).contains(outputText) should be(true)
+      }
+
+      it("should return a csv message when csv is the selected output"){
+        val (magic, interpreter, converter) = createMocks
+        val outputText = "test output"
+        val message: Either[ExecuteOutput, ExecuteFailure] = Left(outputText)
+        val mockDataFrame = mock[org.apache.spark.sql.DataFrame]
+        val variableName = "variable"
+        val executeCode =s"""--output=csv
+                             |${variableName}
+                  """.stripMargin
+        doReturn((Results.Success,message)).when(interpreter).interpret(variableName, false)
+        doReturn(Some(variableName)).when(interpreter).lastExecutionVariableName
+        doReturn(Some(mockDataFrame)).when(interpreter).read(variableName)
+        doReturn(Success(outputText)).when(converter).convert(
+          mockDataFrame,"csv", 10
+        )
+        val output = magic.execute(executeCode)
+        output.contains(MIMEType.PlainText) should be(true)
+        output(MIMEType.PlainText).contains(outputText) should be(true)
+      }
+
+      it("should pass the limit argument to the converter"){
+        val (magic, interpreter, converter) = createMocks
+        val outputText = "test output"
+        val message: Either[ExecuteOutput, ExecuteFailure] = Left(outputText)
+        val mockDataFrame = mock[org.apache.spark.sql.DataFrame]
+        val variableName = "variable"
+        val executeCode =s"""--output=html --limit=3
+                             |${variableName}
+                  """.stripMargin
+        doReturn((Results.Success,message)).when(interpreter).interpret(variableName, false)
+        doReturn(Some(variableName)).when(interpreter).lastExecutionVariableName
+        doReturn(Some(mockDataFrame)).when(interpreter).read(variableName)
+        doReturn(Success(outputText)).when(converter).convert(
+          mockDataFrame,"html", 3
+        )
+        magic.execute(executeCode)
+        verify(converter).convert(any(), anyString(), mockEq(3))
+      }
+
+      it("should return a plain text message with help when the converter throws an exception"){
+        val (magic, interpreter, converter) = createMocks
+        val outputText = "test output"
+        val message: Either[ExecuteOutput, ExecuteFailure] = Left(outputText)
+        val mockDataFrame = mock[org.apache.spark.sql.DataFrame]
+        val code = "variable"
+        doReturn((Results.Success,message)).when(interpreter).interpret(code, false)
+        doReturn(Some(code)).when(interpreter).lastExecutionVariableName
+        doReturn(Some(mockDataFrame)).when(interpreter).read(code)
+        doThrow(new RuntimeException()).when(converter).convert(
+          mockDataFrame,"html", 10
+        )
+        val output = magic.execute(code)
+        output.contains(MIMEType.PlainText) should be(true)
+        output(MIMEType.PlainText).contains(DataFrameResponses.Usage) should be(true)
+
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/test/scala/org/apache/toree/magic/builtin/RDDSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/magic/builtin/RDDSpec.scala b/kernel/src/test/scala/org/apache/toree/magic/builtin/RDDSpec.scala
deleted file mode 100644
index 3062036..0000000
--- a/kernel/src/test/scala/org/apache/toree/magic/builtin/RDDSpec.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License
- */
-
-package org.apache.toree.magic.builtin
-
-import org.apache.toree.interpreter.Results.Result
-import org.apache.toree.interpreter.{Results, ExecuteAborted, ExecuteError, Interpreter}
-import org.apache.toree.kernel.protocol.v5.MIMEType
-import org.apache.toree.magic.dependencies.{IncludeKernelInterpreter, IncludeInterpreter}
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.types.StructType
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
-import play.api.libs.json.Json
-
-class RDDSpec extends FunSpec with Matchers with MockitoSugar with BeforeAndAfter {
-
-  val resOutput = "res1: org.apache.spark.sql.SchemaRDD ="
-
-  val mockInterpreter = mock[Interpreter]
-  val mockDataFrame = mock[DataFrame]
-  val mockRdd = mock[org.apache.spark.rdd.RDD[Any]]
-  val mockStruct = mock[StructType]
-  val columns = Seq("foo", "bar").toArray
-  val rows = Array( Array("a", "b"), Array("c", "d") )
-
-  doReturn(mockStruct).when(mockDataFrame).schema
-  doReturn(columns).when(mockStruct).fieldNames
-  doReturn(mockRdd).when(mockDataFrame).map(any())(any())
-  doReturn(rows).when(mockRdd).take(anyInt())
-
-  val rddMagic = new RDD with IncludeKernelInterpreter {
-    override val kernelInterpreter: Interpreter = mockInterpreter
-  }
-
-  before {
-    doReturn(Some("someRDD")).when(mockInterpreter).lastExecutionVariableName
-    doReturn(Some(mockDataFrame)).when(mockInterpreter).read(anyString())
-    doReturn((Results.Success, Left(resOutput)))
-      .when(mockInterpreter).interpret(anyString(), anyBoolean())
-  }
-
-  describe("RDD") {
-    describe("#execute") {
-      it("should return valid JSON when the executed code evaluates to a " +
-         "SchemaRDD") {
-        val magicOutput = rddMagic.execute("schemaRDD")
-        magicOutput.contains(MIMEType.ApplicationJson) should be (true)
-        Json.parse(magicOutput(MIMEType.ApplicationJson))
-      }
-
-      it("should return normally when the executed code does not evaluate to " +
-         "a SchemaRDD") {
-        doReturn((mock[Result], Left("foo"))).when(mockInterpreter)
-          .interpret(anyString(), anyBoolean())
-        val magicOutput = rddMagic.execute("")
-        magicOutput.contains(MIMEType.PlainText) should be (true)
-      }
-
-      it("should return error message when the interpreter does not return " +
-         "SchemaRDD as expected") {
-        doReturn(Some("foo")).when(mockInterpreter).read(anyString())
-        val magicOutput = rddMagic.execute("")
-        magicOutput.contains(MIMEType.PlainText) should be (true)
-      }
-
-      it("should throw a Throwable if the interpreter returns an ExecuteError"){
-        val expected = "some error message"
-        val mockExecuteError = mock[ExecuteError]
-        doReturn(expected).when(mockExecuteError).value
-
-        doReturn((mock[Result], Right(mockExecuteError))).when(mockInterpreter)
-          .interpret(anyString(), anyBoolean())
-        val actual = {
-          val exception = intercept[Throwable] {
-            rddMagic.execute("")
-          }
-          exception.getLocalizedMessage
-        }
-
-        actual should be (expected)
-      }
-
-      it("should throw a Throwable if the interpreter returns an " +
-         "ExecuteAborted") {
-        val expected = "RDD magic aborted!"
-        val mockExecuteAborted = mock[ExecuteAborted]
-
-        doReturn((mock[Result], Right(mockExecuteAborted)))
-          .when(mockInterpreter).interpret(anyString(), anyBoolean())
-        val actual = {
-          val exception = intercept[Throwable] {
-            rddMagic.execute("")
-          }
-          exception.getLocalizedMessage
-        }
-
-        actual should be (expected)
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/test/scala/org/apache/toree/utils/DataFrameConverterSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/utils/DataFrameConverterSpec.scala b/kernel/src/test/scala/org/apache/toree/utils/DataFrameConverterSpec.scala
new file mode 100644
index 0000000..d45ecce
--- /dev/null
+++ b/kernel/src/test/scala/org/apache/toree/utils/DataFrameConverterSpec.scala
@@ -0,0 +1,79 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+package org.apache.toree.utils
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.types.StructType
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FunSpec, Matchers}
+import play.api.libs.json.{JsArray, JsString, Json}
+
+class DataFrameConverterSpec extends FunSpec with MockitoSugar with Matchers {
+  val dataFrameConverter: DataFrameConverter = new DataFrameConverter
+  val mockDataFrame = mock[DataFrame]
+  val mockRdd = mock[RDD[Any]]
+  val mockStruct = mock[StructType]
+  val columns = Seq("foo", "bar").toArray
+  val rowsOfArrays = Array( Array("a", "b"), Array("c", "d") )
+  val rowsOfStrings = Array("test1","test2")
+  val rowsOfString = Array("test1")
+
+  doReturn(mockStruct).when(mockDataFrame).schema
+  doReturn(columns).when(mockStruct).fieldNames
+  doReturn(mockRdd).when(mockDataFrame).map(any())(any())
+  doReturn(rowsOfArrays).when(mockRdd).take(anyInt())
+
+  describe("DataFrameConverter") {
+    describe("#convert") {
+      it("should convert to a valid JSON object") {
+        val someJson = dataFrameConverter.convert(mockDataFrame, "json")
+        val jsValue = Json.parse(someJson.get)
+        jsValue \ "columns" should be (JsArray(Seq(JsString("foo"), JsString("bar"))))
+        jsValue \ "rows" should be (JsArray(Seq(
+          JsArray(Seq(JsString("a"), JsString("b"))),
+          JsArray(Seq(JsString("c"), JsString("d")))))
+        )
+      }
+      it("should convert to csv") {
+        doReturn(rowsOfStrings).when(mockRdd).take(anyInt())
+        val csv = dataFrameConverter.convert(mockDataFrame, "csv").get
+        val values = csv.split("\n").map(_.split(","))
+        values(0) should contain allOf ("foo","bar")
+      }
+      it("should convert to html") {
+        doReturn(rowsOfStrings).when(mockRdd).take(anyInt())
+        val html = dataFrameConverter.convert(mockDataFrame, "html").get
+        html.contains("<th>foo</th>") should be(true)
+        html.contains("<th>bar</th>") should be(true)
+      }
+      it("should convert limit the selection") {
+        doReturn(rowsOfString).when(mockRdd).take(1)
+        val someLimited = dataFrameConverter.convert(mockDataFrame, "csv", 1)
+        val limitedLines = someLimited.get.split("\n")
+        limitedLines.length should be(2)
+      }
+      it("should return a Failure for invalid types") {
+        val result = dataFrameConverter.convert(mockDataFrame, "Invalid Type")
+        result.isFailure should be(true)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5a2b79e2/kernel/src/test/scala/org/apache/toree/utils/json/RddToJsonSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/utils/json/RddToJsonSpec.scala b/kernel/src/test/scala/org/apache/toree/utils/json/RddToJsonSpec.scala
deleted file mode 100644
index e2b403e..0000000
--- a/kernel/src/test/scala/org/apache/toree/utils/json/RddToJsonSpec.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License
- */
-
-package org.apache.toree.utils.json
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.types.StructType
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSpec}
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-import play.api.libs.json.{JsArray, JsString, Json}
-
-class RddToJsonSpec extends FunSpec with MockitoSugar with Matchers {
-
-  val mockDataFrame = mock[DataFrame]
-  val mockRdd = mock[RDD[Any]]
-  val mockStruct = mock[StructType]
-  val columns = Seq("foo", "bar").toArray
-  val rows = Array( Array("a", "b"), Array("c", "d") )
-
-  doReturn(mockStruct).when(mockDataFrame).schema
-  doReturn(columns).when(mockStruct).fieldNames
-  doReturn(mockRdd).when(mockDataFrame).map(any())(any())
-  doReturn(rows).when(mockRdd).take(anyInt())
-
-  describe("RddToJson") {
-    describe("#convert(SchemaRDD)") {
-      it("should convert to valid JSON object") {
-
-        val json = RddToJson.convert(mockDataFrame)
-        val jsValue = Json.parse(json)
-
-        jsValue \ "columns" should be (JsArray(Seq(JsString("foo"), JsString("bar"))))
-        jsValue \ "rows" should be (JsArray(Seq(
-          JsArray(Seq(JsString("a"), JsString("b"))),
-          JsArray(Seq(JsString("c"), JsString("d"))))))
-      }
-    }
-  }
-}



[2/2] incubator-toree git commit: [skip ci] Added dataframe to the TOC of the sample notebook

Posted by lb...@apache.org.
[skip ci] Added dataframe to the TOC of the sample notebook


Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/4463bf46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/4463bf46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/4463bf46

Branch: refs/heads/master
Commit: 4463bf460fd5bb04e2fe818e9127b4ec79678564
Parents: 5a2b79e
Author: Gino Bustelo <lb...@apache.org>
Authored: Fri May 6 13:42:48 2016 -0500
Committer: Gino Bustelo <lb...@apache.org>
Committed: Fri May 6 13:42:48 2016 -0500

----------------------------------------------------------------------
 etc/examples/notebooks/magic-tutorial.ipynb | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/4463bf46/etc/examples/notebooks/magic-tutorial.ipynb
----------------------------------------------------------------------
diff --git a/etc/examples/notebooks/magic-tutorial.ipynb b/etc/examples/notebooks/magic-tutorial.ipynb
index 3d3ca28..e6d6a62 100644
--- a/etc/examples/notebooks/magic-tutorial.ipynb
+++ b/etc/examples/notebooks/magic-tutorial.ipynb
@@ -16,6 +16,7 @@
     "    1. [AddJar](#addjar)\n",
     "    1. [AddDeps](#adddeps)\n",
     "1. [Cell Magics](#cell-magics)\n",
+    "    1. [DataFrame](#dataframe)\n",
     "    1. [Html](#html)\n",
     "    1. [JavaScript](#javascript)\n",
     "    1. [PySpark](#pyspark)\n",
@@ -516,7 +517,7 @@
    "cell_type": "markdown",
    "metadata": {},
    "source": [
-    "### %%DataFrame\n",
+    "### %%DataFrame<a name=\"dataframe\"></a><span style=\"float: right; font-size: 0.5em\"><a href=\"#top\">Top</a></span>\n",
     "The `%%DataFrame` magic is used to convert a Spark SQL DataFrame into various formats. Currently, `json`, `html`, and `csv` are supported. The magic takes an expression, which evauluates to a dataframe, to perform the conversion. So, we first need to create a DataFrame object for reference."
    ]
   },