You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "FangYongs (via GitHub)" <gi...@apache.org> on 2023/03/29 02:50:49 UTC

[GitHub] [flink] FangYongs opened a new pull request, #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection

FangYongs opened a new pull request, #22289:
URL: https://github.com/apache/flink/pull/22289

   ## What is the purpose of the change
   
   This PR aims to create `Executor` in `FlinkConnection` and supports catalog/schema related operations
   
   ## Brief change log
     - Create `Executor` in `FlinkConnection`
     - Supports catalog/schema related operations
     - Supports cluster info in `FlinkConnection`
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
     - Added `testCatalogSchema` and `testClientInfo` in `FlinkConnectionTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no) no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) no
     - The serializers: (yes / no / don't know) no
     - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no
     - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no) no
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection

Posted by "libenchao (via GitHub)" <gi...@apache.org>.
libenchao commented on PR #22289:
URL: https://github.com/apache/flink/pull/22289#issuecomment-1493829442

   CI now fails due to the API compatibility problem, could you fix that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao closed pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection

Posted by "libenchao (via GitHub)" <gi...@apache.org>.
libenchao closed pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection
URL: https://github.com/apache/flink/pull/22289


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on a diff in pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection

Posted by "libenchao (via GitHub)" <gi...@apache.org>.
libenchao commented on code in PR #22289:
URL: https://github.com/apache/flink/pull/22289#discussion_r1155415681


##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java:
##########
@@ -18,34 +18,67 @@
 
 package org.apache.flink.table.jdbc;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.jdbc.utils.DriverUtils;
+
 import java.sql.DatabaseMetaData;
 import java.sql.SQLClientInfoException;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Statement;
+import java.util.Collections;
 import java.util.Properties;
+import java.util.UUID;
 
 /** Connection to flink sql gateway for jdbc driver. */
 public class FlinkConnection extends BaseConnection {
-    private final DriverUri driverUri;
+    private final Executor executor;
+    private volatile boolean closed = false;
 
     public FlinkConnection(DriverUri driverUri) {
-        this.driverUri = driverUri;
+        // TODO Support default context from map to get gid of flink core for jdbc driver

Review Comment:
   It would be great if you also add the JIra link in the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] FangYongs commented on a diff in pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection

Posted by "FangYongs (via GitHub)" <gi...@apache.org>.
FangYongs commented on code in PR #22289:
URL: https://github.com/apache/flink/pull/22289#discussion_r1155231101


##########
flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.SingleSessionManager;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.SQLException;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link FlinkConnection}. */
+public class FlinkConnectionTest {
+    @RegisterExtension
+    @Order(1)
+    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(4)
+                            .build());
+
+    @RegisterExtension
+    @Order(2)
+    public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
+            new SqlGatewayServiceExtension(
+                    MINI_CLUSTER_RESOURCE::getClientConfiguration, SingleSessionManager::new);
+
+    @RegisterExtension
+    @Order(3)
+    private static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION =
+            new SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
+
+    @Test
+    public void testCatalogSchema() throws Exception {
+        DriverUri driverUri =
+                DriverUri.create(
+                        String.format(
+                                "jdbc:flink://%s:%s",
+                                SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(),
+                                SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort()),
+                        new Properties());
+        try (FlinkConnection connection = new FlinkConnection(driverUri)) {
+            assertEquals("default_catalog", connection.getCatalog());
+            assertEquals("default_database", connection.getSchema());
+
+            assertThrowsExactly(
+                    SQLException.class,
+                    () -> connection.setCatalog("invalid_catalog"),
+                    "Set catalog[invalid_catalog] fail");
+            assertThrowsExactly(
+                    SQLException.class,
+                    () -> connection.setSchema("invalid_database"),
+                    "Set schema[invalid_database] fail");
+            assertEquals("default_catalog", connection.getCatalog());
+            assertEquals("default_database", connection.getSchema());

Review Comment:
   These two lines want to check whether the catalog and database are changed by invalid catalog and database



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on a diff in pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection

Posted by "libenchao (via GitHub)" <gi...@apache.org>.
libenchao commented on code in PR #22289:
URL: https://github.com/apache/flink/pull/22289#discussion_r1155415681


##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java:
##########
@@ -18,34 +18,67 @@
 
 package org.apache.flink.table.jdbc;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.jdbc.utils.DriverUtils;
+
 import java.sql.DatabaseMetaData;
 import java.sql.SQLClientInfoException;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Statement;
+import java.util.Collections;
 import java.util.Properties;
+import java.util.UUID;
 
 /** Connection to flink sql gateway for jdbc driver. */
 public class FlinkConnection extends BaseConnection {
-    private final DriverUri driverUri;
+    private final Executor executor;
+    private volatile boolean closed = false;
 
     public FlinkConnection(DriverUri driverUri) {
-        this.driverUri = driverUri;
+        // TODO Support default context from map to get gid of flink core for jdbc driver

Review Comment:
   It would be great if you also add the JIra link in the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] FangYongs commented on a diff in pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection

Posted by "FangYongs (via GitHub)" <gi...@apache.org>.
FangYongs commented on code in PR #22289:
URL: https://github.com/apache/flink/pull/22289#discussion_r1155231475


##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java:
##########
@@ -55,41 +85,75 @@ public DatabaseMetaData getMetaData() throws SQLException {
 
     @Override
     public void setCatalog(String catalog) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        try {
+            setSessionCatalog(catalog);
+        } catch (Exception e) {
+            throw new SQLException(String.format("Set catalog[%s] fail", catalog), e);
+        }
+    }
+
+    private void setSessionCatalog(String catalog) {
+        executor.configureSession(String.format("USE CATALOG %s;", catalog));
     }
 
     @Override
     public String getCatalog() throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        try (StatementResult result = executor.executeStatement("SHOW CURRENT CATALOG;")) {
+            if (result.hasNext()) {
+                return result.next().getString(0).toString();
+            } else {
+                throw new SQLException("No catalog");
+            }
+        }
     }
 
     @Override
     public void setClientInfo(String name, String value) throws SQLClientInfoException {
-        throw new SQLClientInfoException();
+        executor.configureSession(String.format("SET '%s'='%s';", name, value));
     }
 
     @Override
     public void setClientInfo(Properties properties) throws SQLClientInfoException {
-        throw new SQLClientInfoException();
+        for (Object key : properties.keySet()) {
+            setClientInfo(key.toString(), properties.getProperty(key.toString()));
+        }
     }
 
     @Override
     public String getClientInfo(String name) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        Configuration configuration = (Configuration) executor.getSessionConfig();

Review Comment:
   I agree with you and I think executor.getSessionConfig() should config without other dependencies such as `Map<String, String>`. I will add `TODO` here and fix it in https://issues.apache.org/jira/browse/FLINK-31687 . What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on a diff in pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection

Posted by "libenchao (via GitHub)" <gi...@apache.org>.
libenchao commented on code in PR #22289:
URL: https://github.com/apache/flink/pull/22289#discussion_r1154400863


##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java:
##########
@@ -18,34 +18,64 @@
 
 package org.apache.flink.table.jdbc;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.jdbc.utils.DriverUtils;
+
 import java.sql.DatabaseMetaData;
 import java.sql.SQLClientInfoException;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Statement;
+import java.util.Collections;
 import java.util.Properties;
+import java.util.UUID;
 
 /** Connection to flink sql gateway for jdbc driver. */
 public class FlinkConnection extends BaseConnection {
-    private final DriverUri driverUri;
+    private final Executor executor;
+    private volatile boolean closed = false;
 
     public FlinkConnection(DriverUri driverUri) {
-        this.driverUri = driverUri;
+        this.executor =
+                Executor.create(
+                        new DefaultContext(
+                                Configuration.fromMap(
+                                        DriverUtils.fromProperties(driverUri.getProperties())),
+                                Collections.emptyList()),
+                        driverUri.getAddress(),
+                        UUID.randomUUID().toString());
+        driverUri.getCatalog().ifPresent(this::setSessionCatalog);
+        driverUri.getDatabase().ifPresent(this::setSessionSchema);
     }
 
     @Override
     public Statement createStatement() throws SQLException {
         throw new SQLFeatureNotSupportedException();
     }
 
+    Executor getExecutor() {

Review Comment:
   Add a `VisibleForTesting` annotation?



##########
flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkConnectionTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.SingleSessionManager;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.SQLException;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link FlinkConnection}. */
+public class FlinkConnectionTest {
+    @RegisterExtension
+    @Order(1)
+    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(4)
+                            .build());
+
+    @RegisterExtension
+    @Order(2)
+    public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
+            new SqlGatewayServiceExtension(
+                    MINI_CLUSTER_RESOURCE::getClientConfiguration, SingleSessionManager::new);
+
+    @RegisterExtension
+    @Order(3)
+    private static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION =
+            new SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
+
+    @Test
+    public void testCatalogSchema() throws Exception {
+        DriverUri driverUri =
+                DriverUri.create(
+                        String.format(
+                                "jdbc:flink://%s:%s",
+                                SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(),
+                                SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort()),
+                        new Properties());
+        try (FlinkConnection connection = new FlinkConnection(driverUri)) {
+            assertEquals("default_catalog", connection.getCatalog());
+            assertEquals("default_database", connection.getSchema());
+
+            assertThrowsExactly(
+                    SQLException.class,
+                    () -> connection.setCatalog("invalid_catalog"),
+                    "Set catalog[invalid_catalog] fail");
+            assertThrowsExactly(
+                    SQLException.class,
+                    () -> connection.setSchema("invalid_database"),
+                    "Set schema[invalid_database] fail");
+            assertEquals("default_catalog", connection.getCatalog());
+            assertEquals("default_database", connection.getSchema());

Review Comment:
   These two lines are duplicated with line 72-73?



##########
flink-table/flink-sql-jdbc-driver-bundle/pom.xml:
##########
@@ -77,7 +89,9 @@
 									<include>org.apache.flink:flink-sql-jdbc-driver</include>
 									<include>org.apache.flink:flink-sql-client</include>
 									<include>org.apache.flink:flink-sql-gateway-api</include>
+									<include>org.apache.flink:flink-sql-gateway</include>
 									<include>org.apache.flink:flink-table-common</include>
+									<include>org.apache.flink:flink-core</include>

Review Comment:
   Finally, we include 'flink-core' module 😁



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java:
##########
@@ -55,41 +85,75 @@ public DatabaseMetaData getMetaData() throws SQLException {
 
     @Override
     public void setCatalog(String catalog) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        try {
+            setSessionCatalog(catalog);
+        } catch (Exception e) {
+            throw new SQLException(String.format("Set catalog[%s] fail", catalog), e);
+        }
+    }
+
+    private void setSessionCatalog(String catalog) {
+        executor.configureSession(String.format("USE CATALOG %s;", catalog));
     }
 
     @Override
     public String getCatalog() throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        try (StatementResult result = executor.executeStatement("SHOW CURRENT CATALOG;")) {
+            if (result.hasNext()) {
+                return result.next().getString(0).toString();
+            } else {
+                throw new SQLException("No catalog");
+            }
+        }
     }
 
     @Override
     public void setClientInfo(String name, String value) throws SQLClientInfoException {
-        throw new SQLClientInfoException();
+        executor.configureSession(String.format("SET '%s'='%s';", name, value));
     }
 
     @Override
     public void setClientInfo(Properties properties) throws SQLClientInfoException {
-        throw new SQLClientInfoException();
+        for (Object key : properties.keySet()) {
+            setClientInfo(key.toString(), properties.getProperty(key.toString()));
+        }
     }
 
     @Override
     public String getClientInfo(String name) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        Configuration configuration = (Configuration) executor.getSessionConfig();

Review Comment:
   Maybe the assumption is not always true in the future. (I'm big fun for coding against interface instead of implementation). Maybe we should improve `ReadableConfig` interface for reading configs dynamically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] FangYongs commented on pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection

Posted by "FangYongs (via GitHub)" <gi...@apache.org>.
FangYongs commented on PR #22289:
URL: https://github.com/apache/flink/pull/22289#issuecomment-1493319117

   Thanks @libenchao , I have rebased master and updated this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22289:
URL: https://github.com/apache/flink/pull/22289#issuecomment-1487875564

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "351a2da4884f5e57cf0e71bf2c35905bf03a3d7b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "351a2da4884f5e57cf0e71bf2c35905bf03a3d7b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 351a2da4884f5e57cf0e71bf2c35905bf03a3d7b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] FangYongs commented on a diff in pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection

Posted by "FangYongs (via GitHub)" <gi...@apache.org>.
FangYongs commented on code in PR #22289:
URL: https://github.com/apache/flink/pull/22289#discussion_r1155230946


##########
flink-table/flink-sql-jdbc-driver-bundle/pom.xml:
##########
@@ -77,7 +89,9 @@
 									<include>org.apache.flink:flink-sql-jdbc-driver</include>
 									<include>org.apache.flink:flink-sql-client</include>
 									<include>org.apache.flink:flink-sql-gateway-api</include>
+									<include>org.apache.flink:flink-sql-gateway</include>
 									<include>org.apache.flink:flink-table-common</include>
+									<include>org.apache.flink:flink-core</include>

Review Comment:
   I have created an issue https://issues.apache.org/jira/browse/FLINK-31687 to get rid of flink-core after we complete main functions in jdbc driver



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] FangYongs commented on a diff in pull request #22289: [FLINK-31545][jdbc-driver] Create executor in flink connection

Posted by "FangYongs (via GitHub)" <gi...@apache.org>.
FangYongs commented on code in PR #22289:
URL: https://github.com/apache/flink/pull/22289#discussion_r1155438551


##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java:
##########
@@ -18,34 +18,67 @@
 
 package org.apache.flink.table.jdbc;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.jdbc.utils.DriverUtils;
+
 import java.sql.DatabaseMetaData;
 import java.sql.SQLClientInfoException;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Statement;
+import java.util.Collections;
 import java.util.Properties;
+import java.util.UUID;
 
 /** Connection to flink sql gateway for jdbc driver. */
 public class FlinkConnection extends BaseConnection {
-    private final DriverUri driverUri;
+    private final Executor executor;
+    private volatile boolean closed = false;
 
     public FlinkConnection(DriverUri driverUri) {
-        this.driverUri = driverUri;
+        // TODO Support default context from map to get gid of flink core for jdbc driver

Review Comment:
   Thanks @libenchao , added the jira link



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org