You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by bo...@apache.org on 2023/10/23 14:21:29 UTC

[kyuubi] branch master updated: [KYUUBI #5382][JDBC] Duplication cleanup improvement in JdbcDialect and schema helpers

This is an automated email from the ASF dual-hosted git repository.

bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 81964803c [KYUUBI #5382][JDBC] Duplication cleanup improvement in JdbcDialect and schema helpers
81964803c is described below

commit 81964803c9ea0c9037d7f18422f875f8bb088ac9
Author: Fantasy-Jay <13...@163.com>
AuthorDate: Mon Oct 23 22:21:17 2023 +0800

    [KYUUBI #5382][JDBC] Duplication cleanup improvement in JdbcDialect and schema helpers
    
    ### _Why are the changes needed?_
    
    To close #5382.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    No
    
    Closes #5490 from zhuyaogai/issue-5382.
    
    Closes #5382
    
    4757445e7 [Fantasy-Jay] Remove unrelated comment.
    f68c7aa6c [Fantasy-Jay] Refactor JDBC engine to reduce to code duplication.
    4ad6b3c53 [Fantasy-Jay] Refactor JDBC engine to reduce to code duplication.
    
    Authored-by: Fantasy-Jay <13...@163.com>
    Signed-off-by: liangbowen <li...@gf.com.cn>
---
 .../kyuubi/engine/jdbc/dialect/DorisDialect.scala  |  35 +---
 .../kyuubi/engine/jdbc/dialect/JdbcDialect.scala   |  40 +++-
 .../engine/jdbc/dialect/PhoenixDialect.scala       |  39 ----
 .../engine/jdbc/doris/DorisRowSetHelper.scala      | 122 +----------
 .../engine/jdbc/doris/DorisSchemaHelper.scala      |  35 +---
 .../engine/jdbc/phoenix/PhoenixRowSetHelper.scala  | 142 +------------
 .../engine/jdbc/phoenix/PhoenixSchemaHelper.scala  |  46 +---
 .../kyuubi/engine/jdbc/schema/RowSetHelper.scala   | 231 ++++++++++++++++++++-
 .../kyuubi/engine/jdbc/schema/SchemaHelper.scala   |  94 ++++++++-
 9 files changed, 366 insertions(+), 418 deletions(-)

diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/DorisDialect.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/DorisDialect.scala
index c2ae29953..f7c1ace64 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/DorisDialect.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/DorisDialect.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 package org.apache.kyuubi.engine.jdbc.dialect
-import java.sql.{Connection, ResultSet, Statement}
+import java.sql.{Connection, Statement}
 import java.util
 
 import scala.collection.JavaConverters._
@@ -23,34 +23,19 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.commons.lang3.StringUtils
 
-import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.engine.jdbc.doris.{DorisRowSetHelper, DorisSchemaHelper}
 import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
-import org.apache.kyuubi.operation.Operation
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
 import org.apache.kyuubi.session.Session
 
 class DorisDialect extends JdbcDialect {
 
   override def createStatement(connection: Connection, fetchSize: Int): Statement = {
-    val statement =
-      connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
+    val statement = super.createStatement(connection, fetchSize)
     statement.setFetchSize(Integer.MIN_VALUE)
     statement
   }
 
-  override def getTypeInfoOperation(session: Session): Operation = {
-    throw KyuubiSQLException.featureNotSupported()
-  }
-
-  override def getCatalogsOperation(session: Session): Operation = {
-    throw KyuubiSQLException.featureNotSupported()
-  }
-
-  override def getSchemasOperation(session: Session): Operation = {
-    throw KyuubiSQLException.featureNotSupported()
-  }
-
   override def getTablesQuery(
       catalog: String,
       schema: String,
@@ -96,10 +81,6 @@ class DorisDialect extends JdbcDialect {
     query.toString()
   }
 
-  override def getTableTypesOperation(session: Session): Operation = {
-    throw KyuubiSQLException.featureNotSupported()
-  }
-
   override def getColumnsQuery(
       session: Session,
       catalogName: String,
@@ -139,18 +120,6 @@ class DorisDialect extends JdbcDialect {
     query.toString()
   }
 
-  override def getFunctionsOperation(session: Session): Operation = {
-    throw KyuubiSQLException.featureNotSupported()
-  }
-
-  override def getPrimaryKeysOperation(session: Session): Operation = {
-    throw KyuubiSQLException.featureNotSupported()
-  }
-
-  override def getCrossReferenceOperation(session: Session): Operation = {
-    throw KyuubiSQLException.featureNotSupported()
-  }
-
   override def getRowSetHelper(): RowSetHelper = {
     new DorisRowSetHelper
   }
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
index e08b22758..62e20a1d2 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
@@ -16,10 +16,10 @@
  */
 package org.apache.kyuubi.engine.jdbc.dialect
 
-import java.sql.{Connection, Statement}
+import java.sql.{Connection, ResultSet, Statement}
 import java.util
 
-import org.apache.kyuubi.{KyuubiException, Logging}
+import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Logging}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_URL, ENGINE_JDBC_SHORT_NAME}
 import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
@@ -30,13 +30,24 @@ import org.apache.kyuubi.util.reflect.ReflectUtils._
 
 abstract class JdbcDialect extends SupportServiceLoader with Logging {
 
-  def createStatement(connection: Connection, fetchSize: Int = 1000): Statement
+  def createStatement(connection: Connection, fetchSize: Int = 1000): Statement = {
+    val statement =
+      connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
+    statement.setFetchSize(fetchSize)
+    statement
+  }
 
-  def getTypeInfoOperation(session: Session): Operation
+  def getTypeInfoOperation(session: Session): Operation = {
+    throw KyuubiSQLException.featureNotSupported()
+  }
 
-  def getCatalogsOperation(session: Session): Operation
+  def getCatalogsOperation(session: Session): Operation = {
+    throw KyuubiSQLException.featureNotSupported()
+  }
 
-  def getSchemasOperation(session: Session): Operation
+  def getSchemasOperation(session: Session): Operation = {
+    throw KyuubiSQLException.featureNotSupported()
+  }
 
   def getTablesQuery(
       catalog: String,
@@ -44,7 +55,9 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging {
       tableName: String,
       tableTypes: util.List[String]): String
 
-  def getTableTypesOperation(session: Session): Operation
+  def getTableTypesOperation(session: Session): Operation = {
+    throw KyuubiSQLException.featureNotSupported()
+  }
 
   def getColumnsQuery(
       session: Session,
@@ -53,16 +66,21 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging {
       tableName: String,
       columnName: String): String
 
-  def getFunctionsOperation(session: Session): Operation
+  def getFunctionsOperation(session: Session): Operation = {
+    throw KyuubiSQLException.featureNotSupported()
+  }
 
-  def getPrimaryKeysOperation(session: Session): Operation
+  def getPrimaryKeysOperation(session: Session): Operation = {
+    throw KyuubiSQLException.featureNotSupported()
+  }
 
-  def getCrossReferenceOperation(session: Session): Operation
+  def getCrossReferenceOperation(session: Session): Operation = {
+    throw KyuubiSQLException.featureNotSupported()
+  }
 
   def getRowSetHelper(): RowSetHelper
 
   def getSchemaHelper(): SchemaHelper
-
 }
 
 object JdbcDialects extends Logging {
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
index 0cce14b42..4c8e8f265 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
@@ -15,8 +15,6 @@
  * limitations under the License.
  */
 package org.apache.kyuubi.engine.jdbc.dialect
-
-import java.sql.{Connection, ResultSet, Statement}
 import java.util
 
 import scala.collection.JavaConverters._
@@ -24,34 +22,13 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.commons.lang3.StringUtils
 
-import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.engine.jdbc.phoenix.{PhoenixRowSetHelper, PhoenixSchemaHelper}
 import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
-import org.apache.kyuubi.operation.Operation
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
 import org.apache.kyuubi.session.Session
 
 class PhoenixDialect extends JdbcDialect {
 
-  override def createStatement(connection: Connection, fetchSize: Int): Statement = {
-    val statement =
-      connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
-    statement.setFetchSize(fetchSize)
-    statement
-  }
-
-  override def getTypeInfoOperation(session: Session): Operation = {
-    throw KyuubiSQLException.featureNotSupported()
-  }
-
-  override def getCatalogsOperation(session: Session): Operation = {
-    throw KyuubiSQLException.featureNotSupported()
-  }
-
-  override def getSchemasOperation(session: Session): Operation = {
-    throw KyuubiSQLException.featureNotSupported()
-  }
-
   override def getTablesQuery(
       catalog: String,
       schema: String,
@@ -91,10 +68,6 @@ class PhoenixDialect extends JdbcDialect {
     query.toString()
   }
 
-  override def getTableTypesOperation(session: Session): Operation = {
-    throw KyuubiSQLException.featureNotSupported()
-  }
-
   override def getColumnsQuery(
       session: Session,
       catalogName: String,
@@ -127,18 +100,6 @@ class PhoenixDialect extends JdbcDialect {
     query.toString()
   }
 
-  override def getFunctionsOperation(session: Session): Operation = {
-    throw KyuubiSQLException.featureNotSupported()
-  }
-
-  override def getPrimaryKeysOperation(session: Session): Operation = {
-    throw KyuubiSQLException.featureNotSupported()
-  }
-
-  override def getCrossReferenceOperation(session: Session): Operation = {
-    throw KyuubiSQLException.featureNotSupported()
-  }
-
   override def getRowSetHelper(): RowSetHelper = {
     new PhoenixRowSetHelper
   }
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisRowSetHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisRowSetHelper.scala
index 1ce43c7a4..a92942cec 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisRowSetHelper.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisRowSetHelper.scala
@@ -16,125 +16,21 @@
  */
 package org.apache.kyuubi.engine.jdbc.doris
 
-import java.sql.{Date, Types}
-import java.time.LocalDateTime
-
-import scala.collection.JavaConverters._
-
 import org.apache.hive.service.rpc.thrift._
 
-import org.apache.kyuubi.engine.jdbc.schema.{Column, RowSetHelper}
-import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate, formatLocalDateTime}
+import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper
 
 class DorisRowSetHelper extends RowSetHelper {
 
-  protected def toTColumn(
-      rows: Seq[Seq[Any]],
-      ordinal: Int,
-      sqlType: Int): TColumn = {
-    val nulls = new java.util.BitSet()
-    sqlType match {
-      case Types.BIT =>
-        val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true)
-        TColumn.boolVal(new TBoolColumn(values, nulls))
-
-      case Types.TINYINT | Types.SMALLINT | Types.INTEGER =>
-        val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0)
-        TColumn.i32Val(new TI32Column(values, nulls))
-
-      case Types.BIGINT =>
-        val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L)
-        TColumn.i64Val(new TI64Column(values, nulls))
-
-      case Types.REAL =>
-        val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 0.toFloat)
-          .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava
-        TColumn.doubleVal(new TDoubleColumn(values, nulls))
-
-      case Types.DOUBLE =>
-        val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 0.toDouble)
-        TColumn.doubleVal(new TDoubleColumn(values, nulls))
-
-      case Types.CHAR | Types.VARCHAR =>
-        val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
-        TColumn.stringVal(new TStringColumn(values, nulls))
-
-      case _ =>
-        val rowSize = rows.length
-        val values = new java.util.ArrayList[String](rowSize)
-        var i = 0
-        while (i < rowSize) {
-          val row = rows(i)
-          nulls.set(i, row(ordinal) == null)
-          val value =
-            if (row(ordinal) == null) {
-              ""
-            } else {
-              toHiveString(row(ordinal), sqlType)
-            }
-          values.add(value)
-          i += 1
-        }
-        TColumn.stringVal(new TStringColumn(values, nulls))
-    }
-  }
-
-  protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue = {
-    types(ordinal).sqlType match {
-      case Types.BIT =>
-        val boolValue = new TBoolValue
-        if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean])
-        TColumnValue.boolVal(boolValue)
-
-      case Types.TINYINT | Types.SMALLINT | Types.INTEGER =>
-        val tI32Value = new TI32Value
-        if (row(ordinal) != null) tI32Value.setValue(row(ordinal).asInstanceOf[Int])
-        TColumnValue.i32Val(tI32Value)
-
-      case Types.BIGINT =>
-        val tI64Value = new TI64Value
-        if (row(ordinal) != null) tI64Value.setValue(row(ordinal).asInstanceOf[Long])
-        TColumnValue.i64Val(tI64Value)
-
-      case Types.REAL =>
-        val tDoubleValue = new TDoubleValue
-        if (row(ordinal) != null) {
-          val doubleValue = java.lang.Double.valueOf(row(ordinal).asInstanceOf[Float].toString)
-          tDoubleValue.setValue(doubleValue)
-        }
-        TColumnValue.doubleVal(tDoubleValue)
-
-      case Types.DOUBLE =>
-        val tDoubleValue = new TDoubleValue
-        if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Double])
-        TColumnValue.doubleVal(tDoubleValue)
+  override def toTinyIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
+    toIntegerTColumn(rows, ordinal)
 
-      case Types.CHAR | Types.VARCHAR =>
-        val tStringValue = new TStringValue
-        if (row(ordinal) != null) tStringValue.setValue(row(ordinal).asInstanceOf[String])
-        TColumnValue.stringVal(tStringValue)
+  override def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
+    toIntegerTColumn(rows, ordinal)
 
-      case _ =>
-        val tStrValue = new TStringValue
-        if (row(ordinal) != null) {
-          tStrValue.setValue(
-            toHiveString(row(ordinal), types(ordinal).sqlType))
-        }
-        TColumnValue.stringVal(tStrValue)
-    }
-  }
+  override def toTinyIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
+    toIntegerTColumnValue(row, ordinal)
 
-  protected def toHiveString(data: Any, sqlType: Int): String = {
-    (data, sqlType) match {
-      case (date: Date, Types.DATE) =>
-        formatDate(date)
-      case (dateTime: LocalDateTime, Types.TIMESTAMP) =>
-        formatLocalDateTime(dateTime)
-      case (decimal: java.math.BigDecimal, Types.DECIMAL) =>
-        decimal.toPlainString
-      // TODO support bitmap and hll
-      case (other, _) =>
-        other.toString
-    }
-  }
+  override def toSmallIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
+    toIntegerTColumnValue(row, ordinal)
 }
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisSchemaHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisSchemaHelper.scala
index ca8bb6ec3..b323d3731 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisSchemaHelper.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisSchemaHelper.scala
@@ -16,44 +16,13 @@
  */
 package org.apache.kyuubi.engine.jdbc.doris
 
