You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2020/03/26 18:01:57 UTC

[flink] branch master updated: [FLINK-16702] develop JDBCCatalogFactory, descriptor, and validator for service discovery

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ce52f3b  [FLINK-16702] develop JDBCCatalogFactory, descriptor, and validator for service discovery
ce52f3b is described below

commit ce52f3bcddc032cc4b3cb54c33eb1376df42c887
Author: bowen.li <bo...@gmail.com>
AuthorDate: Fri Mar 20 17:39:09 2020 -0700

    [FLINK-16702] develop JDBCCatalogFactory, descriptor, and validator for service discovery
    
    closes #16702
---
 flink-connectors/flink-jdbc/pom.xml                |  8 ++
 .../java/io/jdbc/catalog/AbstractJDBCCatalog.java  | 15 ++++
 .../jdbc/catalog/factory/JDBCCatalogFactory.java   | 93 ++++++++++++++++++++++
 .../table/descriptors/JDBCCatalogDescriptor.java   | 60 ++++++++++++++
 .../table/descriptors/JDBCCatalogValidator.java    | 43 ++++++++++
 .../org.apache.flink.table.factories.TableFactory  |  1 +
 .../catalog/factory/JDBCCatalogFactoryTest.java    | 85 ++++++++++++++++++++
 .../descriptors/JDBCCatalogDescriptorTest.java     | 63 +++++++++++++++
 8 files changed, 368 insertions(+)

diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml
index cb7afab..0af5588 100644
--- a/flink-connectors/flink-jdbc/pom.xml
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -76,6 +76,14 @@ under the License.
 			<scope>test</scope>
         </dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-common</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
 		<!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
index 523de83..6e7dd02 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/AbstractJDBCCatalog.java
@@ -102,6 +102,21 @@ public abstract class AbstractJDBCCatalog extends AbstractCatalog {
 		LOG.info("Catalog {} closing", getName());
 	}
 
