You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2021/12/14 12:52:31 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1515] Add RowSet for trino engine

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new c1a0ce8  [KYUUBI #1515] Add RowSet for trino engine
c1a0ce8 is described below

commit c1a0ce8e0174cd442cd8f0edc464d595c281648a
Author: hongdongdong <ho...@cmss.chinamobile.com>
AuthorDate: Tue Dec 14 20:52:20 2021 +0800

    [KYUUBI #1515] Add RowSet for trino engine
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    Add RowSet for trino engine
    
    ### _How was this patch tested?_
    - [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [X] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #1549 from hddong/add-rowset-for-trino.
    
    Closes #1515
    
    cc885130 [hongdongdong] remove timeZone
    2fb10934 [hongdongdong] fix toHiveString
    9066caff [hongdongdong] rm optionalStart
    491b7682 [hongdongdong] Reset
    27895ebe [hongdongdong] fix RowSetUtils
    7ce4edeb [hongdongdong] fix
    34dd01b8 [hongdongdong] move functions to Utils
    79e0621a [hongdongdong] fix
    2dcd91e5 [hongdongdong] fix
    71bdcfd2 [hongdongdong] [KYUUBI #1515] Add RowSet for trino engine
    
    Authored-by: hongdongdong <ho...@cmss.chinamobile.com>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 externals/kyuubi-trino-engine/pom.xml              |  19 ++
 .../apache/kyuubi/engine/trino/schema/RowSet.scala | 282 +++++++++++++++++
 .../kyuubi/schema/engine/trino/RowSetSuite.scala   | 332 +++++++++++++++++++++
 .../scala/org/apache/kyuubi/util/RowSetUtils.scala |  29 ++
 pom.xml                                            |   7 +
 5 files changed, 669 insertions(+)

diff --git a/externals/kyuubi-trino-engine/pom.xml b/externals/kyuubi-trino-engine/pom.xml
index 7130855..788abc9 100644
--- a/externals/kyuubi-trino-engine/pom.xml
+++ b/externals/kyuubi-trino-engine/pom.xml
@@ -45,6 +45,25 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>io.trino</groupId>
+            <artifactId>trino-client</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
 </project>
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/schema/RowSet.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/schema/RowSet.scala
new file mode 100644
index 0000000..b890cdf
--- /dev/null
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/schema/RowSet.scala
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.trino.schema
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import io.trino.client.ClientStandardTypes._
+import io.trino.client.Column
+import io.trino.client.Row
+import org.apache.hive.service.rpc.thrift.TBinaryColumn
+import org.apache.hive.service.rpc.thrift.TBoolColumn
+import org.apache.hive.service.rpc.thrift.TBoolValue
+import org.apache.hive.service.rpc.thrift.TByteColumn
+import org.apache.hive.service.rpc.thrift.TByteValue
+import org.apache.hive.service.rpc.thrift.TColumn
+import org.apache.hive.service.rpc.thrift.TColumnValue
+import org.apache.hive.service.rpc.thrift.TDoubleColumn
+import org.apache.hive.service.rpc.thrift.TDoubleValue
+import org.apache.hive.service.rpc.thrift.TI16Column
+import org.apache.hive.service.rpc.thrift.TI16Value
+import org.apache.hive.service.rpc.thrift.TI32Column
+import org.apache.hive.service.rpc.thrift.TI32Value
+import org.apache.hive.service.rpc.thrift.TI64Column
+import org.apache.hive.service.rpc.thrift.TI64Value
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.hive.service.rpc.thrift.TRow
+import org.apache.hive.service.rpc.thrift.TRowSet
+import org.apache.hive.service.rpc.thrift.TStringColumn
+import org.apache.hive.service.rpc.thrift.TStringValue
+
+import org.apache.kyuubi.util.RowSetUtils.bitSetToBuffer
+
+object RowSet {
+
+  def toTRowSet(
+      rows: Seq[List[_]],
+      schema: List[Column],
+      protocolVersion: TProtocolVersion): TRowSet = {
+    if (protocolVersion.getValue < TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
+      toRowBasedSet(rows, schema)
+    } else {
+      toColumnBasedSet(rows, schema)
+    }
+  }
+
+  def toRowBasedSet(rows: Seq[List[_]], schema: List[Column]): TRowSet = {
+    val tRows = rows.map { row =>
+      val tRow = new TRow()
+      (0 until row.size).map(i => toTColumnValue(i, row, schema))
+        .foreach(tRow.addToColVals)
+      tRow
+    }.asJava
+    new TRowSet(0, tRows)
+  }
+
+  def toColumnBasedSet(rows: Seq[List[_]], schema: List[Column]): TRowSet = {
+    val size = rows.size
+    val tRowSet = new TRowSet(0, new java.util.ArrayList[TRow](size))
+    schema.zipWithIndex.foreach { case (filed, i) =>
+      val tColumn = toTColumn(
+        rows,
+        i,
+        filed.getType)
+      tRowSet.addToColumns(tColumn)
+    }
+    tRowSet
+  }
+
+  private def toTColumn(
+      rows: Seq[Seq[Any]],
+      ordinal: Int,
+      typ: String): TColumn = {
+    val nulls = new java.util.BitSet()
+    typ match {
+      case BOOLEAN =>
+        val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true)
+        TColumn.boolVal(new TBoolColumn(values, nulls))
+
+      case TINYINT =>
+        val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 0.toByte)
+        TColumn.byteVal(new TByteColumn(values, nulls))
+
+      case SMALLINT =>
+        val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls, 0.toShort)
+        TColumn.i16Val(new TI16Column(values, nulls))
+
+      case INTEGER =>
+        val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0)
+        TColumn.i32Val(new TI32Column(values, nulls))
+
+      case BIGINT =>
+        val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L)
+        TColumn.i64Val(new TI64Column(values, nulls))
+
+      case REAL =>
+        val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 0.toFloat)
+          .asScala.map(n => java.lang.Double.valueOf(n.toDouble)).asJava
+        TColumn.doubleVal(new TDoubleColumn(values, nulls))
+
+      case DOUBLE =>
+        val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 0.toDouble)
+        TColumn.doubleVal(new TDoubleColumn(values, nulls))
+
+      case VARCHAR =>
+        val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
+        TColumn.stringVal(new TStringColumn(values, nulls))
+
+      case VARBINARY =>
+        val values = getOrSetAsNull[Array[Byte]](rows, ordinal, nulls, Array())
+          .asScala
+          .map(ByteBuffer.wrap)
+          .asJava
+        TColumn.binaryVal(new TBinaryColumn(values, nulls))
+
+      case _ =>
+        val values = rows.zipWithIndex.map { case (row, i) =>
+          nulls.set(i, row(ordinal) == null)
+          if (row(ordinal) == null) {
+            ""
+          } else {
+            toHiveString((row(ordinal), typ))
+          }
+        }.asJava
+        TColumn.stringVal(new TStringColumn(values, nulls))
+    }
+  }
+
+  private def getOrSetAsNull[T](
+      rows: Seq[Seq[Any]],
+      ordinal: Int,
+      nulls: java.util.BitSet,
+      defaultVal: T): java.util.List[T] = {
+    val size = rows.length
+    val ret = new java.util.ArrayList[T](size)
+    var idx = 0
+    while (idx < size) {
+      val row = rows(idx)
+      val isNull = row(ordinal) == null
+      if (isNull) {
+        nulls.set(idx, true)
+        ret.add(idx, defaultVal)
+      } else {
+        ret.add(idx, row(ordinal).asInstanceOf[T])
+      }
+      idx += 1
+    }
+    ret
+  }
+
+  private def toTColumnValue(
+      ordinal: Int,
+      row: List[Any],
+      types: List[Column]): TColumnValue = {
+
+    types(ordinal).getType match {
+      case BOOLEAN =>
+        val boolValue = new TBoolValue
+        if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean])
+        TColumnValue.boolVal(boolValue)
+
+      case TINYINT =>
+        val byteValue = new TByteValue
+        if (row(ordinal) != null) byteValue.setValue(row(ordinal).asInstanceOf[Byte])
+        TColumnValue.byteVal(byteValue)
+
+      case SMALLINT =>
+        val tI16Value = new TI16Value
+        if (row(ordinal) != null) tI16Value.setValue(row(ordinal).asInstanceOf[Short])
+        TColumnValue.i16Val(tI16Value)
+
+      case INTEGER =>
+        val tI32Value = new TI32Value
+        if (row(ordinal) != null) tI32Value.setValue(row(ordinal).asInstanceOf[Int])
+        TColumnValue.i32Val(tI32Value)
+
+      case BIGINT =>
+        val tI64Value = new TI64Value
+        if (row(ordinal) != null) tI64Value.setValue(row(ordinal).asInstanceOf[Long])
+        TColumnValue.i64Val(tI64Value)
+
+      case REAL =>
+        val tDoubleValue = new TDoubleValue
+        if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Float])
+        TColumnValue.doubleVal(tDoubleValue)
+
+      case DOUBLE =>
+        val tDoubleValue = new TDoubleValue
+        if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Double])
+        TColumnValue.doubleVal(tDoubleValue)
+
+      case VARCHAR =>
+        val tStringValue = new TStringValue
+        if (row(ordinal) != null) tStringValue.setValue(row(ordinal).asInstanceOf[String])
+        TColumnValue.stringVal(tStringValue)
+
+      case _ =>
+        val tStrValue = new TStringValue
+        if (row(ordinal) != null) {
+          tStrValue.setValue(
+            toHiveString((row(ordinal), types(ordinal).getType)))
+        }
+        TColumnValue.stringVal(tStrValue)
+    }
+  }
+
+  /**
+   * A simpler impl of Trino's toHiveString
+   */
+  def toHiveString(dataWithType: (Any, String)): String = {
+    dataWithType match {
+      case (null, _) =>
+        // Only match nulls in nested type values
+        "null"
+
+      case (bin: Array[Byte], VARBINARY) =>
+        new String(bin, StandardCharsets.UTF_8)
+
+      case (s: String, VARCHAR) =>
+        // Only match string in nested type values
+        "\"" + s + "\""
+
+      // for Array Map and Row, temporarily convert to string
+      // TODO further analysis of type
+      case (list: java.util.List[_], _) =>
+        formatValue(list)
+
+      case (m: java.util.Map[_, _], _) =>
+        formatValue(m)
+
+      case (row: Row, _) =>
+        formatValue(row)
+
+      case (other, _) =>
+        other.toString
+    }
+  }
+
+  def formatValue(o: Any): String = {
+    o match {
+      case null =>
+        "null"
+
+      case m: java.util.Map[_, _] =>
+        m.asScala.map { case (key, value) =>
+          formatValue(key) + ":" + formatValue(value)
+        }.toSeq.sorted.mkString("{", ",", "}")
+
+      case l: java.util.List[_] =>
+        l.asScala.map(formatValue).mkString("[", ",", "]")
+
+      case row: Row =>
+        row.getFields.asScala.map { r =>
+          val formattedValue = formatValue(r.getValue())
+          if (r.getName.isPresent) {
+            r.getName.get() + "=" + formattedValue
+          } else {
+            formattedValue
+          }
+        }.mkString("{", ",", "}")
+
+      case _ => o.toString
+    }
+  }
+}
diff --git a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/schema/engine/trino/RowSetSuite.scala b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/schema/engine/trino/RowSetSuite.scala
new file mode 100644
index 0000000..2f580c5
--- /dev/null
+++ b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/schema/engine/trino/RowSetSuite.scala
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.schema.engine.trino
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.sql.Date
+import java.sql.Time
+
+import scala.collection.JavaConverters._
+
+import io.trino.client.ClientStandardTypes._
+import io.trino.client.ClientTypeSignature
+import io.trino.client.Column
+import io.trino.client.Row
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.engine.trino.schema.RowSet
+import org.apache.kyuubi.engine.trino.schema.RowSet.toHiveString
+
+class RowSetSuite extends KyuubiFunSuite {
+
+  final private val UUID_PREFIX = "486bb66f-1206-49e3-993f-0db68f3cd8"
+
+  def genRow(value: Int): List[_] = {
+    val boolVal = value % 3 match {
+      case 0 => true
+      case 1 => false
+      case _ => null
+    }
+    val byteVal = value.toByte
+    val shortVal = value.toShort
+    val longVal = value.toLong
+    val charVal = String.format("%10s", value.toString)
+    val floatVal = java.lang.Float.valueOf(s"$value.$value")
+    val doubleVal = java.lang.Double.valueOf(s"$value.$value")
+    val stringVal = value.toString * value
+    val decimalVal = new java.math.BigDecimal(s"$value.$value").toPlainString
+    val dayOrTime = java.lang.String.format("%02d", java.lang.Integer.valueOf(value + 1))
+    val dateVal = s"2018-11-$dayOrTime"
+    val timeVal = s"13:33:$dayOrTime"
+    val timestampVal = s"2018-11-17 13:33:33.$value"
+    val timestampWithZoneVal = s"2018-11-17 13:33:33.$value Asia/Shanghai"
+    val binaryVal = Array.fill[Byte](value)(value.toByte)
+    val arrVal = Array.fill(value)(doubleVal).toList.asJava
+    val mapVal = Map(value -> doubleVal).asJava
+    val jsonVal = s"""{"$value": $value}"""
+    val rowVal = Row.builder().addField(value.toString, value).build()
+    val ipVal = s"${value}.${value}.${value}.${value}"
+    val uuidVal = java.util.UUID.fromString(
+      s"$UUID_PREFIX${uuidSuffix(value)}")
+
+    List(
+      longVal,
+      value,
+      shortVal,
+      byteVal,
+      boolVal,
+      dateVal,
+      decimalVal,
+      floatVal,
+      doubleVal,
+      timestampVal,
+      timestampWithZoneVal,
+      timeVal,
+      binaryVal,
+      stringVal,
+      charVal,
+      rowVal,
+      arrVal,
+      mapVal,
+      jsonVal,
+      ipVal,
+      uuidVal)
+  }
+
+  val schema: List[Column] = List(
+    column("a", BIGINT),
+    column("b", INTEGER),
+    column("c", SMALLINT),
+    column("d", TINYINT),
+    column("e", BOOLEAN),
+    column("f", DATE),
+    column("g", DECIMAL),
+    column("h", REAL),
+    column("i", DOUBLE),
+    column("j", TIMESTAMP),
+    column("k", TIMESTAMP_WITH_TIME_ZONE),
+    column("l", TIME),
+    column("m", VARBINARY),
+    column("n", VARCHAR),
+    column("o", CHAR),
+    column("p", ROW),
+    column("q", ARRAY),
+    column("r", MAP),
+    column("s", JSON),
+    column("t", IPADDRESS),
+    column("u", UUID))
+
+  private val rows: Seq[List[_]] = (0 to 10).map(genRow) ++ Seq(List.fill(21)(null))
+
+  def column(name: String, tp: String): Column = new Column(name, tp, new ClientTypeSignature(tp))
+
+  def uuidSuffix(value: Int): String = if (value > 9) value.toString else s"f$value"
+
+  test("column based set") {
+    val tRowSet = RowSet.toColumnBasedSet(rows, schema)
+    assert(tRowSet.getColumns.size() === schema.size)
+    assert(tRowSet.getRowsSize === 0)
+
+    val cols = tRowSet.getColumns.iterator()
+
+    val longCol = cols.next().getI64Val
+    longCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b === 0)
+      case (b, i) => assert(b === i)
+    }
+
+    val intCol = cols.next().getI32Val
+    intCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b === 0)
+      case (b, i) => assert(b === i)
+    }
+
+    val shortCol = cols.next().getI16Val
+    shortCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b === 0)
+      case (b, i) => assert(b === i)
+    }
+
+    val byteCol = cols.next().getByteVal
+    byteCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b === 0)
+      case (b, i) => assert(b === i)
+    }
+
+    val boolCol = cols.next().getBoolVal
+    assert(boolCol.getValuesSize === rows.size)
+    boolCol.getValues.asScala.zipWithIndex.foreach { case (b, i) =>
+      i % 3 match {
+        case 0 => assert(b)
+        case 1 => assert(!b)
+        case _ => assert(b)
+      }
+    }
+
+    val dateCol = cols.next().getStringVal
+    dateCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b.isEmpty)
+      case (b, i) =>
+        assert(b === toHiveString((Date.valueOf(s"2018-11-${i + 1}"), DATE)))
+    }
+
+    val decCol = cols.next().getStringVal
+    decCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b.isEmpty)
+      case (b, i) => assert(b === s"$i.$i")
+    }
+
+    val floatCol = cols.next().getDoubleVal
+    floatCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b === 0)
+      case (b, i) => assert(b === java.lang.Float.valueOf(s"$i.$i"))
+    }
+
+    val doubleCol = cols.next().getDoubleVal
+    doubleCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b === 0)
+      case (b, i) => assert(b === java.lang.Double.valueOf(s"$i.$i"))
+    }
+
+    val timestampCol = cols.next().getStringVal
+    timestampCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b.isEmpty)
+      case (b, i) => assert(b === s"2018-11-17 13:33:33.$i")
+    }
+
+    val timestampWithZoneCol = cols.next().getStringVal
+    timestampWithZoneCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b.isEmpty)
+      case (b, i) => assert(b === s"2018-11-17 13:33:33.$i Asia/Shanghai")
+    }
+
+    val timeCol = cols.next().getStringVal
+    timeCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b.isEmpty)
+      case (b, i) => assert(b === toHiveString((Time.valueOf(s"13:33:${i + 1}"), TIME)))
+    }
+
+    val binCol = cols.next().getBinaryVal
+    binCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b === ByteBuffer.allocate(0))
+      case (b, i) => assert(b === ByteBuffer.wrap(Array.fill[Byte](i)(i.toByte)))
+    }
+
+    val strCol = cols.next().getStringVal
+    strCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b.isEmpty)
+      case (b, i) => assert(b === i.toString * i)
+    }
+
+    val charCol = cols.next().getStringVal
+    charCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b.isEmpty)
+      case (b, i) => assert(b === String.format("%10s", i.toString))
+    }
+
+    val rowCol = cols.next().getStringVal
+    rowCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b.isEmpty)
+      case (b, i) => assert(b ===
+          toHiveString((Row.builder().addField(i.toString, i).build(), ROW)))
+    }
+
+    val arrCol = cols.next().getStringVal
+    arrCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b === "")
+      case (b, i) => assert(b === toHiveString(
+          (Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toList.asJava, ARRAY)))
+    }
+
+    val mapCol = cols.next().getStringVal
+    mapCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b === "")
+      case (b, i) => assert(b === toHiveString(
+          (Map(i -> java.lang.Double.valueOf(s"$i.$i")).asJava, MAP)))
+    }
+
+    val jsonCol = cols.next().getStringVal
+    jsonCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b === "")
+      case (b, i) => assert(b ===
+          toHiveString((s"""{"$i": $i}""", JSON)))
+    }
+
+    val ipCol = cols.next().getStringVal
+    ipCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b === "")
+      case (b, i) => assert(b ===
+          toHiveString((s"${i}.${i}.${i}.${i}", IPADDRESS)))
+    }
+
+    val uuidCol = cols.next().getStringVal
+    uuidCol.getValues.asScala.zipWithIndex.foreach {
+      case (b, 11) => assert(b === "")
+      case (b, i) => assert(b ===
+          toHiveString((s"$UUID_PREFIX${uuidSuffix(i)}", UUID)))
+    }
+  }
+
+  test("row based set") {
+    val tRowSet = RowSet.toRowBasedSet(rows, schema)
+    assert(tRowSet.getColumnCount === 0)
+    assert(tRowSet.getRowsSize === rows.size)
+    val iter = tRowSet.getRowsIterator
+
+    val r1 = iter.next().getColVals
+    assert(r1.get(0).getI64Val.getValue === 0)
+    assert(r1.get(4).getBoolVal.isValue)
+
+    val r2 = iter.next().getColVals
+    assert(r2.get(1).getI32Val.getValue === 1)
+    assert(!r2.get(4).getBoolVal.isValue)
+
+    val r3 = iter.next().getColVals
+    assert(r3.get(2).getI16Val.getValue == 2)
+    assert(!r3.get(4).getBoolVal.isValue)
+
+    val r4 = iter.next().getColVals
+    assert(r4.get(3).getByteVal.getValue == 3)
+
+    val r5 = iter.next().getColVals
+    assert(r5.get(5).getStringVal.getValue === "2018-11-05")
+    assert(r5.get(6).getStringVal.getValue === "4.4")
+
+    val r6 = iter.next().getColVals
+    assert(r6.get(7).getDoubleVal.getValue === 5.5)
+    assert(r6.get(8).getDoubleVal.getValue === 5.5)
+
+    val r7 = iter.next().getColVals
+    assert(r7.get(9).getStringVal.getValue === "2018-11-17 13:33:33.6")
+    assert(r7.get(10).getStringVal.getValue === "2018-11-17 13:33:33.6 Asia/Shanghai")
+
+    val r8 = iter.next().getColVals
+    assert(r8.get(11).getStringVal.getValue === "13:33:08")
+
+    val r9 = iter.next().getColVals
+    assert(r9.get(12).getStringVal.getValue === new String(
+      Array.fill[Byte](8)(8.toByte),
+      StandardCharsets.UTF_8))
+    assert(r9.get(13).getStringVal.getValue === "8" * 8)
+    assert(r9.get(14).getStringVal.getValue === String.format(s"%10s", 8.toString))
+
+    val r10 = iter.next().getColVals
+    assert(r10.get(15).getStringVal.getValue ===
+      toHiveString((Row.builder().addField(9.toString, 9).build(), ROW)))
+    assert(r10.get(16).getStringVal.getValue === Array.fill(9)(9.9d).mkString("[", ",", "]"))
+    assert(r10.get(17).getStringVal.getValue === toHiveString((Map(9 -> 9.9d).asJava, MAP)))
+    assert(r10.get(18).getStringVal.getValue === "{\"9\": 9}")
+    assert(r10.get(19).getStringVal.getValue === "9.9.9.9")
+    assert(r10.get(20).getStringVal.getValue === s"$UUID_PREFIX${uuidSuffix(9)}")
+  }
+
+  test("to row set") {
+    TProtocolVersion.values().foreach { proto =>
+      val set = RowSet.toTRowSet(rows, schema, proto)
+      if (proto.getValue < TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
+        assert(!set.isSetColumns, proto.toString)
+        assert(set.isSetRows, proto.toString)
+      } else {
+        assert(set.isSetColumns, proto.toString)
+        assert(set.isSetRows, proto.toString)
+      }
+    }
+  }
+}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
new file mode 100644
index 0000000..5cb421c
--- /dev/null
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.util
+
+import java.nio.ByteBuffer
+
+import scala.language.implicitConversions
+
+object RowSetUtils {
+
+  implicit def bitSetToBuffer(bitSet: java.util.BitSet): ByteBuffer = {
+    ByteBuffer.wrap(bitSet.toByteArray)
+  }
+}
diff --git a/pom.xml b/pom.xml
index f0b36ba..65dbda9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,6 +136,7 @@
         <swagger.version>2.1.11</swagger.version>
         <swagger.scala.module.version>2.5.2</swagger.scala.module.version>
         <swagger-ui.version>4.1.0</swagger-ui.version>
+        <trino.client.version>363</trino.client.version>
         <zookeeper.version>3.4.14</zookeeper.version>
 
         <!-- apply to kyuubi-hive-jdbc/kyuubi-hive-beeline module -->
@@ -445,6 +446,12 @@
             </dependency>
 
             <dependency>
+                <groupId>io.trino</groupId>
+                <artifactId>trino-client</artifactId>
+                <version>${trino.client.version}</version>
+            </dependency>
+
+            <dependency>
                 <groupId>io.fabric8</groupId>
                 <artifactId>kubernetes-client</artifactId>
                 <version>${kubernetes-client.version}</version>