-import java.sql.Types
-
 import org.apache.hive.service.rpc.thrift._
 
 import org.apache.kyuubi.engine.jdbc.schema.SchemaHelper
 
 class DorisSchemaHelper extends SchemaHelper {
 
-  override def toTTypeId(sqlType: Int): TTypeId = sqlType match {
-    case Types.BIT =>
-      TTypeId.BOOLEAN_TYPE
-
-    case Types.TINYINT | Types.SMALLINT | Types.INTEGER =>
-      TTypeId.INT_TYPE
-
-    case Types.BIGINT =>
-      TTypeId.BIGINT_TYPE
-
-    case Types.REAL =>
-      TTypeId.FLOAT_TYPE
-
-    case Types.DOUBLE =>
-      TTypeId.DOUBLE_TYPE
-
-    case Types.CHAR | Types.VARCHAR =>
-      TTypeId.STRING_TYPE
-
-    case Types.DATE =>
-      TTypeId.DATE_TYPE
-
-    case Types.TIMESTAMP =>
-      TTypeId.TIMESTAMP_TYPE
-
-    case Types.DECIMAL =>
-      TTypeId.DECIMAL_TYPE
+  override def tinyIntToTTypeId: TTypeId = TTypeId.INT_TYPE
 
-    // TODO add more type support
-    case _ =>
-      TTypeId.STRING_TYPE
-  }
+  override def smallIntToTTypeId: TTypeId = TTypeId.INT_TYPE
 }
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala
index a1f6d4ac2..67d9d09e5 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala
@@ -16,144 +16,6 @@
  */
 package org.apache.kyuubi.engine.jdbc.phoenix
 
