You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@kyuubi.apache.org by GitBox <gi...@apache.org> on 2022/10/28 02:41:51 UTC

[GitHub] [incubator-kyuubi] waywtdcc opened a new pull request, #3718: [KYUUBI #3717] Support flink engine see primary keys

waywtdcc opened a new pull request, #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718

   <!--
   Thanks for sending a pull request!
   
   Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/CONTRIBUTING.html
     2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
   -->
   
   ### _Why are the changes needed?_
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you add a feature, you can talk about the use case of it.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   *Support flink engine see primary keys*
   
   ### _How was this patch tested?_
   - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
   
   - [ ] Add screenshots for manual tests if appropriate
   
   - [Y] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] pan3793 commented on pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
pan3793 commented on PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#issuecomment-1295794445

   Yea, schema and table name should not be patterns but the exact name


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] waywtdcc commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
waywtdcc commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1009062230


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl
+import org.apache.flink.table.catalog.{Column, UniqueConstraint}
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+      val catalog = tableEnv.getCatalog(catalogName).get()
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) catalog.getDefaultDatabase
+        else schemaNameOrEmpty
+
+      val flinkTable = tableEnv.from(s"`$catalogName`.`$schemaName`.`$tableName`")
+
+      val resolvedSchema = flinkTable.getResolvedSchema
+      val uniqueConstraint = resolvedSchema.getPrimaryKey.orElse(
+        UniqueConstraint.primaryKey("null_pri", Collections.emptyList()))

Review Comment:
   Thanks for your reviews. I have modified it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] waywtdcc commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
waywtdcc commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1012527763


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) tableEnv.getCurrentDatabase
+        else schemaNameOrEmpty

Review Comment:
         val schemaName =
           if (StringUtils.isEmpty(schemaNameOrEmpty)) {
             if (tableEnv.getCurrentDatabase != null) {
               tableEnv.getCurrentDatabase
             } else {
               tableEnv.getCatalog(catalogName).get().getDefaultDatabase
             }
           } else schemaNameOrEmpty
   
     Is that OK?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] pan3793 commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
pan3793 commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1014603598


##########
externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala:
##########
@@ -163,6 +163,78 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
     }
   }
 
