You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2021/12/14 12:52:31 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #1515] Add RowSet for trino engine
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new c1a0ce8 [KYUUBI #1515] Add RowSet for trino engine
c1a0ce8 is described below
commit c1a0ce8e0174cd442cd8f0edc464d595c281648a
Author: hongdongdong <ho...@cmss.chinamobile.com>
AuthorDate: Tue Dec 14 20:52:20 2021 +0800
[KYUUBI #1515] Add RowSet for trino engine
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
Add RowSet for trino engine
### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [X] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #1549 from hddong/add-rowset-for-trino.
Closes #1515
cc885130 [hongdongdong] remove timeZone
2fb10934 [hongdongdong] fix toHiveString
9066caff [hongdongdong] rm optionalStart
491b7682 [hongdongdong] Reset
27895ebe [hongdongdong] fix RowSetUtils
7ce4edeb [hongdongdong] fix
34dd01b8 [hongdongdong] move functions to Utils
79e0621a [hongdongdong] fix
2dcd91e5 [hongdongdong] fix
71bdcfd2 [hongdongdong] [KYUUBI #1515] Add RowSet for trino engine
Authored-by: hongdongdong <ho...@cmss.chinamobile.com>
Signed-off-by: Kent Yao <ya...@apache.org>
---
externals/kyuubi-trino-engine/pom.xml | 19 ++
.../apache/kyuubi/engine/trino/schema/RowSet.scala | 282 +++++++++++++++++
.../kyuubi/schema/engine/trino/RowSetSuite.scala | 332 +++++++++++++++++++++
.../scala/org/apache/kyuubi/util/RowSetUtils.scala | 29 ++
pom.xml | 7 +
5 files changed, 669 insertions(+)
diff --git a/externals/kyuubi-trino-engine/pom.xml b/externals/kyuubi-trino-engine/pom.xml
index 7130855..788abc9 100644
--- a/externals/kyuubi-trino-engine/pom.xml
+++ b/externals/kyuubi-trino-engine/pom.xml
@@ -45,6 +45,25 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.trino</groupId>
+ <artifactId>trino-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/schema/RowSet.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/schema/RowSet.scala
new file mode 100644
index 0000000..b890cdf
--- /dev/null
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/schema/RowSet.scala
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.trino.schema
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import io.trino.client.ClientStandardTypes._
+import io.trino.client.Column
+import io.trino.client.Row
+import org.apache.hive.service.rpc.thrift.TBinaryColumn
+import org.apache.hive.service.rpc.thrift.TBoolColumn
+import org.apache.hive.service.rpc.thrift.TBoolValue
+import org.apache.hive.service.rpc.thrift.TByteColumn
+import org.apache.hive.service.rpc.thrift.TByteValue
+import org.apache.hive.service.rpc.thrift.TColumn
+import org.apache.hive.service.rpc.thrift.TColumnValue
+import org.apache.hive.service.rpc.thrift.TDoubleColumn
+import org.apache.hive.service.rpc.thrift.TDoubleValue
+import org.apache.hive.service.rpc.thrift.TI16Column
+import org.apache.hive.service.rpc.thrift.TI16Value
+import org.apache.hive.service.rpc.thrift.TI32Column
+import org.apache.hive.service.rpc.thrift.TI32Value
+import org.apache.hive.service.rpc.thrift.TI64Column
+import org.apache.hive.service.rpc.thrift.TI64Value
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.hive.service.rpc.thrift.TRow
+import org.apache.hive.service.rpc.thrift.TRowSet
+import org.apache.hive.service.rpc.thrift.TStringColumn
+import org.apache.hive.service.rpc.thrift.TStringValue
+
+import org.apache.kyuubi.util.RowSetUtils.bitSetToBuffer
+
+object RowSet {
+
+ def toTRowSet(
+ rows: Seq[List[_]],
+ schema: List[Column],
+ protocolVersion: TProtocolVersion): TRowSet = {
+ if (protocolVersion.getValue < TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
+ toRowBasedSet(rows, schema)
+ } else {
+ toColumnBasedSet(rows, schema)
+ }
+ }
+
+ def toRowBasedSet(rows: Seq[List[_]], schema: List[Column]): TRowSet = {
+ val tRows = rows.map { row =>
+ val tRow = new TRow()
+ (0 until row.size).map(i => toTColumnValue(i, row, schema))
+ .foreach(tRow.addToColVals)
+ tRow
+ }.asJava
+ new TRowSet(0, tRows)
+ }
+
+ def toColumnBasedSet(rows: Seq[List[_]], schema: List[Column]): TRowSet = {
+ val size = rows.size
+ val tRowSet = new TRowSet(0, new java.util.ArrayList[TRow](size))
+ schema.zipWithIndex.foreach { case (filed, i) =>
+ val tColumn = toTColumn(
+ rows,
+ i,
+ filed.getType)
+ tRowSet.addToColumns(tColumn)
+ }
+ tRowSet
+ }
+
+ private def toTColumn(
+ rows: Seq[Seq[Any]],
+ ordinal: Int,
+ typ: String): TColumn = {
+ val nulls = new java.util.BitSet()
+ typ match {
+ case BOOLEAN =>
+ val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true)
+ TColumn.boolVal(new TBoolColumn(values, nulls))
+
+ case TINYINT =>
+ val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 0.toByte)
+ TColumn.byteVal(new TByteColumn(values, nulls))
+
+ case SMALLINT =>
+ val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls, 0.toShort)
+ TColumn.i16Val(new TI16Column(values, nulls))
+
+ case INTEGER =>
+ val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0)
+ TColumn.i32Val(new TI32Column(values, nulls))
+
+ case BIGINT =>
+ val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L)
+ TColumn.i64Val(new TI64Column(values, nulls))
+
+ case REAL =>
+ val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 0.toFloat)
+ .asScala.map(n => java.lang.Double.valueOf(n.toDouble)).asJava
+ TColumn.doubleVal(new TDoubleColumn(values, nulls))
+
+ case DOUBLE =>
+ val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 0.toDouble)
+ TColumn.doubleVal(new TDoubleColumn(values, nulls))
+
+ case VARCHAR =>
+ val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
+ TColumn.stringVal(new TStringColumn(values, nulls))
+
+ case VARBINARY =>
+ val values = getOrSetAsNull[Array[Byte]](rows, ordinal, nulls, Array())
+ .asScala
+ .map(ByteBuffer.wrap)
+ .asJava
+ TColumn.binaryVal(new TBinaryColumn(values, nulls))
+
+ case _ =>
+ val values = rows.zipWithIndex.map { case (row, i) =>
+ nulls.set(i, row(ordinal) == null)
+ if (row(ordinal) == null) {
+ ""
+ } else {
+ toHiveString((row(ordinal), typ))
+ }
+ }.asJava
+ TColumn.stringVal(new TStringColumn(values, nulls))
+ }
+ }
+
+ private def getOrSetAsNull[T](
+ rows: Seq[Seq[Any]],
+ ordinal: Int,
+ nulls: java.util.BitSet,
+ defaultVal: T): java.util.List[T] = {
+ val size = rows.length
+ val ret = new java.util.ArrayList[T](size)
+ var idx = 0
+ while (idx < size) {
+ val row = rows(idx)
+ val isNull = row(ordinal) == null
+ if (isNull) {
+ nulls.set(idx, true)
+ ret.add(idx, defaultVal)
+ } else {
+ ret.add(idx, row(ordinal).asInstanceOf[T])
+ }
+ idx += 1
+ }
+ ret
+ }
+
+ private def toTColumnValue(
+ ordinal: Int,
+ row: List[Any],
+ types: List[Column]): TColumnValue = {
+
+ types(ordinal).getType match {
+ case BOOLEAN =>
+ val boolValue = new TBoolValue
+ if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean])
+ TColumnValue.boolVal(boolValue)
+
+ case TINYINT =>
+ val byteValue = new TByteValue
+ if (row(ordinal) != null) byteValue.setValue(row(ordinal).asInstanceOf[Byte])
+ TColumnValue.byteVal(byteValue)
+
+ case SMALLINT =>
+ val tI16Value = new TI16Value
+ if (row(ordinal) != null) tI16Value.setValue(row(ordinal).asInstanceOf[Short])
+ TColumnValue.i16Val(tI16Value)
+
+ case INTEGER =>
+ val tI32Value = new TI32Value
+ if (row(ordinal) != null) tI32Value.setValue(row(ordinal).asInstanceOf[Int])
+ TColumnValue.i32Val(tI32Value)
+
+ case BIGINT =>
+ val tI64Value = new TI64Value
+ if (row(ordinal) != null) tI64Value.setValue(row(ordinal).asInstanceOf[Long])
+ TColumnValue.i64Val(tI64Value)
+
+ case REAL =>
+ val tDoubleValue = new TDoubleValue
+ if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Float])
+ TColumnValue.doubleVal(tDoubleValue)
+
+ case DOUBLE =>
+ val tDoubleValue = new TDoubleValue
+ if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Double])
+ TColumnValue.doubleVal(tDoubleValue)
+
+ case VARCHAR =>
+ val tStringValue = new TStringValue
+ if (row(ordinal) != null) tStringValue.setValue(row(ordinal).asInstanceOf[String])
+ TColumnValue.stringVal(tStringValue)
+
+ case _ =>
+ val tStrValue = new TStringValue
+ if (row(ordinal) != null) {
+ tStrValue.setValue(
+ toHiveString((row(ordinal), types(ordinal).getType)))
+ }
+ TColumnValue.stringVal(tStrValue)
+ }
+ }
+
+ /**
+ * A simpler impl of Trino's toHiveString
+ */
+ def toHiveString(dataWithType: (Any, String)): String = {
+ dataWithType match {
+ case (null, _) =>
+ // Only match nulls in nested type values
+ "null"
+
+ case (bin: Array[Byte], VARBINARY) =>
+ new String(bin, StandardCharsets.UTF_8)
+
+ case (s: String, VARCHAR) =>
+ // Only match string in nested type values
+ "\"" + s + "\""
+
+ // for Array Map and Row, temporarily convert to string
+ // TODO further analysis of type
+ case (list: java.util.List[_], _) =>
+ formatValue(list)
+
+ case (m: java.util.Map[_, _], _) =>
+ formatValue(m)
+
+ case (row: Row, _) =>
+ formatValue(row)
+
+ case (other, _) =>
+ other.toString
+ }
+ }
+
+ def formatValue(o: Any): String = {
+ o match {
+ case null =>
+ "null"
+
+ case m: java.util.Map[_, _] =>
+ m.asScala.map { case (key, value) =>
+ formatValue(key) + ":" + formatValue(value)
+ }.toSeq.sorted.mkString("{", ",", "}")
+
+ case l: java.util.List[_] =>
+ l.asScala.map(formatValue).mkString("[", ",", "]")
+
+ case row: Row =>
+ row.getFields.asScala.map { r =>
+ val formattedValue = formatValue(r.getValue())
+ if (r.getName.isPresent) {
+ r.getName.get() + "=" + formattedValue
+ } else {
+ formattedValue
+ }
+ }.mkString("{", ",", "}")
+
+ case _ => o.toString
+ }
+ }
+}
diff --git a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/schema/engine/trino/RowSetSuite.scala b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/schema/engine/trino/RowSetSuite.scala
new file mode 100644
index 0000000..2f580c5
--- /dev/null
+++ b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/schema/engine/trino/RowSetSuite.scala
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.schema.engine.trino
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.sql.Date
+import java.sql.Time
+
+import scala.collection.JavaConverters._
+
+import io.trino.client.ClientStandardTypes._
+import io.trino.client.ClientTypeSignature
+import io.trino.client.Column
+import io.trino.client.Row
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.engine.trino.schema.RowSet
+import org.apache.kyuubi.engine.trino.schema.RowSet.toHiveString
+
+class RowSetSuite extends KyuubiFunSuite {
+
+ final private val UUID_PREFIX = "486bb66f-1206-49e3-993f-0db68f3cd8"
+
+ def genRow(value: Int): List[_] = {
+ val boolVal = value % 3 match {
+ case 0 => true
+ case 1 => false
+ case _ => null
+ }
+ val byteVal = value.toByte
+ val shortVal = value.toShort
+ val longVal = value.toLong
+ val charVal = String.format("%10s", value.toString)
+ val floatVal = java.lang.Float.valueOf(s"$value.$value")
+ val doubleVal = java.lang.Double.valueOf(s"$value.$value")
+ val stringVal = value.toString * value
+ val decimalVal = new java.math.BigDecimal(s"$value.$value").toPlainString
+ val dayOrTime = java.lang.String.format("%02d", java.lang.Integer.valueOf(value + 1))
+ val dateVal = s"2018-11-$dayOrTime"
+ val timeVal = s"13:33:$dayOrTime"
+ val timestampVal = s"2018-11-17 13:33:33.$value"
+ val timestampWithZoneVal = s"2018-11-17 13:33:33.$value Asia/Shanghai"
+ val binaryVal = Array.fill[Byte](value)(value.toByte)
+ val arrVal = Array.fill(value)(doubleVal).toList.asJava
+ val mapVal = Map(value -> doubleVal).asJava
+ val jsonVal = s"""{"$value": $value}"""
+ val rowVal = Row.builder().addField(value.toString, value).build()
+ val ipVal = s"${value}.${value}.${value}.${value}"
+ val uuidVal = java.util.UUID.fromString(
+ s"$UUID_PREFIX${uuidSuffix(value)}")
+
+ List(
+ longVal,
+ value,
+ shortVal,
+ byteVal,
+ boolVal,
+ dateVal,
+ decimalVal,
+ floatVal,
+ doubleVal,
+ timestampVal,
+ timestampWithZoneVal,
+ timeVal,
+ binaryVal,
+ stringVal,
+ charVal,
+ rowVal,
+ arrVal,
+ mapVal,
+ jsonVal,
+ ipVal,
+ uuidVal)
+ }
+
+ val schema: List[Column] = List(
+ column("a", BIGINT),
+ column("b", INTEGER),
+ column("c", SMALLINT),
+ column("d", TINYINT),
+ column("e", BOOLEAN),
+ column("f", DATE),
+ column("g", DECIMAL),
+ column("h", REAL),
+ column("i", DOUBLE),
+ column("j", TIMESTAMP),
+ column("k", TIMESTAMP_WITH_TIME_ZONE),
+ column("l", TIME),
+ column("m", VARBINARY),
+ column("n", VARCHAR),
+ column("o", CHAR),
+ column("p", ROW),
+ column("q", ARRAY),
+ column("r", MAP),
+ column("s", JSON),
+ column("t", IPADDRESS),
+ column("u", UUID))
+
+ private val rows: Seq[List[_]] = (0 to 10).map(genRow) ++ Seq(List.fill(21)(null))
+
+ def column(name: String, tp: String): Column = new Column(name, tp, new ClientTypeSignature(tp))
+
+ def uuidSuffix(value: Int): String = if (value > 9) value.toString else s"f$value"
+
+ test("column based set") {
+ val tRowSet = RowSet.toColumnBasedSet(rows, schema)
+ assert(tRowSet.getColumns.size() === schema.size)
+ assert(tRowSet.getRowsSize === 0)
+
+ val cols = tRowSet.getColumns.iterator()
+
+ val longCol = cols.next().getI64Val
+ longCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b === 0)
+ case (b, i) => assert(b === i)
+ }
+
+ val intCol = cols.next().getI32Val
+ intCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b === 0)
+ case (b, i) => assert(b === i)
+ }
+
+ val shortCol = cols.next().getI16Val
+ shortCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b === 0)
+ case (b, i) => assert(b === i)
+ }
+
+ val byteCol = cols.next().getByteVal
+ byteCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b === 0)
+ case (b, i) => assert(b === i)
+ }
+
+ val boolCol = cols.next().getBoolVal
+ assert(boolCol.getValuesSize === rows.size)
+ boolCol.getValues.asScala.zipWithIndex.foreach { case (b, i) =>
+ i % 3 match {
+ case 0 => assert(b)
+ case 1 => assert(!b)
+ case _ => assert(b)
+ }
+ }
+
+ val dateCol = cols.next().getStringVal
+ dateCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b.isEmpty)
+ case (b, i) =>
+ assert(b === toHiveString((Date.valueOf(s"2018-11-${i + 1}"), DATE)))
+ }
+
+ val decCol = cols.next().getStringVal
+ decCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b.isEmpty)
+ case (b, i) => assert(b === s"$i.$i")
+ }
+
+ val floatCol = cols.next().getDoubleVal
+ floatCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b === 0)
+ case (b, i) => assert(b === java.lang.Float.valueOf(s"$i.$i"))
+ }
+
+ val doubleCol = cols.next().getDoubleVal
+ doubleCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b === 0)
+ case (b, i) => assert(b === java.lang.Double.valueOf(s"$i.$i"))
+ }
+
+ val timestampCol = cols.next().getStringVal
+ timestampCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b.isEmpty)
+ case (b, i) => assert(b === s"2018-11-17 13:33:33.$i")
+ }
+
+ val timestampWithZoneCol = cols.next().getStringVal
+ timestampWithZoneCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b.isEmpty)
+ case (b, i) => assert(b === s"2018-11-17 13:33:33.$i Asia/Shanghai")
+ }
+
+ val timeCol = cols.next().getStringVal
+ timeCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b.isEmpty)
+ case (b, i) => assert(b === toHiveString((Time.valueOf(s"13:33:${i + 1}"), TIME)))
+ }
+
+ val binCol = cols.next().getBinaryVal
+ binCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b === ByteBuffer.allocate(0))
+ case (b, i) => assert(b === ByteBuffer.wrap(Array.fill[Byte](i)(i.toByte)))
+ }
+
+ val strCol = cols.next().getStringVal
+ strCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b.isEmpty)
+ case (b, i) => assert(b === i.toString * i)
+ }
+
+ val charCol = cols.next().getStringVal
+ charCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b.isEmpty)
+ case (b, i) => assert(b === String.format("%10s", i.toString))
+ }
+
+ val rowCol = cols.next().getStringVal
+ rowCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b.isEmpty)
+ case (b, i) => assert(b ===
+ toHiveString((Row.builder().addField(i.toString, i).build(), ROW)))
+ }
+
+ val arrCol = cols.next().getStringVal
+ arrCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b === "")
+ case (b, i) => assert(b === toHiveString(
+ (Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toList.asJava, ARRAY)))
+ }
+
+ val mapCol = cols.next().getStringVal
+ mapCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b === "")
+ case (b, i) => assert(b === toHiveString(
+ (Map(i -> java.lang.Double.valueOf(s"$i.$i")).asJava, MAP)))
+ }
+
+ val jsonCol = cols.next().getStringVal
+ jsonCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b === "")
+ case (b, i) => assert(b ===
+ toHiveString((s"""{"$i": $i}""", JSON)))
+ }
+
+ val ipCol = cols.next().getStringVal
+ ipCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b === "")
+ case (b, i) => assert(b ===
+ toHiveString((s"${i}.${i}.${i}.${i}", IPADDRESS)))
+ }
+
+ val uuidCol = cols.next().getStringVal
+ uuidCol.getValues.asScala.zipWithIndex.foreach {
+ case (b, 11) => assert(b === "")
+ case (b, i) => assert(b ===
+ toHiveString((s"$UUID_PREFIX${uuidSuffix(i)}", UUID)))
+ }
+ }
+
+ test("row based set") {
+ val tRowSet = RowSet.toRowBasedSet(rows, schema)
+ assert(tRowSet.getColumnCount === 0)
+ assert(tRowSet.getRowsSize === rows.size)
+ val iter = tRowSet.getRowsIterator
+
+ val r1 = iter.next().getColVals
+ assert(r1.get(0).getI64Val.getValue === 0)
+ assert(r1.get(4).getBoolVal.isValue)
+
+ val r2 = iter.next().getColVals
+ assert(r2.get(1).getI32Val.getValue === 1)
+ assert(!r2.get(4).getBoolVal.isValue)
+
+ val r3 = iter.next().getColVals
+ assert(r3.get(2).getI16Val.getValue == 2)
+ assert(!r3.get(4).getBoolVal.isValue)
+
+ val r4 = iter.next().getColVals
+ assert(r4.get(3).getByteVal.getValue == 3)
+
+ val r5 = iter.next().getColVals
+ assert(r5.get(5).getStringVal.getValue === "2018-11-05")
+ assert(r5.get(6).getStringVal.getValue === "4.4")
+
+ val r6 = iter.next().getColVals
+ assert(r6.get(7).getDoubleVal.getValue === 5.5)
+ assert(r6.get(8).getDoubleVal.getValue === 5.5)
+
+ val r7 = iter.next().getColVals
+ assert(r7.get(9).getStringVal.getValue === "2018-11-17 13:33:33.6")
+ assert(r7.get(10).getStringVal.getValue === "2018-11-17 13:33:33.6 Asia/Shanghai")
+
+ val r8 = iter.next().getColVals
+ assert(r8.get(11).getStringVal.getValue === "13:33:08")
+
+ val r9 = iter.next().getColVals
+ assert(r9.get(12).getStringVal.getValue === new String(
+ Array.fill[Byte](8)(8.toByte),
+ StandardCharsets.UTF_8))
+ assert(r9.get(13).getStringVal.getValue === "8" * 8)
+ assert(r9.get(14).getStringVal.getValue === String.format(s"%10s", 8.toString))
+
+ val r10 = iter.next().getColVals
+ assert(r10.get(15).getStringVal.getValue ===
+ toHiveString((Row.builder().addField(9.toString, 9).build(), ROW)))
+ assert(r10.get(16).getStringVal.getValue === Array.fill(9)(9.9d).mkString("[", ",", "]"))
+ assert(r10.get(17).getStringVal.getValue === toHiveString((Map(9 -> 9.9d).asJava, MAP)))
+ assert(r10.get(18).getStringVal.getValue === "{\"9\": 9}")
+ assert(r10.get(19).getStringVal.getValue === "9.9.9.9")
+ assert(r10.get(20).getStringVal.getValue === s"$UUID_PREFIX${uuidSuffix(9)}")
+ }
+
+ test("to row set") {
+ TProtocolVersion.values().foreach { proto =>
+ val set = RowSet.toTRowSet(rows, schema, proto)
+ if (proto.getValue < TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
+ assert(!set.isSetColumns, proto.toString)
+ assert(set.isSetRows, proto.toString)
+ } else {
+ assert(set.isSetColumns, proto.toString)
+ assert(set.isSetRows, proto.toString)
+ }
+ }
+ }
+}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
new file mode 100644
index 0000000..5cb421c
--- /dev/null
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.util
+
+import java.nio.ByteBuffer
+
+import scala.language.implicitConversions
+
+object RowSetUtils {
+
+ implicit def bitSetToBuffer(bitSet: java.util.BitSet): ByteBuffer = {
+ ByteBuffer.wrap(bitSet.toByteArray)
+ }
+}
diff --git a/pom.xml b/pom.xml
index f0b36ba..65dbda9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,6 +136,7 @@
<swagger.version>2.1.11</swagger.version>
<swagger.scala.module.version>2.5.2</swagger.scala.module.version>
<swagger-ui.version>4.1.0</swagger-ui.version>
+ <trino.client.version>363</trino.client.version>
<zookeeper.version>3.4.14</zookeeper.version>
<!-- apply to kyuubi-hive-jdbc/kyuubi-hive-beeline module -->
@@ -445,6 +446,12 @@
</dependency>
<dependency>
+ <groupId>io.trino</groupId>
+ <artifactId>trino-client</artifactId>
+ <version>${trino.client.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>${kubernetes-client.version}</version>