-import java.sql.{Date, Types}
-import java.time.LocalDateTime
+import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper
 
-import scala.collection.JavaConverters._
-
-import org.apache.hive.service.rpc.thrift._
-
-import org.apache.kyuubi.engine.jdbc.schema.{Column, RowSetHelper}
-import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate, formatLocalDateTime}
-
-class PhoenixRowSetHelper extends RowSetHelper {
-
-  protected def toTColumn(
-      rows: Seq[Seq[Any]],
-      ordinal: Int,
-      sqlType: Int): TColumn = {
-    val nulls = new java.util.BitSet()
-    sqlType match {
-
-      case Types.BIT =>
-        val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true)
-        TColumn.boolVal(new TBoolColumn(values, nulls))
-
-      case Types.TINYINT =>
-        val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 0.toByte)
-        TColumn.byteVal(new TByteColumn(values, nulls))
-
-      case Types.SMALLINT =>
-        val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls, 0.toShort)
-        TColumn.i16Val(new TI16Column(values, nulls))
-
-      case Types.INTEGER =>
-        val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0)
-        TColumn.i32Val(new TI32Column(values, nulls))
-
-      case Types.BIGINT =>
-        val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L)
-        TColumn.i64Val(new TI64Column(values, nulls))
-
-      case Types.REAL =>
-        val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 0.toFloat)
-          .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava
-        TColumn.doubleVal(new TDoubleColumn(values, nulls))
-
-      case Types.DOUBLE =>
-        val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 0.toDouble)
-        TColumn.doubleVal(new TDoubleColumn(values, nulls))
-
-      case Types.CHAR | Types.VARCHAR =>
-        val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
-        TColumn.stringVal(new TStringColumn(values, nulls))
-
-      case _ =>
-        val rowSize = rows.length
-        val values = new java.util.ArrayList[String](rowSize)
-        var i = 0
-        while (i < rowSize) {
-          val row = rows(i)
-          nulls.set(i, row(ordinal) == null)
-          val value =
-            if (row(ordinal) == null) {
-              ""
-            } else {
-              toHiveString(row(ordinal), sqlType)
-            }
-          values.add(value)
-          i += 1
-        }
-        TColumn.stringVal(new TStringColumn(values, nulls))
-    }
-  }
-
-  protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue = {
-    types(ordinal).sqlType match {
-      case Types.BIT =>
-        val boolValue = new TBoolValue
-        if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean])
-        TColumnValue.boolVal(boolValue)
-
-      case Types.TINYINT =>
-        val byteValue = new TByteValue()
-        if (row(ordinal) != null) byteValue.setValue(row(ordinal).asInstanceOf[Byte])
-        TColumnValue.byteVal(byteValue)
-
-      case Types.SMALLINT =>
-        val tI16Value = new TI16Value()
-        if (row(ordinal) != null) tI16Value.setValue(row(ordinal).asInstanceOf[Short])
-        TColumnValue.i16Val(tI16Value)
-
-      case Types.INTEGER =>
-        val tI32Value = new TI32Value
-        if (row(ordinal) != null) tI32Value.setValue(row(ordinal).asInstanceOf[Int])
-        TColumnValue.i32Val(tI32Value)
-
-      case Types.BIGINT =>
-        val tI64Value = new TI64Value
-        if (row(ordinal) != null) tI64Value.setValue(row(ordinal).asInstanceOf[Long])
-        TColumnValue.i64Val(tI64Value)
-
-      case Types.REAL =>
-        val tDoubleValue = new TDoubleValue
-        if (row(ordinal) != null) {
-          val doubleValue = java.lang.Double.valueOf(row(ordinal).asInstanceOf[Float].toString)
-          tDoubleValue.setValue(doubleValue)
-        }
-        TColumnValue.doubleVal(tDoubleValue)
-
-      case Types.DOUBLE =>
-        val tDoubleValue = new TDoubleValue
-        if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Double])
-        TColumnValue.doubleVal(tDoubleValue)
-
-      case Types.CHAR | Types.VARCHAR =>
-        val tStringValue = new TStringValue
-        if (row(ordinal) != null) tStringValue.setValue(row(ordinal).asInstanceOf[String])
-        TColumnValue.stringVal(tStringValue)
-
-      case _ =>
-        val tStrValue = new TStringValue
-        if (row(ordinal) != null) {
-          tStrValue.setValue(
-            toHiveString(row(ordinal), types(ordinal).sqlType))
-        }
-        TColumnValue.stringVal(tStrValue)
-    }
-  }
-
-  protected def toHiveString(data: Any, sqlType: Int): String = {
-    (data, sqlType) match {
-      case (date: Date, Types.DATE) =>
-        formatDate(date)
-      case (dateTime: LocalDateTime, Types.TIMESTAMP) =>
-        formatLocalDateTime(dateTime)
-      case (decimal: java.math.BigDecimal, Types.DECIMAL) =>
-        decimal.toPlainString
-      // TODO support bitmap and hll
-      case (other, _) =>
-        other.toString
-    }
-  }
-}
+class PhoenixRowSetHelper extends RowSetHelper {}
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixSchemaHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixSchemaHelper.scala
index f5e04f7ca..938956cdc 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixSchemaHelper.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixSchemaHelper.scala
@@ -16,50 +16,6 @@
  */
 package org.apache.kyuubi.engine.jdbc.phoenix
 
