You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by bo...@apache.org on 2023/10/23 14:21:29 UTC
[kyuubi] branch master updated: [KYUUBI #5382][JDBC] Duplication cleanup improvement in JdbcDialect and schema helpers
This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 81964803c [KYUUBI #5382][JDBC] Duplication cleanup improvement in JdbcDialect and schema helpers
81964803c is described below
commit 81964803c9ea0c9037d7f18422f875f8bb088ac9
Author: Fantasy-Jay <13...@163.com>
AuthorDate: Mon Oct 23 22:21:17 2023 +0800
[KYUUBI #5382][JDBC] Duplication cleanup improvement in JdbcDialect and schema helpers
### _Why are the changes needed?_
To close #5382.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No
Closes #5490 from zhuyaogai/issue-5382.
Closes #5382
4757445e7 [Fantasy-Jay] Remove unrelated comment.
f68c7aa6c [Fantasy-Jay] Refactor JDBC engine to reduce to code duplication.
4ad6b3c53 [Fantasy-Jay] Refactor JDBC engine to reduce to code duplication.
Authored-by: Fantasy-Jay <13...@163.com>
Signed-off-by: liangbowen <li...@gf.com.cn>
---
.../kyuubi/engine/jdbc/dialect/DorisDialect.scala | 35 +---
.../kyuubi/engine/jdbc/dialect/JdbcDialect.scala | 40 +++-
.../engine/jdbc/dialect/PhoenixDialect.scala | 39 ----
.../engine/jdbc/doris/DorisRowSetHelper.scala | 122 +----------
.../engine/jdbc/doris/DorisSchemaHelper.scala | 35 +---
.../engine/jdbc/phoenix/PhoenixRowSetHelper.scala | 142 +------------
.../engine/jdbc/phoenix/PhoenixSchemaHelper.scala | 46 +---
.../kyuubi/engine/jdbc/schema/RowSetHelper.scala | 231 ++++++++++++++++++++-
.../kyuubi/engine/jdbc/schema/SchemaHelper.scala | 94 ++++++++-
9 files changed, 366 insertions(+), 418 deletions(-)
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/DorisDialect.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/DorisDialect.scala
index c2ae29953..f7c1ace64 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/DorisDialect.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/DorisDialect.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.dialect
-import java.sql.{Connection, ResultSet, Statement}
+import java.sql.{Connection, Statement}
import java.util
import scala.collection.JavaConverters._
@@ -23,34 +23,19 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.StringUtils
-import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.jdbc.doris.{DorisRowSetHelper, DorisSchemaHelper}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
-import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class DorisDialect extends JdbcDialect {
override def createStatement(connection: Connection, fetchSize: Int): Statement = {
- val statement =
- connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
+ val statement = super.createStatement(connection, fetchSize)
statement.setFetchSize(Integer.MIN_VALUE)
statement
}
- override def getTypeInfoOperation(session: Session): Operation = {
- throw KyuubiSQLException.featureNotSupported()
- }
-
- override def getCatalogsOperation(session: Session): Operation = {
- throw KyuubiSQLException.featureNotSupported()
- }
-
- override def getSchemasOperation(session: Session): Operation = {
- throw KyuubiSQLException.featureNotSupported()
- }
-
override def getTablesQuery(
catalog: String,
schema: String,
@@ -96,10 +81,6 @@ class DorisDialect extends JdbcDialect {
query.toString()
}
- override def getTableTypesOperation(session: Session): Operation = {
- throw KyuubiSQLException.featureNotSupported()
- }
-
override def getColumnsQuery(
session: Session,
catalogName: String,
@@ -139,18 +120,6 @@ class DorisDialect extends JdbcDialect {
query.toString()
}
- override def getFunctionsOperation(session: Session): Operation = {
- throw KyuubiSQLException.featureNotSupported()
- }
-
- override def getPrimaryKeysOperation(session: Session): Operation = {
- throw KyuubiSQLException.featureNotSupported()
- }
-
- override def getCrossReferenceOperation(session: Session): Operation = {
- throw KyuubiSQLException.featureNotSupported()
- }
-
override def getRowSetHelper(): RowSetHelper = {
new DorisRowSetHelper
}
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
index e08b22758..62e20a1d2 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
@@ -16,10 +16,10 @@
*/
package org.apache.kyuubi.engine.jdbc.dialect
-import java.sql.{Connection, Statement}
+import java.sql.{Connection, ResultSet, Statement}
import java.util
-import org.apache.kyuubi.{KyuubiException, Logging}
+import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_URL, ENGINE_JDBC_SHORT_NAME}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
@@ -30,13 +30,24 @@ import org.apache.kyuubi.util.reflect.ReflectUtils._
abstract class JdbcDialect extends SupportServiceLoader with Logging {
- def createStatement(connection: Connection, fetchSize: Int = 1000): Statement
+ def createStatement(connection: Connection, fetchSize: Int = 1000): Statement = {
+ val statement =
+ connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
+ statement.setFetchSize(fetchSize)
+ statement
+ }
- def getTypeInfoOperation(session: Session): Operation
+ def getTypeInfoOperation(session: Session): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
- def getCatalogsOperation(session: Session): Operation
+ def getCatalogsOperation(session: Session): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
- def getSchemasOperation(session: Session): Operation
+ def getSchemasOperation(session: Session): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
def getTablesQuery(
catalog: String,
@@ -44,7 +55,9 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging {
tableName: String,
tableTypes: util.List[String]): String
- def getTableTypesOperation(session: Session): Operation
+ def getTableTypesOperation(session: Session): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
def getColumnsQuery(
session: Session,
@@ -53,16 +66,21 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging {
tableName: String,
columnName: String): String
- def getFunctionsOperation(session: Session): Operation
+ def getFunctionsOperation(session: Session): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
- def getPrimaryKeysOperation(session: Session): Operation
+ def getPrimaryKeysOperation(session: Session): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
- def getCrossReferenceOperation(session: Session): Operation
+ def getCrossReferenceOperation(session: Session): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
def getRowSetHelper(): RowSetHelper
def getSchemaHelper(): SchemaHelper
-
}
object JdbcDialects extends Logging {
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
index 0cce14b42..4c8e8f265 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
@@ -15,8 +15,6 @@
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.dialect
-
-import java.sql.{Connection, ResultSet, Statement}
import java.util
import scala.collection.JavaConverters._
@@ -24,34 +22,13 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.StringUtils
-import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.jdbc.phoenix.{PhoenixRowSetHelper, PhoenixSchemaHelper}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
-import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class PhoenixDialect extends JdbcDialect {
- override def createStatement(connection: Connection, fetchSize: Int): Statement = {
- val statement =
- connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
- statement.setFetchSize(fetchSize)
- statement
- }
-
- override def getTypeInfoOperation(session: Session): Operation = {
- throw KyuubiSQLException.featureNotSupported()
- }
-
- override def getCatalogsOperation(session: Session): Operation = {
- throw KyuubiSQLException.featureNotSupported()
- }
-
- override def getSchemasOperation(session: Session): Operation = {
- throw KyuubiSQLException.featureNotSupported()
- }
-
override def getTablesQuery(
catalog: String,
schema: String,
@@ -91,10 +68,6 @@ class PhoenixDialect extends JdbcDialect {
query.toString()
}
- override def getTableTypesOperation(session: Session): Operation = {
- throw KyuubiSQLException.featureNotSupported()
- }
-
override def getColumnsQuery(
session: Session,
catalogName: String,
@@ -127,18 +100,6 @@ class PhoenixDialect extends JdbcDialect {
query.toString()
}
- override def getFunctionsOperation(session: Session): Operation = {
- throw KyuubiSQLException.featureNotSupported()
- }
-
- override def getPrimaryKeysOperation(session: Session): Operation = {
- throw KyuubiSQLException.featureNotSupported()
- }
-
- override def getCrossReferenceOperation(session: Session): Operation = {
- throw KyuubiSQLException.featureNotSupported()
- }
-
override def getRowSetHelper(): RowSetHelper = {
new PhoenixRowSetHelper
}
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisRowSetHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisRowSetHelper.scala
index 1ce43c7a4..a92942cec 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisRowSetHelper.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisRowSetHelper.scala
@@ -16,125 +16,21 @@
*/
package org.apache.kyuubi.engine.jdbc.doris
-import java.sql.{Date, Types}
-import java.time.LocalDateTime
-
-import scala.collection.JavaConverters._
-
import org.apache.hive.service.rpc.thrift._
-import org.apache.kyuubi.engine.jdbc.schema.{Column, RowSetHelper}
-import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate, formatLocalDateTime}
+import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper
class DorisRowSetHelper extends RowSetHelper {
- protected def toTColumn(
- rows: Seq[Seq[Any]],
- ordinal: Int,
- sqlType: Int): TColumn = {
- val nulls = new java.util.BitSet()
- sqlType match {
- case Types.BIT =>
- val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true)
- TColumn.boolVal(new TBoolColumn(values, nulls))
-
- case Types.TINYINT | Types.SMALLINT | Types.INTEGER =>
- val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0)
- TColumn.i32Val(new TI32Column(values, nulls))
-
- case Types.BIGINT =>
- val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L)
- TColumn.i64Val(new TI64Column(values, nulls))
-
- case Types.REAL =>
- val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 0.toFloat)
- .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava
- TColumn.doubleVal(new TDoubleColumn(values, nulls))
-
- case Types.DOUBLE =>
- val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 0.toDouble)
- TColumn.doubleVal(new TDoubleColumn(values, nulls))
-
- case Types.CHAR | Types.VARCHAR =>
- val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
- TColumn.stringVal(new TStringColumn(values, nulls))
-
- case _ =>
- val rowSize = rows.length
- val values = new java.util.ArrayList[String](rowSize)
- var i = 0
- while (i < rowSize) {
- val row = rows(i)
- nulls.set(i, row(ordinal) == null)
- val value =
- if (row(ordinal) == null) {
- ""
- } else {
- toHiveString(row(ordinal), sqlType)
- }
- values.add(value)
- i += 1
- }
- TColumn.stringVal(new TStringColumn(values, nulls))
- }
- }
-
- protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue = {
- types(ordinal).sqlType match {
- case Types.BIT =>
- val boolValue = new TBoolValue
- if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean])
- TColumnValue.boolVal(boolValue)
-
- case Types.TINYINT | Types.SMALLINT | Types.INTEGER =>
- val tI32Value = new TI32Value
- if (row(ordinal) != null) tI32Value.setValue(row(ordinal).asInstanceOf[Int])
- TColumnValue.i32Val(tI32Value)
-
- case Types.BIGINT =>
- val tI64Value = new TI64Value
- if (row(ordinal) != null) tI64Value.setValue(row(ordinal).asInstanceOf[Long])
- TColumnValue.i64Val(tI64Value)
-
- case Types.REAL =>
- val tDoubleValue = new TDoubleValue
- if (row(ordinal) != null) {
- val doubleValue = java.lang.Double.valueOf(row(ordinal).asInstanceOf[Float].toString)
- tDoubleValue.setValue(doubleValue)
- }
- TColumnValue.doubleVal(tDoubleValue)
-
- case Types.DOUBLE =>
- val tDoubleValue = new TDoubleValue
- if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Double])
- TColumnValue.doubleVal(tDoubleValue)
+ override def toTinyIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
+ toIntegerTColumn(rows, ordinal)
- case Types.CHAR | Types.VARCHAR =>
- val tStringValue = new TStringValue
- if (row(ordinal) != null) tStringValue.setValue(row(ordinal).asInstanceOf[String])
- TColumnValue.stringVal(tStringValue)
+ override def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
+ toIntegerTColumn(rows, ordinal)
- case _ =>
- val tStrValue = new TStringValue
- if (row(ordinal) != null) {
- tStrValue.setValue(
- toHiveString(row(ordinal), types(ordinal).sqlType))
- }
- TColumnValue.stringVal(tStrValue)
- }
- }
+ override def toTinyIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
+ toIntegerTColumnValue(row, ordinal)
- protected def toHiveString(data: Any, sqlType: Int): String = {
- (data, sqlType) match {
- case (date: Date, Types.DATE) =>
- formatDate(date)
- case (dateTime: LocalDateTime, Types.TIMESTAMP) =>
- formatLocalDateTime(dateTime)
- case (decimal: java.math.BigDecimal, Types.DECIMAL) =>
- decimal.toPlainString
- // TODO support bitmap and hll
- case (other, _) =>
- other.toString
- }
- }
+ override def toSmallIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
+ toIntegerTColumnValue(row, ordinal)
}
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisSchemaHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisSchemaHelper.scala
index ca8bb6ec3..b323d3731 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisSchemaHelper.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisSchemaHelper.scala
@@ -16,44 +16,13 @@
*/
package org.apache.kyuubi.engine.jdbc.doris
-import java.sql.Types
-
import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.engine.jdbc.schema.SchemaHelper
class DorisSchemaHelper extends SchemaHelper {
- override def toTTypeId(sqlType: Int): TTypeId = sqlType match {
- case Types.BIT =>
- TTypeId.BOOLEAN_TYPE
-
- case Types.TINYINT | Types.SMALLINT | Types.INTEGER =>
- TTypeId.INT_TYPE
-
- case Types.BIGINT =>
- TTypeId.BIGINT_TYPE
-
- case Types.REAL =>
- TTypeId.FLOAT_TYPE
-
- case Types.DOUBLE =>
- TTypeId.DOUBLE_TYPE
-
- case Types.CHAR | Types.VARCHAR =>
- TTypeId.STRING_TYPE
-
- case Types.DATE =>
- TTypeId.DATE_TYPE
-
- case Types.TIMESTAMP =>
- TTypeId.TIMESTAMP_TYPE
-
- case Types.DECIMAL =>
- TTypeId.DECIMAL_TYPE
+ override def tinyIntToTTypeId: TTypeId = TTypeId.INT_TYPE
- // TODO add more type support
- case _ =>
- TTypeId.STRING_TYPE
- }
+ override def smallIntToTTypeId: TTypeId = TTypeId.INT_TYPE
}
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala
index a1f6d4ac2..67d9d09e5 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala
@@ -16,144 +16,6 @@
*/
package org.apache.kyuubi.engine.jdbc.phoenix
-import java.sql.{Date, Types}
-import java.time.LocalDateTime
+import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper
-import scala.collection.JavaConverters._
-
-import org.apache.hive.service.rpc.thrift._
-
-import org.apache.kyuubi.engine.jdbc.schema.{Column, RowSetHelper}
-import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate, formatLocalDateTime}
-
-class PhoenixRowSetHelper extends RowSetHelper {
-
- protected def toTColumn(
- rows: Seq[Seq[Any]],
- ordinal: Int,
- sqlType: Int): TColumn = {
- val nulls = new java.util.BitSet()
- sqlType match {
-
- case Types.BIT =>
- val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true)
- TColumn.boolVal(new TBoolColumn(values, nulls))
-
- case Types.TINYINT =>
- val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 0.toByte)
- TColumn.byteVal(new TByteColumn(values, nulls))
-
- case Types.SMALLINT =>
- val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls, 0.toShort)
- TColumn.i16Val(new TI16Column(values, nulls))
-
- case Types.INTEGER =>
- val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0)
- TColumn.i32Val(new TI32Column(values, nulls))
-
- case Types.BIGINT =>
- val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L)
- TColumn.i64Val(new TI64Column(values, nulls))
-
- case Types.REAL =>
- val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 0.toFloat)
- .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava
- TColumn.doubleVal(new TDoubleColumn(values, nulls))
-
- case Types.DOUBLE =>
- val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 0.toDouble)
- TColumn.doubleVal(new TDoubleColumn(values, nulls))
-
- case Types.CHAR | Types.VARCHAR =>
- val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
- TColumn.stringVal(new TStringColumn(values, nulls))
-
- case _ =>
- val rowSize = rows.length
- val values = new java.util.ArrayList[String](rowSize)
- var i = 0
- while (i < rowSize) {
- val row = rows(i)
- nulls.set(i, row(ordinal) == null)
- val value =
- if (row(ordinal) == null) {
- ""
- } else {
- toHiveString(row(ordinal), sqlType)
- }
- values.add(value)
- i += 1
- }
- TColumn.stringVal(new TStringColumn(values, nulls))
- }
- }
-
- protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue = {
- types(ordinal).sqlType match {
- case Types.BIT =>
- val boolValue = new TBoolValue
- if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean])
- TColumnValue.boolVal(boolValue)
-
- case Types.TINYINT =>
- val byteValue = new TByteValue()
- if (row(ordinal) != null) byteValue.setValue(row(ordinal).asInstanceOf[Byte])
- TColumnValue.byteVal(byteValue)
-
- case Types.SMALLINT =>
- val tI16Value = new TI16Value()
- if (row(ordinal) != null) tI16Value.setValue(row(ordinal).asInstanceOf[Short])
- TColumnValue.i16Val(tI16Value)
-
- case Types.INTEGER =>
- val tI32Value = new TI32Value
- if (row(ordinal) != null) tI32Value.setValue(row(ordinal).asInstanceOf[Int])
- TColumnValue.i32Val(tI32Value)
-
- case Types.BIGINT =>
- val tI64Value = new TI64Value
- if (row(ordinal) != null) tI64Value.setValue(row(ordinal).asInstanceOf[Long])
- TColumnValue.i64Val(tI64Value)
-
- case Types.REAL =>
- val tDoubleValue = new TDoubleValue
- if (row(ordinal) != null) {
- val doubleValue = java.lang.Double.valueOf(row(ordinal).asInstanceOf[Float].toString)
- tDoubleValue.setValue(doubleValue)
- }
- TColumnValue.doubleVal(tDoubleValue)
-
- case Types.DOUBLE =>
- val tDoubleValue = new TDoubleValue
- if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Double])
- TColumnValue.doubleVal(tDoubleValue)
-
- case Types.CHAR | Types.VARCHAR =>
- val tStringValue = new TStringValue
- if (row(ordinal) != null) tStringValue.setValue(row(ordinal).asInstanceOf[String])
- TColumnValue.stringVal(tStringValue)
-
- case _ =>
- val tStrValue = new TStringValue
- if (row(ordinal) != null) {
- tStrValue.setValue(
- toHiveString(row(ordinal), types(ordinal).sqlType))
- }
- TColumnValue.stringVal(tStrValue)
- }
- }
-
- protected def toHiveString(data: Any, sqlType: Int): String = {
- (data, sqlType) match {
- case (date: Date, Types.DATE) =>
- formatDate(date)
- case (dateTime: LocalDateTime, Types.TIMESTAMP) =>
- formatLocalDateTime(dateTime)
- case (decimal: java.math.BigDecimal, Types.DECIMAL) =>
- decimal.toPlainString
- // TODO support bitmap and hll
- case (other, _) =>
- other.toString
- }
- }
-}
+class PhoenixRowSetHelper extends RowSetHelper {}
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixSchemaHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixSchemaHelper.scala
index f5e04f7ca..938956cdc 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixSchemaHelper.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixSchemaHelper.scala
@@ -16,50 +16,6 @@
*/
package org.apache.kyuubi.engine.jdbc.phoenix
-import java.sql.Types
-
-import org.apache.hive.service.rpc.thrift._
-
import org.apache.kyuubi.engine.jdbc.schema.SchemaHelper
-class PhoenixSchemaHelper extends SchemaHelper {
-
- override def toTTypeId(sqlType: Int): TTypeId = sqlType match {
- case Types.BIT =>
- TTypeId.BOOLEAN_TYPE
-
- case Types.TINYINT =>
- TTypeId.TINYINT_TYPE
-
- case Types.SMALLINT =>
- TTypeId.SMALLINT_TYPE
-
- case Types.INTEGER =>
- TTypeId.INT_TYPE
-
- case Types.BIGINT =>
- TTypeId.BIGINT_TYPE
-
- case Types.REAL =>
- TTypeId.FLOAT_TYPE
-
- case Types.DOUBLE =>
- TTypeId.DOUBLE_TYPE
-
- case Types.CHAR | Types.VARCHAR =>
- TTypeId.STRING_TYPE
-
- case Types.DATE =>
- TTypeId.DATE_TYPE
-
- case Types.TIMESTAMP =>
- TTypeId.TIMESTAMP_TYPE
-
- case Types.DECIMAL =>
- TTypeId.DECIMAL_TYPE
-
- // TODO add more type support
- case _ =>
- TTypeId.STRING_TYPE
- }
-}
+class PhoenixSchemaHelper extends SchemaHelper {}
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala
index d489ed8a2..74b4cec10 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala
@@ -16,10 +16,16 @@
*/
package org.apache.kyuubi.engine.jdbc.schema
-import java.util
+import java.{lang, util}
+import java.sql.{Date, Types}
+import java.time.LocalDateTime
+
+import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift._
+import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate, formatLocalDateTime}
+
abstract class RowSetHelper {
def toTRowSet(
@@ -70,9 +76,73 @@ abstract class RowSetHelper {
protected def toTColumn(
rows: Seq[Seq[Any]],
ordinal: Int,
- sqlType: Int): TColumn
+ sqlType: Int): TColumn = {
+ sqlType match {
+ case Types.BIT =>
+ toBitTColumn(rows, ordinal)
+
+ case Types.TINYINT =>
+ toTinyIntTColumn(rows, ordinal)
+
+ case Types.SMALLINT =>
+ toSmallIntTColumn(rows, ordinal)
+
+ case Types.INTEGER =>
+ toIntegerTColumn(rows, ordinal)
+
+ case Types.BIGINT =>
+ toBigIntTColumn(rows, ordinal)
+
+ case Types.REAL =>
+ toRealTColumn(rows, ordinal)
+
+ case Types.DOUBLE =>
+ toDoubleTColumn(rows, ordinal)
+
+ case Types.CHAR =>
+ toCharTColumn(rows, ordinal)
+
+ case Types.VARCHAR =>
+ toVarcharTColumn(rows, ordinal)
+
+ case _ =>
+ toDefaultTColumn(rows, ordinal, sqlType)
+ }
+ }
+
+ protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue = {
+ types(ordinal).sqlType match {
+ case Types.BIT =>
+ toBitTColumnValue(row, ordinal)
+
+ case Types.TINYINT =>
+ toTinyIntTColumnValue(row, ordinal)
+
+ case Types.SMALLINT =>
+ toSmallIntTColumnValue(row, ordinal)
+
+ case Types.INTEGER =>
+ toIntegerTColumnValue(row, ordinal)
+
+ case Types.BIGINT =>
+ toBigIntTColumnValue(row, ordinal)
+
+ case Types.REAL =>
+ toRealTColumnValue(row, ordinal)
+
+ case Types.DOUBLE =>
+ toDoubleTColumnValue(row, ordinal)
- protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue
+ case Types.CHAR =>
+ toCharTColumnValue(row, ordinal)
+
+ case Types.VARCHAR =>
+ toVarcharTColumnValue(row, ordinal)
+
+ case _ =>
+ toDefaultTColumnValue(row, ordinal, types)
+ }
+ }
protected def getOrSetAsNull[T](
rows: Seq[Seq[Any]],
@@ -95,4 +165,159 @@ abstract class RowSetHelper {
}
ret
}
+
+ protected def toDefaultTColumn(rows: Seq[Seq[Any]], ordinal: Int, sqlType: Int): TColumn = {
+ val nulls = new java.util.BitSet()
+ val rowSize = rows.length
+ val values = new util.ArrayList[String](rowSize)
+ var i = 0
+ while (i < rowSize) {
+ val row = rows(i)
+ nulls.set(i, row(ordinal) == null)
+ val value =
+ if (row(ordinal) == null) {
+ ""
+ } else {
+ toHiveString(row(ordinal), sqlType)
+ }
+ values.add(value)
+ i += 1
+ }
+ TColumn.stringVal(new TStringColumn(values, nulls))
+ }
+
+ protected def toBitTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+ val nulls = new java.util.BitSet()
+ val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true)
+ TColumn.boolVal(new TBoolColumn(values, nulls))
+ }
+
+ protected def toTinyIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+ val nulls = new java.util.BitSet()
+ val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 0.toByte)
+ TColumn.byteVal(new TByteColumn(values, nulls))
+ }
+
+ protected def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+ val nulls = new java.util.BitSet()
+ val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls, 0.toShort)
+ TColumn.i16Val(new TI16Column(values, nulls))
+ }
+
+ protected def toIntegerTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+ val nulls = new java.util.BitSet()
+ val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0)
+ TColumn.i32Val(new TI32Column(values, nulls))
+ }
+
+ protected def toBigIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+ val nulls = new java.util.BitSet()
+ val values = getOrSetAsNull[lang.Long](rows, ordinal, nulls, 0L)
+ TColumn.i64Val(new TI64Column(values, nulls))
+ }
+
+ protected def toRealTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+ val nulls = new java.util.BitSet()
+ val values = getOrSetAsNull[lang.Float](rows, ordinal, nulls, 0.toFloat)
+ .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava
+ TColumn.doubleVal(new TDoubleColumn(values, nulls))
+ }
+
+ protected def toDoubleTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+ val nulls = new java.util.BitSet()
+ val values = getOrSetAsNull[lang.Double](rows, ordinal, nulls, 0.toDouble)
+ TColumn.doubleVal(new TDoubleColumn(values, nulls))
+ }
+
+ protected def toCharTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+ toVarcharTColumn(rows, ordinal)
+ }
+
+ protected def toVarcharTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
+ val nulls = new java.util.BitSet()
+ val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
+ TColumn.stringVal(new TStringColumn(values, nulls))
+ }
+
+ // ==========================================================
+
+ protected def toBitTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+ val boolValue = new TBoolValue
+ if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean])
+ TColumnValue.boolVal(boolValue)
+ }
+
+ protected def toTinyIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+ val byteValue = new TByteValue
+ if (row(ordinal) != null) byteValue.setValue(row(ordinal).asInstanceOf[Byte])
+ TColumnValue.byteVal(byteValue)
+ }
+
+ protected def toSmallIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+ val tI16Value = new TI16Value
+ if (row(ordinal) != null) tI16Value.setValue(row(ordinal).asInstanceOf[Short])
+ TColumnValue.i16Val(tI16Value)
+ }
+
+ protected def toIntegerTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+ val tI32Value = new TI32Value
+ if (row(ordinal) != null) tI32Value.setValue(row(ordinal).asInstanceOf[Int])
+ TColumnValue.i32Val(tI32Value)
+ }
+
+ protected def toBigIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+ val tI64Value = new TI64Value
+ if (row(ordinal) != null) tI64Value.setValue(row(ordinal).asInstanceOf[Long])
+ TColumnValue.i64Val(tI64Value)
+ }
+
+ protected def toRealTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+ val tDoubleValue = new TDoubleValue
+ if (row(ordinal) != null) {
+ val doubleValue = java.lang.Double.valueOf(row(ordinal).asInstanceOf[Float].toString)
+ tDoubleValue.setValue(doubleValue)
+ }
+ TColumnValue.doubleVal(tDoubleValue)
+ }
+
+ protected def toDoubleTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+ val tDoubleValue = new TDoubleValue
+ if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Double])
+ TColumnValue.doubleVal(tDoubleValue)
+ }
+
+ protected def toCharTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+ toVarcharTColumnValue(row, ordinal)
+ }
+
+ protected def toVarcharTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
+ val tStringValue = new TStringValue
+ if (row(ordinal) != null) tStringValue.setValue(row(ordinal).asInstanceOf[String])
+ TColumnValue.stringVal(tStringValue)
+ }
+
+ protected def toDefaultTColumnValue(
+ row: List[Any],
+ ordinal: Int,
+ types: List[Column]): TColumnValue = {
+ val tStrValue = new TStringValue
+ if (row(ordinal) != null) {
+ tStrValue.setValue(
+ toHiveString(row(ordinal), types(ordinal).sqlType))
+ }
+ TColumnValue.stringVal(tStrValue)
+ }
+
+ protected def toHiveString(data: Any, sqlType: Int): String = {
+ (data, sqlType) match {
+ case (date: Date, Types.DATE) =>
+ formatDate(date)
+ case (dateTime: LocalDateTime, Types.TIMESTAMP) =>
+ formatLocalDateTime(dateTime)
+ case (decimal: java.math.BigDecimal, Types.DECIMAL) =>
+ decimal.toPlainString
+ case (other, _) =>
+ other.toString
+ }
+ }
}
diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/SchemaHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/SchemaHelper.scala
index 3be3c7d42..455eb2a92 100644
--- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/SchemaHelper.scala
+++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/SchemaHelper.scala
@@ -16,6 +16,7 @@
*/
package org.apache.kyuubi.engine.jdbc.schema
+import java.sql.Types
import java.util.Collections
import scala.collection.JavaConverters._
@@ -62,6 +63,97 @@ abstract class SchemaHelper {
ret
}
- protected def toTTypeId(sqlType: Int): TTypeId
+ protected def toTTypeId(sqlType: Int): TTypeId = sqlType match {
+ case Types.BIT =>
+ bitToTTypeId
+ case Types.TINYINT =>
+ tinyIntToTTypeId
+
+ case Types.SMALLINT =>
+ smallIntToTTypeId
+
+ case Types.INTEGER =>
+ integerToTTypeId
+
+ case Types.BIGINT =>
+ bigintToTTypeId
+
+ case Types.REAL =>
+ realToTTypeId
+
+ case Types.DOUBLE =>
+ doubleToTTypeId
+
+ case Types.CHAR =>
+ charToTTypeId
+
+ case Types.VARCHAR =>
+ varcharToTTypeId
+
+ case Types.DATE =>
+ dateToTTypeId
+
+ case Types.TIMESTAMP =>
+ timestampToTTypeId
+
+ case Types.DECIMAL =>
+ decimalToTTypeId
+
+ // TODO add more type support
+ case _ =>
+ defaultToTTypeId
+ }
+
+ protected def bitToTTypeId: TTypeId = {
+ TTypeId.BOOLEAN_TYPE
+ }
+
+ protected def tinyIntToTTypeId: TTypeId = {
+ TTypeId.TINYINT_TYPE
+ }
+
+ protected def smallIntToTTypeId: TTypeId = {
+ TTypeId.SMALLINT_TYPE
+ }
+
+ protected def integerToTTypeId: TTypeId = {
+ TTypeId.INT_TYPE
+ }
+
+ protected def bigintToTTypeId: TTypeId = {
+ TTypeId.BIGINT_TYPE
+ }
+
+ protected def realToTTypeId: TTypeId = {
+ TTypeId.FLOAT_TYPE
+ }
+
+ protected def doubleToTTypeId: TTypeId = {
+ TTypeId.DOUBLE_TYPE
+ }
+
+ protected def charToTTypeId: TTypeId = {
+ TTypeId.STRING_TYPE
+ }
+
+ protected def varcharToTTypeId: TTypeId = {
+ TTypeId.STRING_TYPE
+ }
+
+ protected def dateToTTypeId: TTypeId = {
+ TTypeId.DATE_TYPE
+ }
+
+ protected def timestampToTTypeId: TTypeId = {
+ TTypeId.TIMESTAMP_TYPE
+ }
+
+ protected def decimalToTTypeId: TTypeId = {
+ TTypeId.DECIMAL_TYPE
+ }
+
+ protected def defaultToTTypeId: TTypeId = {
+ TTypeId.STRING_TYPE
+ }
}