You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by mg...@apache.org on 2019/09/05 12:26:18 UTC

[incubator-livy] branch master updated: [LIVY-640] Add tests for ThriftServer

This is an automated email from the ASF dual-hosted git repository.

mgaido pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cdb7b2  [LIVY-640] Add tests for ThriftServer
3cdb7b2 is described below

commit 3cdb7b2584dc97bc17e01721b6be38c3b137a2e1
Author: micahzhao <mi...@tencent.com>
AuthorDate: Thu Sep 5 14:26:02 2019 +0200

    [LIVY-640] Add tests for ThriftServer
    
    ## What changes were proposed in this pull request?
    
    1、Added some tests in BinaryThriftServerSuite that ThriftServer is currently missing(The tests was Moved from spark ThriftServer).
    2、Set ENABLE_HIVE_CONTEXT=true in BinaryThriftServerSuite to support the creation of Hive tables and the creation of udf in test.
    3、Upgrade spark2.2.0 to 2.2.3. To resolve the issue that session cannot be created during Travis test when ENABLE_HIVE_CONTEXT is set to true
    4、Travis ITs is divided into two parts to avoid test timeout problems.
    
    ## How was this patch tested?
    
    Test with Travis and see the results
    ![image](https://user-images.githubusercontent.com/13825159/63577482-10a32c80-c5c1-11e9-868a-0aab4b1af743.png)
    
    Author: micahzhao <mi...@tencent.com>
    
    Closes #209 from captainzmc/add-thrift-test.
---
 pom.xml                                            |   6 +-
 .../livy/thriftserver/ThriftServerSuites.scala     | 209 ++++++++++++++-------
 2 files changed, 143 insertions(+), 72 deletions(-)

diff --git a/pom.xml b/pom.xml
index f98071c..fa1600d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,7 +80,7 @@
   <properties>
     <hadoop.version>2.7.3</hadoop.version>
     <hadoop.scope>compile</hadoop.scope>
-    <spark.scala-2.11.version>2.2.0</spark.scala-2.11.version>
+    <spark.scala-2.11.version>2.2.3</spark.scala-2.11.version>
     <spark.version>${spark.scala-2.11.version}</spark.version>
     <hive.version>3.0.0</hive.version>
     <commons-codec.version>1.9</commons-codec.version>
@@ -109,9 +109,9 @@
     <execution.root>${user.dir}</execution.root>
     <spark.home>${execution.root}/dev/spark</spark.home>
     <spark.bin.download.url>
-      https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz
+      https://archive.apache.org/dist/spark/spark-2.2.3/spark-2.2.3-bin-hadoop2.7.tgz
     </spark.bin.download.url>
-    <spark.bin.name>spark-2.2.0-bin-hadoop2.7</spark.bin.name>
+    <spark.bin.name>spark-2.2.3-bin-hadoop2.7</spark.bin.name>
     <!--  used for testing, NCSARequestLog use it for access log  -->
     <livy.log.dir>${basedir}/target</livy.log.dir>
 
diff --git a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
index 48750da..1939a56 100644
--- a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
+++ b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
@@ -19,6 +19,9 @@ package org.apache.livy.thriftserver
 
 import java.sql.{Connection, Date, SQLException, Statement, Types}
 
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+
 import org.apache.hive.jdbc.HiveStatement
 
 import org.apache.livy.LivyConf
@@ -31,7 +34,8 @@ trait CommonThriftTests {
 
   def dataTypesTest(statement: Statement, mapSupported: Boolean): Unit = {
     val resultSet = statement.executeQuery(
-      "select 1, 'a', cast(null as int), 1.2345, CAST('2018-08-06' as date)")
+      "select 1, 'a', cast(null as int), 1.2345, CAST('2018-08-06' as date), " +
+        "CAST('123' as BINARY)")
     resultSet.next()
     assert(resultSet.getInt(1) == 1)
     assert(resultSet.getString(2) == "a")
@@ -39,6 +43,9 @@ trait CommonThriftTests {
     assert(resultSet.wasNull())
     assert(resultSet.getDouble(4) == 1.2345)
     assert(resultSet.getDate(5) == Date.valueOf("2018-08-06"))
+    val resultBytes = Source.fromInputStream(resultSet.getBinaryStream(6))
+      .map(_.toByte).toArray
+    assert("123".getBytes.sameElements(resultBytes))
     assert(!resultSet.next())
 
     val resultSetWithNulls = statement.executeQuery("select cast(null as string), " +
@@ -103,81 +110,90 @@ trait CommonThriftTests {
 
   def getTablesTest(connection: Connection): Unit = {
     val statement = connection.createStatement()
-    statement.execute("CREATE TABLE test_get_tables (id integer, desc string) USING json")
-    statement.close()
-
-    val metadata = connection.getMetaData
-    val tablesResultSet = metadata.getTables("", "default", "*", Array("TABLE"))
-    assert(tablesResultSet.getMetaData.getColumnCount == 5)
-    assert(tablesResultSet.getMetaData.getColumnName(1) == "TABLE_CAT")
-    assert(tablesResultSet.getMetaData.getColumnName(2) == "TABLE_SCHEM")
-    assert(tablesResultSet.getMetaData.getColumnName(3) == "TABLE_NAME")
-    assert(tablesResultSet.getMetaData.getColumnName(4) == "TABLE_TYPE")
-    assert(tablesResultSet.getMetaData.getColumnName(5) == "REMARKS")
+    try {
+      statement.execute("CREATE TABLE test_get_tables (id integer, desc string) USING json")
+      val metadata = connection.getMetaData
+      val tablesResultSet = metadata.getTables("", "default", "*", Array("TABLE"))
+      assert(tablesResultSet.getMetaData.getColumnCount == 5)
+      assert(tablesResultSet.getMetaData.getColumnName(1) == "TABLE_CAT")
+      assert(tablesResultSet.getMetaData.getColumnName(2) == "TABLE_SCHEM")
+      assert(tablesResultSet.getMetaData.getColumnName(3) == "TABLE_NAME")
+      assert(tablesResultSet.getMetaData.getColumnName(4) == "TABLE_TYPE")
+      assert(tablesResultSet.getMetaData.getColumnName(5) == "REMARKS")
+
+      tablesResultSet.next()
+      assert(tablesResultSet.getString(3) == "test_get_tables")
+      assert(tablesResultSet.getString(4) == "TABLE")
+      assert(!tablesResultSet.next())
+    } finally {
+      statement.execute("DROP TABLE IF EXISTS test_get_tables")
+      statement.close()
+    }
 
-    tablesResultSet.next()
-    assert(tablesResultSet.getString(3) == "test_get_tables")
-    assert(tablesResultSet.getString(4) == "TABLE")
-    assert(!tablesResultSet.next())
   }
 
   def getColumnsTest(connection: Connection): Unit = {
     val metadata = connection.getMetaData
     val statement = connection.createStatement()
-    statement.execute("CREATE TABLE test_get_columns (id integer, desc string) USING json")
-    statement.close()
+    try {
+      statement.execute("CREATE TABLE test_get_columns (id integer, desc string) USING json")
+
+      val columnsResultSet = metadata.getColumns("", "default", "test_get_columns", ".*")
+      assert(columnsResultSet.getMetaData.getColumnCount == 23)
+      columnsResultSet.next()
+      assert(columnsResultSet.getString(1) == "")
+      assert(columnsResultSet.getString(2) == "default")
+      assert(columnsResultSet.getString(3) == "test_get_columns")
+      assert(columnsResultSet.getString(4) == "id")
+      assert(columnsResultSet.getInt(5) == 4)
+      assert(columnsResultSet.getString(6) == "integer")
+      assert(columnsResultSet.getInt(7) == 10)
+      assert(columnsResultSet.getString(8) == null)
+      assert(columnsResultSet.getInt(9) == 0)
+      assert(columnsResultSet.getInt(10) == 10)
+      assert(columnsResultSet.getInt(11) == 1)
+      assert(columnsResultSet.getString(12) == "")
+      assert(columnsResultSet.getString(13) == null)
+      assert(columnsResultSet.getString(14) == null)
+      assert(columnsResultSet.getString(15) == null)
+      assert(columnsResultSet.getString(15) == null)
+      assert(columnsResultSet.getInt(17) == 0)
+      assert(columnsResultSet.getString(18) == "YES")
+      assert(columnsResultSet.getString(19) == null)
+      assert(columnsResultSet.getString(20) == null)
+      assert(columnsResultSet.getString(21) == null)
+      assert(columnsResultSet.getString(22) == null)
+      assert(columnsResultSet.getString(23) == "NO")
+      columnsResultSet.next()
+      assert(columnsResultSet.getString(1) == "")
+      assert(columnsResultSet.getString(2) == "default")
+      assert(columnsResultSet.getString(3) == "test_get_columns")
+      assert(columnsResultSet.getString(4) == "desc")
+      assert(columnsResultSet.getInt(5) == 12)
+      assert(columnsResultSet.getString(6) == "string")
+      assert(columnsResultSet.getInt(7) == Integer.MAX_VALUE)
+      assert(columnsResultSet.getString(8) == null)
+      assert(columnsResultSet.getString(9) == null)
+      assert(columnsResultSet.getString(10) == null)
+      assert(columnsResultSet.getInt(11) == 1)
+      assert(columnsResultSet.getString(12) == "")
+      assert(columnsResultSet.getString(13) == null)
+      assert(columnsResultSet.getString(14) == null)
+      assert(columnsResultSet.getString(15) == null)
+      assert(columnsResultSet.getString(16) == null)
+      assert(columnsResultSet.getInt(17) == 1)
+      assert(columnsResultSet.getString(18) == "YES")
+      assert(columnsResultSet.getString(19) == null)
+      assert(columnsResultSet.getString(20) == null)
+      assert(columnsResultSet.getString(21) == null)
+      assert(columnsResultSet.getString(22) == null)
+      assert(columnsResultSet.getString(23) == "NO")
+      assert(!columnsResultSet.next())
+    } finally {
+      statement.execute("DROP TABLE IF EXISTS test_get_columns")
+      statement.close()
+    }
 
-    val columnsResultSet = metadata.getColumns("", "default", "test_get_columns", ".*")
-    assert(columnsResultSet.getMetaData.getColumnCount == 23)
-    columnsResultSet.next()
-    assert(columnsResultSet.getString(1) == "")
-    assert(columnsResultSet.getString(2) == "default")
-    assert(columnsResultSet.getString(3) == "test_get_columns")
-    assert(columnsResultSet.getString(4) == "id")
-    assert(columnsResultSet.getInt(5) == 4)
-    assert(columnsResultSet.getString(6) == "integer")
-    assert(columnsResultSet.getInt(7) == 10)
-    assert(columnsResultSet.getString(8) == null)
-    assert(columnsResultSet.getInt(9) == 0)
-    assert(columnsResultSet.getInt(10) == 10)
-    assert(columnsResultSet.getInt(11) == 1)
-    assert(columnsResultSet.getString(12) == "")
-    assert(columnsResultSet.getString(13) == null)
-    assert(columnsResultSet.getString(14) == null)
-    assert(columnsResultSet.getString(15) == null)
-    assert(columnsResultSet.getString(15) == null)
-    assert(columnsResultSet.getInt(17) == 0)
-    assert(columnsResultSet.getString(18) == "YES")
-    assert(columnsResultSet.getString(19) == null)
-    assert(columnsResultSet.getString(20) == null)
-    assert(columnsResultSet.getString(21) == null)
-    assert(columnsResultSet.getString(22) == null)
-    assert(columnsResultSet.getString(23) == "NO")
-    columnsResultSet.next()
-    assert(columnsResultSet.getString(1) == "")
-    assert(columnsResultSet.getString(2) == "default")
-    assert(columnsResultSet.getString(3) == "test_get_columns")
-    assert(columnsResultSet.getString(4) == "desc")
-    assert(columnsResultSet.getInt(5) == 12)
-    assert(columnsResultSet.getString(6) == "string")
-    assert(columnsResultSet.getInt(7) == Integer.MAX_VALUE)
-    assert(columnsResultSet.getString(8) == null)
-    assert(columnsResultSet.getString(9) == null)
-    assert(columnsResultSet.getString(10) == null)
-    assert(columnsResultSet.getInt(11) == 1)
-    assert(columnsResultSet.getString(12) == "")
-    assert(columnsResultSet.getString(13) == null)
-    assert(columnsResultSet.getString(14) == null)
-    assert(columnsResultSet.getString(15) == null)
-    assert(columnsResultSet.getString(16) == null)
-    assert(columnsResultSet.getInt(17) == 1)
-    assert(columnsResultSet.getString(18) == "YES")
-    assert(columnsResultSet.getString(19) == null)
-    assert(columnsResultSet.getString(20) == null)
-    assert(columnsResultSet.getString(21) == null)
-    assert(columnsResultSet.getString(22) == null)
-    assert(columnsResultSet.getString(23) == "NO")
-    assert(!columnsResultSet.next())
   }
 
   def operationLogRetrievalTest(statement: Statement): Unit = {
@@ -260,6 +276,61 @@ class BinaryThriftServerSuite extends ThriftServerBaseTest with CommonThriftTest
   override def mode: ServerMode.Value = ServerMode.binary
   override def port: Int = 20000
 
+  test("test multiple session") {
+    var defaultV1: String = null
+    var defaultV2: String = null
+    var data: ArrayBuffer[Int] = null
+
+    // first session, we get the default value of the session status
+    withJdbcStatement { statement =>
+      val rs1 = statement.executeQuery("SET spark.sql.shuffle.partitions")
+      rs1.next()
+      assert("spark.sql.shuffle.partitions" === rs1.getString(1))
+      defaultV1 = rs1.getString(2)
+      rs1.close()
+      val rs2 = statement.executeQuery("SET hive.cli.print.header")
+      rs2.next()
+      assert("hive.cli.print.header" === rs2.getString(1))
+      defaultV2 = rs2.getString(2)
+      rs2.close()
+    }
+
+    // second session, we update the session status
+    withJdbcStatement { statement =>
+      val queries = Seq(
+        "SET spark.sql.shuffle.partitions=291",
+        "SET hive.cli.print.header=true"
+      )
+      queries.map(statement.execute)
+      val rs1 = statement.executeQuery("SET spark.sql.shuffle.partitions")
+      rs1.next()
+      assert("spark.sql.shuffle.partitions" === rs1.getString(1))
+      assert("291" === rs1.getString(2))
+      rs1.close()
+      val rs2 = statement.executeQuery("SET hive.cli.print.header")
+      rs2.next()
+      assert("hive.cli.print.header" === rs2.getString(1))
+      assert("true" === rs2.getString(2))
+      rs2.close()
+    }
+
+    // third session, we get the latest session status, supposed to be the
+    // default value
+    withJdbcStatement { statement =>
+      val rs1 = statement.executeQuery("SET spark.sql.shuffle.partitions")
+      rs1.next()
+      assert("spark.sql.shuffle.partitions" === rs1.getString(1))
+      assert(defaultV1 === rs1.getString(2))
+      rs1.close()
+      val rs2 = statement.executeQuery("SET hive.cli.print.header")
+      rs2.next()
+      assert("hive.cli.print.header" === rs2.getString(1))
+      assert(defaultV2 === rs2.getString(2))
+      rs2.close()
+    }
+
+  }
+
   test("Reuse existing session") {
     withJdbcConnection { _ =>
       val sessionManager = LivyThriftServer.getInstance.get.getSessionManager