-import java.sql.Types
-
-import org.apache.hive.service.rpc.thrift._
-
 import org.apache.kyuubi.engine.jdbc.schema.SchemaHelper
 
-class PhoenixSchemaHelper extends SchemaHelper {
-
-  override def toTTypeId(sqlType: Int): TTypeId = sqlType match {
-    case Types.BIT =>
-      TTypeId.BOOLEAN_TYPE
-
-    case Types.TINYINT =>
-      TTypeId.TINYINT_TYPE
-
-    case Types.SMALLINT =>
-      TTypeId.SMALLINT_TYPE
-
-    case Types.INTEGER =>
-      TTypeId.INT_TYPE
-
-    case Types.BIGINT =>
-      TTypeId.BIGINT_TYPE
-
-    case Types.REAL =>
-      TTypeId.FLOAT_TYPE
-
-    case Types.DOUBLE =>
-      TTypeId.DOUBLE_TYPE
-
-    case Types.CHAR | Types.VARCHAR =>
-      TTypeId.STRING_TYPE
-
-    case Types.DATE =>
-      TTypeId.DATE_TYPE
-
-    case Types.TIMESTAMP =>
-      TTypeId.TIMESTAMP_TYPE
-
-    case Types.DECIMAL =>
-      TTypeId.DECIMAL_TYPE
-
-    // TODO add more type support
-    case _ =>
-      TTypeId.STRING_TYPE
-  }
-}
+class PhoenixSchemaHelper extends SchemaHelper {}
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala
index d489ed8a2..74b4cec10 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala
@@ -16,10 +16,16 @@
  */
 package org.apache.kyuubi.engine.jdbc.schema
 
