You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "FangYongs (via GitHub)" <gi...@apache.org> on 2023/03/21 11:49:15 UTC

[GitHub] [flink] FangYongs opened a new pull request, #22233: [FLINK-31538][jdbc-driver] Supports parse catalog/database and properties for uri

FangYongs opened a new pull request, #22233:
URL: https://github.com/apache/flink/pull/22233

   
   ## What is the purpose of the change
   
   This PR aims to parse catalog/database/properties for uri and read `project.version` as jdbc version. 
   
   ## Brief change log
     - Added `DriverInfo` to read project version
     - Added `DriverUri` to parse catalog/database and properties for uri
     - Added `FlinkConnection` to implement `java.sql.Connection`
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   
     - Added `FlinkDriverTest.testDriverInfo` to test version
     - Added `FlinkDriverTest.testDriverUri` to parse catalog/properties for uri
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no) no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) no
     - The serializers: (yes / no / don't know) no
     - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no
     - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no) no
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao closed pull request #22233: [FLINK-31538][jdbc-driver] Supports parse catalog/database and properties for uri

Posted by "libenchao (via GitHub)" <gi...@apache.org>.
libenchao closed pull request #22233: [FLINK-31538][jdbc-driver] Supports parse catalog/database and properties for uri
URL: https://github.com/apache/flink/pull/22233


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] FangYongs commented on pull request #22233: [FLINK-31538][jdbc-driver] Supports parse catalog/database and properties for uri

Posted by "FangYongs (via GitHub)" <gi...@apache.org>.
FangYongs commented on PR #22233:
URL: https://github.com/apache/flink/pull/22233#issuecomment-1482115252

   Hi @libenchao Please help to review this PR when you are free, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] FangYongs commented on pull request #22233: [FLINK-31538][jdbc-driver] Supports parse catalog/database and properties for uri

Posted by "FangYongs (via GitHub)" <gi...@apache.org>.
FangYongs commented on PR #22233:
URL: https://github.com/apache/flink/pull/22233#issuecomment-1486041523

   Thanks @libenchao I have introduced utils for jdbc driver and add invalid uri tests, please help to review again when you're free


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22233: [FLINK-31538][jdbc-driver] Supports parse catalog/database and properties for uri

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22233:
URL: https://github.com/apache/flink/pull/22233#issuecomment-1477709337

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "77454a27d82738447ffb8b16b41afcbaeedc087b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "77454a27d82738447ffb8b16b41afcbaeedc087b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 77454a27d82738447ffb8b16b41afcbaeedc087b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] libenchao commented on a diff in pull request #22233: [FLINK-31538][jdbc-driver] Supports parse catalog/database and properties for uri

Posted by "libenchao (via GitHub)" <gi...@apache.org>.
libenchao commented on code in PR #22233:
URL: https://github.com/apache/flink/pull/22233#discussion_r1148345880


