You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2020/03/26 18:01:57 UTC
[flink] branch master updated: [FLINK-16702] develop
JDBCCatalogFactory, descriptor, and validator for service discovery
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ce52f3b [FLINK-16702] develop JDBCCatalogFactory, descriptor, and validator for service discovery
ce52f3b is described below
commit ce52f3bcddc032cc4b3cb54c33eb1376df42c887
Author: bowen.li <bo...@gmail.com>
AuthorDate: Fri Mar 20 17:39:09 2020 -0700
[FLINK-16702] develop JDBCCatalogFactory, descriptor, and validator for service discovery
closes #16702
---
flink-connectors/flink-jdbc/pom.xml | 8 ++
.../java/io/jdbc/catalog/AbstractJDBCCatalog.java | 15 ++++
.../jdbc/catalog/factory/JDBCCatalogFactory.java | 93 ++++++++++++++++++++++
.../table/descriptors/JDBCCatalogDescriptor.java | 60 ++++++++++++++
.../table/descriptors/JDBCCatalogValidator.java | 43 ++++++++++
.../org.apache.flink.table.factories.TableFactory | 1 +
.../catalog/factory/JDBCCatalogFactoryTest.java | 85 ++++++++++++++++++++
.../descriptors/JDBCCatalogDescriptorTest.java | 63 +++++++++++++++
8 files changed, 368 insertions(+)
diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml
index cb7afab..0af5588 100644
--- a/flink-connectors/flink-jdbc/pom.xml
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -76,6 +76,14 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
<dependency>
<groupId>org.apache.flink</groupId>
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
index 523de83..6e7dd02 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
@@ -102,6 +102,21 @@ public abstract class AbstractJDBCCatalog extends AbstractCatalog {
LOG.info("Catalog {} closing", getName());
}
+ // ----- getters ------
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return pwd;
+ }
+
+ public String getBaseUrl() {
+ return baseUrl;
+ }
+
+
// ------ table factory ------
public Optional<TableFactory> getTableFactory() {
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/factory/JDBCCatalogFactory.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/factory/JDBCCatalogFactory.java
new file mode 100644
index 0000000..05f08c7
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/factory/JDBCCatalogFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog.factory;
+
+import org.apache.flink.api.java.io.jdbc.catalog.JDBCCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.JDBCCatalogValidator;
+import org.apache.flink.table.factories.CatalogFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
+import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_JDBC_BASE_URL;
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_JDBC_PASSWORD;
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_JDBC_USERNAME;
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_TYPE_VALUE_JDBC;
+
+/**
+ * Factory for {@link org.apache.flink.api.java.io.jdbc.catalog.JDBCCatalog}.
+ */
+public class JDBCCatalogFactory implements CatalogFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JDBCCatalogFactory.class);
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> context = new HashMap<>();
+ context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_JDBC); // jdbc
+ context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility
+ return context;
+ }
+
+ @Override
+ public List<String> supportedProperties() {
+ List<String> properties = new ArrayList<>();
+
+ // default database
+ properties.add(CATALOG_DEFAULT_DATABASE);
+
+ properties.add(CATALOG_JDBC_BASE_URL);
+ properties.add(CATALOG_JDBC_USERNAME);
+ properties.add(CATALOG_JDBC_PASSWORD);
+
+ return properties;
+ }
+
+ @Override
+ public Catalog createCatalog(String name, Map<String, String> properties) {
+ final DescriptorProperties prop = getValidatedProperties(properties);
+
+ return new JDBCCatalog(
+ name,
+ prop.getString(CATALOG_DEFAULT_DATABASE),
+ prop.getString(CATALOG_JDBC_USERNAME),
+ prop.getString(CATALOG_JDBC_PASSWORD),
+ prop.getString(CATALOG_JDBC_BASE_URL));
+ }
+
+ private static DescriptorProperties getValidatedProperties(Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+ descriptorProperties.putProperties(properties);
+
+ new JDBCCatalogValidator().validate(descriptorProperties);
+
+ return descriptorProperties;
+ }
+
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCCatalogDescriptor.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCCatalogDescriptor.java
new file mode 100644
index 0000000..dc03ae2
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCCatalogDescriptor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_JDBC_BASE_URL;
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_JDBC_PASSWORD;
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_JDBC_USERNAME;
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_TYPE_VALUE_JDBC;
+
+/**
+ * Descriptor for {@link org.apache.flink.api.java.io.jdbc.catalog.JDBCCatalog}.
+ */
+public class JDBCCatalogDescriptor extends CatalogDescriptor {
+
+ private final String defaultDatabase;
+ private final String username;
+ private final String pwd;
+ private final String baseUrl;
+
+ public JDBCCatalogDescriptor(String defaultDatabase, String username, String pwd, String baseUrl) {
+
+ super(CATALOG_TYPE_VALUE_JDBC, 1);
+
+ this.defaultDatabase = defaultDatabase;
+ this.username = username;
+ this.pwd = pwd;
+ this.baseUrl = baseUrl;
+ }
+
+ @Override
+ protected Map<String, String> toCatalogProperties() {
+ final DescriptorProperties properties = new DescriptorProperties();
+
+ properties.putString(CATALOG_DEFAULT_DATABASE, defaultDatabase);
+ properties.putString(CATALOG_JDBC_USERNAME, username);
+ properties.putString(CATALOG_JDBC_PASSWORD, pwd);
+ properties.putString(CATALOG_JDBC_BASE_URL, baseUrl);
+
+ return properties.asMap();
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCCatalogValidator.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCCatalogValidator.java
new file mode 100644
index 0000000..14730b4
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCCatalogValidator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.api.java.io.jdbc.catalog.JDBCCatalog;
+
+/**
+ * Validator for {@link JDBCCatalog}.
+ */
+public class JDBCCatalogValidator extends CatalogDescriptorValidator {
+
+ public static final String CATALOG_TYPE_VALUE_JDBC = "jdbc";
+
+ public static final String CATALOG_JDBC_USERNAME = "username";
+ public static final String CATALOG_JDBC_PASSWORD = "password";
+ public static final String CATALOG_JDBC_BASE_URL = "base-url";
+
+ @Override
+ public void validate(DescriptorProperties properties) {
+ super.validate(properties);
+ properties.validateValue(CATALOG_TYPE, CATALOG_TYPE_VALUE_JDBC, false);
+ properties.validateString(CATALOG_JDBC_BASE_URL, false, 1);
+ properties.validateString(CATALOG_JDBC_USERNAME, false, 1);
+ properties.validateString(CATALOG_JDBC_PASSWORD, false, 1);
+ properties.validateString(CATALOG_DEFAULT_DATABASE, false, 1);
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index dbd648d..fa0b4e6 100644
--- a/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
+org.apache.flink.api.java.io.jdbc.catalog.factory.JDBCCatalogFactory
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/factory/JDBCCatalogFactoryTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/factory/JDBCCatalogFactoryTest.java
new file mode 100644
index 0000000..271617d
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/factory/JDBCCatalogFactoryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog.factory;
+
+import org.apache.flink.api.java.io.jdbc.catalog.JDBCCatalog;
+import org.apache.flink.api.java.io.jdbc.catalog.PostgresCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.descriptors.CatalogDescriptor;
+import org.apache.flink.table.descriptors.JDBCCatalogDescriptor;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+
+import com.opentable.db.postgres.junit.EmbeddedPostgresRules;
+import com.opentable.db.postgres.junit.SingleInstancePostgresRule;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link JDBCCatalogFactory}.
+ */
+public class JDBCCatalogFactoryTest {
+ @ClassRule
+ public static SingleInstancePostgresRule pg = EmbeddedPostgresRules.singleInstance();
+
+ protected static String baseUrl;
+ protected static JDBCCatalog catalog;
+
+ protected static final String TEST_CATALOG_NAME = "mypg";
+ protected static final String TEST_USERNAME = "postgres";
+ protected static final String TEST_PWD = "postgres";
+
+ @BeforeClass
+ public static void setup() throws SQLException {
+ // jdbc:postgresql://localhost:50807/postgres?user=postgres
+ String embeddedJdbcUrl = pg.getEmbeddedPostgres().getJdbcUrl(TEST_USERNAME, TEST_PWD);
+ // jdbc:postgresql://localhost:50807/
+ baseUrl = embeddedJdbcUrl.substring(0, embeddedJdbcUrl.lastIndexOf("/") + 1);
+
+ catalog = new JDBCCatalog(
+ TEST_CATALOG_NAME, PostgresCatalog.DEFAULT_DATABASE, TEST_USERNAME, TEST_PWD, baseUrl);
+ }
+
+ @Test
+ public void test() {
+ final CatalogDescriptor catalogDescriptor =
+ new JDBCCatalogDescriptor(PostgresCatalog.DEFAULT_DATABASE, TEST_USERNAME, TEST_PWD, baseUrl);
+
+ final Map<String, String> properties = catalogDescriptor.toProperties();
+
+ final Catalog actualCatalog = TableFactoryService.find(CatalogFactory.class, properties)
+ .createCatalog(TEST_CATALOG_NAME, properties);
+
+ checkEquals(catalog, (JDBCCatalog) actualCatalog);
+ }
+
+ private static void checkEquals(JDBCCatalog c1, JDBCCatalog c2) {
+ assertEquals(c1.getName(), c2.getName());
+ assertEquals(c1.getDefaultDatabase(), c2.getDefaultDatabase());
+ assertEquals(c1.getUsername(), c2.getUsername());
+ assertEquals(c1.getPassword(), c2.getPassword());
+ assertEquals(c1.getBaseUrl(), c2.getBaseUrl());
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/table/descriptors/JDBCCatalogDescriptorTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/table/descriptors/JDBCCatalogDescriptorTest.java
new file mode 100644
index 0000000..d8151a6
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/table/descriptors/JDBCCatalogDescriptorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_TYPE_VALUE_JDBC;
+
+/**
+ * Test for {@link JDBCCatalogDescriptor}.
+ */
+public class JDBCCatalogDescriptorTest extends DescriptorTestBase {
+
+ private static final String TEST_DB = "db";
+ private static final String TEST_USERNAME = "user";
+ private static final String TEST_PWD = "pwd";
+ private static final String TEST_BASE_URL = "xxx";
+
+ @Override
+ protected List<Descriptor> descriptors() {
+ final Descriptor descriptor = new JDBCCatalogDescriptor(
+ TEST_DB, TEST_USERNAME, TEST_PWD, TEST_BASE_URL);
+
+ return Arrays.asList(descriptor);
+ }
+
+ @Override
+ protected List<Map<String, String>> properties() {
+ final Map<String, String> props = new HashMap<>();
+ props.put("type", CATALOG_TYPE_VALUE_JDBC);
+ props.put("property-version", "1");
+ props.put("default-database", TEST_DB);
+ props.put("username", TEST_USERNAME);
+ props.put("password", TEST_PWD);
+ props.put("base-url", TEST_BASE_URL);
+
+ return Arrays.asList(props);
+ }
+
+ @Override
+ protected DescriptorValidator validator() {
+ return new JDBCCatalogValidator();
+ }
+}