-import java.util
+import java.{lang, util}
+import java.sql.{Date, Types}
+import java.time.LocalDateTime
+
+import scala.collection.JavaConverters._
 
 import org.apache.hive.service.rpc.thrift._
 
+import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate, formatLocalDateTime}
+
 abstract class RowSetHelper {
 
   def toTRowSet(
@@ -70,9 +76,73 @@ abstract class RowSetHelper {
   protected def toTColumn(
       rows: Seq[Seq[Any]],
       ordinal: Int,
-      sqlType: Int): TColumn
+      sqlType: Int): TColumn = {
+    sqlType match {
+      case Types.BIT =>
+        toBitTColumn(rows, ordinal)
+
+      case Types.TINYINT =>
+        toTinyIntTColumn(rows, ordinal)
+
+      case Types.SMALLINT =>
+        toSmallIntTColumn(rows, ordinal)
+
+      case Types.INTEGER =>
+        toIntegerTColumn(rows, ordinal)
+
+      case Types.BIGINT =>
+        toBigIntTColumn(rows, ordinal)
+
+      case Types.REAL =>
+        toRealTColumn(rows, ordinal)
+
+      case Types.DOUBLE =>
+        toDoubleTColumn(rows, ordinal)
+
+      case Types.CHAR =>
+        toCharTColumn(rows, ordinal)
+
+      case Types.VARCHAR =>
+        toVarcharTColumn(rows, ordinal)
+
+      case _ =>
+        toDefaultTColumn(rows, ordinal, sqlType)
+    }
+  }
+
+  protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue = {
+    types(ordinal).sqlType match {
+      case Types.BIT =>
+        toBitTColumnValue(row, ordinal)
+
+      case Types.TINYINT =>
+        toTinyIntTColumnValue(row, ordinal)
+
+      case Types.SMALLINT =>
+        toSmallIntTColumnValue(row, ordinal)
+
+      case Types.INTEGER =>
+        toIntegerTColumnValue(row, ordinal)
+
+      case Types.BIGINT =>
+        toBigIntTColumnValue(row, ordinal)
+
+      case Types.REAL =>
+        toRealTColumnValue(row, ordinal)
+
+      case Types.DOUBLE =>
+        toDoubleTColumnValue(row, ordinal)
 
-  protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue
+      case Types.CHAR =>
+        toCharTColumnValue(row, ordinal)
+
+      case Types.VARCHAR =>
+        toVarcharTColumnValue(row, ordinal)
+
+      case _ =>
+        toDefaultTColumnValue(row, ordinal, types)
+    }
+  }
 
   protected def getOrSetAsNull[T](
       rows: Seq[Seq[Any]],
@@ -95,4 +165,159 @@ abstract class RowSetHelper {
     }
     ret
   }
+
+  protected def toDefaultTColumn(rows: Seq[Seq[Any]], ordinal: Int, sqlType: Int): TColumn = {
+    val nulls = new java.util.BitSet()
+    val rowSize = rows.length
+    val values = new util.ArrayList[String](rowSize)
+    var i = 0
+    while (i < rowSize) {
+      val row = rows(i)
+      nulls.set(i, row(ordinal) == null)
+      val value =
+        if (row(ordinal) == null) {
+          ""
+        } else {
+          toHiveString(row(ordinal), sqlType)
+        }
+      values.add(value)
+      i += 1
+    }
+    TColumn.stringVal(new TStringColumn(values, nulls))
+  }
+
+  protected def toBitTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+    val nulls = new java.util.BitSet()
+    val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true)
+    TColumn.boolVal(new TBoolColumn(values, nulls))
+  }
+
+  protected def toTinyIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+    val nulls = new java.util.BitSet()
+    val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 0.toByte)
+    TColumn.byteVal(new TByteColumn(values, nulls))
+  }
+
+  protected def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+    val nulls = new java.util.BitSet()
+    val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls, 0.toShort)
+    TColumn.i16Val(new TI16Column(values, nulls))
+  }
+
+  protected def toIntegerTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+    val nulls = new java.util.BitSet()
+    val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0)
+    TColumn.i32Val(new TI32Column(values, nulls))
+  }
+
+  protected def toBigIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+    val nulls = new java.util.BitSet()
+    val values = getOrSetAsNull[lang.Long](rows, ordinal, nulls, 0L)
+    TColumn.i64Val(new TI64Column(values, nulls))
+  }
+
+  protected def toRealTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+    val nulls = new java.util.BitSet()
+    val values = getOrSetAsNull[lang.Float](rows, ordinal, nulls, 0.toFloat)
+      .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava
+    TColumn.doubleVal(new TDoubleColumn(values, nulls))
+  }
+
+  protected def toDoubleTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+    val nulls = new java.util.BitSet()
+    val values = getOrSetAsNull[lang.Double](rows, ordinal, nulls, 0.toDouble)
+    TColumn.doubleVal(new TDoubleColumn(values, nulls))
+  }
+
+  protected def toCharTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+    toVarcharTColumn(rows, ordinal)
+  }
+
+  protected def toVarcharTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+    val nulls = new java.util.BitSet()
+    val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
+    TColumn.stringVal(new TStringColumn(values, nulls))
+  }
+
+  // ==========================================================
+
+  protected def toBitTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+    val boolValue = new TBoolValue
+    if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean])
+    TColumnValue.boolVal(boolValue)
+  }
+
+  protected def toTinyIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+    val byteValue = new TByteValue
+    if (row(ordinal) != null) byteValue.setValue(row(ordinal).asInstanceOf[Byte])
+    TColumnValue.byteVal(byteValue)
+  }
+
+  protected def toSmallIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+    val tI16Value = new TI16Value
+    if (row(ordinal) != null) tI16Value.setValue(row(ordinal).asInstanceOf[Short])
+    TColumnValue.i16Val(tI16Value)
+  }
+
+  protected def toIntegerTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+    val tI32Value = new TI32Value
+    if (row(ordinal) != null) tI32Value.setValue(row(ordinal).asInstanceOf[Int])
+    TColumnValue.i32Val(tI32Value)
+  }
+
+  protected def toBigIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+    val tI64Value = new TI64Value
+    if (row(ordinal) != null) tI64Value.setValue(row(ordinal).asInstanceOf[Long])
+    TColumnValue.i64Val(tI64Value)
+  }
+
+  protected def toRealTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+    val tDoubleValue = new TDoubleValue
+    if (row(ordinal) != null) {
+      val doubleValue = java.lang.Double.valueOf(row(ordinal).asInstanceOf[Float].toString)
+      tDoubleValue.setValue(doubleValue)
+    }
+    TColumnValue.doubleVal(tDoubleValue)
+  }
+
+  protected def toDoubleTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+    val tDoubleValue = new TDoubleValue
+    if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Double])
+    TColumnValue.doubleVal(tDoubleValue)
+  }
+
+  protected def toCharTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+    toVarcharTColumnValue(row, ordinal)
+  }
+
+  protected def toVarcharTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+    val tStringValue = new TStringValue
+    if (row(ordinal) != null) tStringValue.setValue(row(ordinal).asInstanceOf[String])
+    TColumnValue.stringVal(tStringValue)
+  }
+
+  protected def toDefaultTColumnValue(
+      row: List[Any],
+      ordinal: Int,
+      types: List[Column]): TColumnValue = {
+    val tStrValue = new TStringValue
+    if (row(ordinal) != null) {
+      tStrValue.setValue(
+        toHiveString(row(ordinal), types(ordinal).sqlType))
+    }
+    TColumnValue.stringVal(tStrValue)
+  }
+
+  protected def toHiveString(data: Any, sqlType: Int): String = {
+    (data, sqlType) match {
+      case (date: Date, Types.DATE) =>
+        formatDate(date)
+      case (dateTime: LocalDateTime, Types.TIMESTAMP) =>
+        formatLocalDateTime(dateTime)
+      case (decimal: java.math.BigDecimal, Types.DECIMAL) =>
+        decimal.toPlainString
+      case (other, _) =>
+        other.toString
+    }
+  }
 }
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/SchemaHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/SchemaHelper.scala
index 3be3c7d42..455eb2a92 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/SchemaHelper.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/SchemaHelper.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.kyuubi.engine.jdbc.schema
 