##########
flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDriverTest.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.junit.jupiter.api.Test;
+
+import java.sql.SQLException;
+import java.util.Properties;
+
+import static org.apache.flink.table.jdbc.DriverInfo.DRIVER_NAME;
+import static org.apache.flink.table.jdbc.DriverInfo.DRIVER_VERSION_MAJOR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link FlinkDriver}. */
+public class FlinkDriverTest {
+    @Test
+    public void testDriverInfo() {
+        assertEquals(DRIVER_VERSION_MAJOR, 1);
+        assertEquals(DRIVER_NAME, "Flink JDBC Driver");
+    }
+
+    @Test
+    public void testDriverUri() throws Exception {
+        String uri =
+                "jdbc:flink://localhost:8888/catalog_name/database_name?sessionId=123&key1=val1&key2=val2";

Review Comment:
   Could you add more tests for the invalid uri exception messages?



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Splitter;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static java.lang.String.format;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Parse catalog, table and connection properties from uri for {@link FlinkDriver}. */
+public class DriverUri {
+    private static final String URL_PREFIX = "jdbc:";
+    private static final String URL_START = URL_PREFIX + "flink:";
+
+    private static final Splitter URL_ARG_SPLITTER = Splitter.on('&').omitEmptyStrings();
+    private static final Splitter ARG_VALUE_SPLITTER = Splitter.on('=').limit(2);
+    private final String host;
+    private final int port;
+    private final URI uri;
+
+    private final Properties properties;
+
+    private Optional<String> catalog = Optional.empty();
+    private Optional<String> database = Optional.empty();
+
+    private DriverUri(String url, Properties driverProperties) throws SQLException {
+        this(parseDriverUrl(url), driverProperties);
+    }
+
+    private DriverUri(URI uri, Properties driverProperties) throws SQLException {
+        this.uri = checkNotNull(uri, "uri is null");
+        this.host = uri.getHost();
+        this.port = uri.getPort();
+        this.properties = mergeDynamicProperties(uri, driverProperties);
+
+        initCatalogAndSchema();
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public Optional<String> getCatalog() {
+        return catalog;
+    }
+
+    public Optional<String> getDatabase() {
+        return database;
+    }
+
+    private void initCatalogAndSchema() throws SQLException {
+        String path = uri.getPath();
+        if (StringUtils.isNullOrWhitespaceOnly(uri.getPath()) || path.equals("/")) {
+            return;
+        }
+
+        // remove first slash
+        if (!path.startsWith("/")) {
+            throw new SQLException("Path in uri does not start with a slash: " + uri);
+        }
+        path = path.substring(1);
+
+        List<String> parts = Splitter.on("/").splitToList(path);
+        // remove last item due to a trailing slash
+        if (parts.get(parts.size() - 1).isEmpty()) {
+            parts = parts.subList(0, parts.size() - 1);
+        }
+
+        if (parts.size() > 2) {
+            throw new SQLException("Invalid path segments in URL: " + uri);
+        }
+
+        if (parts.get(0).isEmpty()) {
+            throw new SQLException("Catalog name in URL is empty: " + uri);
+        }
+
+        catalog = Optional.ofNullable(parts.get(0));
+
+        if (parts.size() > 1) {
+            if (parts.get(1).isEmpty()) {
+                throw new SQLException("Database name in URL is empty: " + uri);
+            }
+
+            database = Optional.ofNullable(parts.get(1));
+        }
+    }
+
+    private static Properties mergeDynamicProperties(URI uri, Properties driverProperties)
+            throws SQLException {
+        Map<String, String> urlProperties = parseUriParameters(uri.getQuery());
+        Map<String, String> suppliedProperties = Maps.fromProperties(driverProperties);
+
+        for (String key : urlProperties.keySet()) {
+            if (suppliedProperties.containsKey(key)) {
+                throw new SQLException(
+                        format("Connection property '%s' is both in the URL and an argument", key));
+            }
+        }
+
+        Properties result = new Properties();
+        setMapToProperties(result, urlProperties);
+        setMapToProperties(result, suppliedProperties);
+        return result;
+    }
+
+    private static void setMapToProperties(Properties properties, Map<String, String> values) {
+        for (Map.Entry<String, String> entry : values.entrySet()) {
+            properties.setProperty(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private static Map<String, String> parseUriParameters(String query) throws SQLException {
+        Map<String, String> result = new HashMap<>();
+
+        if (query != null) {
+            Iterable<String> queryArgs = URL_ARG_SPLITTER.split(query);
+            for (String queryArg : queryArgs) {
+                List<String> parts = ARG_VALUE_SPLITTER.splitToList(queryArg);
+                if (parts.size() != 2) {
+                    throw new SQLException(
+                            format(
+                                    "Connection property in uri must be key=val format: '%s'",
+                                    queryArg));
+                }
+                if (result.put(parts.get(0), parts.get(1)) != null) {
+                    throw new SQLException(
+                            format(
+                                    "Connection property '%s' is in URL multiple times",
+                                    parts.get(0)));
+                }
+            }
+        }
+
+        return result;
+    }
+
+    private static URI parseDriverUrl(String url) throws SQLException {
+        if (!url.startsWith(URL_START)) {
+            throw new SQLException("Invalid Flink JDBC URL: " + url);

Review Comment:
   Should we add more messages to indicate why it's invalid? same to below exception messages.



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Splitter;

Review Comment:
   Then we'd better include the shaded-guava to `flink-sql-jdbc-driver`?



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Splitter;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static java.lang.String.format;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Parse catalog, table and connection properties from uri for {@link FlinkDriver}. */
+public class DriverUri {
+    private static final String URL_PREFIX = "jdbc:";
+    private static final String URL_START = URL_PREFIX + "flink:";
+
+    private static final Splitter URL_ARG_SPLITTER = Splitter.on('&').omitEmptyStrings();
+    private static final Splitter ARG_VALUE_SPLITTER = Splitter.on('=').limit(2);
+    private final String host;
+    private final int port;
+    private final URI uri;
+
+    private final Properties properties;
+
+    private Optional<String> catalog = Optional.empty();
+    private Optional<String> database = Optional.empty();
+
+    private DriverUri(String url, Properties driverProperties) throws SQLException {
+        this(parseDriverUrl(url), driverProperties);
+    }
+
+    private DriverUri(URI uri, Properties driverProperties) throws SQLException {
+        this.uri = checkNotNull(uri, "uri is null");
+        this.host = uri.getHost();
+        this.port = uri.getPort();
+        this.properties = mergeDynamicProperties(uri, driverProperties);
+
+        initCatalogAndSchema();
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public Optional<String> getCatalog() {
+        return catalog;
+    }
+
+    public Optional<String> getDatabase() {
+        return database;
+    }
+
+    private void initCatalogAndSchema() throws SQLException {
+        String path = uri.getPath();
+        if (StringUtils.isNullOrWhitespaceOnly(uri.getPath()) || path.equals("/")) {
+            return;
+        }
+
+        // remove first slash
+        if (!path.startsWith("/")) {
+            throw new SQLException("Path in uri does not start with a slash: " + uri);
+        }
+        path = path.substring(1);
+
+        List<String> parts = Splitter.on("/").splitToList(path);
+        // remove last item due to a trailing slash
+        if (parts.get(parts.size() - 1).isEmpty()) {
+            parts = parts.subList(0, parts.size() - 1);
+        }
+
+        if (parts.size() > 2) {
+            throw new SQLException("Invalid path segments in URL: " + uri);
+        }
+
+        if (parts.get(0).isEmpty()) {
+            throw new SQLException("Catalog name in URL is empty: " + uri);
+        }
+
+        catalog = Optional.ofNullable(parts.get(0));
+
+        if (parts.size() > 1) {
+            if (parts.get(1).isEmpty()) {
+                throw new SQLException("Database name in URL is empty: " + uri);
+            }
+
+            database = Optional.ofNullable(parts.get(1));
+        }
+    }
+
+    private static Properties mergeDynamicProperties(URI uri, Properties driverProperties)
+            throws SQLException {
+        Map<String, String> urlProperties = parseUriParameters(uri.getQuery());
+        Map<String, String> suppliedProperties = Maps.fromProperties(driverProperties);
+
+        for (String key : urlProperties.keySet()) {
+            if (suppliedProperties.containsKey(key)) {
+                throw new SQLException(
+                        format("Connection property '%s' is both in the URL and an argument", key));
+            }
+        }
+
+        Properties result = new Properties();
+        setMapToProperties(result, urlProperties);
+        setMapToProperties(result, suppliedProperties);
+        return result;
+    }
+
+    private static void setMapToProperties(Properties properties, Map<String, String> values) {

Review Comment:
   how about name it `add[Map]ToProperties`?



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverInfo.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.URL;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.lang.Integer.parseInt;
+import static org.apache.flink.util.Preconditions.checkNotNull;

Review Comment:
   If you are gonna to import `flink-core` dependency, then we will need to bundle `flink-core` to `flink-sql-jdbc-driver` finally. IMO, we'd better to keep the dependency as simple as possible, what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org