You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/11/11 02:08:16 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #3790] Avoid using SchemaResolver directly in GetColumns operation
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new a58816609 [KYUUBI #3790] Avoid using SchemaResolver directly in GetColumns operation
a58816609 is described below
commit a58816609b4b4ed9a7af9ce968587f6def227572
Author: df_liu <df...@trip.com>
AuthorDate: Fri Nov 11 10:08:05 2022 +0800
[KYUUBI #3790] Avoid using SchemaResolver directly in GetColumns operation
### _Why are the changes needed?_
close #3790
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3793 from df-Liu/flink_table.
Closes #3790
317434d2 [df_liu] flink table from
Authored-by: df_liu <df...@trip.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../apache/kyuubi/engine/flink/operation/GetColumns.scala | 14 +++-----------
1 file changed, 3 insertions(+), 11 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala
index 63f604976..6ce2a6ac7 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala
@@ -21,7 +21,6 @@ import scala.collection.JavaConverters._
import org.apache.commons.lang3.StringUtils
import org.apache.flink.table.api.{DataTypes, ResultKind}
-import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl
import org.apache.flink.table.catalog.Column
import org.apache.flink.table.types.logical._
import org.apache.flink.types.Row
@@ -42,14 +41,6 @@ class GetColumns(
override protected def runInternal(): Unit = {
try {
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
- val resolver = tableEnv match {
- case impl: StreamTableEnvironmentImpl =>
- impl.getCatalogManager.getSchemaResolver
- case _ =>
- throw new UnsupportedOperationException(
- "Unsupported Operation type GetColumns. You can execute " +
- "DESCRIBE statement instead to get column infos.")
- }
val catalogName =
if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
@@ -68,8 +59,9 @@ class GetColumns(
schemaName,
tableNameRegex)
.filter { _._2.isDefined }
- .flatMap { case (tableName, flinkTable) =>
- val resolvedSchema = flinkTable.get.getUnresolvedSchema.resolve(resolver)
+ .flatMap { case (tableName, _) =>
+ val flinkTable = tableEnv.from(s"`$catalogName`.`$schemaName`.`$tableName`")
+ val resolvedSchema = flinkTable.getResolvedSchema
resolvedSchema.getColumns.asScala.toArray.zipWithIndex
.filter { case (column, _) =>
columnNameRegex.pattern.matcher(column.getName).matches()