+import java.sql.Types
 import java.util.Collections
 
 import scala.collection.JavaConverters._
@@ -62,6 +63,97 @@ abstract class SchemaHelper {
     ret
   }
 
-  protected def toTTypeId(sqlType: Int): TTypeId
+  protected def toTTypeId(sqlType: Int): TTypeId = sqlType match {
+    case Types.BIT =>
+      bitToTTypeId
 
+    case Types.TINYINT =>
+      tinyIntToTTypeId
+
+    case Types.SMALLINT =>
+      smallIntToTTypeId
+
+    case Types.INTEGER =>
+      integerToTTypeId
+
+    case Types.BIGINT =>
+      bigintToTTypeId
+
+    case Types.REAL =>
+      realToTTypeId
+
+    case Types.DOUBLE =>
+      doubleToTTypeId
+
+    case Types.CHAR =>
+      charToTTypeId
+
+    case Types.VARCHAR =>
+      varcharToTTypeId
+
+    case Types.DATE =>
+      dateToTTypeId
+
+    case Types.TIMESTAMP =>
+      timestampToTTypeId
+
+    case Types.DECIMAL =>
+      decimalToTTypeId
+
+    // TODO add more type support
+    case _ =>
+      defaultToTTypeId
+  }
+
+  protected def bitToTTypeId: TTypeId = {
+    TTypeId.BOOLEAN_TYPE
+  }
+
+  protected def tinyIntToTTypeId: TTypeId = {
+    TTypeId.TINYINT_TYPE
+  }
+
+  protected def smallIntToTTypeId: TTypeId = {
+    TTypeId.SMALLINT_TYPE
+  }
+
+  protected def integerToTTypeId: TTypeId = {
+    TTypeId.INT_TYPE
+  }
+
+  protected def bigintToTTypeId: TTypeId = {
+    TTypeId.BIGINT_TYPE
+  }
+
+  protected def realToTTypeId: TTypeId = {
+    TTypeId.FLOAT_TYPE
+  }
+
+  protected def doubleToTTypeId: TTypeId = {
+    TTypeId.DOUBLE_TYPE
+  }
+
+  protected def charToTTypeId: TTypeId = {
+    TTypeId.STRING_TYPE
+  }
+
+  protected def varcharToTTypeId: TTypeId = {
+    TTypeId.STRING_TYPE
+  }
+
+  protected def dateToTTypeId: TTypeId = {
+    TTypeId.DATE_TYPE
+  }
+
+  protected def timestampToTTypeId: TTypeId = {
+    TTypeId.TIMESTAMP_TYPE
+  }
+
+  protected def decimalToTTypeId: TTypeId = {
+    TTypeId.DECIMAL_TYPE
+  }
+
+  protected def defaultToTTypeId: TTypeId = {
+    TTypeId.STRING_TYPE
+  }
 }