+  test("get primary keys") {
+    val tableName1 = "flink_get_primary_keys_operation1"
+    val tableName2 = "flink_get_primary_keys_operation2"
+    val tableName3 = "flink_get_primary_keys_operation3"
+
+    withJdbcStatement(tableName1, tableName2, tableName3) { statement =>
+      statement.execute(
+        s"""
+           | create table $tableName1 (
+           |  id1 int,
+           |  c1 tinyint,
+           |  c2 smallint,
+           |  c3 integer
+           | )
+           | with (
+           |   'connector' = 'filesystem'
+           | )
+    """.stripMargin)
+
+      statement.execute(
+        s"""
+           | create table $tableName2 (
+           |  id1 int,
+           |  id2 int,
+           |  c1 tinyint,
+           |  c2 smallint,
+           |  c3 integer,
+           |  CONSTRAINT pk_con primary key(id1,id2) NOT ENFORCED
+           | )
+           | with (
+           |   'connector' = 'filesystem'
+           | )
+    """.stripMargin)
+
+      statement.execute(
+        s"""
+           | create table $tableName3 (
+           |  id1 int,
+           |  id2 int,
+           |  c1 tinyint,
+           |  c2 smallint,
+           |  c3 integer
+           | )
+           | with (
+           |   'connector' = 'filesystem'
+           | )
+    """.stripMargin)
+
+      val metaData = statement.getConnection.getMetaData
+
+      Seq(tableName1, tableName2, tableName3) foreach { tableName =>

Review Comment:
   I think the original idea here is define single pk for `tableName1`
   
   ```
   CONSTRAINT pk_con primary key(id1) NOT ENFORCED
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] waywtdcc commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
waywtdcc commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1012538443


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) tableEnv.getCurrentDatabase
+        else schemaNameOrEmpty
+
+      val flinkTable = tableEnv.from(s"`$catalogName`.`$schemaName`.`$tableName`")
+
+      val resolvedSchema = flinkTable.getResolvedSchema
+      val primaryKeySchema = resolvedSchema.getPrimaryKey
+      val columns = primaryKeySchema.asScala.map { uniqueConstraint =>
+        uniqueConstraint
+          .getColumns.asScala.toArray.zipWithIndex
+          .map { case (column, pos) =>
+            toColumnResult(
+              catalogName,
+              schemaName,
+              tableName,
+              uniqueConstraint.getName,
+              column,
+              pos)
+          }
+      }.getOrElse(Array.empty)
+
+      resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+        .columns(
+          Column.physical(TABLE_CAT, DataTypes.STRING),
+          Column.physical(TABLE_SCHEM, DataTypes.STRING),
+          Column.physical(TABLE_NAME, DataTypes.STRING),
+          Column.physical(COLUMN_NAME, DataTypes.STRING),
+          Column.physical(KEY_SEQ, DataTypes.INT),
+          Column.physical(PK_NAME, DataTypes.STRING))
+        .data(columns)
+        .build
+
+    } catch onError()
+  }
+
+  private def toColumnResult(
+      catalogName: String,
+      schemaName: String,
+      tableName: String,
+      pkName: String,
+      columnName: String,
+      pos: Int): Row = {

Review Comment:
   it is list type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] pan3793 commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
pan3793 commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1009056336


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl
+import org.apache.flink.table.catalog.{Column, UniqueConstraint}
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+      val catalog = tableEnv.getCatalog(catalogName).get()
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) catalog.getDefaultDatabase
+        else schemaNameOrEmpty
+
+      val flinkTable = tableEnv.from(s"`$catalogName`.`$schemaName`.`$tableName`")
+
+      val resolvedSchema = flinkTable.getResolvedSchema
+      val uniqueConstraint = resolvedSchema.getPrimaryKey.orElse(
+        UniqueConstraint.primaryKey("null_pri", Collections.emptyList()))

Review Comment:
   why should we use this default value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] jiaoqingbo commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
jiaoqingbo commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1010041295


##########
externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala:
##########
@@ -163,6 +163,78 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
     }
   }
 
+  test("get primary keys") {
+    val tableName1 = "flink_get_primary_keys_operation1"
+    val tableName2 = "flink_get_primary_keys_operation2"
+    val tableName3 = "flink_get_primary_keys_operation3"
+
+    withJdbcStatement(tableName1, tableName2, tableName3) { statement =>
+      statement.execute(
+        s"""
+           | create table $tableName1 (
+           |  id1 int,
+           |  c1 tinyint,
+           |  c2 smallint,
+           |  c3 integer
+           | )
+           | with (
+           |   'connector' = 'filesystem'
+           | )
+    """.stripMargin)
+
+      statement.execute(
+        s"""
+           | create table $tableName2 (
+           |  id1 int,
+           |  id2 int,
+           |  c1 tinyint,
+           |  c2 smallint,
+           |  c3 integer,
+           |  CONSTRAINT pk_con primary key(id1,id2) NOT ENFORCED
+           | )
+           | with (
+           |   'connector' = 'filesystem'
+           | )
+    """.stripMargin)
+
+      statement.execute(
+        s"""
+           | create table $tableName3 (
+           |  id1 int,
+           |  id2 int,
+           |  c1 tinyint,
+           |  c2 smallint,
+           |  c3 integer
+           | )
+           | with (
+           |   'connector' = 'filesystem'
+           | )
+    """.stripMargin)
+
+      val metaData = statement.getConnection.getMetaData
+
+      Seq(tableName1, tableName2, tableName3) foreach { tableName =>
+        val rowSet = metaData.getPrimaryKeys("", "", tableName)
+
+        if (tableName.equals(tableName3)) {
+          assert(rowSet.next() == false)

Review Comment:
    assert(!rowSet.next())



##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+      val catalog = tableEnv.getCatalog(catalogName).get()
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) catalog.getDefaultDatabase
+        else schemaNameOrEmpty

Review Comment:
   If catalogNameOrEmpty is empty, tableEnv.getCurrentCatalog will be selected. In this case, if schemaNameOrEmpty is empty, schemaName should choose tableEnv.getCurrentDatabase instead of catalog.getDefaultDatabase. WDYT @pan3793 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] waywtdcc commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
waywtdcc commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1012537020


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) tableEnv.getCurrentDatabase
+        else schemaNameOrEmpty

Review Comment:
   Thank you for your reviews. I think it's good. I have revised the relevant content.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] yanghua commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
yanghua commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1013532469


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) tableEnv.getCurrentDatabase
+        else schemaNameOrEmpty

Review Comment:
   Can we add a test for this logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] pan3793 commented on pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
pan3793 commented on PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#issuecomment-1297877896

   please rebase master to solve the conflict, and it would be good if you can test w/ DBeaver to verify this functionality.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] pan3793 commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
pan3793 commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1010048589


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+      val catalog = tableEnv.getCatalog(catalogName).get()
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) catalog.getDefaultDatabase
+        else schemaNameOrEmpty

Review Comment:
   good catch, sgtm, @waywtdcc would you please change as @jiaoqingbo suggested?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] waywtdcc commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
waywtdcc commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1010081792


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+      val catalog = tableEnv.getCatalog(catalogName).get()
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) catalog.getDefaultDatabase
+        else schemaNameOrEmpty

Review Comment:
   OK, thank you for your review. I have fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] waywtdcc commented on pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
waywtdcc commented on PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#issuecomment-1294678164

   This may not require regular expressions. I will modify it later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] yanghua commented on pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
yanghua commented on PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#issuecomment-1296434846

   Will review it later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] yanghua commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
yanghua commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1013838108


##########
externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala:
##########
@@ -163,6 +163,78 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
     }
   }
 
+  test("get primary keys") {
+    val tableName1 = "flink_get_primary_keys_operation1"
+    val tableName2 = "flink_get_primary_keys_operation2"
+    val tableName3 = "flink_get_primary_keys_operation3"
+
+    withJdbcStatement(tableName1, tableName2, tableName3) { statement =>
+      statement.execute(
+        s"""
+           | create table $tableName1 (
+           |  id1 int,
+           |  c1 tinyint,
+           |  c2 smallint,
+           |  c3 integer
+           | )
+           | with (
+           |   'connector' = 'filesystem'
+           | )
+    """.stripMargin)
+
+      statement.execute(
+        s"""
+           | create table $tableName2 (
+           |  id1 int,
+           |  id2 int,
+           |  c1 tinyint,
+           |  c2 smallint,
+           |  c3 integer,
+           |  CONSTRAINT pk_con primary key(id1,id2) NOT ENFORCED
+           | )
+           | with (
+           |   'connector' = 'filesystem'
+           | )
+    """.stripMargin)
+
+      statement.execute(
+        s"""
+           | create table $tableName3 (
+           |  id1 int,
+           |  id2 int,
+           |  c1 tinyint,
+           |  c2 smallint,
+           |  c3 integer
+           | )
+           | with (
+           |   'connector' = 'filesystem'
+           | )
+    """.stripMargin)
+
+      val metaData = statement.getConnection.getMetaData
+
+      Seq(tableName1, tableName2, tableName3) foreach { tableName =>

Review Comment:
   I found the `tableName3` seems redundant? The definition of `tableName1 ` and `tableName3 ` are almost the same(no primary keys). If we want to verify if we can fetch primary keys, we can just use `tableName1`. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] yanghua commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
yanghua commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1010100143


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) tableEnv.getCurrentDatabase
+        else schemaNameOrEmpty
+
+      val flinkTable = tableEnv.from(s"`$catalogName`.`$schemaName`.`$tableName`")
+
+      val resolvedSchema = flinkTable.getResolvedSchema
+      val primaryKeySchema = resolvedSchema.getPrimaryKey
+      val columns = primaryKeySchema.asScala.map { uniqueConstraint =>
+        uniqueConstraint
+          .getColumns.asScala.toArray.zipWithIndex
+          .map { case (column, pos) =>
+            toColumnResult(
+              catalogName,
+              schemaName,
+              tableName,
+              uniqueConstraint.getName,
+              column,
+              pos)
+          }
+      }.getOrElse(Array.empty)
+
+      resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+        .columns(
+          Column.physical(TABLE_CAT, DataTypes.STRING),
+          Column.physical(TABLE_SCHEM, DataTypes.STRING),
+          Column.physical(TABLE_NAME, DataTypes.STRING),
+          Column.physical(COLUMN_NAME, DataTypes.STRING),
+          Column.physical(KEY_SEQ, DataTypes.INT),
+          Column.physical(PK_NAME, DataTypes.STRING))
+        .data(columns)
+        .build
+
+    } catch onError()
+  }
+
+  private def toColumnResult(
+      catalogName: String,
+      schemaName: String,
+      tableName: String,
+      pkName: String,
+      columnName: String,
+      pos: Int): Row = {

Review Comment:
   minor comment: make the order of args equals to `Row.of(x, y, x)` and `.columns(...)` look more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] cxzl25 commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
cxzl25 commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1009134389


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+      val catalog = tableEnv.getCatalog(catalogName).get()
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) catalog.getDefaultDatabase
+        else schemaNameOrEmpty
+
+      val flinkTable = tableEnv.from(s"`$catalogName`.`$schemaName`.`$tableName`")
+
+      val resolvedSchema = flinkTable.getResolvedSchema
+      val primaryKeySchema = resolvedSchema.getPrimaryKey
+      var columns = ArrayBuffer.empty[Row].toArray;
+      if (primaryKeySchema.isPresent) {
+        val uniqueConstraint = primaryKeySchema.get()
+        columns = uniqueConstraint
+          .getColumns.asScala.toArray.zipWithIndex
+          .map { case (column, pos) =>
+            toColumnResult(
+              catalogName,
+              schemaName,
+              tableName,
+              uniqueConstraint.getName,
+              column,
+              pos)
+          }
+      }
+      resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+        .columns(
+          Column.physical(TABLE_CAT, DataTypes.STRING),
+          Column.physical(TABLE_SCHEM, DataTypes.STRING),
+          Column.physical(TABLE_NAME, DataTypes.STRING),
+          Column.physical(COLUMN_NAME, DataTypes.STRING),
+          Column.physical(KEY_SEQ, DataTypes.INT),
+          Column.physical(PK_NAME, DataTypes.STRING))
+        .data(columns)
+        .build
+
+    } catch onError()
+  }
+
+  private def toColumnResult(
+      catalogName: String,
+      schemaName: String,
+      tableName: String,
+      pkName: String,
+      columnName: String,
+      pos: Int): Row = {
+    // format: off
+    Row.of(
+      catalogName, // TABLE_CAT
+      schemaName, // TABLE_SCHEM
+      tableName, // TABLE_NAME
+      columnName, // COLUMN_NAME
+      Integer.valueOf(pos + 1), // KEY_SEQ
+      pkName // PK_NAME
+    )

Review Comment:
   ```suggestion
       Row.of(
         catalogName,              // TABLE_CAT
         schemaName,               // TABLE_SCHEM
         tableName,                // TABLE_NAME
         columnName,               // COLUMN_NAME
         Integer.valueOf(pos + 1), // KEY_SEQ
         pkName                    // PK_NAME
       )
   ```



##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+      val catalog = tableEnv.getCatalog(catalogName).get()
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) catalog.getDefaultDatabase
+        else schemaNameOrEmpty
+
+      val flinkTable = tableEnv.from(s"`$catalogName`.`$schemaName`.`$tableName`")
+
+      val resolvedSchema = flinkTable.getResolvedSchema
+      val primaryKeySchema = resolvedSchema.getPrimaryKey
+      var columns = ArrayBuffer.empty[Row].toArray;
+      if (primaryKeySchema.isPresent) {
+        val uniqueConstraint = primaryKeySchema.get()
+        columns = uniqueConstraint
+          .getColumns.asScala.toArray.zipWithIndex
+          .map { case (column, pos) =>
+            toColumnResult(
+              catalogName,
+              schemaName,
+              tableName,
+              uniqueConstraint.getName,
+              column,
+              pos)
+          }
+      }

Review Comment:
   ```suggestion
         val columns = primaryKeySchema.asScala.map { uniqueConstraint =>
           uniqueConstraint
             .getColumns.asScala.toArray.zipWithIndex
             .map { case (column, pos) =>
               toColumnResult(
                 catalogName,
                 schemaName,
                 tableName,
                 uniqueConstraint.getName,
                 column,
                 pos)
             }
         }.getOrElse(Array.empty)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] codecov-commenter commented on pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#issuecomment-1294417976

   # [Codecov](https://codecov.io/gh/apache/incubator-kyuubi/pull/3718?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3718](https://codecov.io/gh/apache/incubator-kyuubi/pull/3718?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ca19da6) into [master](https://codecov.io/gh/apache/incubator-kyuubi/commit/010cfdfcd54fec6711e480d4fef17f7678546b00?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (010cfdf) will **decrease** coverage by `0.02%`.
   > The diff coverage is `n/a`.
   
   > :exclamation: Current head ca19da6 differs from pull request most recent head 562e8eb. Consider uploading reports for the commit 562e8eb to get more accurate results
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3718      +/-   ##
   ============================================
   - Coverage     52.47%   52.45%   -0.03%     
     Complexity       13       13              
   ============================================
     Files           490      490              
     Lines         27649    27649              
     Branches       3824     3824              
   ============================================
   - Hits          14508    14502       -6     
   + Misses        11760    11759       -1     
   - Partials       1381     1388       +7     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-kyuubi/pull/3718?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...yuubi/operation/meta/ResultSetSchemaConstant.scala](https://codecov.io/gh/apache/incubator-kyuubi/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a3l1dWJpLWNvbW1vbi9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2t5dXViaS9vcGVyYXRpb24vbWV0YS9SZXN1bHRTZXRTY2hlbWFDb25zdGFudC5zY2FsYQ==) | `0.00% <ø> (ø)` | |
   | [...rg/apache/kyuubi/ctl/cmd/log/LogBatchCommand.scala](https://codecov.io/gh/apache/incubator-kyuubi/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a3l1dWJpLWN0bC9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2t5dXViaS9jdGwvY21kL2xvZy9Mb2dCYXRjaENvbW1hbmQuc2NhbGE=) | `78.00% <0.00%> (-2.00%)` | :arrow_down: |
   | [...ain/scala/org/apache/kyuubi/engine/EngineRef.scala](https://codecov.io/gh/apache/incubator-kyuubi/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a3l1dWJpLXNlcnZlci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2t5dXViaS9lbmdpbmUvRW5naW5lUmVmLnNjYWxh) | `73.84% <0.00%> (-0.77%)` | :arrow_down: |
   | [...yuubi/server/metadata/jdbc/JDBCMetadataStore.scala](https://codecov.io/gh/apache/incubator-kyuubi/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a3l1dWJpLXNlcnZlci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2t5dXViaS9zZXJ2ZXIvbWV0YWRhdGEvamRiYy9KREJDTWV0YWRhdGFTdG9yZS5zY2FsYQ==) | `89.27% <0.00%> (-0.70%)` | :arrow_down: |
   | [...he/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala](https://codecov.io/gh/apache/incubator-kyuubi/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a3l1dWJpLWhhL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUva3l1dWJpL2hhL2NsaWVudC9ldGNkL0V0Y2REaXNjb3ZlcnlDbGllbnQuc2NhbGE=) | `67.64% <0.00%> (-0.59%)` | :arrow_down: |
   | [...in/scala/org/apache/kyuubi/config/KyuubiConf.scala](https://codecov.io/gh/apache/incubator-kyuubi/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a3l1dWJpLWNvbW1vbi9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2t5dXViaS9jb25maWcvS3l1dWJpQ29uZi5zY2FsYQ==) | `97.46% <0.00%> (-0.08%)` | :arrow_down: |
   | [...g/apache/kyuubi/operation/BatchJobSubmission.scala](https://codecov.io/gh/apache/incubator-kyuubi/pull/3718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a3l1dWJpLXNlcnZlci9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2t5dXViaS9vcGVyYXRpb24vQmF0Y2hKb2JTdWJtaXNzaW9uLnNjYWxh) | `74.64% <0.00%> (ø)` | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] yanghua commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
yanghua commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1013520495


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) tableEnv.getCurrentDatabase
+        else schemaNameOrEmpty
+
+      val flinkTable = tableEnv.from(s"`$catalogName`.`$schemaName`.`$tableName`")
+
+      val resolvedSchema = flinkTable.getResolvedSchema
+      val primaryKeySchema = resolvedSchema.getPrimaryKey
+      val columns = primaryKeySchema.asScala.map { uniqueConstraint =>
+        uniqueConstraint
+          .getColumns.asScala.toArray.zipWithIndex
+          .map { case (column, pos) =>
+            toColumnResult(
+              catalogName,
+              schemaName,
+              tableName,
+              uniqueConstraint.getName,
+              column,
+              pos)
+          }
+      }.getOrElse(Array.empty)
+
+      resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+        .columns(
+          Column.physical(TABLE_CAT, DataTypes.STRING),
+          Column.physical(TABLE_SCHEM, DataTypes.STRING),
+          Column.physical(TABLE_NAME, DataTypes.STRING),
+          Column.physical(COLUMN_NAME, DataTypes.STRING),
+          Column.physical(KEY_SEQ, DataTypes.INT),
+          Column.physical(PK_NAME, DataTypes.STRING))
+        .data(columns)
+        .build
+
+    } catch onError()
+  }
+
+  private def toColumnResult(
+      catalogName: String,
+      schemaName: String,
+      tableName: String,
+      pkName: String,
+      columnName: String,
+      pos: Int): Row = {

Review Comment:
   > it is list type.
   
   What's the meaning?
   
   Actually, my suggestion is:
   
   ```
   private def toColumnResult(
         catalogName: String,
         schemaName: String,
         tableName: String,
         columnName: String,
         pos: Int,
         pkName: String): Row = {
   ```
   
   It's the same order in which you assemble the objects. Moreover, this method is only for assembling objects.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] pan3793 commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
pan3793 commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1014603717


##########
externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala:
##########
@@ -163,6 +163,78 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
     }
   }
 
+  test("get primary keys") {
+    val tableName1 = "flink_get_primary_keys_operation1"
+    val tableName2 = "flink_get_primary_keys_operation2"
+    val tableName3 = "flink_get_primary_keys_operation3"
+
+    withJdbcStatement(tableName1, tableName2, tableName3) { statement =>
+      statement.execute(
+        s"""
+           | create table $tableName1 (
+           |  id1 int,
+           |  c1 tinyint,
+           |  c2 smallint,
+           |  c3 integer
+           | )
+           | with (
+           |   'connector' = 'filesystem'
+           | )
+    """.stripMargin)
+
+      statement.execute(
+        s"""
+           | create table $tableName2 (
+           |  id1 int,
+           |  id2 int,
+           |  c1 tinyint,
+           |  c2 smallint,
+           |  c3 integer,
+           |  CONSTRAINT pk_con primary key(id1,id2) NOT ENFORCED
+           | )
+           | with (
+           |   'connector' = 'filesystem'
+           | )
+    """.stripMargin)
+
+      statement.execute(
+        s"""
+           | create table $tableName3 (
+           |  id1 int,
+           |  id2 int,
+           |  c1 tinyint,
+           |  c2 smallint,
+           |  c3 integer
+           | )
+           | with (
+           |   'connector' = 'filesystem'
+           | )
+    """.stripMargin)
+
+      val metaData = statement.getConnection.getMetaData
+
+      Seq(tableName1, tableName2, tableName3) foreach { tableName =>
+        val rowSet = metaData.getPrimaryKeys("", "", tableName)
+
+        if (tableName.equals(tableName3)) {
+          assert(!rowSet.next())
+        } else {
+          var pos = 1;
+          while (rowSet.next()) {
+            assert(rowSet.getString(TABLE_CAT) === "default_catalog")
+            assert(rowSet.getString(TABLE_SCHEM) === "default_database")
+            assert(rowSet.getString(TABLE_NAME) === tableName)
+            assert(rowSet.getString(COLUMN_NAME) === s"id$pos")
+            assert(rowSet.getInt(KEY_SEQ) === pos)
+            assert(rowSet.getString(PK_NAME) === "pk_con")
+            pos += 1
+          }
+        }
+

Review Comment:
   `assert pos > 1`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] pan3793 closed pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
pan3793 closed pull request #3718: [KYUUBI #3717] Support flink engine see primary keys
URL: https://github.com/apache/incubator-kyuubi/pull/3718


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] yaooqinn commented on pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
yaooqinn commented on PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#issuecomment-1294499087

   cc @yanghua @SteNicholas 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] waywtdcc commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
waywtdcc commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1013550372


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) tableEnv.getCurrentDatabase
+        else schemaNameOrEmpty
+
+      val flinkTable = tableEnv.from(s"`$catalogName`.`$schemaName`.`$tableName`")
+
+      val resolvedSchema = flinkTable.getResolvedSchema
+      val primaryKeySchema = resolvedSchema.getPrimaryKey
+      val columns = primaryKeySchema.asScala.map { uniqueConstraint =>
+        uniqueConstraint
+          .getColumns.asScala.toArray.zipWithIndex
+          .map { case (column, pos) =>
+            toColumnResult(
+              catalogName,
+              schemaName,
+              tableName,
+              uniqueConstraint.getName,
+              column,
+              pos)
+          }
+      }.getOrElse(Array.empty)
+
+      resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+        .columns(
+          Column.physical(TABLE_CAT, DataTypes.STRING),
+          Column.physical(TABLE_SCHEM, DataTypes.STRING),
+          Column.physical(TABLE_NAME, DataTypes.STRING),
+          Column.physical(COLUMN_NAME, DataTypes.STRING),
+          Column.physical(KEY_SEQ, DataTypes.INT),
+          Column.physical(PK_NAME, DataTypes.STRING))
+        .data(columns)
+        .build
+
+    } catch onError()
+  }
+
+  private def toColumnResult(
+      catalogName: String,
+      schemaName: String,
+      tableName: String,
+      pkName: String,
+      columnName: String,
+      pos: Int): Row = {

Review Comment:
   OK, thanks for your reviews. I modified the order, please take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] waywtdcc commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
waywtdcc commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1012527763


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) tableEnv.getCurrentDatabase
+        else schemaNameOrEmpty

