You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/05/06 03:11:23 UTC
[rocketmq-schema-registry] branch main updated: Storage support jdbc #78 (#81)
This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
The following commit(s) were added to refs/heads/main by this push:
new 707ec53 Storage support jdbc #78 (#81)
707ec53 is described below
commit 707ec53f0e4fa1cfecd7e752555038687abc4965
Author: Xiaojian Sun <su...@163.com>
AuthorDate: Sat May 6 11:11:18 2023 +0800
Storage support jdbc #78 (#81)
Co-authored-by: xiaoyi <xi...@alibaba-inc.com>
---
.gitignore | 2 +
.../client/CachedSchemaRegistryClient.java | 24 +-
.../schema/registry/common/model/StorageType.java | 4 +-
.../registry/common/storage/StorageService.java | 8 +-
core/pom.xml | 6 +
.../schema/registry/core/CoreApplication.java | 3 +-
core/src/main/resources/application.properties | 5 +
pom.xml | 1 +
storage-jdbc/embedded-hazelcast-on-kubernetes.md | 3 +
storage-jdbc/pom.xml | 57 +++
.../registry/storage/jdbc/JdbcStorageFactory.java | 50 +++
.../registry/storage/jdbc/JdbcStoragePlugin.java | 32 +-
.../registry/storage/jdbc/JdbcStorageService.java | 150 +++++++
.../registry/storage/jdbc/cache/LocalCache.java | 28 +-
.../storage/jdbc/cache/SubjectLocalCache.java | 62 +++
.../storage/jdbc/common/ExpressionBuilder.java | 274 +++++++++++++
.../storage/jdbc/common/IdentifierRules.java | 128 ++++++
.../registry/storage/jdbc/common/Operator.java | 30 +-
.../jdbc/configs/JdbcStorageConfigConstants.java | 37 ++
.../storage/jdbc/configs/ServiceConfig.java | 29 +-
.../storage/jdbc/dialect/ConnectionProvider.java | 37 +-
.../storage/jdbc/dialect/DatabaseDialect.java | 55 +++
.../jdbc/dialect/DatabaseDialectProvider.java | 21 +-
.../jdbc/dialect/DiscoverDialectFactory.java | 49 +++
.../jdbc/dialect/GenericDatabaseDialect.java | 445 +++++++++++++++++++++
.../registry/storage/jdbc/dialect/TableId.java | 52 +++
.../jdbc/dialect/mysql/MysqlDatabaseDialect.java | 80 ++++
.../registry/storage/jdbc/handler/IHandler.java | 109 +++++
.../storage/jdbc/handler/SchemaHandler.java | 286 +++++++++++++
.../registry/storage/jdbc/store/IMapStore.java | 20 +-
.../storage/jdbc/store/JdbcSchemaMapStore.java | 179 +++++++++
.../jdbc/store/JdbcSchemaMapStoreFactory.java | 24 +-
.../storage/jdbc/store/JdbcSubjectMapStore.java | 94 +++++
.../jdbc/store/JdbcSubjectMapStoreFactory.java | 23 +-
...mq.schema.registry.common.storage.StoragePlugin | 19 +
...ry.storage.jdbc.dialect.DatabaseDialectProvider | 18 +
storage-jdbc/src/main/resources/hazelcast.yaml | 52 +++
.../src/main/resources/mysql-storage-ddl.sql | 27 ++
38 files changed, 2373 insertions(+), 150 deletions(-)
diff --git a/.gitignore b/.gitignore
index e3e0e07..7fcd49f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -34,3 +34,5 @@ build/
### VS Code ###
.vscode/
+
+.DS_Store
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java
index d884492..f3fc316 100644
--- a/client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java
@@ -19,26 +19,26 @@ package org.apache.rocketmq.schema.registry.client;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
import org.apache.rocketmq.schema.registry.client.rest.RestService;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest;
import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
public class CachedSchemaRegistryClient implements SchemaRegistryClient {
private static final String DEFAULT_TENANT = "default";
@@ -131,11 +131,9 @@ public class CachedSchemaRegistryClient implements SchemaRegistryClient {
String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject);
schemaCacheBySubject.invalidate(subjectFullName);
schemaCacheBySubjectAndVersion.invalidate(new SubjectAndVersion(cluster, tenant, subject, version));
-
- if (subjectToSchema.get(subjectFullName) != null) {
+ if (subjectToVersion.containsKey(subjectFullName)) {
subjectToVersion.get(subjectFullName).remove(version);
}
-
return restService.deleteSchema(cluster, tenant, subject, version);
}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
index 794b9ce..48033da 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
@@ -24,9 +24,9 @@ public enum StorageType {
*/
ROCKETMQ(1),
/**
- * Mysql type
+ * Jdbc type
*/
- MYSQL(2);
+ JDBC(2);
private final int value;
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
index 45fbcea..4fce948 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.schema.registry.common.storage;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.List;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
@@ -24,7 +26,7 @@ import org.apache.rocketmq.schema.registry.common.model.BaseInfo;
import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
-public interface StorageService<T extends BaseInfo> {
+public interface StorageService<T extends BaseInfo> extends Closeable {
/**
* Error message for all default implementations.
@@ -95,4 +97,8 @@ public interface StorageService<T extends BaseInfo> {
default List<String> listTenants(StorageServiceContext storageService, QualifiedName name) {
throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
}
+
+ @Override
+ default void close() throws IOException {
+ }
}
diff --git a/core/pom.xml b/core/pom.xml
index 554de3b..65b3b14 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -41,6 +41,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>schema-registry-storage-jdbc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.eclipse.aether</groupId>
<artifactId>aether-api</artifactId>
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/CoreApplication.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/CoreApplication.java
index f4e47e7..8fba44e 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/CoreApplication.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/CoreApplication.java
@@ -18,13 +18,14 @@ package org.apache.rocketmq.schema.registry.core;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.hazelcast.HazelcastAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.ComponentScan.Filter;
import org.springframework.context.annotation.FilterType;
import org.springframework.scheduling.annotation.EnableScheduling;
import springfox.documentation.oas.annotations.EnableOpenApi;
-@SpringBootApplication
+@SpringBootApplication(exclude = {HazelcastAutoConfiguration.class})
@EnableScheduling
@EnableOpenApi
@ComponentScan(excludeFilters = @Filter(type = FilterType.ASPECTJ, pattern = "org.apache.rocketmq.schema.registry.storage..*"))
diff --git a/core/src/main/resources/application.properties b/core/src/main/resources/application.properties
index 22c0ae0..f872d17 100644
--- a/core/src/main/resources/application.properties
+++ b/core/src/main/resources/application.properties
@@ -9,8 +9,13 @@ schema.dependency.upload-enabled=false
#schema.dependency.repository-url
#schema.dependency.username
#schema.dependency.password
+
schema.storage.type=rocketmq
schema.storage.config-path=storage-rocketmq/src/main/resources/rocketmq.properties
+
+#schema.storage.type=jdbc
+#schema.storage.config-path=storage-jdbc/src/main/resources/hazelcast.yaml
+
springfox.documentation.swagger-ui.enabled=true
management.health.db.enabled=true
server.error.include-stacktrace=on_param
diff --git a/pom.xml b/pom.xml
index d79978a..620450f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,6 +44,7 @@
<module>war</module>
<module>client</module>
<module>example</module>
+ <module>storage-jdbc</module>
</modules>
<properties>
diff --git a/storage-jdbc/embedded-hazelcast-on-kubernetes.md b/storage-jdbc/embedded-hazelcast-on-kubernetes.md
new file mode 100644
index 0000000..f0214bb
--- /dev/null
+++ b/storage-jdbc/embedded-hazelcast-on-kubernetes.md
@@ -0,0 +1,3 @@
+embedded-hazelcast-on-kubernetes
+================
+https://hazelcast.com/blog/how-to-use-embedded-hazelcast-on-kubernetes/
\ No newline at end of file
diff --git a/storage-jdbc/pom.xml b/storage-jdbc/pom.xml
new file mode 100644
index 0000000..56aabdf
--- /dev/null
+++ b/storage-jdbc/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-schema-registry-all</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>0.1.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>schema-registry-storage-jdbc</artifactId>
+ <packaging>jar</packaging>
+
+ <properties>
+ <mysql.version>8.0.16</mysql.version>
+ <hazelcast.version>5.2.3</hazelcast.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>schema-registry-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast</artifactId>
+ <version>${hazelcast.version}</version>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStorageFactory.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStorageFactory.java
new file mode 100644
index 0000000..689e318
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStorageFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc;
+
+import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.storage.SpringStorageFactory;
+import org.apache.rocketmq.schema.registry.common.storage.StorageService;
+import org.apache.rocketmq.schema.registry.storage.jdbc.configs.ServiceConfig;
+
+import java.io.IOException;
+
+public class JdbcStorageFactory
+ extends SpringStorageFactory {
+ public JdbcStorageFactory(StoragePluginContext context) {
+ super(context);
+ super.registerClazz(ServiceConfig.class);
+ super.refresh();
+ }
+
+ @Override
+ public StorageService<SchemaInfo> getStorageService() {
+ return this.ctx.getBean(JdbcStorageService.class);
+ }
+
+ @Override
+ public void stop() {
+ try {
+ getStorageService().close();
+ } catch (IOException e) {
+ // Ignore this error
+ }
+ super.stop();
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStoragePlugin.java
similarity index 55%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStoragePlugin.java
index 794b9ce..c480e4d 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStoragePlugin.java
@@ -15,23 +15,25 @@
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc;
-public enum StorageType {
+import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import org.apache.rocketmq.schema.registry.common.model.StorageType;
+import org.apache.rocketmq.schema.registry.common.storage.StorageFactory;
+import org.apache.rocketmq.schema.registry.common.storage.StoragePlugin;
- /**
- * Rocketmq type
- */
- ROCKETMQ(1),
- /**
- * Mysql type
- */
- MYSQL(2);
-
- private final int value;
-
- StorageType(final int value) {
- this.value = value;
+/**
+ * Mysql storage plugin
+ */
+public class JdbcStoragePlugin
+ implements StoragePlugin {
+ @Override
+ public StorageType getType() {
+ return StorageType.JDBC;
}
+ @Override
+ public StorageFactory load(StoragePluginContext context) {
+ return new JdbcStorageFactory(context);
+ }
}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStorageService.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStorageService.java
new file mode 100644
index 0000000..1d94c57
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStorageService.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc;
+
+import org.apache.avro.Schema;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaMetaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
+import org.apache.rocketmq.schema.registry.common.storage.StorageService;
+import org.apache.rocketmq.schema.registry.storage.jdbc.handler.IHandler;
+import org.apache.rocketmq.schema.registry.storage.jdbc.handler.SchemaHandler;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * mysql storage service
+ */
+public class JdbcStorageService
+ implements StorageService<SchemaInfo> {
+ private final String hazelcastYamlConfigPath;
+ private final IHandler handler;
+
+ public JdbcStorageService(StoragePluginContext context) {
+ this.hazelcastYamlConfigPath = context.getConfig().getStorageConfigPath();
+ this.handler = new SchemaHandler(this.hazelcastYamlConfigPath);
+ }
+
+ @Override
+ public SchemaInfo register(StorageServiceContext context, SchemaInfo schemaInfo) {
+ try {
+ handler.register(schemaInfo);
+ return schemaInfo;
+ } catch (Exception e) {
+ throw new SchemaException("Registry schema failed", e);
+ }
+ }
+
+ @Override
+ public void delete(StorageServiceContext context, QualifiedName name) {
+ if (name.getVersion() == null) {
+ handler.deleteBySubject(name);
+ } else {
+ handler.deleteByVersion(name);
+ }
+ }
+
+ @Override
+ public SchemaInfo update(StorageServiceContext context, SchemaInfo schemaInfo) {
+ handler.updateSchema(schemaInfo);
+ return schemaInfo;
+ }
+
+ @Override
+ public SchemaInfo get(StorageServiceContext context, QualifiedName qualifiedName) {
+ return handler.getSchema(qualifiedName);
+ }
+
+ @Override
+ public SchemaRecordInfo getBySubject(StorageServiceContext context, QualifiedName qualifiedName) {
+ if (qualifiedName.getVersion() == null) {
+ SchemaRecordInfo result = handler.getBySubject(qualifiedName.subjectFullName());
+ return result;
+ }
+ // schema version is given
+ SchemaInfo schemaInfo = handler.getSchemaInfoBySubject(qualifiedName.subjectFullName());
+ if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
+ return null;
+ }
+ Map<Long, SchemaRecordInfo> versionSchemaMap = schemaInfo.getDetails().getSchemaRecords()
+ .stream().collect(Collectors.toMap(SchemaRecordInfo::getVersion, Function.identity()));
+ return versionSchemaMap.get(qualifiedName.getVersion());
+ }
+
+ @Override
+ public SchemaRecordInfo getTargetSchema(StorageServiceContext context, QualifiedName qualifiedName) {
+ // schema version is given
+ SchemaInfo schemaInfo = handler.getSchemaInfoBySubject(qualifiedName.subjectFullName());
+ if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
+ return null;
+ }
+ SchemaMetaInfo schemaMetaInfo = schemaInfo.getMeta();
+ if (schemaMetaInfo == null) {
+ return null;
+ }
+ if (schemaMetaInfo.getType() == SchemaType.AVRO) {
+ for (SchemaRecordInfo schemaRecordInfo : schemaInfo.getDetails().getSchemaRecords()) {
+ Schema store = new Schema.Parser().parse(schemaRecordInfo.getIdl());
+ Schema target = new Schema.Parser().parse(qualifiedName.getSchema());
+ if (Objects.equals(store, target)) {
+ return schemaRecordInfo;
+ }
+ }
+ } else {
+ //todo support other type
+ return null;
+ }
+ return null;
+ }
+
+ @Override
+ public List<SchemaRecordInfo> listBySubject(StorageServiceContext context, QualifiedName qualifiedName) {
+ SchemaInfo schemaInfo = handler.getSchemaInfoBySubject(qualifiedName.subjectFullName());
+ if (schemaInfo == null || schemaInfo.getDetails() == null) {
+ return null;
+ }
+ return schemaInfo.getDetails().getSchemaRecords();
+ }
+
+ @Override
+ public List<String> listSubjectsByTenant(StorageServiceContext context, QualifiedName qualifiedName) {
+ return handler.getSubjects(context, qualifiedName.getTenant());
+ }
+
+ @Override
+ public List<String> listTenants(StorageServiceContext storageService, QualifiedName qualifiedName) {
+ return handler.getTenants(qualifiedName.getCluster());
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.handler != null) {
+ this.handler.close();
+ }
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/cache/LocalCache.java
similarity index 71%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/cache/LocalCache.java
index 794b9ce..a5343b1 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/cache/LocalCache.java
@@ -15,23 +15,23 @@
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc.cache;
-public enum StorageType {
+import java.util.Map;
+import java.util.function.Consumer;
- /**
- * Rocketmq type
- */
- ROCKETMQ(1),
- /**
- * Mysql type
- */
- MYSQL(2);
+public interface LocalCache<K, V> {
- private final int value;
+ V put(K k, V v);
- StorageType(final int value) {
- this.value = value;
- }
+ V get(K k, Consumer<String> consumer);
+
+ V get(K k);
+
+ V remove(K k);
+
+ boolean containsKey(K k);
+
+ Map<K, V> getCache();
}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/cache/SubjectLocalCache.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/cache/SubjectLocalCache.java
new file mode 100644
index 0000000..44c8226
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/cache/SubjectLocalCache.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.cache;
+
+import com.google.common.collect.Maps;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+
+import java.util.Map;
+import java.util.function.Consumer;
+
+public class SubjectLocalCache implements LocalCache<String, SchemaRecordInfo> {
+
+ public static Map<String, SchemaRecordInfo> subjectCache = Maps.newConcurrentMap();
+
+ @Override
+ public SchemaRecordInfo put(String subjectFullName, SchemaRecordInfo schemaRecordInfo) {
+ return subjectCache.get(subjectFullName);
+ }
+
+ @Override
+ public SchemaRecordInfo get(String subjectFullName, Consumer<String> consumer) {
+ if (!containsKey(subjectFullName)) {
+ consumer.accept(subjectFullName);
+ }
+ return subjectCache.get(subjectFullName);
+ }
+
+ @Override
+ public SchemaRecordInfo get(String subjectFullName) {
+ return subjectCache.get(subjectFullName);
+ }
+
+ @Override
+ public SchemaRecordInfo remove(String s) {
+ return subjectCache.remove(s);
+ }
+
+ @Override
+ public boolean containsKey(String fullName) {
+ return subjectCache.containsKey(fullName);
+ }
+
+ @Override
+ public Map<String, SchemaRecordInfo> getCache() {
+ return subjectCache;
+ }
+}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/ExpressionBuilder.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/ExpressionBuilder.java
new file mode 100644
index 0000000..75d3820
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/ExpressionBuilder.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.storage.jdbc.common;
+
+/**
+ * expression builder
+ */
+public class ExpressionBuilder {
+ private final IdentifierRules rules;
+ private final StringBuilder sb = new StringBuilder();
+
+ public ExpressionBuilder(IdentifierRules rules) {
+ this.rules = rules != null ? rules : IdentifierRules.DEFAULT;
+ }
+
+ public static ExpressionBuilder create() {
+ return new ExpressionBuilder(IdentifierRules.DEFAULT);
+ }
+
+ public static ExpressionBuilder create(IdentifierRules identifierRules) {
+ return new ExpressionBuilder(identifierRules);
+ }
+
+ /**
+ * Get a {@link Transform} that will quote just the column names.
+ *
+ * @return the transform; never null
+ */
+ public static Transform<String> columnNames() {
+ return (builder, input) -> builder.appendColumnName(input);
+ }
+
+ public static Transform<String> columnNamesWith(final String appended) {
+ return (builder, input) -> {
+ builder.appendColumnName(input);
+ builder.append(appended);
+ };
+ }
+
+ /**
+ * Return a new ExpressionBuilder that escapes quotes with the specified prefix.
+ * This builder remains unaffected.
+ *
+ * @param prefix the prefix
+ * @return the new ExpressionBuilder, or this builder if the prefix is null or empty
+ */
+ public ExpressionBuilder escapeQuotesWith(String prefix) {
+ if (prefix == null || prefix.isEmpty()) {
+ return this;
+ }
+ return new ExpressionBuilder(this.rules.escapeQuotesWith(prefix));
+ }
+
+ /**
+ * Append to this builder's expression the delimiter defined by this builder's
+ * {@link IdentifierRules}.
+ *
+ * @return this builder to enable methods to be chained; never null
+ */
+ public ExpressionBuilder appendIdentifierDelimiter() {
+ sb.append(rules.identifierDelimiter());
+ return this;
+ }
+
+ /**
+ * Always append to this builder's expression the leading quote character(s) defined by this
+ * builder's {@link IdentifierRules}.
+ *
+ * @return this builder to enable methods to be chained; never null
+ */
+ public ExpressionBuilder appendLeadingQuote() {
+ sb.append(rules.leadingQuoteString());
+ return this;
+ }
+
+ protected ExpressionBuilder appendTrailingQuote() {
+ sb.append(rules.trailingQuoteString());
+ return this;
+ }
+
+ public ExpressionBuilder appendStringQuote() {
+ sb.append("'");
+ return this;
+ }
+
+ /**
+ * Append to this builder's expression a string surrounded by single quote characters ({@code '}).
+ *
+ * @param name the object whose string representation is to be appended
+ * @return this builder to enable methods to be chained; never null
+ */
+ public ExpressionBuilder appendStringQuoted(Object name) {
+ appendStringQuote();
+ sb.append(name);
+ appendStringQuote();
+ return this;
+ }
+
+ /**
+ * Append to this builder's expression the identifier.
+ *
+ * @param name the name to be appended
+ * @return this builder to enable methods to be chained; never null
+ */
+ public ExpressionBuilder appendIdentifier(
+ String name
+ ) {
+ appendLeadingQuote();
+ sb.append(name);
+ appendTrailingQuote();
+ return this;
+ }
+
+ /**
+ * append table name
+ *
+ * @param name
+ * @return
+ */
+ public ExpressionBuilder appendTableName(String name) {
+ appendLeadingQuote();
+ sb.append(name);
+ appendTrailingQuote();
+ return this;
+ }
+
+ /**
+ * append column name
+ *
+ * @param name
+ * @return
+ */
+ public ExpressionBuilder appendColumnName(String name) {
+ appendLeadingQuote();
+ sb.append(name);
+ appendTrailingQuote();
+ return this;
+ }
+
+ /**
+ * Append to this builder's expression the specified identifier, surrounded by the leading and
+ * trailing quotes.
+ *
+ * @param name the name to be appended
+ * @return this builder to enable methods to be chained; never null
+ */
+ public ExpressionBuilder appendIdentifierQuoted(String name) {
+ appendLeadingQuote();
+ sb.append(name);
+ appendTrailingQuote();
+ return this;
+ }
+
+ /**
+ * Append to this builder's expression a new line.
+ *
+ * @return this builder to enable methods to be chained; never null
+ */
+ public ExpressionBuilder appendNewLine() {
+ sb.append(System.lineSeparator());
+ return this;
+ }
+
+ public ExpressionBuilder append(Object obj) {
+ sb.append(obj);
+ return this;
+ }
+
+ public <T> ExpressionBuilder append(
+ T obj,
+ Transform<T> transform
+ ) {
+ if (transform != null) {
+ transform.apply(this, obj);
+ } else {
+ append(obj);
+ }
+ return this;
+ }
+
+ public ListBuilder<Object> appendList() {
+ return new BasicListBuilder<>();
+ }
+
+
+ public ExpressionBuilder appendMultiple(String delimiter, String expression, int times) {
+ for (int i = 0; i < times; i++) {
+ if (i > 0) {
+ append(delimiter);
+ }
+ append(expression);
+ }
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return sb.toString();
+ }
+
+
+ /**
+ * A functional interface for a transformation that an expression builder might use when
+ * appending one or more other objects.
+ *
+ * @param <T> the type of object to transform before appending.
+ */
+ @FunctionalInterface
+ public interface Transform<T> {
+ void apply(ExpressionBuilder builder, T input);
+ }
+
+
+ public interface ListBuilder<T> {
+
+ ListBuilder<T> delimitedBy(String delimiter);
+
+ <R> ListBuilder<R> transformedBy(Transform<R> transform);
+
+ ExpressionBuilder of(Iterable<? extends T> objects);
+ }
+
+ protected class BasicListBuilder<T> implements ListBuilder<T> {
+ private final String delimiter;
+ private final Transform<T> transform;
+ private boolean first = true;
+
+ BasicListBuilder() {
+ this(", ", null);
+ }
+
+ BasicListBuilder(String delimiter, Transform<T> transform) {
+ this.delimiter = delimiter;
+ this.transform = transform != null ? transform : ExpressionBuilder::append;
+ }
+
+ @Override
+ public ListBuilder<T> delimitedBy(String delimiter) {
+ return new BasicListBuilder<T>(delimiter, transform);
+ }
+
+ @Override
+ public <R> ListBuilder<R> transformedBy(Transform<R> transform) {
+ return new BasicListBuilder<>(delimiter, transform);
+ }
+
+ @Override
+ public ExpressionBuilder of(Iterable<? extends T> objects) {
+ for (T obj : objects) {
+ if (first) {
+ first = false;
+ } else {
+ append(delimiter);
+ }
+ append(obj, transform);
+ }
+ return ExpressionBuilder.this;
+ }
+ }
+}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/IdentifierRules.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/IdentifierRules.java
new file mode 100644
index 0000000..8f2af15
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/IdentifierRules.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.storage.jdbc.common;
+
+/**
+ * The rules for how identifiers are parsed and quoted.
+ */
+public class IdentifierRules {
+ public static final String DEFAULT_QUOTE = "\"";
+ public static final String DEFAULT_ID_DELIM = ".";
+
+ public static final IdentifierRules DEFAULT = new IdentifierRules(DEFAULT_ID_DELIM,
+ DEFAULT_QUOTE
+ );
+
+ private final String leadingQuoteString;
+ private final String trailingQuoteString;
+ private final String identifierDelimiter;
+
+ /**
+ * Create new identifier rules using the supplied quote string for both leading and trailing
+ * quotes, and the '{@link #DEFAULT_ID_DELIM}' character for identifier delimiters.
+ *
+ * @param quoteString the string used for leading and trailing quotes; may be null if {@link
+ * #DEFAULT_QUOTE} is to be used
+ */
+ public IdentifierRules(String quoteString) {
+ this(DEFAULT_ID_DELIM, quoteString, quoteString);
+ }
+
+ /**
+ * Create new identifier rules using the supplied parameters.
+ *
+ * @param delimiter the delimiter used within fully qualified names; may be null if {@link
+ * #DEFAULT_ID_DELIM} is to be used
+ * @param quoteString the string used for leading and trailing quotes; may be null if {@link
+ * #DEFAULT_QUOTE} is to be used
+ */
+ public IdentifierRules(
+ String delimiter,
+ String quoteString
+ ) {
+ this(delimiter, quoteString, quoteString);
+ }
+
+ /**
+ * Create new identifier rules using the supplied parameters.
+ *
+ * @param identifierDelimiter the delimiter used within fully qualified names; may be null if
+ * {@link #DEFAULT_ID_DELIM} is to be used
+ * @param leadingQuoteString the string used for leading quotes; may be null if {@link
+ * #DEFAULT_QUOTE} is to be used
+ * @param trailingQuoteString the string used for leading quotes; may be null if {@link
+ * #DEFAULT_QUOTE} is to be used
+ */
+ public IdentifierRules(
+ String identifierDelimiter,
+ String leadingQuoteString,
+ String trailingQuoteString
+ ) {
+ this.leadingQuoteString = leadingQuoteString != null ? leadingQuoteString : DEFAULT_QUOTE;
+ this.trailingQuoteString = trailingQuoteString != null ? trailingQuoteString : DEFAULT_QUOTE;
+ this.identifierDelimiter = identifierDelimiter != null ? identifierDelimiter : DEFAULT_ID_DELIM;
+ }
+
+ public ExpressionBuilder expressionBuilder() {
+ return new ExpressionBuilder(this);
+ }
+
+ /**
+ * Get the delimiter that is used to delineate segments within fully-qualified identifiers.
+ *
+ * @return the identifier delimiter; never null
+ */
+ public String identifierDelimiter() {
+ return identifierDelimiter;
+ }
+
+
+ /**
+ * Get the string used as a leading quote.
+ *
+ * @return the leading quote string; never null
+ */
+ public String leadingQuoteString() {
+ return leadingQuoteString;
+ }
+
+ /**
+ * Get the string used as a trailing quote.
+ *
+ * @return the trailing quote string; never null
+ */
+ public String trailingQuoteString() {
+ return trailingQuoteString;
+ }
+
+ /**
+ * Return a new IdentifierRules that escapes quotes with the specified prefix.
+ *
+ * @param prefix the prefix
+ * @return the new IdentifierRules, or this builder if the prefix is null or empty
+ */
+ public IdentifierRules escapeQuotesWith(String prefix) {
+ if (prefix == null || prefix.isEmpty()) {
+ return this;
+ }
+ return new IdentifierRules(
+ identifierDelimiter,
+ prefix + leadingQuoteString,
+ prefix + trailingQuoteString
+ );
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/Operator.java
similarity index 73%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/Operator.java
index 794b9ce..90468d5 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/Operator.java
@@ -15,23 +15,15 @@
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc.common;
-public enum StorageType {
-
- /**
- * Rocketmq type
- */
- ROCKETMQ(1),
- /**
- * Mysql type
- */
- MYSQL(2);
-
- private final int value;
-
- StorageType(final int value) {
- this.value = value;
- }
-
-}
+/**
+ * update mode
+ */
+public enum Operator {
+ INSERT,
+ DELETE,
+ UPSERT,
+ UPDATE,
+ SELECT
+}
\ No newline at end of file
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/configs/JdbcStorageConfigConstants.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/configs/JdbcStorageConfigConstants.java
new file mode 100644
index 0000000..a0f001e
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/configs/JdbcStorageConfigConstants.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.storage.jdbc.configs;
+
+public class JdbcStorageConfigConstants {
+
+ // jdbc storage config
+ public final static String STORAGE_JDBC_TYPE = "storage.jdbc.type";
+ public final static String STORAGE_JDBC_TYPE_MYSQL = "mysql";
+ public final static String STORAGE_JDBC_URL = "storage.jdbc.url";
+ public final static String STORAGE_JDBC_USER = "storage.jdbc.user";
+ public final static String STORAGE_JDBC_PASSWORD = "storage.jdbc.password";
+ public final static String STORAGE_JDBC_DATABASE_NAME = "storage.jdbc.database.name";
+ public final static String STORAGE_JDBC_SCHEMA_NAME = "storage.jdbc.schema.name";
+ public final static String STORAGE_JDBC_TABLE_NAME = "storage.jdbc.table.name";
+ public final static String MAX_CONNECTIONS_ATTEMPTS = "max.connections.attempts";
+ public final static String MAX_CONNECTIONS_ATTEMPTS_DEFAULT = "3";
+ public final static String CONNECTION_RETRY_BACKOFF = "connection.retry.backoff";
+ public final static String CONNECTION_RETRY_BACKOFF_DEFAULT = "1000";
+ public final static String HAZELCAST_YAML_PATH = "storage.jdbc.hazelcast.yaml.path";
+ public final static String DATABASE_DEFAULT = "schema_registry";
+ public final static String TABLE_NAME_DEFAULT = "schema_table";
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/configs/ServiceConfig.java
similarity index 58%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/configs/ServiceConfig.java
index 794b9ce..2f6bc9d 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/configs/ServiceConfig.java
@@ -15,23 +15,24 @@
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc.configs;
-public enum StorageType {
+import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import org.apache.rocketmq.schema.registry.storage.jdbc.JdbcStorageService;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class ServiceConfig {
/**
- * Rocketmq type
- */
- ROCKETMQ(1),
- /**
- * Mysql type
+ * mysql storage service
+ *
+ * @param context
+ * @return
*/
- MYSQL(2);
-
- private final int value;
-
- StorageType(final int value) {
- this.value = value;
+ @Bean
+ public JdbcStorageService jdbcStorageService(StoragePluginContext context) {
+ return new JdbcStorageService(context);
}
-
}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/ConnectionProvider.java
similarity index 53%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/ConnectionProvider.java
index 794b9ce..f17ee48 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/ConnectionProvider.java
@@ -15,23 +15,38 @@
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc.dialect;
-public enum StorageType {
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ * A provider of JDBC {@link Connection} instances.
+ */
+public interface ConnectionProvider extends AutoCloseable {
/**
- * Rocketmq type
+ * Create a connection.
+ *
+ * @return the connection; never null
+ * @throws SQLException if there is a problem getting the connection
*/
- ROCKETMQ(1),
+ Connection getConnection() throws SQLException;
+
/**
- * Mysql type
+ * connection valid
+ *
+ * @param connection
+ * @param timeout
+ * @return
+ * @throws SQLException
*/
- MYSQL(2);
+ boolean isConnectionValid(Connection connection, int timeout) throws SQLException;
- private final int value;
-
- StorageType(final int value) {
- this.value = value;
- }
+ /**
+ * Close this connection provider.
+ */
+ @Override
+ void close();
}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DatabaseDialect.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DatabaseDialect.java
new file mode 100644
index 0000000..42a4612
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DatabaseDialect.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.dialect;
+
+import org.apache.rocketmq.schema.registry.storage.jdbc.common.Operator;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * database dialect
+ */
+public interface DatabaseDialect extends ConnectionProvider {
+ String dbType();
+
+ TableId tableId();
+
+ List<String> fields();
+
+ PreparedStatement createPreparedStatement(Connection db, String sql) throws SQLException;
+
+ String buildSelectStatement();
+
+ String buildSelectOneStatement(String keyField);
+
+ String buildUpsertStatement(TableId tableId, Collection<String> fields);
+
+ String buildInsertStatement(TableId tableId, Collection<String> columns);
+
+ String buildUpdateStatement(TableId tableId, Collection<String> keyColumns, Collection<String> columns);
+
+ String buildDeleteStatement(TableId tableId, Collection<String> keyColumns);
+
+ void bindRecord(PreparedStatement statement, Collection<String> keyValues,
+ Collection<String> noKeyValues, Operator mode) throws SQLException;
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DatabaseDialectProvider.java
similarity index 73%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DatabaseDialectProvider.java
index 794b9ce..9c4a21c 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DatabaseDialectProvider.java
@@ -15,23 +15,14 @@
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc.dialect;
-public enum StorageType {
+import java.io.IOException;
+import java.util.Properties;
- /**
- * Rocketmq type
- */
- ROCKETMQ(1),
- /**
- * Mysql type
- */
- MYSQL(2);
+public interface DatabaseDialectProvider {
- private final int value;
-
- StorageType(final int value) {
- this.value = value;
- }
+ DatabaseDialect createDialect(Properties config) throws IOException;
+ String databaseType();
}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DiscoverDialectFactory.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DiscoverDialectFactory.java
new file mode 100644
index 0000000..0d4c0f2
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DiscoverDialectFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.dialect;
+
+import com.google.common.collect.Maps;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+/**
+ * Discover dialect factory
+ */
+public class DiscoverDialectFactory {
+
+ private static final Map<String, DatabaseDialectProvider> DIALECT_CACHE = Maps.newConcurrentMap();
+
+ static {
+ loadDialects();
+ }
+
+ private static void loadDialects() {
+ ServiceLoader<DatabaseDialectProvider> dialects = ServiceLoader.load(DatabaseDialectProvider.class);
+ Iterator<DatabaseDialectProvider> dialectIterator = dialects.iterator();
+ while (dialectIterator.hasNext()) {
+ DatabaseDialectProvider dialect = dialectIterator.next();
+ DIALECT_CACHE.put(dialect.databaseType(), dialect);
+ }
+ }
+
+ public static DatabaseDialectProvider getDialectProvider(String dbType) {
+ return DIALECT_CACHE.get(dbType);
+ }
+}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/GenericDatabaseDialect.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/GenericDatabaseDialect.java
new file mode 100644
index 0000000..08edb64
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/GenericDatabaseDialect.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.dialect;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.storage.jdbc.common.ExpressionBuilder;
+import org.apache.rocketmq.schema.registry.storage.jdbc.common.IdentifierRules;
+import org.apache.rocketmq.schema.registry.storage.jdbc.common.Operator;
+import org.apache.rocketmq.schema.registry.storage.jdbc.configs.JdbcStorageConfigConstants;
+import org.springframework.util.Assert;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+@Slf4j
+public abstract class GenericDatabaseDialect implements DatabaseDialect {
+
+ protected static final List<String> FIELDS_DEFAULT =
+ Arrays.stream(new String[] {"schema_full_name", "schema_info"}).collect(Collectors.toList());
+ protected static final String DDL_FILE = "/%s-storage-ddl.sql";
+ private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
+ private static final int VALIDITY_CHECK_TIMEOUT_S = 5;
+ private static int jdbcMajorVersion;
+ private final AtomicReference<IdentifierRules> identifierRules = new AtomicReference<>();
+ private final IdentifierRules defaultIdentifierRules = IdentifierRules.DEFAULT;
+ private String dbType;
+ private String jdbcUrl;
+ private String userName;
+ private String password;
+ private int maxConnectionAttempts;
+ private long connectionRetryBackoff;
+ private TableId tableId;
+ private int count = 0;
+ private Connection connection;
+
+ public GenericDatabaseDialect(Properties props, String dbType) throws IOException {
+ this.initConfig(props, dbType);
+ this.createStorageTables();
+ }
+
+
+ private void initConfig(Properties config, String dbType) {
+ // connection info
+ this.dbType = dbType;
+ this.jdbcUrl = config.getProperty(JdbcStorageConfigConstants.STORAGE_JDBC_URL, null);
+ this.userName = config.getProperty(JdbcStorageConfigConstants.STORAGE_JDBC_USER, null);
+ this.password = config.getProperty(JdbcStorageConfigConstants.STORAGE_JDBC_PASSWORD, null);
+ Assert.notNull(jdbcUrl, "Configuration jdbc url cannot be empty");
+ Assert.notNull(userName, "Configuration jdbc userName cannot be empty");
+ Assert.notNull(password, "Configuration jdbc password cannot be empty");
+
+ this.maxConnectionAttempts =
+ Integer.parseInt(config.getProperty(JdbcStorageConfigConstants.MAX_CONNECTIONS_ATTEMPTS,
+ JdbcStorageConfigConstants.MAX_CONNECTIONS_ATTEMPTS_DEFAULT));
+ this.connectionRetryBackoff = Long.parseLong(config.getProperty(JdbcStorageConfigConstants.CONNECTION_RETRY_BACKOFF,
+ JdbcStorageConfigConstants.CONNECTION_RETRY_BACKOFF_DEFAULT));
+
+ // Storage db and tables
+ String database = config.getProperty(JdbcStorageConfigConstants.STORAGE_JDBC_DATABASE_NAME,
+ JdbcStorageConfigConstants.DATABASE_DEFAULT);
+ String schemaName = config.getProperty(JdbcStorageConfigConstants.STORAGE_JDBC_SCHEMA_NAME,
+ null);
+ String tableName = config.getProperty(JdbcStorageConfigConstants.STORAGE_JDBC_TABLE_NAME,
+ JdbcStorageConfigConstants.TABLE_NAME_DEFAULT);
+
+ this.tableId = new TableId(tableName, database, schemaName);
+ }
+
+ @Override
+ public String dbType() {
+ return dbType;
+ }
+
+ /**
+ * create storage tables
+ */
+ protected void createStorageTables() throws IOException {
+ InputStream inputStream = GenericDatabaseDialect.class.getResourceAsStream(String.format(DDL_FILE, dbType()));
+ BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+ List<String> ddl = Lists.newArrayList();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ ddl.add(line);
+ }
+ String[] statements = ddl.stream()
+ .map(String::trim)
+ .filter(x -> !x.startsWith("--") && !x.isEmpty())
+ .map(
+ x -> {
+ final Matcher m =
+ COMMENT_PATTERN.matcher(x);
+ return m.matches() ? m.group(1) : x;
+ })
+ .collect(Collectors.joining("\n"))
+ .split(";");
+
+ // create db and tables
+ try (Statement st = getConnection().createStatement()) {
+ for (String statement : statements) {
+ if (tableId.getCatalogName() != null) {
+ statement = statement.replace(TableId.DB_NAME, tableId.getCatalogName().trim());
+ }
+ if (tableId.getSchemaName() != null) {
+ statement = statement.replace(TableId.SCHEMA_NAME, tableId.getSchemaName().trim());
+ }
+ statement = statement.replace(TableId.TABLE_NAME, tableId.getTableName().trim());
+ st.execute(statement.trim());
+ }
+ } catch (SQLException sqe) {
+ throw new SchemaException("Init database and table is failed", sqe);
+ }
+ }
+
+ protected ExpressionBuilder expressionBuilder() {
+ return identifierRules().expressionBuilder();
+ }
+
+ /**
+ * Load one data from schema table by schema full name
+ *
+ * @param keyField
+ * @return
+ */
+ @Override
+ public String buildSelectOneStatement(String keyField) {
+ ExpressionBuilder builder = expressionBuilder();
+ builder.append("SELECT ");
+ builder.appendList()
+ .delimitedBy(",")
+ .transformedBy(ExpressionBuilder.columnNames())
+ .of(fields());
+ builder.append(" from ");
+ builder.append(tableId);
+ // append where
+ builder.append(" WHERE ");
+ builder.append(keyField);
+ builder.append(" = ?");
+ return builder.toString();
+ }
+
+
+ /**
+ * Load all data from table
+ *
+ * @return
+ */
+ @Override
+ public String buildSelectStatement() {
+ ExpressionBuilder builder = expressionBuilder();
+ builder.append("SELECT ");
+ builder.appendList()
+ .delimitedBy(",")
+ .transformedBy(ExpressionBuilder.columnNames())
+ .of(FIELDS_DEFAULT);
+ builder.append(" from ");
+ builder.append(tableId);
+ return builder.toString();
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ try {
+ if (connection == null) {
+ newConnection();
+ } else if (!isConnectionValid(connection, VALIDITY_CHECK_TIMEOUT_S)) {
+ log.info("The database connection is invalid. Reconnecting...");
+ close();
+ newConnection();
+ }
+ } catch (SQLException sqle) {
+ throw new SchemaException("Get database is failed", sqle);
+ }
+ return connection;
+ }
+
+ private synchronized void newConnection() throws SQLException {
+ int attempts = 0;
+ while (attempts < maxConnectionAttempts) {
+ try {
+ ++count;
+ log.info("Attempting to open connection #{}", count);
+ connection = createConnection();
+ return;
+ } catch (SQLException sqle) {
+ attempts++;
+ if (attempts < maxConnectionAttempts) {
+ log.info("Unable to connect to database on attempt {}/{}. Will retry in {} ms.", attempts,
+ maxConnectionAttempts, connectionRetryBackoff, sqle
+ );
+ try {
+ Thread.sleep(connectionRetryBackoff);
+ } catch (InterruptedException e) {
+ // this is ok because just woke up early
+ }
+ } else {
+ throw sqle;
+ }
+ }
+ }
+ }
+
+ private Connection createConnection() throws SQLException {
+ Properties properties = new Properties();
+ if (userName != null) {
+ properties.setProperty("user", userName);
+ }
+ if (password != null) {
+ properties.setProperty("password", password);
+ }
+ DriverManager.setLoginTimeout(40);
+ Connection connection = DriverManager.getConnection(jdbcUrl, properties);
+ jdbcMajorVersion = connection.getMetaData().getJDBCMajorVersion();
+ return connection;
+ }
+
+ /**
+ * connection valid
+ *
+ * @param connection
+ * @param timeout
+ * @return
+ * @throws SQLException
+ */
+ @Override
+ public boolean isConnectionValid(Connection connection, int timeout) throws SQLException {
+ if (jdbcMajorVersion >= 4) {
+ return connection.isValid(timeout);
+ }
+ String query = checkConnectionQuery();
+ if (query != null) {
+ try (Statement statement = connection.createStatement()) {
+ if (statement.execute(query)) {
+ ResultSet rs = null;
+ try {
+ rs = statement.getResultSet();
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ protected String checkConnectionQuery() {
+ return "SELECT 1";
+ }
+
+ @Override
+ public PreparedStatement createPreparedStatement(Connection db, String query) throws SQLException {
+ log.trace("Creating a PreparedStatement '{}'", query);
+ PreparedStatement stmt = getConnection().prepareStatement(query);
+ return stmt;
+ }
+
+ /**
+ * build upsert sql
+ *
+ * @param tableId
+ * @param fields
+ * @return
+ */
+ public String buildUpsertStatement(TableId tableId, Collection<String> fields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String buildInsertStatement(TableId tableId, Collection<String> columns) {
+ ExpressionBuilder builder = expressionBuilder();
+ builder.append("INSERT INTO ");
+ builder.append(tableId);
+ builder.append("(");
+ builder.appendList()
+ .delimitedBy(",")
+ .transformedBy(ExpressionBuilder.columnNames())
+ .of(columns);
+ builder.append(") VALUES(");
+ builder.appendMultiple(",", "?", columns.size());
+ builder.append(")");
+ return builder.toString();
+ }
+
+ @Override
+ public String buildUpdateStatement(
+ TableId tableId,
+ Collection<String> keyColumns,
+ Collection<String> columns
+ ) {
+ ExpressionBuilder builder = expressionBuilder();
+ builder.append("UPDATE ");
+ builder.append(tableId);
+ builder.append(" SET ");
+ builder.appendList()
+ .delimitedBy(", ")
+ .transformedBy(ExpressionBuilder.columnNamesWith(" = ?"))
+ .of(columns);
+ if (!keyColumns.isEmpty()) {
+ builder.append(" WHERE ");
+ builder.appendList()
+ .delimitedBy(" AND ")
+ .transformedBy(ExpressionBuilder.columnNamesWith(" = ?"))
+ .of(keyColumns);
+ }
+ return builder.toString();
+ }
+
+ @Override
+ public String buildDeleteStatement(
+ TableId tableId,
+ Collection<String> keyColumns
+ ) {
+ ExpressionBuilder builder = expressionBuilder();
+ builder.append("DELETE FROM ");
+ builder.append(tableId);
+ if (!keyColumns.isEmpty()) {
+ builder.append(" WHERE ");
+ builder.appendList()
+ .delimitedBy(" AND ")
+ .transformedBy(ExpressionBuilder.columnNamesWith(" = ?"))
+ .of(keyColumns);
+ }
+ return builder.toString();
+ }
+
+ private int bindNoKeyFields(PreparedStatement statement, Collection<String> noKeyValues, int index) throws SQLException {
+ for (String value : noKeyValues) {
+ statement.setString(index++, value);
+ }
+ return index;
+ }
+
+ private int bindKeyFields(PreparedStatement statement, Collection<String> keyValues, int index) throws SQLException {
+ for (String value : keyValues) {
+ statement.setString(index++, value);
+ }
+ return index;
+ }
+
+ @Override
+ public void bindRecord(PreparedStatement statement, Collection<String> keyValues,
+ Collection<String> noKeyValues, Operator mode) throws SQLException {
+ int index = 1;
+ switch (mode) {
+ case SELECT:
+ case INSERT:
+ case UPSERT:
+ index = bindKeyFields(statement, keyValues, index);
+ bindNoKeyFields(statement, noKeyValues, index);
+ break;
+ case UPDATE:
+ index = bindNoKeyFields(statement, noKeyValues, index);
+ bindKeyFields(statement, keyValues, index);
+ break;
+ case DELETE:
+ bindKeyFields(statement, keyValues, index);
+ break;
+ }
+ }
+
+ @Override
+ public TableId tableId() {
+ return tableId;
+ }
+
+ @Override
+ public List<String> fields() {
+ return FIELDS_DEFAULT;
+ }
+
+ private IdentifierRules identifierRules() {
+ if (identifierRules.get() == null) {
+ try (Connection connection = getConnection()) {
+ DatabaseMetaData metaData = connection.getMetaData();
+ String leadingQuoteStr = metaData.getIdentifierQuoteString();
+ String trailingQuoteStr = leadingQuoteStr; // JDBC does not distinguish
+ String separator = metaData.getCatalogSeparator();
+ if (StringUtils.isEmpty(leadingQuoteStr)) {
+ leadingQuoteStr = defaultIdentifierRules.leadingQuoteString();
+ trailingQuoteStr = defaultIdentifierRules.trailingQuoteString();
+ }
+ if (StringUtils.isEmpty(separator)) {
+ separator = defaultIdentifierRules.identifierDelimiter();
+ }
+ identifierRules.set(new IdentifierRules(separator, leadingQuoteStr, trailingQuoteStr));
+ } catch (SQLException e) {
+ if (defaultIdentifierRules != null) {
+ identifierRules.set(defaultIdentifierRules);
+ log.warn("Unable to get identifier metadata; using default rules", e);
+ } else {
+ throw new SchemaException("Unable to get identifier metadata", e);
+ }
+ }
+ }
+ return identifierRules.get();
+ }
+
+
+ @Override
+ public void close() {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ // Ignore errors
+ }
+ }
+ }
+}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/TableId.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/TableId.java
new file mode 100644
index 0000000..b8e32ac
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/TableId.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.dialect;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.rocketmq.schema.registry.storage.jdbc.common.IdentifierRules;
+
+@Data
+@AllArgsConstructor
+@Builder
+public class TableId {
+ public static final String DB_NAME = "#DBNAME#";
+ public static final String SCHEMA_NAME = "#SCHEMANAME#";
+ public static final String TABLE_NAME = "#TABLENAME#";
+ private String tableName;
+ private String catalogName;
+ private String schemaName;
+
+ private void appendTo(StringBuilder builder, String name, String quote) {
+ builder.append(name.trim()).append(quote);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ if (catalogName != null) {
+ appendTo(builder, catalogName.trim(), IdentifierRules.DEFAULT_ID_DELIM);
+ }
+ if (schemaName != null) {
+ appendTo(builder, schemaName.trim(), IdentifierRules.DEFAULT_ID_DELIM);
+ }
+ builder.append(tableName.trim());
+ return builder.toString();
+ }
+}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/mysql/MysqlDatabaseDialect.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/mysql/MysqlDatabaseDialect.java
new file mode 100644
index 0000000..f3c1d8d
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/mysql/MysqlDatabaseDialect.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.dialect.mysql;
+
+import org.apache.rocketmq.schema.registry.storage.jdbc.common.ExpressionBuilder;
+import org.apache.rocketmq.schema.registry.storage.jdbc.dialect.DatabaseDialect;
+import org.apache.rocketmq.schema.registry.storage.jdbc.dialect.DatabaseDialectProvider;
+import org.apache.rocketmq.schema.registry.storage.jdbc.dialect.GenericDatabaseDialect;
+import org.apache.rocketmq.schema.registry.storage.jdbc.dialect.TableId;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Properties;
+
+
+/**
+ * Mysql dialect
+ */
+public class MysqlDatabaseDialect extends GenericDatabaseDialect {
+
+ public MysqlDatabaseDialect(Properties props, String dbType) throws IOException {
+ super(props, dbType);
+ }
+
+ @Override
+ public String buildUpsertStatement(TableId tableId, Collection<String> fields) {
+ final ExpressionBuilder.Transform<String> transform = (builder, col) -> {
+ builder.appendColumnName(col);
+ builder.append("=values(");
+ builder.appendColumnName(col);
+ builder.append(")");
+ };
+ ExpressionBuilder builder = expressionBuilder();
+ builder.append("insert into ");
+ builder.append(tableId);
+ builder.append("(");
+ builder.appendList()
+ .delimitedBy(",")
+ .transformedBy(ExpressionBuilder.columnNames())
+ .of(fields);
+ builder.append(") values(");
+ builder.appendMultiple(",", "?", fields.size());
+ builder.append(") on duplicate key update ");
+ builder.appendList()
+ .delimitedBy(",")
+ .transformedBy(transform)
+ .of(fields);
+ return builder.toString();
+ }
+
+
+ public static class Provider implements DatabaseDialectProvider {
+
+ @Override
+ public DatabaseDialect createDialect(Properties config) throws IOException {
+ return new MysqlDatabaseDialect(config, databaseType());
+ }
+
+ @Override
+ public String databaseType() {
+ return "mysql";
+ }
+ }
+
+}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/handler/IHandler.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/handler/IHandler.java
new file mode 100644
index 0000000..798d00e
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/handler/IHandler.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.handler;
+
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+
+import java.io.Closeable;
+import java.util.List;
+
+public abstract class IHandler implements Closeable {
+ /**
+ * Register schema
+ *
+ * @param schema
+ */
+ public abstract void register(SchemaInfo schema);
+
+ /**
+ * update schema
+ *
+ * @param schema
+ */
+ public abstract void updateSchema(SchemaInfo schema);
+
+ /**
+ * delete schema
+ *
+ * @param qualifiedName
+ */
+ public abstract void deleteSchema(QualifiedName qualifiedName);
+
+ /**
+ * delete schema
+ *
+ * @param qualifiedName
+ */
+ public abstract void deleteBySubject(QualifiedName qualifiedName);
+
+ /**
+ * delete by version
+ *
+ * @param name
+ */
+ public abstract void deleteByVersion(QualifiedName name);
+
+ /**
+ * get schema
+ *
+ * @param qualifiedName
+ * @return
+ */
+ public abstract SchemaInfo getSchema(QualifiedName qualifiedName);
+
+ /**
+ * get by subject
+ *
+ * @param subjectFullName
+ * @return
+ */
+ public abstract SchemaRecordInfo getBySubject(String subjectFullName);
+
+ /**
+ * Get schema info by subject
+ *
+ * @param subjectFullName
+ * @return
+ */
+ public abstract SchemaInfo getSchemaInfoBySubject(String subjectFullName);
+
+ /**
+ * Get subjects
+ *
+ * @param context
+ * @param tenant
+ * @return
+ */
+ public abstract List<String> getSubjects(StorageServiceContext context, String tenant);
+
+ /**
+ * Get tenants
+ *
+ * @param cluster
+ * @return
+ */
+ public abstract List<String> getTenants(String cluster);
+
+
+ protected void changeNotify() {
+
+ }
+}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/handler/SchemaHandler.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/handler/SchemaHandler.java
new file mode 100644
index 0000000..4527437
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/handler/SchemaHandler.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.handler;
+
+import com.google.common.collect.Lists;
+import com.hazelcast.config.Config;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.map.IMap;
+import com.hazelcast.map.listener.EntryAddedListener;
+import com.hazelcast.map.listener.EntryRemovedListener;
+import com.hazelcast.map.listener.EntryUpdatedListener;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.constant.SchemaConstants;
+import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
+import org.springframework.util.CollectionUtils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.rocketmq.schema.registry.storage.jdbc.store.JdbcSchemaMapStore.SCHEMAS;
+import static org.apache.rocketmq.schema.registry.storage.jdbc.store.JdbcSubjectMapStore.SUBJECTS;
+
+@Slf4j
+public class SchemaHandler extends IHandler {
+ private final IMap<String, SchemaInfo> schemas;
+ private final IMap<String, SchemaRecordInfo> subjects;
+ private final HazelcastInstance hazelcastInstance;
+
+ public SchemaHandler(String hazelcastYamlConfigPath) {
+ Config config;
+ try {
+ config = Config.loadFromFile(new File(hazelcastYamlConfigPath));
+ } catch (FileNotFoundException e) {
+ throw new SchemaException(String.format("File [%s] not found", hazelcastYamlConfigPath), e);
+ }
+
+ this.hazelcastInstance = Hazelcast.newHazelcastInstance(config);
+ this.subjects = this.hazelcastInstance.getMap(SUBJECTS);
+ this.subjects.loadAll(true);
+
+ this.schemas = this.hazelcastInstance.getMap(SCHEMAS);
+ this.schemas.loadAll(true);
+ this.schemas.addEntryListener(new SchemaChangeEntryListener(this.subjects), true);
+ loadAllSubject();
+ }
+
+ private void loadAllSubject() {
+ for (Map.Entry<String, SchemaInfo> schema : schemas.entrySet()) {
+ SchemaInfo schemaInfo = schema.getValue();
+ List<SchemaRecordInfo> allSchemaRecords = schemaInfo.getDetails().getSchemaRecords();
+ for (SchemaRecordInfo record : allSchemaRecords) {
+ List<String> recordSubjects =
+ record.getSubjects().stream().map(SubjectInfo::fullName).collect(Collectors.toList());
+ recordSubjects.forEach(subject -> {
+ subjects.put(subject, record);
+ });
+ }
+ }
+ }
+
+ @Override
+ public void register(SchemaInfo schema) {
+ if (schemas.containsKey(schema.schemaFullName())) {
+ log.error(String.format("Schema %s already exists, registration failed", schema.schemaFullName()));
+ return;
+ }
+ schemas.put(schema.schemaFullName(), schema);
+ }
+
+ /**
+ * update schema
+ *
+ * @param update
+ */
+ @Override
+ public void updateSchema(SchemaInfo update) {
+ if (!schemas.containsKey(update.schemaFullName())) {
+ log.warn(String.format("Schema %s does not exist, update failed", update.schemaFullName()));
+ return;
+ }
+ // Get lock
+ schemas.lock(update.schemaFullName());
+ try {
+ SchemaInfo current = schemas.get(update.schemaFullName());
+ boolean hasVersionDeleted = current.getRecordCount() > update.getRecordCount();
+ if (current.getLastModifiedTime() != null && update.getLastModifiedTime() != null &&
+ current.getLastModifiedTime().after(update.getLastModifiedTime())) {
+ log.info("Current Schema is later version, no need to update.");
+ return;
+ }
+ if (current.getLastRecordVersion() == update.getLastRecordVersion() && !hasVersionDeleted) {
+ log.info("Schema version is the same, no need to update.");
+ return;
+ }
+ if (current.getLastRecordVersion() > update.getLastRecordVersion() && !hasVersionDeleted) {
+ throw new SchemaException("Schema version is invalid, update: "
+ + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion());
+ }
+ schemas.put(update.schemaFullName(), update);
+ } finally {
+ // unlock
+ schemas.unlock(update.schemaFullName());
+ }
+ }
+
+ @Override
+ public void deleteSchema(QualifiedName qualifiedName) {
+ schemas.lock(qualifiedName.schemaFullName());
+ try {
+ if (!schemas.containsKey(qualifiedName.schemaFullName())) {
+ log.error(String.format("Schema %s does not exist, delete failed", qualifiedName.schemaFullName()));
+ return;
+ }
+ schemas.delete(qualifiedName);
+ } finally {
+ schemas.unlock(qualifiedName.schemaFullName());
+ }
+ }
+
+ @Override
+ public void deleteBySubject(QualifiedName qualifiedName) {
+ schemas.lock(qualifiedName.subjectFullName());
+ try {
+ SchemaInfo schemaInfo = getSchemaInfoBySubject(qualifiedName.subjectFullName());
+ if (schemaInfo == null) {
+ log.error(String.format("Schema %s does not exist, delete failed",
+ qualifiedName.subjectFullName()));
+ return;
+ }
+ schemas.delete(schemaInfo.schemaFullName());
+ } finally {
+ schemas.unlock(qualifiedName.subjectFullName());
+ }
+ }
+
+ @Override
+ public void deleteByVersion(QualifiedName name) {
+ SchemaInfo schemaInfo = getSchemaInfoBySubject(name.subjectFullName());
+ if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
+ log.error(String.format("Schema %s does not exist, failed to delete according to version",
+ name.subjectFullName()));
+ return;
+ }
+ List<SubjectInfo> subjects = schemaInfo.getLastRecord().getSubjects();
+ List<SchemaRecordInfo> schemaRecords = schemaInfo.getDetails().getSchemaRecords();
+ schemaRecords.removeIf(record -> record.getVersion() == name.getVersion());
+ if (CollectionUtils.isEmpty(schemaRecords)) {
+ deleteBySubject(name);
+ }
+ if (schemaInfo.getLastRecord().getSubjects().isEmpty()) {
+ schemaInfo.getLastRecord().setSubjects(subjects);
+ }
+ updateSchema(schemaInfo);
+ }
+
+ @Override
+ public SchemaInfo getSchema(QualifiedName qualifiedName) {
+ return schemas.get(qualifiedName.schemaFullName());
+ }
+
+ @Override
+ public SchemaRecordInfo getBySubject(String subjectFullName) {
+ subjects.lock(subjectFullName);
+ try {
+ if (!subjects.containsKey(subjectFullName)) {
+ return null;
+ }
+ return subjects.get(subjectFullName);
+ } finally {
+ subjects.unlock(subjectFullName);
+ }
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfoBySubject(String subjectFullName) {
+ SchemaRecordInfo subjectRecordInfo = subjects.get(subjectFullName);
+ if (subjectRecordInfo == null) {
+ return null;
+ }
+ String schemaFullName = subjectRecordInfo.getSchema();
+ return schemas.get(schemaFullName);
+ }
+
+ @Override
+ public List<String> getSubjects(StorageServiceContext context, String tenant) {
+ List<String> allSubjects = new ArrayList<>();
+ for (Map.Entry<String, SchemaRecordInfo> schemaRecordEntry : subjects.entrySet()) {
+ String subjectFullName = schemaRecordEntry.getKey();
+ String[] subjectFromCache = subjectFullName.split(String.valueOf(SchemaConstants.SUBJECT_SEPARATOR));
+ String subjectFromKey = subjectFromCache[2];
+ // Check permission
+ allSubjects.add(subjectFromKey);
+ }
+ return allSubjects;
+ }
+
+ @Override
+ public List<String> getTenants(String cluster) {
+ List<String> tenants = Lists.newArrayList();
+ for (Map.Entry<String, SchemaRecordInfo> schemaRecordEntry : subjects.entrySet()) {
+ String subjectFullName = schemaRecordEntry.getKey();
+ String tenant = subjectFullName.split(String.valueOf(SchemaConstants.SUBJECT_SEPARATOR))[1];
+ tenants.add(tenant);
+ }
+ return tenants;
+ }
+
+ @Override
+ public void close() throws IOException {
+ Hazelcast.shutdownAll();
+ }
+
+
+ /**
+ * Schema change entry listener
+ */
+ static class SchemaChangeEntryListener implements EntryAddedListener<String, SchemaInfo>, EntryRemovedListener<String, SchemaInfo>, EntryUpdatedListener<String, SchemaInfo> {
+
+ private final IMap<String, SchemaRecordInfo> subjectCache;
+
+ SchemaChangeEntryListener(IMap<String, SchemaRecordInfo> localCache) {
+ this.subjectCache = localCache;
+ }
+
+ @Override
+ public void entryAdded(EntryEvent<String, SchemaInfo> current) {
+ SchemaInfo schemaInfo = current.getValue();
+ subjectCache.put(schemaInfo.subjectFullName(), schemaInfo.getLastRecord());
+ }
+
+ @Override
+ public void entryRemoved(EntryEvent<String, SchemaInfo> current) {
+ if (current == null || current.getValue() == null || current.getValue().getDetails() == null) {
+ return;
+ }
+ // Delete subjects bind to any version
+ List<SchemaRecordInfo> allSchemaRecords = current.getValue().getDetails().getSchemaRecords();
+ List<String> allSubjects = allSchemaRecords.parallelStream()
+ .flatMap(record -> record.getSubjects().stream().map(SubjectInfo::fullName))
+ .collect(Collectors.toList());
+
+ allSubjects.forEach(subject -> {
+ subjectCache.remove(subject);
+ });
+ }
+
+ @Override
+ public void entryUpdated(EntryEvent<String, SchemaInfo> update) {
+ if (update == null || update.getValue() == null || update.getValue().getLastRecord() == null) {
+ return;
+ }
+ SchemaRecordInfo lastSchemaRecordInfo = update.getValue().getLastRecord();
+ lastSchemaRecordInfo.getSubjects().forEach(subject -> {
+ subjectCache.put(subject.fullName(), update.getValue().getLastRecord());
+ });
+ }
+ }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/IMapStore.java
similarity index 73%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/IMapStore.java
index 794b9ce..6360341 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/IMapStore.java
@@ -15,23 +15,11 @@
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc.store;
-public enum StorageType {
+import com.hazelcast.map.MapLoaderLifecycleSupport;
+import com.hazelcast.map.MapStore;
- /**
- * Rocketmq type
- */
- ROCKETMQ(1),
- /**
- * Mysql type
- */
- MYSQL(2);
-
- private final int value;
-
- StorageType(final int value) {
- this.value = value;
- }
+public interface IMapStore<K, V> extends MapStore<K, V>, MapLoaderLifecycleSupport {
}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSchemaMapStore.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSchemaMapStore.java
new file mode 100644
index 0000000..86033d8
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSchemaMapStore.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.store;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.hazelcast.core.HazelcastInstance;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.common.json.JsonConverter;
+import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.storage.jdbc.common.Operator;
+import org.apache.rocketmq.schema.registry.storage.jdbc.dialect.DatabaseDialect;
+import org.apache.rocketmq.schema.registry.storage.jdbc.dialect.DatabaseDialectProvider;
+import org.apache.rocketmq.schema.registry.storage.jdbc.dialect.DiscoverDialectFactory;
+import org.springframework.util.Assert;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.rocketmq.schema.registry.storage.jdbc.configs.JdbcStorageConfigConstants.STORAGE_JDBC_TYPE;
+import static org.apache.rocketmq.schema.registry.storage.jdbc.configs.JdbcStorageConfigConstants.STORAGE_JDBC_TYPE_MYSQL;
+
+/**
+ * Jdbc map store
+ */
+@Slf4j
+public class JdbcSchemaMapStore implements IMapStore<String, SchemaInfo> {
+
+ public final static String SCHEMAS = "schemas";
+ private DatabaseDialect dialect;
+ private JsonConverter converter;
+
+ @Override
+ public void init(HazelcastInstance hazelcastInstance, Properties config, String mapKey) {
+ Assert.isTrue(mapKey.equals(SCHEMAS), "Schema map key should be [schemas]");
+ String type = config.getProperty(STORAGE_JDBC_TYPE, STORAGE_JDBC_TYPE_MYSQL);
+ DatabaseDialectProvider provider = DiscoverDialectFactory.getDialectProvider(type);
+ try {
+ this.dialect = provider.createDialect(config);
+ } catch (IOException e) {
+ throw new SchemaException("Create jdbc storage instance failed.", e);
+ }
+ this.converter = new JsonConverterImpl();
+ }
+
+ @Override
+ public void store(String s, SchemaInfo schemaInfo) {
+ try (PreparedStatement ps = dialect.createPreparedStatement(
+ dialect.getConnection(),
+ dialect.buildUpsertStatement(dialect.tableId(), dialect.fields()))
+ ) {
+ List<String> keyFieldValues = Lists.newArrayList(schemaInfo.schemaFullName());
+ List<String> noKeyFieldValues = Lists.newArrayList(converter.toString(schemaInfo));
+ dialect.bindRecord(ps, keyFieldValues, noKeyFieldValues, Operator.UPSERT);
+ ps.executeUpdate();
+ } catch (SQLException sqe) {
+ throw new SchemaException("Registry schema handler", sqe);
+ }
+ }
+
+ @Override
+ public void storeAll(Map<String, SchemaInfo> map) {
+ Set<Map.Entry<String, SchemaInfo>> schemaInfos = map.entrySet();
+ for (Map.Entry<String, SchemaInfo> schemaInfoEntry : schemaInfos) {
+ store(schemaInfoEntry.getKey(), schemaInfoEntry.getValue());
+ }
+ }
+
+ @Override
+ public void delete(String key) {
+ try (PreparedStatement statement = dialect.createPreparedStatement(
+ dialect.getConnection(),
+ dialect.buildDeleteStatement(dialect.tableId(), Lists.newArrayList(dialect.fields().get(0))))) {
+ dialect.bindRecord(statement, Lists.newArrayList(key), null, Operator.DELETE);
+ statement.executeUpdate();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void deleteAll(Collection<String> collection) {
+ Iterator<String> iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ delete(iterator.next());
+ }
+ }
+
+ @Override
+ public SchemaInfo load(String key) {
+ try (PreparedStatement preparedStatement =
+ dialect.createPreparedStatement(
+ dialect.getConnection(),
+ dialect.buildSelectOneStatement(dialect.fields().stream().findFirst().get()))
+ ) {
+ dialect.bindRecord(preparedStatement, Lists.newArrayList(), Lists.newArrayList(key), Operator.SELECT);
+ Map<String, SchemaInfo> schemaInfoMap = refresh(preparedStatement);
+ if (schemaInfoMap.isEmpty()) {
+ return null;
+ }
+ return schemaInfoMap.entrySet().stream().findFirst().get().getValue();
+ } catch (SQLException | JsonProcessingException sqe) {
+ throw new SchemaException("", sqe);
+ }
+ }
+
+ @Override
+ public Map<String, SchemaInfo> loadAll(Collection<String> collection) {
+ Map<String, SchemaInfo> schemaInfos = Maps.newConcurrentMap();
+ Iterator<String> iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ String schemaKey = iterator.next();
+ schemaInfos.put(schemaKey, load(schemaKey));
+ }
+ return schemaInfos;
+ }
+
+ @Override
+ public Iterable<String> loadAllKeys() {
+ try (PreparedStatement preparedStatement =
+ dialect.createPreparedStatement(
+ dialect.getConnection(),
+ dialect.buildSelectStatement()
+ )
+ ) {
+ Map<String, SchemaInfo> schemaInfoMap = refresh(preparedStatement);
+ return schemaInfoMap.keySet();
+ } catch (SQLException | JsonProcessingException sqe) {
+ log.error("Load all schema thought jdbc is failed", sqe);
+ throw new SchemaException("Load all schema key is failed", sqe);
+ }
+ }
+
+ private Map<String, SchemaInfo> refresh(PreparedStatement preparedStatement) throws SQLException, JsonProcessingException {
+ Map<String, SchemaInfo> schemaInfos = Maps.newConcurrentMap();
+ ResultSet result = preparedStatement.executeQuery();
+ while (result.next()) {
+ String schemaInfo = result.getString(2);
+ schemaInfos.put(
+ result.getString(1),
+ converter.fromJson(schemaInfo, SchemaInfo.class)
+ );
+ }
+ return schemaInfos;
+ }
+
+ @Override
+ public void destroy() {
+ if (this.dialect != null) {
+ this.dialect.close();
+ }
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSchemaMapStoreFactory.java
similarity index 63%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSchemaMapStoreFactory.java
index 794b9ce..f8c339a 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSchemaMapStoreFactory.java
@@ -15,23 +15,17 @@
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc.store;
-public enum StorageType {
+import com.hazelcast.map.MapLoader;
+import com.hazelcast.map.MapStoreFactory;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
- /**
- * Rocketmq type
- */
- ROCKETMQ(1),
- /**
- * Mysql type
- */
- MYSQL(2);
+import java.util.Properties;
- private final int value;
-
- StorageType(final int value) {
- this.value = value;
+public class JdbcSchemaMapStoreFactory implements MapStoreFactory<String, SchemaInfo> {
+ @Override
+ public MapLoader<String, SchemaInfo> newMapStore(String s, Properties properties) {
+ return new JdbcSchemaMapStore();
}
-
}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSubjectMapStore.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSubjectMapStore.java
new file mode 100644
index 0000000..7076255
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSubjectMapStore.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.store;
+
+import com.google.common.collect.Maps;
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+import org.apache.rocketmq.schema.registry.storage.jdbc.cache.SubjectLocalCache;
+import org.springframework.util.Assert;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Jdbc subject map storage
+ */
+public class JdbcSubjectMapStore implements IMapStore<String, SchemaRecordInfo> {
+ public final static String SUBJECTS = "subjects";
+ private SubjectLocalCache subjectLocalCache;
+
+ @Override
+ public void init(HazelcastInstance hazelcastInstance, Properties properties, String mapKey) {
+ Assert.isTrue(mapKey.equals(SUBJECTS), "Subject map key should be [subjects]");
+ this.subjectLocalCache = new SubjectLocalCache();
+ }
+
+ @Override
+ public void store(String subjectFullName, SchemaRecordInfo schemaRecordInfo) {
+ subjectLocalCache.put(subjectFullName, schemaRecordInfo);
+ }
+
+ @Override
+ public void storeAll(Map<String, SchemaRecordInfo> records) {
+ records.forEach((subjectFullName, record) -> {
+ store(subjectFullName, record);
+ });
+ }
+
+ @Override
+ public void delete(String subjectFullName) {
+ subjectLocalCache.remove(subjectFullName);
+ }
+
+ @Override
+ public void deleteAll(Collection<String> records) {
+ Iterator<String> subjects = records.iterator();
+ while (subjects.hasNext()) {
+ delete(subjects.next());
+ }
+ }
+
+ @Override
+ public SchemaRecordInfo load(String subjectFullName) {
+ return subjectLocalCache.get(subjectFullName);
+ }
+
+ @Override
+ public Map<String, SchemaRecordInfo> loadAll(Collection<String> records) {
+ Map<String, SchemaRecordInfo> schemaRecordInfoMap = Maps.newConcurrentMap();
+ Iterator<String> subjects = records.iterator();
+ while (subjects.hasNext()) {
+ String subject = subjects.next();
+ schemaRecordInfoMap.put(subject, load(subject));
+ }
+ return schemaRecordInfoMap;
+ }
+
+ @Override
+ public Iterable<String> loadAllKeys() {
+ return subjectLocalCache.getCache().keySet();
+ }
+
+ @Override
+ public void destroy() {
+ this.subjectLocalCache = null;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSubjectMapStoreFactory.java
similarity index 62%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSubjectMapStoreFactory.java
index 794b9ce..f137542 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSubjectMapStoreFactory.java
@@ -15,23 +15,18 @@
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc.store;
-public enum StorageType {
+import com.hazelcast.map.MapLoader;
+import com.hazelcast.map.MapStoreFactory;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
- /**
- * Rocketmq type
- */
- ROCKETMQ(1),
- /**
- * Mysql type
- */
- MYSQL(2);
+import java.util.Properties;
- private final int value;
+public class JdbcSubjectMapStoreFactory implements MapStoreFactory<String, SchemaRecordInfo> {
- StorageType(final int value) {
- this.value = value;
+ @Override
+ public MapLoader<String, SchemaRecordInfo> newMapStore(String s, Properties properties) {
+ return new JdbcSubjectMapStore();
}
-
}
diff --git a/storage-jdbc/src/main/resources/META-INF/services/org.apache.rocketmq.schema.registry.common.storage.StoragePlugin b/storage-jdbc/src/main/resources/META-INF/services/org.apache.rocketmq.schema.registry.common.storage.StoragePlugin
new file mode 100644
index 0000000..777a908
--- /dev/null
+++ b/storage-jdbc/src/main/resources/META-INF/services/org.apache.rocketmq.schema.registry.common.storage.StoragePlugin
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.rocketmq.schema.registry.storage.jdbc.JdbcStoragePlugin
+
\ No newline at end of file
diff --git a/storage-jdbc/src/main/resources/META-INF/services/org.apache.rocketmq.schema.registry.storage.jdbc.dialect.DatabaseDialectProvider b/storage-jdbc/src/main/resources/META-INF/services/org.apache.rocketmq.schema.registry.storage.jdbc.dialect.DatabaseDialectProvider
new file mode 100644
index 0000000..1af3e3b
--- /dev/null
+++ b/storage-jdbc/src/main/resources/META-INF/services/org.apache.rocketmq.schema.registry.storage.jdbc.dialect.DatabaseDialectProvider
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.rocketmq.schema.registry.storage.jdbc.dialect.mysql.MysqlDatabaseDialect$Provider
\ No newline at end of file
diff --git a/storage-jdbc/src/main/resources/hazelcast.yaml b/storage-jdbc/src/main/resources/hazelcast.yaml
new file mode 100644
index 0000000..8f8d712
--- /dev/null
+++ b/storage-jdbc/src/main/resources/hazelcast.yaml
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast:
+ cluster-name: schema-registry
+ instance-name: schema-registry
+ network:
+ port:
+ auto-increment: false
+ port: 5801
+ map:
+ schemas:
+ map-store:
+ write-delay-seconds: 0
+ enabled: true
+ initial-mode: EAGER
+ factory-class-name: org.apache.rocketmq.schema.registry.storage.jdbc.store.JdbcSchemaMapStoreFactory
+ properties:
+ storage.jdbc.type: mysql
+ storage.jdbc.url: jdbc:mysql://localhost:3306
+ storage.jdbc.user: root
+ storage.jdbc.password: root
+ storage.jdbc.database.name: schema_registry
+ storage.jdbc.table.name: schema_table
+ subjects:
+ map-store:
+ write-delay-seconds: 0
+ enabled: true
+ initial-mode: EAGER
+ factory-class-name: org.apache.rocketmq.schema.registry.storage.jdbc.store.JdbcSubjectMapStoreFactory
+ properties:
+ cache.size: 1000
+ properties:
+ hazelcast.invocation.max.retry.count: 20
+ hazelcast.tcp.join.port.try.count: 30
+ hazelcast.logging.type: log4j2
+ hazelcast.operation.generic.thread.count: 100
+
diff --git a/storage-jdbc/src/main/resources/mysql-storage-ddl.sql b/storage-jdbc/src/main/resources/mysql-storage-ddl.sql
new file mode 100644
index 0000000..c02b5c4
--- /dev/null
+++ b/storage-jdbc/src/main/resources/mysql-storage-ddl.sql
@@ -0,0 +1,27 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE: mysql storage database
+-- ----------------------------------------------------------------------------------------------------------------
+create database IF NOT EXISTS #DBNAME#;
+-- ----------------------------------------------------------------------------------------------------------------
+CREATE TABLE IF NOT EXISTS `#DBNAME#`.`#TABLENAME#`(
+ `schema_full_name` VARCHAR(255) NOT NULL PRIMARY KEY,
+ `schema_info` TEXT NOT NULL COMMENT 'schema info',
+ `modified_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modified time',
+ `description` VARCHAR(512)
+)