+	// ----- getters ------
+
+	public String getUsername() {
+		return username;
+	}
+
+	public String getPassword() {
+		return pwd;
+	}
+
+	public String getBaseUrl() {
+		return baseUrl;
+	}
+
+
 	// ------ table factory ------
 
 	public Optional<TableFactory> getTableFactory() {
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/factory/JDBCCatalogFactory.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/factory/JDBCCatalogFactory.java
new file mode 100644
index 0000000..05f08c7
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/factory/JDBCCatalogFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog.factory;
+
+import org.apache.flink.api.java.io.jdbc.catalog.JDBCCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.JDBCCatalogValidator;
+import org.apache.flink.table.factories.CatalogFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
+import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_JDBC_BASE_URL;
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_JDBC_PASSWORD;
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_JDBC_USERNAME;
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_TYPE_VALUE_JDBC;
+
+/**
+ * Factory for {@link org.apache.flink.api.java.io.jdbc.catalog.JDBCCatalog}.
+ */
+public class JDBCCatalogFactory implements CatalogFactory {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JDBCCatalogFactory.class);
+
+	@Override
+	public Map<String, String> requiredContext() {
+		Map<String, String> context = new HashMap<>();
+		context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_JDBC); // jdbc
+		context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility
+		return context;
+	}
+
+	@Override
+	public List<String> supportedProperties() {
+		List<String> properties = new ArrayList<>();
+
+		// default database
+		properties.add(CATALOG_DEFAULT_DATABASE);
+
+		properties.add(CATALOG_JDBC_BASE_URL);
+		properties.add(CATALOG_JDBC_USERNAME);
+		properties.add(CATALOG_JDBC_PASSWORD);
+
+		return properties;
+	}
+
+	@Override
+	public Catalog createCatalog(String name, Map<String, String> properties) {
+		final DescriptorProperties prop = getValidatedProperties(properties);
+
+		return new JDBCCatalog(
+			name,
+			prop.getString(CATALOG_DEFAULT_DATABASE),
+			prop.getString(CATALOG_JDBC_USERNAME),
+			prop.getString(CATALOG_JDBC_PASSWORD),
+			prop.getString(CATALOG_JDBC_BASE_URL));
+	}
+
+	private static DescriptorProperties getValidatedProperties(Map<String, String> properties) {
+		final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+		descriptorProperties.putProperties(properties);
+
+		new JDBCCatalogValidator().validate(descriptorProperties);
+
+		return descriptorProperties;
+	}
+
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCCatalogDescriptor.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCCatalogDescriptor.java
new file mode 100644
index 0000000..dc03ae2
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCCatalogDescriptor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_JDBC_BASE_URL;
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_JDBC_PASSWORD;
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_JDBC_USERNAME;
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_TYPE_VALUE_JDBC;
+
+/**
+ * Descriptor for {@link org.apache.flink.api.java.io.jdbc.catalog.JDBCCatalog}.
+ */
+public class JDBCCatalogDescriptor extends CatalogDescriptor {
+
+	private final String defaultDatabase;
+	private final String username;
+	private final String pwd;
+	private final String baseUrl;
+
+	public JDBCCatalogDescriptor(String defaultDatabase, String username, String pwd, String baseUrl) {
+
+		super(CATALOG_TYPE_VALUE_JDBC, 1);
+
+		this.defaultDatabase = defaultDatabase;
+		this.username = username;
+		this.pwd = pwd;
+		this.baseUrl = baseUrl;
+	}
+
+	@Override
+	protected Map<String, String> toCatalogProperties() {
+		final DescriptorProperties properties = new DescriptorProperties();
+
+		properties.putString(CATALOG_DEFAULT_DATABASE, defaultDatabase);
+		properties.putString(CATALOG_JDBC_USERNAME, username);
+		properties.putString(CATALOG_JDBC_PASSWORD, pwd);
+		properties.putString(CATALOG_JDBC_BASE_URL, baseUrl);
+
+		return properties.asMap();
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCCatalogValidator.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCCatalogValidator.java
new file mode 100644
index 0000000..14730b4
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCCatalogValidator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.api.java.io.jdbc.catalog.JDBCCatalog;
+
+/**
+ * Validator for {@link JDBCCatalog}.
+ */
+public class JDBCCatalogValidator extends CatalogDescriptorValidator {
+
+	public static final String CATALOG_TYPE_VALUE_JDBC = "jdbc";
+
+	public static final String CATALOG_JDBC_USERNAME = "username";
+	public static final String CATALOG_JDBC_PASSWORD = "password";
+	public static final String CATALOG_JDBC_BASE_URL = "base-url";
+
+	@Override
+	public void validate(DescriptorProperties properties) {
+		super.validate(properties);
+		properties.validateValue(CATALOG_TYPE, CATALOG_TYPE_VALUE_JDBC, false);
+		properties.validateString(CATALOG_JDBC_BASE_URL, false, 1);
+		properties.validateString(CATALOG_JDBC_USERNAME, false, 1);
+		properties.validateString(CATALOG_JDBC_PASSWORD, false, 1);
+		properties.validateString(CATALOG_DEFAULT_DATABASE, false, 1);
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index dbd648d..fa0b4e6 100644
--- a/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
+org.apache.flink.api.java.io.jdbc.catalog.factory.JDBCCatalogFactory
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/factory/JDBCCatalogFactoryTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/factory/JDBCCatalogFactoryTest.java
new file mode 100644
index 0000000..271617d
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/factory/JDBCCatalogFactoryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.catalog.factory;
+
+import org.apache.flink.api.java.io.jdbc.catalog.JDBCCatalog;
+import org.apache.flink.api.java.io.jdbc.catalog.PostgresCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.descriptors.CatalogDescriptor;
+import org.apache.flink.table.descriptors.JDBCCatalogDescriptor;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+
+import com.opentable.db.postgres.junit.EmbeddedPostgresRules;
+import com.opentable.db.postgres.junit.SingleInstancePostgresRule;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link JDBCCatalogFactory}.
+ */
+public class JDBCCatalogFactoryTest {
+	@ClassRule
+	public static SingleInstancePostgresRule pg = EmbeddedPostgresRules.singleInstance();
+
+	protected static String baseUrl;
+	protected static JDBCCatalog catalog;
+
+	protected static final String TEST_CATALOG_NAME = "mypg";
+	protected static final String TEST_USERNAME = "postgres";
+	protected static final String TEST_PWD = "postgres";
+
+	@BeforeClass
+	public static void setup() throws SQLException {
+		// jdbc:postgresql://localhost:50807/postgres?user=postgres
+		String embeddedJdbcUrl = pg.getEmbeddedPostgres().getJdbcUrl(TEST_USERNAME, TEST_PWD);
+		// jdbc:postgresql://localhost:50807/
+		baseUrl = embeddedJdbcUrl.substring(0, embeddedJdbcUrl.lastIndexOf("/") + 1);
+
+		catalog = new JDBCCatalog(
+			TEST_CATALOG_NAME, PostgresCatalog.DEFAULT_DATABASE, TEST_USERNAME, TEST_PWD, baseUrl);
+	}
+
+	@Test
+	public void test() {
+		final CatalogDescriptor catalogDescriptor =
+			new JDBCCatalogDescriptor(PostgresCatalog.DEFAULT_DATABASE, TEST_USERNAME, TEST_PWD, baseUrl);
+
+		final Map<String, String> properties = catalogDescriptor.toProperties();
+
+		final Catalog actualCatalog = TableFactoryService.find(CatalogFactory.class, properties)
+			.createCatalog(TEST_CATALOG_NAME, properties);
+
+		checkEquals(catalog, (JDBCCatalog) actualCatalog);
+	}
+
+	private static void checkEquals(JDBCCatalog c1, JDBCCatalog c2) {
+		assertEquals(c1.getName(), c2.getName());
+		assertEquals(c1.getDefaultDatabase(), c2.getDefaultDatabase());
+		assertEquals(c1.getUsername(), c2.getUsername());
+		assertEquals(c1.getPassword(), c2.getPassword());
+		assertEquals(c1.getBaseUrl(), c2.getBaseUrl());
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/table/descriptors/JDBCCatalogDescriptorTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/table/descriptors/JDBCCatalogDescriptorTest.java
new file mode 100644
index 0000000..d8151a6
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/table/descriptors/JDBCCatalogDescriptorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.JDBCCatalogValidator.CATALOG_TYPE_VALUE_JDBC;
+
+/**
+ * Test for {@link JDBCCatalogDescriptor}.
+ */
+public class JDBCCatalogDescriptorTest extends DescriptorTestBase {
+
+	private static final String TEST_DB = "db";
+	private static final String TEST_USERNAME = "user";
+	private static final String TEST_PWD = "pwd";
+	private static final String TEST_BASE_URL = "xxx";
+
+	@Override
+	protected List<Descriptor> descriptors() {
+		final Descriptor descriptor = new JDBCCatalogDescriptor(
+			TEST_DB, TEST_USERNAME, TEST_PWD, TEST_BASE_URL);
+
+		return Arrays.asList(descriptor);
+	}
+
+	@Override
+	protected List<Map<String, String>> properties() {
+		final Map<String, String> props = new HashMap<>();
+		props.put("type", CATALOG_TYPE_VALUE_JDBC);
+		props.put("property-version", "1");
+		props.put("default-database", TEST_DB);
+		props.put("username", TEST_USERNAME);
+		props.put("password", TEST_PWD);
+		props.put("base-url", TEST_BASE_URL);
+
+		return Arrays.asList(props);
+	}
+
+	@Override
+	protected DescriptorValidator validator() {
+		return new JDBCCatalogValidator();
+	}
+}