Review Comment:
         val schemaName =
           if (StringUtils.isEmpty(schemaNameOrEmpty)) {
             if (tableEnv.getCurrentDatabase != null) {
               tableEnv.getCurrentDatabase
             } else {
               tableEnv.getCatalog(catalogName).get().getDefaultDatabase
             }
           } else schemaNameOrEmpty  Is that OK?



##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) tableEnv.getCurrentDatabase
+        else schemaNameOrEmpty

Review Comment:
         val schemaName =
           if (StringUtils.isEmpty(schemaNameOrEmpty)) {
             if (tableEnv.getCurrentDatabase != null) {
               tableEnv.getCurrentDatabase
             } else {
               tableEnv.getCatalog(catalogName).get().getDefaultDatabase
             }
           } else schemaNameOrEmpty
   
     Is that OK?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] jiaoqingbo commented on a diff in pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
jiaoqingbo commented on code in PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#discussion_r1010129716


##########
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetPrimaryKeys(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNameOrEmpty: String,
+    tableName: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+
+      val schemaName =
+        if (StringUtils.isEmpty(schemaNameOrEmpty)) tableEnv.getCurrentDatabase
+        else schemaNameOrEmpty

Review Comment:
   It seems that there is still a problem here. When the catalogName is not equal to tableEnv.getCurrentCatalog, the obtained schemaName is still the CurrentDatabase of the current catalog. Correctly modify the code should be similar to the following way.
   
   
   ```val schemaName =
           if (StringUtils.isEmpty(schemaNameOrEmpty)) {
             if (catalogName != tableEnv.getCurrentCatalog) {
               catalog.getDefaultDatabase
             } else {
               tableEnv.getCurrentDatabase
             }
           } else schemaNameOrEmpty



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org


[GitHub] [incubator-kyuubi] pan3793 commented on pull request #3718: [KYUUBI #3717] Support flink engine see primary keys

Posted by GitBox <gi...@apache.org>.
pan3793 commented on PR #3718:
URL: https://github.com/apache/incubator-kyuubi/pull/3718#issuecomment-1305203503

   Thanks, merging to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org