You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/31 08:39:03 UTC
[rocketmq-schema-registry] 03/37: Basic feature
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
commit 691d78029fe13806b65f567007069ff7b0dd9395
Author: wangfan <18...@qq.com>
AuthorDate: Sun Jul 10 22:02:23 2022 +0800
Basic feature
---
.gitignore | 36 +++
README.md | 11 +-
bump-version.sh | 9 +
common/pom.xml | 92 ++++++
.../schema/registry/common/QualifiedName.java | 84 +++++
.../registry/common/auth/AccessControlService.java | 33 ++
.../auth/DefaultAccessControlServiceImpl.java | 62 ++++
.../registry/common/context/RequestContext.java | 80 +++++
.../common/context/RequestContextManager.java | 43 +++
.../common/context/StoragePluginContext.java | 37 +++
.../common/context/StorageServiceContext.java | 31 ++
.../schema/registry/common/dto/AuditDto.java | 51 +++
.../schema/registry/common/dto/BaseDto.java | 69 ++++
.../schema/registry/common/dto/FieldDto.java | 67 ++++
.../registry/common/dto/SchemaDetailDto.java | 41 +++
.../schema/registry/common/dto/SchemaDto.java | 61 ++++
.../schema/registry/common/dto/SchemaMetaDto.java | 58 ++++
.../registry/common/dto/SchemaRecordDto.java | 53 +++
.../registry/common/dto/SchemaStorageDto.java | 49 +++
.../schema/registry/common/dto/SubjectDto.java | 40 +++
.../common/exception/RequestExceptionHandler.java | 57 ++++
.../exception/SchemaAuthorizedException.java | 34 ++
.../exception/SchemaCompatibilityException.java | 36 +++
.../registry/common/exception/SchemaException.java | 45 +++
.../common/exception/SchemaExistException.java | 36 +++
.../common/exception/SchemaNotFoundException.java | 36 +++
.../registry/common/filter/RequestFilter.java | 91 ++++++
.../schema/registry/common/json/JsonConverter.java | 97 ++++++
.../registry/common/json/JsonConverterImpl.java | 99 ++++++
.../schema/registry/common/model/AuditInfo.java | 42 +++
.../schema/registry/common/model/BaseInfo.java | 43 +++
.../registry/common/model/Compatibility.java | 34 ++
.../schema/registry/common/model/Dependency.java | 49 +++
.../schema/registry/common/model/FieldInfo.java | 42 +++
.../registry/common/model/PluginLoadState.java | 26 ++
.../registry/common/model/SchemaDefination.java | 21 ++
.../registry/common/model/SchemaDetailInfo.java | 45 +++
.../schema/registry/common/model/SchemaEntity.java | 39 +++
.../schema/registry/common/model/SchemaInfo.java | 84 +++++
.../registry/common/model/SchemaMetaInfo.java | 43 +++
.../registry/common/model/SchemaOperation.java | 38 +++
.../registry/common/model/SchemaRecordInfo.java | 59 ++++
.../registry/common/model/SchemaStorageInfo.java | 42 +++
.../schema/registry/common/model/SchemaType.java | 53 +++
.../schema/registry/common/model/StorageType.java | 37 +++
.../schema/registry/common/model/SubjectInfo.java | 52 +++
.../registry/common/properties/AclProperties.java | 63 ++++
.../common/properties/CacheProperties.java | 26 ++
.../common/properties/DependencyProperties.java | 32 ++
.../registry/common/properties/GlobalConfig.java | 131 ++++++++
.../common/properties/GlobalConfigImpl.java | 110 +++++++
.../common/properties/SchemaProperties.java | 40 +++
.../common/properties/ServiceProperties.java | 27 ++
.../common/properties/StorageProperties.java | 36 +++
.../common/storage/DefaultStorageServiceImpl.java | 24 ++
.../common/storage/SpringStorageFactory.java | 70 ++++
.../registry/common/storage/StorageFactory.java | 51 +++
.../registry/common/storage/StorageManager.java | 126 ++++++++
.../registry/common/storage/StoragePlugin.java | 39 +++
.../common/storage/StoragePluginManager.java | 43 +++
.../registry/common/storage/StorageService.java | 81 +++++
.../common/storage/StorageServiceProxy.java | 121 +++++++
.../schema/registry/common/utils/CommonUtil.java | 247 ++++++++++++++
.../schema/registry/common/utils/IdGenerator.java | 27 ++
.../common/utils/MemoryJavaFileManager.java | 126 ++++++++
.../common/utils/SnowFlakeIdGenerator.java | 104 ++++++
.../schema/registry/common/utils/StorageUtil.java | 104 ++++++
core/.gitignore | 33 ++
core/.mvn/wrapper/maven-wrapper.jar | Bin 0 -> 58727 bytes
core/.mvn/wrapper/maven-wrapper.properties | 2 +
core/mvnw | 316 ++++++++++++++++++
core/mvnw.cmd | 188 +++++++++++
core/pom.xml | 61 ++++
.../schema/registry/core/CoreApplication.java | 25 ++
.../schema/registry/core/api/RequestProcessor.java | 81 +++++
.../registry/core/api/v1/SchemaController.java | 355 +++++++++++++++++++++
.../registry/core/config/SchemaManagerConfig.java | 51 +++
.../core/config/SchemaPropertiesConfig.java | 52 +++
.../registry/core/config/SchemaServiceConfig.java | 110 +++++++
.../registry/core/config/SchemaUtilsConfig.java | 36 +++
.../schema/registry/core/config/SwaggerConfig.java | 112 +++++++
.../ArtifactoryDependencyServiceImpl.java | 79 +++++
.../registry/core/dependency/DependencyHelper.java | 111 +++++++
.../core/dependency/DependencyService.java | 36 +++
.../core/dependency/DynamicCompileProvider.java | 139 ++++++++
.../core/dependency/DynamicJarsProvider.java | 48 +++
.../schema/registry/core/package-info.java | 26 ++
.../core/service/SchemaInitializationService.java | 90 ++++++
.../registry/core/service/SchemaService.java | 70 ++++
.../registry/core/service/SchemaServiceImpl.java | 257 +++++++++++++++
.../schema/registry/core/service/Service.java | 25 ++
core/src/main/resources/application.properties | 29 ++
core/src/main/resources/template.pom | 11 +
.../schema/registry/core/CoreApplicationTests.java | 11 +
.../core/controller/HelloControllerTest.java | 34 ++
pom.xml | 171 ++++++++++
schema-storage-rocketmq/pom.xml | 50 +++
.../registry/storage/rocketmq/RocketmqClient.java | 344 ++++++++++++++++++++
.../storage/rocketmq/RocketmqStorageClient.java | 75 +++++
.../rocketmq/RocketmqStorageClientImpl.java | 95 ++++++
.../storage/rocketmq/RocketmqStorageFactory.java | 42 +++
.../storage/rocketmq/RocketmqStoragePlugin.java | 43 +++
.../storage/rocketmq/RocketmqStorageService.java | 80 +++++
.../storage/rocketmq/RocketmqStorageUtils.java | 38 +++
.../storage/rocketmq/configs/ClientConfig.java | 39 +++
.../rocketmq/configs/RocketmqConfigConstants.java | 50 +++
.../storage/rocketmq/configs/ServiceConfig.java | 39 +++
.../registry/storage/rocketmq/package-info.java | 26 ++
...mq.schema.registry.common.storage.StoragePlugin | 19 ++
.../src/main/resources/rocketmq.properties | 19 ++
storage-war/pom.xml | 57 ++++
.../rocketmq/schema/registry/StorageWar.java | 38 +++
112 files changed, 7527 insertions(+), 1 deletion(-)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..fc92691
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,36 @@
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/test/**/target/
+*.log
+log/
+log/*
+tools/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
diff --git a/README.md b/README.md
index c3202e5..0de599e 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,10 @@
-# rocketmq-schema-registry
+#rocketmq-schema-registry
+
+## What is rocketmq-schema-registry
+
+
+## How rocketmq-schema-registry working
+
+
+## API Reference
+
diff --git a/bump-version.sh b/bump-version.sh
new file mode 100644
index 0000000..aeea11f
--- /dev/null
+++ b/bump-version.sh
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+if [[ $1 == "" ]]; then
+ echo "Usage: bump-version.sh version"
+ exit -1
+fi
+
+version=$1
+mvn versions:set -DnewVersion=$version
+find . -name *.versionsBackup -exec rm {} \;
diff --git a/common/pom.xml b/common/pom.xml
new file mode 100644
index 0000000..51d43f1
--- /dev/null
+++ b/common/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-schema-registry-all</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>0.0.2-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <packaging>jar</packaging>
+ <artifactId>common</artifactId>
+ <name>rocketmq-schema-registry-common ${project.version}</name>
+ <description>rocketmq-schema-registry-core</description>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <aether.version>1.1.0</aether.version>
+ <maven.version>3.2.5</maven.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.9.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>1.18.24</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.aether</groupId>
+ <artifactId>aether-api</artifactId>
+ <version>${aether.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.aether</groupId>
+ <artifactId>aether-util</artifactId>
+ <version>${aether.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.aether</groupId>
+ <artifactId>aether-impl</artifactId>
+ <version>${aether.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.aether</groupId>
+ <artifactId>aether-connector-basic</artifactId>
+ <version>${aether.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.aether</groupId>
+ <artifactId>aether-transport-file</artifactId>
+ <version>${aether.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.aether</groupId>
+ <artifactId>aether-transport-http</artifactId>
+ <version>${aether.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-aether-provider</artifactId>
+ <version>${maven.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
new file mode 100644
index 0000000..39c812f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common;
+
+import java.beans.Transient;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+
+import javax.security.auth.Subject;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
+
+@Data
+@Builder
+@NoArgsConstructor
+public class QualifiedName implements Serializable {
+ private static final long serialVersionUID = 2266514833942841209L;
+
+ private String cluster;
+ private String tenant;
+ private String subject;
+ private String schema;
+
+ public QualifiedName(
+ @Nullable final String cluster,
+ @Nullable final String tenant,
+ @Nullable final String subject,
+ @Nullable final String schema
+ ) {
+ this.cluster= cluster;
+ this.tenant= tenant;
+ this.subject= subject;
+ this.schema = schema;
+ }
+
+ public SubjectInfo subjectInfo() {
+ return new SubjectInfo(cluster, subject);
+ }
+
+ public String fullName() {
+ return cluster + '/' + tenant + '/' + subject + '/' + schema;
+ }
+
+ public String schemaFullName() {
+ return tenant + '/' + schema;
+ }
+
+ public String subjectFullName() {
+ return cluster + '/' + subject;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("{");
+ sb.append("\"cluster\":\"")
+ .append(cluster).append('\"');
+ sb.append("\"tenant\":\"")
+ .append(tenant).append('\"');
+ sb.append(",\"subject\":\"")
+ .append(subject).append('\"');
+ sb.append(",\"name\":\"")
+ .append(schema).append('\"');
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/auth/AccessControlService.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/auth/AccessControlService.java
new file mode 100644
index 0000000..086b1d9
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/auth/AccessControlService.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.auth;
+
+import org.apache.rocketmq.schema.registry.common.model.SchemaOperation;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaAuthorizedException;
+
+public interface AccessControlService {
+
+ default void checkPermission(
+ final String role,
+ final String resource,
+ final SchemaOperation operation
+ ) throws SchemaAuthorizedException {
+ // default do nothing
+
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/auth/DefaultAccessControlServiceImpl.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/auth/DefaultAccessControlServiceImpl.java
new file mode 100644
index 0000000..14f9df5
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/auth/DefaultAccessControlServiceImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.auth;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.schema.registry.common.model.SchemaOperation;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaAuthorizedException;
+import org.apache.rocketmq.schema.registry.common.properties.GlobalConfig;
+
+public class DefaultAccessControlServiceImpl implements AccessControlService {
+
+ private final GlobalConfig config;
+
+ public DefaultAccessControlServiceImpl(final GlobalConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public void checkPermission(
+ String role,
+ String resource,
+ SchemaOperation operation
+ ) throws SchemaAuthorizedException {
+ if (config.isAclEnabled()) {
+ // TODO
+ checkPermission(config.getAcl(), role, resource, operation);
+ }
+ }
+
+ /**
+ * Check at database level.
+ */
+ private void checkPermission(
+ final Map<QualifiedName, Set<String>> accessACL,
+ final String userName,
+ final String name,
+ final SchemaOperation operation
+ ) {
+ final Set<String> users = null;
+// accessACL.get(QualifiedName.ofDatabase(name.getCatalogName(), name.getDatabaseName()));
+ if ((users != null) && !users.isEmpty() && !users.contains(userName)) {
+ throw new SchemaAuthorizedException(String.format("%s is not permitted for %s %s", userName, operation.name(), name));
+ }
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/context/RequestContext.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/context/RequestContext.java
new file mode 100644
index 0000000..88df328
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/context/RequestContext.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.context;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.UUID;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class RequestContext implements Serializable {
+ private static final long serialVersionUID = 1772214628830653791L;
+
+ public static final String UNKNOWN = "UNKNOWN";
+ private final long timestamp = System.currentTimeMillis();
+ private final String id = UUID.randomUUID().toString();
+ private String userName;
+ private final String clientAppName;
+ private final String clientId;
+ private final String apiUri;
+ private final String scheme;
+ private final String token;
+
+ public RequestContext() {
+ this.userName = null;
+ this.clientAppName = null;
+ this.clientId = null;
+ this.apiUri = UNKNOWN;
+ this.scheme = UNKNOWN;
+ this.token = null;
+ }
+
+ protected RequestContext(
+ @Nullable final String userName,
+ @Nullable final String clientAppName,
+ @Nullable final String clientId,
+ final String apiUri,
+ final String scheme,
+ final String token
+ ) {
+ this.userName = userName;
+ this.clientAppName = clientAppName;
+ this.clientId = clientId;
+ this.apiUri = apiUri;
+ this.scheme = scheme;
+ this.token = token;
+ }
+
+ @Override
+ public String toString() {
+ return "RequestContext{" +
+ "timestamp=" + timestamp +
+ ", id='" + id + '\'' +
+ ", userName='" + userName + '\'' +
+ ", clientAppName='" + clientAppName + '\'' +
+ ", clientId='" + clientId + '\'' +
+ ", apiUri='" + apiUri + '\'' +
+ ", scheme='" + scheme + '\'' +
+ ", metaAccount='" + token + '\'' +
+ '}';
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/context/RequestContextManager.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/context/RequestContextManager.java
new file mode 100644
index 0000000..76dafbb
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/context/RequestContextManager.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.context;
+
+public class RequestContextManager {
+
+ private static final ThreadLocal<RequestContext> contexts = new ThreadLocal<>();
+
+ private RequestContextManager() {
+ }
+
+ public static void removeContext() {
+ contexts.remove();
+ }
+
+ public static RequestContext getContext() {
+ RequestContext result = contexts.get();
+ if (result == null) {
+ result = new RequestContext();
+ putContext(result);
+ }
+ return result;
+ }
+
+ public static void putContext(final RequestContext context) {
+ contexts.set(context);
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/context/StoragePluginContext.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/context/StoragePluginContext.java
new file mode 100644
index 0000000..28b0679
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/context/StoragePluginContext.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.context;
+
+import java.util.Properties;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.common.model.StorageType;
+import org.apache.rocketmq.schema.registry.common.properties.GlobalConfig;
+import org.springframework.context.ApplicationContext;
+
+@Data
+@AllArgsConstructor
+public class StoragePluginContext {
+
+ /**
+ * global config
+ */
+ private final GlobalConfig config;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/context/StorageServiceContext.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/context/StorageServiceContext.java
new file mode 100644
index 0000000..c417e0d
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/context/StorageServiceContext.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.context;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.rocketmq.schema.registry.common.model.StorageType;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class StorageServiceContext {
+ private long timestamp;
+ private String userName;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/AuditDto.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/AuditDto.java
new file mode 100644
index 0000000..0f558ae
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/AuditDto.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.dto;
+
+import java.util.Date;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class AuditDto extends BaseDto {
+ private static final long serialVersionUID = -2306105985602090836L;
+
+ @ApiModelProperty(value = "Description of this resource")
+ private String desc;
+
+ @ApiModelProperty(value = "The user who creates the resource")
+ private String createdBy;
+
+ @ApiModelProperty(value = "The time of this resource was created")
+ private Date createdTime;
+
+ @ApiModelProperty(value = "The user who updates the resource")
+ private String lastModifiedBy;
+
+ @ApiModelProperty(value = "The time of this resource was updated")
+ private Date lastModifiedTime;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/BaseDto.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/BaseDto.java
new file mode 100644
index 0000000..e7445ac
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/BaseDto.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.common.dto;
+
+import java.io.Serializable;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.rocketmq.schema.registry.common.json.JsonConverter;
+import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl;
+
+
+/**
+ * Base class for all DTOs, and all DTOs should be READ-ONLY.
+ */
+public abstract class BaseDto implements Serializable {
+ protected static final JsonConverter JSON_CONVERTER = new JsonConverterImpl();
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ return JSON_CONVERTER.toString(this);
+ }
+//
+// /**
+// * Deserialize data from the input stream.
+// *
+// * @param inputStream input stream
+// * @return Json ObjectNode
+// * @throws IOException exception deserializing the stream
+// */
+// @Nullable
+// public static ObjectNode deserializeObjectNode(
+// @Nonnull @NonNull final ObjectInputStream inputStream
+// ) throws IOException {
+// return JSON_CONVERTER.deserializeObjectNode(inputStream);
+// }
+//
+// /**
+// * Serialize data in the output stream.
+// *
+// * @param outputStream output stream
+// * @param jsonObject jsonObject
+// * @throws IOException exception serializing the json
+// */
+// public static void serializeObjectNode(
+// @Nonnull @NonNull final ObjectOutputStream outputStream,
+// @Nullable final ObjectNode jsonObject
+// ) throws IOException {
+// JSON_CONVERTER.serializeObjectNode(outputStream, jsonObject);
+// }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/FieldDto.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/FieldDto.java
new file mode 100644
index 0000000..dd4be4c
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/FieldDto.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.dto;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+@ApiModel(description = "Schema field/column information")
+@SuppressWarnings("unused")
+@Data
+@EqualsAndHashCode(callSuper = false)
+@NoArgsConstructor
+@Builder
+@AllArgsConstructor
+public class FieldDto extends BaseDto {
+ private static final long serialVersionUID = -8336499483006254487L;
+
+ @ApiModelProperty(value = "Position of the field")
+ private Integer pos;
+
+ @ApiModelProperty(value = "Name of the field", required = true)
+ private String name;
+
+ @ApiModelProperty(value = "Type of the field", required = true)
+ private String type;
+
+ @ApiModelProperty(value = "Comment of the field")
+ private String comment;
+
+ @ApiModelProperty(value = "Can the field be null, default is true")
+ private Boolean isNullable = true;
+
+ @ApiModelProperty(value = "Size of the field")
+ private Integer size;
+
+ @ApiModelProperty(value = "Default value of the field")
+ private String defaultValue;
+
+ @ApiModelProperty(value = "Is a sorted field, default is false")
+ private Boolean isSortable = false;
+
+ @ApiModelProperty(value = "This filed sorted type, likeļ¼ascending, descending, ignore")
+ private String sortType;
+
+ @ApiModelProperty(value = "Extra info of the field")
+ private String extra;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaDetailDto.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaDetailDto.java
new file mode 100644
index 0000000..914e6cf
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaDetailDto.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.dto;
+
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+import java.util.Optional;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SchemaDetailDto extends BaseDto {
+ private static final long serialVersionUID = -2397649152515693952L;
+
+ @ApiModelProperty(value = "Schema record with different version", required = true)
+ private List<SchemaRecordDto> schemaRecords;
+// private Map<String, String> props = ;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaDto.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaDto.java
new file mode 100644
index 0000000..08dcdf7
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaDto.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.dto;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+
+@ApiModel(description = "Schema detail information")
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class SchemaDto extends BaseDto {
+ private static final long serialVersionUID = -441512542075118183L;
+
+ @ApiModelProperty(value = "The qualified name of this entity")
+ private QualifiedName qualifiedName;
+
+ @ApiModelProperty(value = "Information about schema changes")
+ private AuditDto audit;
+
+ @ApiModelProperty(value = "Information about schema meta", required = true)
+ private SchemaMetaDto meta;
+
+ @ApiModelProperty(value = "Information about schema details", required = true)
+ private SchemaDetailDto details;
+
+ @ApiModelProperty(value = "Information about schema persistence")
+ private SchemaStorageDto storage;
+
+ @ApiModelProperty(value = "Extra schema parameters")
+ private Map<String, String> extras;
+
+ public SchemaDto setQualifiedName(QualifiedName qualifiedName) {
+ this.qualifiedName = qualifiedName;
+ return this;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaMetaDto.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaMetaDto.java
new file mode 100644
index 0000000..4c4a5b4
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaMetaDto.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.dto;
+
+import org.apache.rocketmq.schema.registry.common.model.Compatibility;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SchemaMetaDto extends BaseDto {
+ private static final long serialVersionUID = -4377140936300258473L;
+
+ @ApiModelProperty(value = "The type of the schema")
+ private SchemaType type;
+
+ @ApiModelProperty(value = "The tenant of the schema")
+ private String tenant;
+
+ @ApiModelProperty(value = "The namespace of the schema")
+ private String namespace;
+
+ @ApiModelProperty(value = "The struct name of the schema")
+ private String schemaName;
+
+ @ApiModelProperty(value = "Compatibility of the schema")
+ private Compatibility compatibility;
+
+ @ApiModelProperty(value = "Owner of the schema")
+ private String owner;
+
+ @ApiModelProperty(value = "The unique id of the schema")
+ private long uniqueId;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaRecordDto.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaRecordDto.java
new file mode 100644
index 0000000..80c6357
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaRecordDto.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.dto;
+
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.rocketmq.schema.registry.common.model.Dependency;
+
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SchemaRecordDto {
+
+ @ApiModelProperty(value = "Schema name", required = true)
+ private String schema;
+
+ @ApiModelProperty(value = "Schema unique id", required = true)
+ private long schemaId;
+
+ @ApiModelProperty(value = "Version of this schema record")
+ private long version;
+
+ @ApiModelProperty(value = "IDL of this schema record", required = true)
+ private String idl;
+
+ @ApiModelProperty(value = "Dependency of this schema record")
+ private Dependency dependency;
+
+ @ApiModelProperty(value = "Subjects of this record binding")
+ private List<SubjectDto> subjects;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaStorageDto.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaStorageDto.java
new file mode 100644
index 0000000..5db3cac
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaStorageDto.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.dto;
+
+import java.util.Map;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.rocketmq.schema.registry.common.model.Dependency;
+
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SchemaStorageDto extends BaseDto {
+ private static final long serialVersionUID = -3298771958844258686L;
+
+ @ApiModelProperty(value = "Protocol of the schema serializer / deserializer")
+ private String serdeProtocol;
+
+ @ApiModelProperty(value = "Uploaded dependency library of the schema")
+ private Dependency dependency;
+
+ @ApiModelProperty(value = "Extra storage parameters")
+ private Map<String, String> serdeInfo;
+
+ @ApiModelProperty(value = "URI of the schema")
+ private String uri;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SubjectDto.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SubjectDto.java
new file mode 100644
index 0000000..2ff3ad2
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SubjectDto.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.dto;
+
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SubjectDto {
+
+ @ApiModelProperty(value = "Cluster of this subject", required = true)
+ private String cluster;
+
+ @ApiModelProperty(value = "Name of this subject", required = true)
+ private String subject;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/RequestExceptionHandler.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/RequestExceptionHandler.java
new file mode 100644
index 0000000..0c0cb29
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/RequestExceptionHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.exception;
+
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.RestControllerAdvice;
+
+import lombok.extern.slf4j.Slf4j;
+
+@RestControllerAdvice
+@Slf4j
+public class RequestExceptionHandler {
+
+ /**
+ * Handle Schema service Exceptions.
+ *
+ * @param response The HTTP response
+ * @param e The inner exception to handle
+ * @throws IOException on error in sending error
+ */
+ @ExceptionHandler({SchemaException.class})
+ public void handleException(
+ final HttpServletResponse response,
+ final SchemaException e
+ ) throws IOException {
+ final int status;
+
+ if (e instanceof SchemaNotFoundException) {
+ status = HttpStatus.NOT_FOUND.value();
+ } else {
+ status = HttpStatus.INTERNAL_SERVER_ERROR.value();
+ }
+
+ log.error("Global handle SchemaException: " + e.getMessage(), e);
+ response.sendError(status, e.getMessage());
+ }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaAuthorizedException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaAuthorizedException.java
new file mode 100644
index 0000000..b190017
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaAuthorizedException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.exception;
+
+public class SchemaAuthorizedException extends SchemaException {
+ private static final long serialVersionUID = 204882338833006991L;
+
+ public SchemaAuthorizedException(final String tenant, final String schemaName) {
+ this(String.format("Schema: %s/%s not found, please check your configuration.", tenant, schemaName));
+ }
+
+ public SchemaAuthorizedException(final String msg) {
+ super(msg);
+ }
+
+ public SchemaAuthorizedException(final String msg, final Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaCompatibilityException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaCompatibilityException.java
new file mode 100644
index 0000000..c694cb0
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaCompatibilityException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.exception;
+
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+
+public class SchemaCompatibilityException extends SchemaException {
+ private static final long serialVersionUID = 2602020608319903212L;
+
+ public SchemaCompatibilityException(final QualifiedName qualifiedName) {
+ this(String.format("Schema: %s validate failed.", qualifiedName));
+ }
+
+ public SchemaCompatibilityException(final String msg) {
+ super(msg);
+ }
+
+ public SchemaCompatibilityException(final String msg, final Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaException.java
new file mode 100644
index 0000000..524b54c
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.exception;
+
+public class SchemaException extends RuntimeException {
+
+ /** Constructor. */
+ public SchemaException() {
+ super();
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param msg The error message
+ */
+ public SchemaException(final String msg) {
+ super(msg);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param msg The error message
+ * @param cause The cause of the error
+ */
+ public SchemaException(final String msg, final Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java
new file mode 100644
index 0000000..bcea88c
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.exception;
+
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+
+public class SchemaExistException extends SchemaException {
+ private static final long serialVersionUID = -9177284523006645052L;
+
+ public SchemaExistException(final QualifiedName qualifiedName) {
+ this(String.format("Schema: %s is exist, please check your configuration.", qualifiedName));
+ }
+
+ public SchemaExistException(final String msg) {
+ super(msg);
+ }
+
+ public SchemaExistException(final String msg, final Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaNotFoundException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaNotFoundException.java
new file mode 100644
index 0000000..12bcedb
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaNotFoundException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.exception;
+
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+
+public class SchemaNotFoundException extends SchemaException {
+ private static final long serialVersionUID = 554251224980156176L;
+
+ public SchemaNotFoundException(final QualifiedName qualifiedName) {
+ this(String.format("Schema: %s not found, please check your configuration.", qualifiedName));
+ }
+
+ public SchemaNotFoundException(final String msg) {
+ super(msg);
+ }
+
+ public SchemaNotFoundException(final String msg, final Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/filter/RequestFilter.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/filter/RequestFilter.java
new file mode 100644
index 0000000..05b566a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/filter/RequestFilter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.filter;
+
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+
+import org.apache.rocketmq.schema.registry.common.context.RequestContextManager;
+import org.apache.rocketmq.schema.registry.common.context.RequestContext;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class RequestFilter implements Filter {
+
+
+ public RequestFilter() {
+
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void init(FilterConfig filterConfig) throws ServletException {
+ Filter.super.init(filterConfig);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void doFilter(
+ ServletRequest request,
+ ServletResponse response,
+ FilterChain chain
+ ) throws IOException, ServletException {
+ // Pre-processing
+ if (!(request instanceof HttpServletRequest)) {
+ throw new ServletException("Expected an HttpServletRequest but didn't get one");
+ }
+ final HttpServletRequest httpServletRequest = (HttpServletRequest) request;
+ final String method = httpServletRequest.getMethod();
+
+ // TODO: get request Authorization from http header
+// final String requestAuth =
+// httpServletRequest.getHeader(RequestContext.MICOMPUTE_REQUEST_HEADER_AUTHORIZATION);
+// final String metaAccount = StringUtils.isNotBlank(requestAuth)
+// ? requestAuth.replaceAll("@<", "\\{").replaceAll("@>", "\\}")
+// : requestAuth;
+ final RequestContext context = RequestContext.builder().build();
+ RequestContextManager.putContext(context);
+ log.info("filter " + context.toString());
+
+ // Do the rest of the chain
+ chain.doFilter(request, response);
+
+ // Post processing
+ RequestContextManager.removeContext();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void destroy() {
+ Filter.super.destroy();
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/json/JsonConverter.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/json/JsonConverter.java
new file mode 100644
index 0000000..4094c3f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/json/JsonConverter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.json;
+
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * Json <-> Object
+ */
+public interface JsonConverter {
+
+ /**
+ * Parses the given string as json and returns a json ObjectNode representing the json.
+ *
+ * @param s a string representing a json object
+ * @return an object node representation of the string
+ * @throws SchemaException if unable to convert the string to json or the json isn't a json object.
+ */
+ ObjectNode fromJson(String s) throws SchemaException;
+
+ /**
+ * Parses the given JSON value.
+ *
+ * @param s json string
+ * @param clazz class
+ * @param <T> type of the class
+ * @return object
+ */
+ <T> T fromJson(String s, Class<T> clazz);
+
+ /**
+ * Parses the given JSON value.
+ *
+ * @param s json byte array
+ * @param clazz class
+ * @param <T> type of the class
+ * @return object
+ */
+ <T> T fromJson(byte[] s, Class<T> clazz);
+
+ /**
+ * Converts JSON as bytes.
+ *
+ * @param o object
+ * @return byte array
+ */
+ byte[] toJsonAsBytes(Object o);
+
+ /**
+ * Converts an object to JSON.
+ *
+ * @param o object
+ * @return JSON node
+ */
+ ObjectNode toJsonAsObjectNode(Object o);
+
+ /**
+ * Converts an object to JSON string.
+ *
+ * @param o object
+ * @return JSON string
+ */
+ String toJson(Object o);
+
+ /**
+ * Converts an object to JSON string.
+ *
+ * @param o object
+ * @return JSON string
+ */
+ String toString(Object o);
+
+ /**
+ * Converts String to bytes.
+ *
+ * @param s string array
+ * @return byte array
+ */
+ byte[] toBytes(String s);
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/json/JsonConverterImpl.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/json/JsonConverterImpl.java
new file mode 100644
index 0000000..03b1f1f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/json/JsonConverterImpl.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.json;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
+import java.nio.charset.StandardCharsets;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@NoArgsConstructor
+@Getter
+@Slf4j
+public class JsonConverterImpl implements JsonConverter {
+
+ private final ObjectMapper mapper = new ObjectMapper();
+ private final Gson gson = new Gson();
+
+ @Override
+ public ObjectNode fromJson(String s) throws SchemaException {
+ return null;
+ }
+
+ @Override
+ public <T> T fromJson(String s, Class<T> clazz) {
+ return gson.fromJson(s, clazz);
+// T dst;
+// try {
+// dst = mapper.readValue(s, clazz);
+// } catch (Exception e){
+// throw new SchemaException("Parse data failed", e);
+// }
+// return dst;
+ }
+
+ @Override
+ public <T> T fromJson(byte[] s, Class<T> clazz) {
+ return gson.fromJson(new String(s), clazz);
+// T dst;
+// try {
+// dst = mapper.readValue(s, clazz);
+// } catch (Exception e){
+// throw new SchemaException("Parse data failed", e);
+// }
+// return dst;
+ }
+
+ @Override
+ public byte[] toJsonAsBytes(Object o) {
+ return gson.toJson(o).getBytes(StandardCharsets.UTF_8);
+// byte[] dst;
+// try {
+// dst = mapper.writeValueAsBytes(o);
+// } catch (Exception e){
+// throw new SchemaException("Parse data failed", e);
+// }
+// return dst;
+ }
+
+ @Override
+ public ObjectNode toJsonAsObjectNode(Object o) {
+ return null;
+ }
+
+ @Override
+ public String toJson(Object o) {
+ return gson.toJson(o);
+ }
+
+ @Override
+ public String toString(Object o) {
+ return null;
+ }
+
+ @Override
+ public byte[] toBytes(String s) {
+ return s.getBytes(StandardCharsets.UTF_8);
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/AuditInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/AuditInfo.java
new file mode 100644
index 0000000..66cbd5e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/AuditInfo.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+import io.swagger.annotations.ApiModelProperty;
+import java.io.Serializable;
+import java.util.Date;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class AuditInfo implements Serializable {
+ private static final long serialVersionUID = 2258089775496856662L;
+
+ private String desc;
+ private String createdBy;
+ private Date createdTime;
+ private String lastModifiedBy;
+ private Date lastModifiedTime;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/BaseInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/BaseInfo.java
new file mode 100644
index 0000000..7f4ef5e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/BaseInfo.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+import java.io.Serializable;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Data
+public class BaseInfo implements Serializable {
+ private static final long serialVersionUID = 3058601529800238252L;
+
+ private QualifiedName qualifiedName;
+
+ private AuditInfo audit;
+
+ public String schemaFullName() {
+ return qualifiedName.schemaFullName();
+ }
+
+ public String subjectFullName() {
+ return qualifiedName.subjectFullName();
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/Compatibility.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/Compatibility.java
new file mode 100644
index 0000000..34277fd
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/Compatibility.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+public enum Compatibility {
+ NONE(1),
+ BACKWARD(2),
+ BACKWARD_TRANSITIVE(3),
+ FORWARD(4),
+ FORWARD_TRANSITIVE(5),
+ FULL(6),
+ FULL_TRANSITIVE(7);
+
+ private final int value;
+
+ Compatibility(int value) {
+ this.value = value;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/Dependency.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/Dependency.java
new file mode 100644
index 0000000..3484e48
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/Dependency.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+import java.io.Serializable;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class Dependency implements Serializable {
+ private static final long serialVersionUID = -5947555406026133852L;
+
+ private String groupId;
+ private String artifactId;
+ private String version;
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("{");
+ sb.append("\"groupId\":\"")
+ .append(groupId).append('\"');
+ sb.append(",\"artifactId\":\"")
+ .append(artifactId).append('\"');
+ sb.append(",\"version\":\"")
+ .append(version).append('\"');
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/FieldInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/FieldInfo.java
new file mode 100644
index 0000000..3d11a7d
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/FieldInfo.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+import io.swagger.annotations.ApiModelProperty;
+import java.io.Serializable;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Data
+public class FieldInfo implements Serializable {
+ private static final long serialVersionUID = -7630383008028496477L;
+
+ private Integer pos;
+ private String name;
+ private String type;
+ private String comment;
+ private Boolean isNullable;
+ private Integer size;
+ private String defaultValue;
+ private Boolean isSortable;
+ private String sortType;
+ private String extra;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/PluginLoadState.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/PluginLoadState.java
new file mode 100644
index 0000000..353c564
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/PluginLoadState.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+public enum PluginLoadState {
+ INIT,
+ LOADING,
+ LOADED,
+ STARTING,
+ STARTED
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDefination.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDefination.java
new file mode 100644
index 0000000..2c3e873
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDefination.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+public class SchemaDefination {
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDetailInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDetailInfo.java
new file mode 100644
index 0000000..aa93dbf
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDetailInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.Serializable;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class SchemaDetailInfo implements Serializable {
+ private static final long serialVersionUID = 3113021009662503334L;
+
+ private List<SchemaRecordInfo> schemaRecords;
+
+ public SchemaRecordInfo lastRecord() {
+ if (schemaRecords == null || schemaRecords.isEmpty()) {
+ throw new SchemaException("Schema record could not been empty");
+ }
+ return schemaRecords.get(schemaRecords.size() - 1);
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaEntity.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaEntity.java
new file mode 100644
index 0000000..4e81637
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaEntity.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+import java.io.Serializable;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+@Deprecated
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class SchemaEntity implements Serializable {
+ private static final long serialVersionUID = 3371325175580091718L;
+
+ private SchemaMetaInfo meta;
+ private SchemaRecordInfo record;
+ private SchemaStorageInfo storage;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java
new file mode 100644
index 0000000..56f1953
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@EqualsAndHashCode(callSuper = true)
+@AllArgsConstructor
+@NoArgsConstructor
+public class SchemaInfo extends BaseInfo {
+ private static final long serialVersionUID = -5143258312429353896L;
+
+ private SchemaMetaInfo meta;
+
+ private SchemaDetailInfo details;
+
+ private SchemaStorageInfo storage;
+
+ private Map<String, String> extras;
+
+ public String getSchemaName() {
+ return getQualifiedName().getSchema();
+ }
+
+ public String getNamespace() {
+ return getMeta().getNamespace();
+ }
+
+ public SchemaType getSchemaType() {
+ return getMeta().getType();
+ }
+
+ public SchemaRecordInfo getLastRecord() {
+ return getDetails().lastRecord();
+ }
+
+ public String getLastRecordIdl() {
+ return getLastRecord().getIdl();
+ }
+
+ public long getUniqueId() {
+ return getMeta().getUniqueId();
+ }
+
+ public void setUniqueId(long uniqueId) {
+ getMeta().setUniqueId(uniqueId);
+ getLastRecord().setSchemaId(uniqueId);
+ }
+
+ public void setLastRecordDependency(Dependency dependency) {
+ getLastRecord().setDependency(dependency);
+ }
+
+ public long getLastRecordVersion() {
+ return getLastRecord().getVersion();
+ }
+
+ public void setLastRecordVersion(long version) {
+ getLastRecord().setVersion(version);
+ }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaMetaInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaMetaInfo.java
new file mode 100644
index 0000000..135e636
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaMetaInfo.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.Serializable;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class SchemaMetaInfo implements Serializable {
+ private static final long serialVersionUID = -7872862344708837525L;
+
+ private SchemaType type;
+ private String tenant;
+ private String namespace;
+ private String schemaName;
+ private Compatibility compatibility;
+ private String owner;
+ private long uniqueId;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaOperation.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaOperation.java
new file mode 100644
index 0000000..5a92cf1
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaOperation.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+public enum SchemaOperation {
+
+ /**
+ * Operation type: register.
+ */
+ REGISTER,
+ /**
+ * Operation type: delete.
+ */
+ DELETE,
+ /**
+ * Operation type: update.
+ */
+ UPDATE,
+ /**
+ * Operation type: get.
+ */
+ GET
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java
new file mode 100644
index 0000000..9f9f159
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class SchemaRecordInfo implements Serializable {
+ private static final long serialVersionUID = 6215296681034788729L;
+
+ private String schema;
+ private long schemaId;
+ private long version;
+ private String idl;
+ private Dependency dependency;
+ private List<SubjectInfo> subjects;
+ // private List<FieldInfo> fields;
+
+ public void bindSubject(final SubjectInfo subjectInfo) {
+ if (getSubjects() == null) {
+ setSubjects(new ArrayList<>());
+ }
+ getSubjects().add(subjectInfo);
+ }
+
+ public void unbindSubject(final SubjectInfo subjectInfo) {
+ getSubjects().remove(subjectInfo);
+ }
+
+ public SubjectInfo lastBindSubject() {
+ if (getSubjects() == null) {
+ throw new SchemaException("Schema record haven't bind any subject");
+ }
+ return getSubjects().get(subjects.size() - 1);
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaStorageInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaStorageInfo.java
new file mode 100644
index 0000000..491924e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaStorageInfo.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+import io.swagger.annotations.ApiModelProperty;
+import java.io.Serializable;
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class SchemaStorageInfo implements Serializable {
+ private static final long serialVersionUID = -6655281552098217740L;
+
+ private String serdeProtocol;
+ // TODO delete?
+ private String serializationLib;
+ private Map<String, String> serdeInfo;
+ private String uri;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaType.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaType.java
new file mode 100644
index 0000000..785a05a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaType.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+public enum SchemaType {
+
+ /**
+ * Avro type
+ */
+ AVRO(1),
+ /**
+ * Protobuf type
+ */
+ PROTOBUF(2),
+ /**
+ * Thrift type
+ */
+ THRIFT(3),
+ /**
+ * Json type
+ */
+ JSON(4),
+ /**
+ * Text type for reserved
+ */
+ TEXT(5),
+ /**
+ * Binlog type for reserved
+ */
+ BINLOG(6);
+
+ private final int value;
+
+ SchemaType(final int value) {
+ this.value = value;
+ }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
new file mode 100644
index 0000000..794b9ce
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+public enum StorageType {
+
+ /**
+ * Rocketmq type
+ */
+ ROCKETMQ(1),
+ /**
+ * Mysql type
+ */
+ MYSQL(2);
+
+ private final int value;
+
+ StorageType(final int value) {
+ this.value = value;
+ }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java
new file mode 100644
index 0000000..9790f13
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.model;
+
+import java.io.Serializable;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SubjectInfo implements Serializable {
+ private static final long serialVersionUID = -92808722007777844L;
+
+ private String cluster;
+ private String subject;
+
+ public String fullName() {
+ return cluster + '/' + subject;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("{");
+ sb.append("\"cluster\":\"")
+ .append(cluster).append('\"');
+ sb.append(",\"subject\":\"")
+ .append(subject).append('\"');
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/AclProperties.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/AclProperties.java
new file mode 100644
index 0000000..8a427ad
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/AclProperties.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.properties;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+
+@Data
+public class AclProperties {
+ private boolean enabled;
+ private String aclStr;
+ private Map<QualifiedName, Set<String>> aclMap;
+
+ public Map<QualifiedName, Set<String>> getAclMap() {
+ if (aclMap == null) {
+ aclMap = aclStr == null ? new HashMap<>() : buildAclMap(aclStr);
+ }
+ return aclMap;
+ }
+
+ /**
+ * Parse the configuration to get operation control. The control is at userName level
+ * and the controlled operations include create, delete, and rename for table.
+ * The format is below.
+ * db1:user1,user2|db2:user1,user2
+ *
+ * @param aclStr the config strings for dbs
+ * @return acl config
+ */
+ @VisibleForTesting
+ private static Map<QualifiedName, Set<String>> buildAclMap(final String aclStr) {
+ final Map<QualifiedName, Set<String>> aclMap = new HashMap<>();
+ try {
+ for (String entity : StringUtils.split(aclStr, "|")) {
+
+ }
+ } catch (Exception e) {
+ throw new SchemaException("Schema acl property parsing error", e);
+ }
+ return aclMap;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/CacheProperties.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/CacheProperties.java
new file mode 100644
index 0000000..a606a08
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/CacheProperties.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.properties;
+
+import lombok.Data;
+
+@Data
+public class CacheProperties {
+ private boolean enabled;
+ private long ttl;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/DependencyProperties.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/DependencyProperties.java
new file mode 100644
index 0000000..9e6e4c6
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/DependencyProperties.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.properties;
+
+import lombok.Data;
+
+@Data
+public class DependencyProperties {
+ private boolean uploadEnabled;
+ private String compilePath;
+ private String jdkPath;
+ private String localRepositoryPath;
+ private String repositoryUrl;
+ private String username;
+ private String password;
+ private String template = "core/src/main/resources/template.pom";
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/GlobalConfig.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/GlobalConfig.java
new file mode 100644
index 0000000..c80ecf8
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/GlobalConfig.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.properties;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.model.StorageType;
+
+public interface GlobalConfig {
+
+ /**
+ * Enable schema storage cache.
+ *
+ * @return true if cache is enabled
+ */
+ boolean isCacheEnabled();
+
+ /**
+ * Enable schema upload to remote repository.
+ *
+ * @return true if cache is enabled
+ */
+ boolean isUploadEnabled();
+
+ /**
+ * File path for upload.
+ *
+ * @return file path
+ */
+ String getDependencyCompilePath();
+
+ /**
+ * File path for upload.
+ *
+ * @return file path
+ */
+ String getDependencyLocalRepositoryPath();
+
+ /**
+ * Jdk path for compile.
+ *
+ * @return jdk path
+ */
+ String getDependencyJdkPath();
+
+ /**
+ * Remote repository url.
+ *
+ * @return repository url
+ */
+ String getDependencyRepositoryUrl();
+
+ /**
+ * Remote repository username.
+ *
+ * @return repository username
+ */
+ String getDependencyUsername();
+
+ /**
+ * Remote repository password.
+ *
+ * @return repository password
+ */
+ String getDependencyPassword();
+
+ /**
+ * Upload template.
+ *
+ * @return Upload template
+ */
+ String getDependencyTemplate();
+
+ /**
+ * Enable schema acl.
+ *
+ * @return true if acl is enabled
+ */
+ boolean isAclEnabled();
+
+ /**
+ * Schema acl map.
+ *
+ * @return schema acl map
+ */
+ Map<QualifiedName, Set<String>> getAcl();
+
+ /**
+ * Schema storage layer type.
+ *
+ * @return schema type
+ */
+ StorageType getStorageType();
+
+ /**
+ * Schema storage config file.
+ *
+ * @return storage config file
+ */
+ String getStorageConfigPath();
+
+ /**
+ * Schema service region id.
+ *
+ * @return service region id
+ */
+ long getServiceRegionId();
+
+ /**
+ * Schema service node id
+ *
+ * @return service node id
+ */
+ long getServiceNodeId();
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/GlobalConfigImpl.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/GlobalConfigImpl.java
new file mode 100644
index 0000000..b82c55f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/GlobalConfigImpl.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.properties;
+
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nonnull;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.model.StorageType;
+
+@Slf4j
+public class GlobalConfigImpl implements GlobalConfig {
+
+ private final SchemaProperties schemaProperties;
+
+ public GlobalConfigImpl(@Nonnull @NonNull final SchemaProperties schemaProperties) {
+ this.schemaProperties = schemaProperties;
+ }
+
+ @Override
+ public boolean isCacheEnabled() {
+ return schemaProperties.getCache().isEnabled();
+ }
+
+ @Override
+ public boolean isUploadEnabled() {
+ return schemaProperties.getDependency().isUploadEnabled();
+ }
+
+ @Override
+ public String getDependencyCompilePath() {
+ return schemaProperties.getDependency().getCompilePath();
+ }
+
+ @Override
+ public String getDependencyLocalRepositoryPath() {
+ return schemaProperties.getDependency().getLocalRepositoryPath();
+ }
+
+ @Override
+ public String getDependencyJdkPath() {
+ return schemaProperties.getDependency().getJdkPath();
+ }
+
+ @Override
+ public String getDependencyRepositoryUrl() {
+ return schemaProperties.getDependency().getRepositoryUrl();
+ }
+
+ @Override
+ public String getDependencyUsername() {
+ return schemaProperties.getDependency().getUsername();
+ }
+
+ @Override
+ public String getDependencyPassword() {
+ return schemaProperties.getDependency().getPassword();
+ }
+
+ public String getDependencyTemplate() {
+ return schemaProperties.getDependency().getTemplate();
+ }
+
+ @Override
+ public boolean isAclEnabled() {
+ return schemaProperties.getAcl().isEnabled();
+ }
+
+ @Override
+ public Map<QualifiedName, Set<String>> getAcl() {
+ return schemaProperties.getAcl().getAclMap();
+ }
+
+ @Override
+ public StorageType getStorageType() {
+ return schemaProperties.getStorage().getType();
+ }
+
+ @Override
+ public String getStorageConfigPath() {
+ return schemaProperties.getStorage().getConfigPath();
+ }
+
+ @Override
+ public long getServiceRegionId() {
+ return schemaProperties.getService().getRegionId();
+ }
+
+ @Override
+ public long getServiceNodeId() {
+ return schemaProperties.getService().getNodeId();
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/SchemaProperties.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/SchemaProperties.java
new file mode 100644
index 0000000..66a3207
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/SchemaProperties.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.properties;
+
+import lombok.Data;
+import lombok.NonNull;
+
+@Data
+public class SchemaProperties {
+
+ @NonNull
+ private CacheProperties cache = new CacheProperties();
+
+ @NonNull
+ private DependencyProperties dependency = new DependencyProperties();
+
+ @NonNull
+ private AclProperties acl = new AclProperties();
+
+ @NonNull
+ private StorageProperties storage = new StorageProperties();
+
+ @NonNull
+ private ServiceProperties service = new ServiceProperties();
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/ServiceProperties.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/ServiceProperties.java
new file mode 100644
index 0000000..bd84177
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/ServiceProperties.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.properties;
+
+import lombok.Data;
+import org.apache.rocketmq.schema.registry.common.model.StorageType;
+
+@Data
+public class ServiceProperties {
+ private long regionId = 0;
+ private long nodeId = 0;
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/StorageProperties.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/StorageProperties.java
new file mode 100644
index 0000000..052cc17
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/properties/StorageProperties.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.properties;
+
+import lombok.Data;
+import org.apache.rocketmq.schema.registry.common.model.StorageType;
+
+@Data
+public class StorageProperties {
+ private StorageType type;
+ private String configPath;
+
+ @Data
+ public static class Config {
+ private String path;
+ }
+
+ public StorageType getType() {
+ return type;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/DefaultStorageServiceImpl.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/DefaultStorageServiceImpl.java
new file mode 100644
index 0000000..2410129
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/DefaultStorageServiceImpl.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.storage;
+
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+
+public class DefaultStorageServiceImpl implements StorageService<SchemaInfo> {
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/SpringStorageFactory.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/SpringStorageFactory.java
new file mode 100644
index 0000000..2687a0b
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/SpringStorageFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.storage;
+
+import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import org.apache.rocketmq.schema.registry.common.model.StorageType;
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
+import org.springframework.core.env.StandardEnvironment;
+
+public class SpringStorageFactory implements StorageFactory {
+
+ protected final AnnotationConfigApplicationContext ctx;
+ private final StoragePluginContext storagePluginContext;
+ private final StorageType storageType;
+
+ public SpringStorageFactory(final StoragePluginContext storagePluginContext) {
+ this.storagePluginContext = storagePluginContext;
+ this.storageType = storagePluginContext.getConfig().getStorageType();
+ this.ctx = new AnnotationConfigApplicationContext();
+ this.ctx.setEnvironment(new StandardEnvironment());
+ this.ctx.getBeanFactory().registerSingleton("StoragePluginContext", storagePluginContext);
+ }
+
+ /**
+ * register classes to context.
+ *
+ * @param clazz classes object.
+ */
+ protected void registerClazz(final Class<?>... clazz) {
+ this.ctx.register(clazz);
+ }
+
+ /**
+ * refresh the context.
+ */
+ public void refresh() {
+ this.ctx.refresh();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public StorageType getStorageType() {
+ return this.storageType;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void stop() {
+ this.ctx.close();
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageFactory.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageFactory.java
new file mode 100644
index 0000000..685cd8e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.storage;
+
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.StorageType;
+
+public interface StorageFactory {
+
+ /**
+ * Standard error message for all default implementations.
+ */
+ String ERROR_MESSAGE_DEFAULT = "Not supported by this storage layer";
+
+ /**
+ * Returns the storage service implementation of the config type.
+ *
+ * @return Returns the storage service implementation of the config type.
+ */
+ default StorageService<SchemaInfo> getStorageService() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
+
+ /**
+ * Returns the type of the storage.
+ *
+ * @return Returns the type of the storage.
+ */
+ StorageType getStorageType();
+
+ /**
+ * Stop and clear the factory.
+ */
+ void stop();
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageManager.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageManager.java
new file mode 100644
index 0000000..3b15524
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageManager.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.storage;
+
+import java.util.ServiceLoader;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.PreDestroy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.schema.registry.common.model.PluginLoadState;
+import org.apache.rocketmq.schema.registry.common.properties.GlobalConfig;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import org.springframework.context.ApplicationContext;
+
+@Slf4j
+public class StorageManager {
+
+ private final GlobalConfig config;
+
+ private StoragePlugin plugin;
+ private StorageFactory factory;
+
+ private final AtomicReference<PluginLoadState> state = new AtomicReference<>(PluginLoadState.INIT);
+ private final AtomicBoolean connected = new AtomicBoolean();
+ private final AtomicBoolean stopped = new AtomicBoolean();
+
+ public StorageManager(final GlobalConfig config) {
+ this.config = config;
+ }
+
+ /**
+ * Returns the storage service for the configuration
+ *
+ * @return Returns the storage service
+ */
+ public StorageService<SchemaInfo> getStorageService() {
+ return this.factory.getStorageService();
+ }
+
+ /**
+ * Loads the storage plugin
+ *
+ * @throws Exception
+ */
+ public void loadPlugin() {
+ if (!state.compareAndSet(PluginLoadState.INIT, PluginLoadState.LOADING)) {
+ return;
+ }
+
+ final ServiceLoader<StoragePlugin> serviceLoader =
+ ServiceLoader.load(StoragePlugin.class, this.getClass().getClassLoader());
+ for (StoragePlugin storagePlugin : serviceLoader) {
+ if (config.getStorageType().equals(storagePlugin.getType())) {
+ log.info("Loading plugin {}", storagePlugin.getClass().getName());
+ this.plugin = storagePlugin;
+ log.info("Finished loading plugin {}", storagePlugin.getClass().getName());
+ }
+ }
+
+ state.set(PluginLoadState.LOADED);
+ }
+
+ /**
+ * Load storage and creates a connection.
+ *
+ * @param context schema storage service context
+ */
+ public void start(ApplicationContext context) {
+ if (!state.compareAndSet(PluginLoadState.LOADED, PluginLoadState.STARTING)) {
+ return;
+ }
+
+ // TODO 1. string constants
+ // TODO 2. validate
+ // TODO 3. encrypt / decrypt
+ StoragePluginContext pluginContext = new StoragePluginContext(config);
+ if (plugin != null) {
+ factory = plugin.load(pluginContext);
+ log.info("factory is loading" + factory);
+ } else {
+ log.warn("No plugin for storage with type {}", pluginContext.getConfig().getStorageType());
+ }
+
+ state.set(PluginLoadState.STARTED);
+ }
+
+ /**
+ * Stop the storage.
+ */
+ @PreDestroy
+ public void stop() {
+ if (stopped.getAndSet(true)) {
+ return;
+ }
+
+ try {
+ factory.stop();
+ } catch (Exception e) {
+ log.error("Error shutting down storage: {}", factory.getStorageType(), e);
+ }
+ }
+
+ public boolean isPluginLoaded() {
+ return state.get().equals(PluginLoadState.LOADED);
+ }
+
+ public boolean isConnected() {
+ return connected.get();
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StoragePlugin.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StoragePlugin.java
new file mode 100644
index 0000000..507cbbf
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StoragePlugin.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.storage;
+
+import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import org.apache.rocketmq.schema.registry.common.model.StorageType;
+
+public interface StoragePlugin {
+
+ /**
+ * Returns the type of the storage plugin.
+ *
+ * @return type of the plugin.
+ */
+ StorageType getType();
+
+ /**
+ * Returns the storage service implementation for the type.
+ *
+ * @param storageContext registry for spectator
+ * @return connector factory
+ */
+ StorageFactory load(StoragePluginContext storageContext);
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StoragePluginManager.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StoragePluginManager.java
new file mode 100644
index 0000000..f3f3646
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StoragePluginManager.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.storage;
+
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.schema.registry.common.model.PluginLoadState;
+import org.apache.rocketmq.schema.registry.common.properties.GlobalConfig;
+
+@Slf4j
+public class StoragePluginManager {
+
+ private final GlobalConfig config;
+ private final StorageManager storageManager;
+
+ private static final AtomicReference<PluginLoadState> state = new AtomicReference<>();
+
+ /**
+ * Constructor.
+ *
+ * @param storageManager storage manager
+ */
+ public StoragePluginManager(GlobalConfig config, StorageManager storageManager) {
+ this.config = config;
+ this.storageManager = storageManager;
+ }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
new file mode 100644
index 0000000..cdcc36b
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.storage;
+
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
+import org.apache.rocketmq.schema.registry.common.model.BaseInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+
+public interface StorageService<T extends BaseInfo> {
+
+ /**
+ * Error message for all default implementations.
+ */
+ String ERROR_MESSAGE_DEFAULT = "Not supported method for this storage type";
+
+ /**
+ * Register a brand new schema.
+ *
+ * @param context The storage service needed context
+ * @param schemaInfo The schema information
+ * @throws UnsupportedOperationException If the storage type doesn't implement this method
+ */
+ default SchemaInfo register(final StorageServiceContext context, final T schemaInfo) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
+
+ /**
+ * Delete schema with all version
+ *
+ * @param context The storage service needed context
+ * @param name Qualified name with tenant / name of schema
+ * @throws UnsupportedOperationException If the storage type doesn't implement this method
+ */
+ default void delete(final StorageServiceContext context, final QualifiedName name) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
+
+ /**
+ * Update schema and generate a new version with the given information.
+ *
+ * @param context The request context
+ * @param schemaInfo schema information
+ * @throws UnsupportedOperationException If the connector doesn't implement this method
+ */
+ default T update(final StorageServiceContext context, final T schemaInfo) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
+
+ /**
+ * Query a resource with the given tenant and name.
+ *
+ * @param context The storage service needed context
+ * @param name Qualified name with tenant / name of schema
+ * @return The resource metadata.
+ * @throws UnsupportedOperationException If the storage type doesn't implement this method
+ */
+ default T get(final StorageServiceContext context, final QualifiedName name) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
+
+ default SchemaRecordInfo getBySubject(final StorageServiceContext context, final QualifiedName name) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
new file mode 100644
index 0000000..4ff223c
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.storage;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.context.RequestContext;
+import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.context.RequestContextManager;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+import org.apache.rocketmq.schema.registry.common.utils.StorageUtil;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
+@Slf4j
+@CacheConfig(cacheNames = "storage")
+public class StorageServiceProxy {
+
+ private final StorageManager storageManager;
+ private final StorageUtil storageUtil;
+
+ /**
+ * Constructor for storage service proxy.
+ *
+ * @param storageManager storage manager
+ * @param storageUtil convert from Dto to storage instance or vice versa
+ */
+ public StorageServiceProxy(final StorageManager storageManager, final StorageUtil storageUtil) {
+ this.storageManager = storageManager;
+ this.storageUtil = storageUtil;
+ }
+
+ /**
+ * Proxy calls the StorageService's register method.
+ * @param qualifiedName Qualified name with tenant / name of schema
+ * @param schemaInfo schema object
+ */
+ public SchemaInfo register(
+ final QualifiedName qualifiedName,
+ final SchemaInfo schemaInfo
+ ) {
+ final RequestContext requestContext = RequestContextManager.getContext();
+ final StorageServiceContext storageContext = storageUtil.convertToStorageServiceContext(requestContext);
+ final StorageService<SchemaInfo> storageService = storageManager.getStorageService();
+
+ return storageService.register(storageContext, schemaInfo);
+ }
+
+ /**
+ * Proxy calls the StorageService's register method.
+ *
+ * @param name Qualified name with tenant / name of schema
+ */
+ @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.schema()")
+ public void delete(final QualifiedName name) {
+ final RequestContext requestContext = RequestContextManager.getContext();
+ final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
+ final StorageService<SchemaInfo> storageService = storageManager.getStorageService();
+
+ storageService.delete(storageServiceContext, name);
+ }
+
+ /**
+ * Proxy calls the StorageService's update method.
+ *
+ * @param name schema qualified name
+ * @param schemaInfo schema information instance
+ * @return true if errors after this should be ignored.
+ */
+ @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.schema()")
+ public SchemaInfo update(final QualifiedName name, final SchemaInfo schemaInfo) {
+ final RequestContext requestContext = RequestContextManager.getContext();
+ final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
+ final StorageService<SchemaInfo> service = storageManager.getStorageService();
+
+ return service.update(storageServiceContext, schemaInfo);
+ }
+
+ /**
+ * Proxy calls the StorageService's get method. Returns schema from store
+ * if <code>useCache</code> is false.
+ *
+ * @param name Qualified name with tenant / name of schema
+ * @param useCache if schema can be retrieved from cache
+ * @return schema information instance
+ */
+ @Cacheable(key = "'schema.' + #name.getTenant() + '/' + #name.schema()", condition = "#useCache")
+ public SchemaInfo get(final QualifiedName name, final boolean useCache) {
+ final RequestContext requestContext = RequestContextManager.getContext();
+ final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
+ final StorageService<SchemaInfo> storageService = storageManager.getStorageService();
+
+ return storageService.get(storageServiceContext, name);
+ }
+
+ @Cacheable(key = "'subject.' + #subject", condition = "#useCache")
+ public SchemaRecordInfo getBySubject(final QualifiedName name, final boolean useCache) {
+ final RequestContext requestContext = RequestContextManager.getContext();
+ final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
+ final StorageService<SchemaInfo> storageService = storageManager.getStorageService();
+
+ return storageService.getBySubject(storageServiceContext, name);
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
new file mode 100644
index 0000000..ddc2731
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.utils;
+
+import com.google.common.base.Strings;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+import java.util.stream.Collectors;
+import javax.tools.JavaCompiler;
+import javax.tools.JavaFileObject;
+import javax.tools.StandardJavaFileManager;
+import javax.tools.ToolProvider;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaValidationException;
+import org.apache.avro.SchemaValidator;
+import org.apache.avro.SchemaValidatorBuilder;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaCompatibilityException;
+import org.apache.rocketmq.schema.registry.common.model.Compatibility;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+
+@Slf4j
+public class CommonUtil {
+
+ public static void validateName(QualifiedName qualifiedName) {
+// Preconditions.checkNotNull(qualifiedName.getTenant(), "Tenant is null");
+// Preconditions.checkNotNull(qualifiedName.getSubject(), "Subject is null");
+// Preconditions.checkNotNull(qualifiedName.getName(), "Schema name is null");
+ }
+
+ public static boolean isQualifiedNameEmpty(QualifiedName qualifiedName) {
+ return Strings.isNullOrEmpty(qualifiedName.getTenant()) || Strings.isNullOrEmpty(qualifiedName.getSchema());
+ }
+
+ public static List<File> listFiles(File path) {
+ if (path != null && path.isDirectory()) {
+ File[] files = path.listFiles();
+ if (files != null) {
+ return Arrays.stream(files).collect(Collectors.toList());
+ }
+ }
+ return new ArrayList<>();
+ }
+
+ public static Properties loadProperties(File file) {
+ Properties properties = new Properties();
+ if (file.isFile()) {
+ try (InputStreamReader in = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8)) {
+ properties.load(in);
+ } catch (IOException e) {
+ throw new SchemaException(String.format("Load properties failed: %s", file), e);
+ }
+ }
+ return properties;
+ }
+
+ public static File mkdir(String pathStr) {
+ return mkdir(new File(pathStr));
+ }
+
+ public static File mkdir(File fileDir) {
+ if (fileDir.exists() && !fileDir.isDirectory()) {
+ throw new SchemaException("Not a directory: " + fileDir.getAbsolutePath());
+ }
+
+ if (!fileDir.exists() && !fileDir.mkdirs()) {
+ throw new SchemaException("Couldn't create directory: " + fileDir.getAbsolutePath());
+ }
+
+ return fileDir;
+ }
+
+ /**
+ * can't modify
+ *
+ * @param value
+ * @return
+ * @throws SchemaException
+ */
+ private String md5hash(String value) throws SchemaException {
+ try {
+ MessageDigest messageDigest = MessageDigest.getInstance("MD5");
+ byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
+ messageDigest.update(bytes, 0, bytes.length);
+ return new BigInteger(1, messageDigest.digest()).toString(16);
+ } catch (Exception e) {
+ throw new SchemaException("md5hash failed: " + value);
+ }
+ }
+
+ public static Map<String, ByteBuffer> compileJavaFile(List<File> javaFileList) throws IOException {
+ Map<String, ByteBuffer> results = new HashMap<String, ByteBuffer>();
+ JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+ StandardJavaFileManager stdManager = compiler.getStandardFileManager(null, null, null);
+ Iterable<? extends JavaFileObject> compilationUnits = stdManager.getJavaFileObjectsFromFiles(javaFileList);
+
+ MemoryJavaFileManager manager = new MemoryJavaFileManager(stdManager);
+ JavaCompiler.CompilationTask task = compiler.getTask(null, manager, null, null, null, compilationUnits);
+ if (task.call()) {
+ results.putAll(manager.getClassBytes());
+ }
+ return results;
+ }
+
+ public static void generateJarFile(String jarFilePath, Map<String, ByteBuffer> classResult) {
+ try (JarOutputStream jarOutStream = new JarOutputStream(Files.newOutputStream(Paths.get(jarFilePath)))) {
+ for (String name : classResult.keySet()) {
+ JarEntry entry = new JarEntry(name.replace(".", "/") + ".class");
+ jarOutStream.putNextEntry(entry);
+ jarOutStream.write(classResult.get(name).array());
+ jarOutStream.closeEntry();
+ }
+ } catch (IOException e) {
+ throw new SchemaException("Generate jar file: " + jarFilePath + " failed", e);
+ }
+ }
+
+ public static void generatePomFile(String dependencyTemplate,
+ String group, String artifact, String version, String pomFilePath) {
+ try (FileWriter fileWriter = new FileWriter(pomFilePath)) {
+ dependencyTemplate = dependencyTemplate.replace("$groupId", group)
+ .replace("$artifactId", artifact)
+ .replace("$version", version);
+ fileWriter.write(dependencyTemplate);
+ fileWriter.flush();
+ } catch (IOException e) {
+ throw new SchemaException("Generate pom file: " + pomFilePath + " failed", e);
+ }
+ }
+
+ public static void execCommand(String... command) throws IOException, InterruptedException {
+ ProcessBuilder pb = new ProcessBuilder(command);
+ Process process = pb.start();
+
+ ProcessStream infoThread = new ProcessStream(process.getInputStream());
+ ProcessStream errThread = new ProcessStream(process.getErrorStream());
+ infoThread.start();
+ errThread.start();
+
+ process.waitFor();
+ process.destroy();
+
+ String info = infoThread.getOutput();
+ String err = errThread.getOutput();
+ log.info("exec command " + Arrays.toString(command) + " info: " + info);
+ log.info("exec command " + Arrays.toString(command) + " error: " + err);
+ if (err.contains("Exception")) {
+ throw new SchemaException("exec command failed: " + err);
+ }
+ }
+
+ private static class ProcessStream extends Thread {
+ private final InputStream inputStream;
+ private String output;
+
+ public ProcessStream(InputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ public String getOutput() {
+ return this.output;
+ }
+
+ @Override
+ public void run() {
+ try (
+ InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
+ BufferedReader br = new BufferedReader(inputStreamReader);
+ ) {
+ String line;
+ StringBuilder sb = new StringBuilder();
+ while ((line = br.readLine()) != null) {
+ sb.append(line);
+ }
+ output = sb.toString();
+ } catch (Throwable e) {
+ log.error("", e);
+ }
+ }
+ }
+
+ public static void validateIdl(SchemaDto schemaDto) throws SchemaCompatibilityException {
+ switch (schemaDto.getMeta().getType()) {
+ case AVRO:
+ break;
+ case JSON:
+ throw new SchemaCompatibilityException("Unsupported schema type: " + schemaDto.getMeta().getType());
+ }
+ }
+
+ public static void validateCompatibility(SchemaInfo update, SchemaInfo current,
+ Compatibility expectCompatibility) {
+ switch (update.getMeta().getType()) {
+ case AVRO:
+ SchemaValidator validator = new SchemaValidatorBuilder().canReadStrategy().validateLatest();
+ try {
+ Schema toValidate = new Schema.Parser().parse(update.getLastRecordIdl());
+ List<Schema> existing = new ArrayList<>();
+ existing.add(new Schema.Parser().parse(current.getLastRecordIdl()));
+ validator.validate(toValidate, existing);
+ } catch (SchemaValidationException e) {
+ throw new SchemaCompatibilityException("Schema validation failed", e);
+ }
+ break;
+ default:
+ throw new SchemaCompatibilityException("Unsupported schema type: " + update.getMeta().getType());
+ }
+
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/IdGenerator.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/IdGenerator.java
new file mode 100644
index 0000000..8a4c5f6
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/IdGenerator.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.utils;
+
+public interface IdGenerator {
+
+ /**
+ *
+ * @return unique id
+ */
+ public long nextId();
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/MemoryJavaFileManager.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/MemoryJavaFileManager.java
new file mode 100644
index 0000000..ae6c2df
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/MemoryJavaFileManager.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import javax.tools.FileObject;
+import javax.tools.ForwardingJavaFileManager;
+import javax.tools.JavaFileManager;
+import javax.tools.JavaFileObject;
+import javax.tools.SimpleJavaFileObject;
+
+public class MemoryJavaFileManager extends ForwardingJavaFileManager {
+
+ /** Java source file extension. */
+ private final static String EXT = ".java";
+
+ private Map<String, ByteBuffer> classBytes;
+
+ public MemoryJavaFileManager(JavaFileManager fileManager) {
+ super(fileManager);
+ classBytes = new HashMap<String, ByteBuffer>();
+ }
+
+ public Map<String, ByteBuffer> getClassBytes() {
+ return classBytes;
+ }
+
+ public void close() throws IOException {
+ classBytes = null;
+ }
+
+ public void flush() throws IOException {
+ }
+
+ /**
+ * A file object used to represent Java source coming from a string.
+ */
+ private static class StringInputBuffer extends SimpleJavaFileObject {
+ final String code;
+
+ StringInputBuffer(String fileName, String code) {
+ super(toURI(fileName), Kind.SOURCE);
+ this.code = code;
+ }
+
+ public CharBuffer getCharContent(boolean ignoreEncodingErrors) {
+ return CharBuffer.wrap(code);
+ }
+ }
+
+ /**
+ * A file object that stores Java bytecode into the classBytes map.
+ */
+ private class ClassOutputBuffer extends SimpleJavaFileObject {
+ private String name;
+
+ ClassOutputBuffer(String name) {
+ super(toURI(name), Kind.CLASS);
+ this.name = name;
+ }
+
+ public OutputStream openOutputStream() {
+ return new FilterOutputStream(new ByteArrayOutputStream()) {
+ public void close() throws IOException {
+ out.close();
+ ByteArrayOutputStream bos = (ByteArrayOutputStream)out;
+ classBytes.put(name, ByteBuffer.wrap(bos.toByteArray()));
+ }
+ };
+ }
+ }
+
+ public JavaFileObject getJavaFileForOutput(Location location, String className,
+ JavaFileObject.Kind kind, FileObject sibling) throws IOException {
+ if (kind == JavaFileObject.Kind.CLASS) {
+ return new ClassOutputBuffer(className);
+ } else {
+ return super.getJavaFileForOutput(location, className, kind, sibling);
+ }
+ }
+
+ public static JavaFileObject makeStringSource(String fileName, String code) {
+ return new StringInputBuffer(fileName, code);
+ }
+
+ static URI toURI(String name) {
+ File file = new File(name);
+ if (file.exists()) {
+ return file.toURI();
+ } else {
+ try {
+ final StringBuilder newUri = new StringBuilder();
+ newUri.append("mfm:///");
+ newUri.append(name.replace('.', '/'));
+ if(name.endsWith(EXT)) newUri.replace(newUri.length() - EXT.length(), newUri.length(), EXT);
+ return URI.create(newUri.toString());
+ } catch (Exception exp) {
+ return URI.create("mfm:///com/sun/script/java/java_source");
+ }
+ }
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/SnowFlakeIdGenerator.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/SnowFlakeIdGenerator.java
new file mode 100644
index 0000000..b8b51b2
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/SnowFlakeIdGenerator.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.utils;
+
+import com.google.common.base.Preconditions;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.common.properties.GlobalConfig;
+
+public class SnowFlakeIdGenerator implements IdGenerator {
+
+ // machine node id [0-63]
+ private final long nodeId;
+ // region id [0-15]
+ private final long regionId;
+ // sequence id [0-4095]
+ private long sequenceId = 0L;
+ private long lastTimestamp = -1L;
+
+ // startTime, UTC 2020-01-01 00:00:00
+ private final long startTime = 1577808000000L;
+
+ // the using bits number by region id of master-role cluster and machine-node id
+ private final long regionIdBits = 4L;
+ private final long nodeIdBits = 6L;
+ // max machine-node id: 63
+ private final long maxNodeId = ~(-1 << nodeIdBits);
+ // max region id: 15
+ private final long maxRegionId = ~(-1 << regionIdBits);
+
+ // the bits number of sequenceId used
+ private final long sequenceIdBits = 12L;
+ // the bits number of nodeId left moving, 12bits
+ private final long nodeIdMoveBits = sequenceIdBits;
+ // the bits number of regionId left moving, 18bits
+ private final long regionIdMoveBits = nodeIdMoveBits + nodeIdBits;
+ // the bits number of timestamp left moving, 22bits
+ private final long timestampMoveBits = regionIdMoveBits + regionIdBits;
+
+ // the mask of sequenceId, 4095
+ private final long sequenceIdMask = ~(-1L << sequenceIdBits);
+
+ public SnowFlakeIdGenerator(GlobalConfig config) {
+ this.regionId = config.getServiceRegionId();
+ this.nodeId = config.getServiceNodeId();
+
+ Preconditions.checkArgument(nodeId <= maxNodeId && nodeId >= 0,
+ "The NodeId can not be greater than %d or less than 0", maxNodeId);
+ Preconditions.checkArgument(regionId <= maxRegionId && regionId >= 0,
+ "The RegionId can not be greater than %d or less than 0", maxRegionId);
+ }
+
+ @Override
+ public synchronized long nextId() {
+ long timestamp = System.currentTimeMillis();
+ if (timestamp < lastTimestamp) {
+ throw new SchemaException("SchemaId generating error, clock moved backwards, please try later");
+ }
+
+ // if timestamp equals to lastTimestamp, diff the sequenceId
+ if (lastTimestamp == timestamp) {
+ // prevent sequenceId greater than 4095 (number of 'sequenceIdBits')
+ sequenceId = (sequenceId + 1) & sequenceIdMask;
+ // if sequenceId equals to 0, it means that
+ // the 'sequenceIdBits' has been exhausted at the current timestamp, then
+ // it would be blocked for a new timestamp
+ if (sequenceId == 0) {
+ timestamp = waitNextMillis(lastTimestamp);
+ }
+ } else {
+ sequenceId = 0L;
+ }
+
+ lastTimestamp = timestamp;
+
+ // computing the 64 bits unique id
+ return ((timestamp - startTime) << timestampMoveBits)
+ | (regionId << regionIdMoveBits)
+ | (nodeId << nodeIdMoveBits)
+ | sequenceId;
+ }
+
+ private long waitNextMillis(long lastTimestamp) {
+ long timestamp = System.currentTimeMillis();
+ while (timestamp <= lastTimestamp) {
+ timestamp = System.currentTimeMillis();
+ }
+ return timestamp;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/StorageUtil.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/StorageUtil.java
new file mode 100644
index 0000000..268458b
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/StorageUtil.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.utils;
+
+import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.dto.SubjectDto;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.context.RequestContext;
+import org.apache.rocketmq.schema.registry.common.dto.AuditDto;
+import org.apache.rocketmq.schema.registry.common.dto.FieldDto;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaDetailDto;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaMetaDto;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaStorageDto;
+import org.apache.rocketmq.schema.registry.common.model.AuditInfo;
+import org.apache.rocketmq.schema.registry.common.model.FieldInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaDetailInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaMetaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaStorageInfo;
+import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
+import org.dozer.DozerBeanMapper;
+import org.dozer.Mapper;
+import org.dozer.loader.api.BeanMappingBuilder;
+
+public class StorageUtil {
+ private final Mapper mapper;
+
+ public StorageUtil() {
+ final DozerBeanMapper dozerBeanMapper = new DozerBeanMapper();
+ final BeanMappingBuilder builder = new BeanMappingBuilder() {
+ @Override
+ protected void configure() {
+ mapping(SchemaDto.class, SchemaInfo.class);
+ mapping(SchemaMetaDto.class, SchemaMetaInfo.class);
+ mapping(SchemaDetailDto.class, SchemaDetailInfo.class);
+ mapping(SchemaStorageDto.class, SchemaStorageInfo.class);
+ mapping(SchemaRecordDto.class, SchemaRecordInfo.class);
+ mapping(SubjectDto.class, SubjectInfo.class);
+ mapping(AuditDto.class, AuditInfo.class);
+ mapping(FieldDto.class, FieldInfo.class);
+ }
+ };
+ dozerBeanMapper.addMapping(builder);
+ this.mapper = dozerBeanMapper;
+ }
+
+ /**
+ * Converts from SchemaDto to SchemaInfo.
+ *
+ * @param schemaDto schema dto
+ * @return schema info
+ */
+ public SchemaInfo convertFromSchemaDto(final SchemaDto schemaDto) {
+ return mapper.map(schemaDto, SchemaInfo.class);
+ }
+
+ /**
+ * Converts from schemaInfo to SchemaDto.
+ *
+ * @param schemaInfo schema info
+ * @return schema dto
+ */
+ public SchemaDto convertToSchemaDto(final SchemaInfo schemaInfo) {
+ return mapper.map(schemaInfo, SchemaDto.class);
+ }
+
+ /**
+ * Converts from schemaInfo to SchemaDto.
+ *
+ * @param recordInfo schema record info
+ * @return schema dto
+ */
+ public SchemaRecordDto convertToSchemaRecordDto(final SchemaRecordInfo recordInfo) {
+ return mapper.map(recordInfo, SchemaRecordDto.class);
+ }
+
+ /**
+ * Converts to the storage service context.
+ *
+ * @param requestContext request context
+ * @return storage service context
+ */
+ public StorageServiceContext convertToStorageServiceContext(final RequestContext requestContext) {
+ return mapper.map(requestContext, StorageServiceContext.class);
+ }
+
+}
diff --git a/core/.gitignore b/core/.gitignore
new file mode 100644
index 0000000..549e00a
--- /dev/null
+++ b/core/.gitignore
@@ -0,0 +1,33 @@
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
diff --git a/core/.mvn/wrapper/maven-wrapper.jar b/core/.mvn/wrapper/maven-wrapper.jar
new file mode 100644
index 0000000..c1dd12f
Binary files /dev/null and b/core/.mvn/wrapper/maven-wrapper.jar differ
diff --git a/core/.mvn/wrapper/maven-wrapper.properties b/core/.mvn/wrapper/maven-wrapper.properties
new file mode 100644
index 0000000..b7cb93e
--- /dev/null
+++ b/core/.mvn/wrapper/maven-wrapper.properties
@@ -0,0 +1,2 @@
+distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.4/apache-maven-3.8.4-bin.zip
+wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar
diff --git a/core/mvnw b/core/mvnw
new file mode 100644
index 0000000..8a8fb22
--- /dev/null
+++ b/core/mvnw
@@ -0,0 +1,316 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+
+# ----------------------------------------------------------------------------
+# Maven Start Up Batch script
+#
+# Required ENV vars:
+# ------------------
+# JAVA_HOME - location of a JDK home dir
+#
+# Optional ENV vars
+# -----------------
+# M2_HOME - location of maven2's installed home dir
+# MAVEN_OPTS - parameters passed to the Java VM when running Maven
+# e.g. to debug Maven itself, use
+# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+# ----------------------------------------------------------------------------
+
+if [ -z "$MAVEN_SKIP_RC" ] ; then
+
+ if [ -f /usr/local/etc/mavenrc ] ; then
+ . /usr/local/etc/mavenrc
+ fi
+
+ if [ -f /etc/mavenrc ] ; then
+ . /etc/mavenrc
+ fi
+
+ if [ -f "$HOME/.mavenrc" ] ; then
+ . "$HOME/.mavenrc"
+ fi
+
+fi
+
+# OS specific support. $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "`uname`" in
+ CYGWIN*) cygwin=true ;;
+ MINGW*) mingw=true;;
+ Darwin*) darwin=true
+ # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
+ # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+ if [ -z "$JAVA_HOME" ]; then
+ if [ -x "/usr/libexec/java_home" ]; then
+ export JAVA_HOME="`/usr/libexec/java_home`"
+ else
+ export JAVA_HOME="/Library/Java/Home"
+ fi
+ fi
+ ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+ if [ -r /etc/gentoo-release ] ; then
+ JAVA_HOME=`java-config --jre-home`
+ fi
+fi
+
+if [ -z "$M2_HOME" ] ; then
+ ## resolve links - $0 may be a link to maven's home
+ PRG="$0"
+
+ # need this for relative symlinks
+ while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG="`dirname "$PRG"`/$link"
+ fi
+ done
+
+ saveddir=`pwd`
+
+ M2_HOME=`dirname "$PRG"`/..
+
+ # make it fully qualified
+ M2_HOME=`cd "$M2_HOME" && pwd`
+
+ cd "$saveddir"
+ # echo Using m2 at $M2_HOME
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME=`cygpath --unix "$M2_HOME"`
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# For Mingw, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME="`(cd "$M2_HOME"; pwd)`"
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+ javaExecutable="`which javac`"
+ if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
+ # readlink(1) is not available as standard on Solaris 10.
+ readLink=`which readlink`
+ if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
+ if $darwin ; then
+ javaHome="`dirname \"$javaExecutable\"`"
+ javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
+ else
+ javaExecutable="`readlink -f \"$javaExecutable\"`"
+ fi
+ javaHome="`dirname \"$javaExecutable\"`"
+ javaHome=`expr "$javaHome" : '\(.*\)/bin'`
+ JAVA_HOME="$javaHome"
+ export JAVA_HOME
+ fi
+ fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+ if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ else
+ JAVACMD="`\\unset -f command; \\command -v java`"
+ fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+ echo "Error: JAVA_HOME is not defined correctly." >&2
+ echo " We cannot execute $JAVACMD" >&2
+ exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+ echo "Warning: JAVA_HOME environment variable is not set."
+fi
+
+CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
+
+# traverses directory structure from process work directory to filesystem root
+# first directory with .mvn subdirectory is considered project base directory
+find_maven_basedir() {
+
+ if [ -z "$1" ]
+ then
+ echo "Path not specified to find_maven_basedir"
+ return 1
+ fi
+
+ basedir="$1"
+ wdir="$1"
+ while [ "$wdir" != '/' ] ; do
+ if [ -d "$wdir"/.mvn ] ; then
+ basedir=$wdir
+ break
+ fi
+ # workaround for JBEAP-8937 (on Solaris 10/Sparc)
+ if [ -d "${wdir}" ]; then
+ wdir=`cd "$wdir/.."; pwd`
+ fi
+ # end of workaround
+ done
+ echo "${basedir}"
+}
+
+# concatenates all lines of a file
+concat_lines() {
+ if [ -f "$1" ]; then
+ echo "$(tr -s '\n' ' ' < "$1")"
+ fi
+}
+
+BASE_DIR=`find_maven_basedir "$(pwd)"`
+if [ -z "$BASE_DIR" ]; then
+ exit 1;
+fi
+
+##########################################################################################
+# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+# This allows using the maven wrapper in projects that prohibit checking in binary data.
+##########################################################################################
+if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found .mvn/wrapper/maven-wrapper.jar"
+ fi
+else
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..."
+ fi
+ if [ -n "$MVNW_REPOURL" ]; then
+ jarUrl="$MVNW_REPOURL/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar"
+ else
+ jarUrl="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar"
+ fi
+ while IFS="=" read key value; do
+ case "$key" in (wrapperUrl) jarUrl="$value"; break ;;
+ esac
+ done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties"
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Downloading from: $jarUrl"
+ fi
+ wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar"
+ if $cygwin; then
+ wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"`
+ fi
+
+ if command -v wget > /dev/null; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found wget ... using wget"
+ fi
+ if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+ wget "$jarUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath"
+ else
+ wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath"
+ fi
+ elif command -v curl > /dev/null; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found curl ... using curl"
+ fi
+ if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+ curl -o "$wrapperJarPath" "$jarUrl" -f
+ else
+ curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f
+ fi
+
+ else
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Falling back to using Java to download"
+ fi
+ javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java"
+ # For Cygwin, switch paths to Windows format before running javac
+ if $cygwin; then
+ javaClass=`cygpath --path --windows "$javaClass"`
+ fi
+ if [ -e "$javaClass" ]; then
+ if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo " - Compiling MavenWrapperDownloader.java ..."
+ fi
+ # Compiling the Java class
+ ("$JAVA_HOME/bin/javac" "$javaClass")
+ fi
+ if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+ # Running the downloader
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo " - Running MavenWrapperDownloader.java ..."
+ fi
+ ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR")
+ fi
+ fi
+ fi
+fi
+##########################################################################################
+# End of extension
+##########################################################################################
+
+export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
+if [ "$MVNW_VERBOSE" = true ]; then
+ echo $MAVEN_PROJECTBASEDIR
+fi
+MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME=`cygpath --path --windows "$M2_HOME"`
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+ [ -n "$MAVEN_PROJECTBASEDIR" ] &&
+ MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
+fi
+
+# Provide a "standardized" way to retrieve the CLI args that will
+# work with both Windows and non-Windows executions.
+MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
+export MAVEN_CMD_LINE_ARGS
+
+WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+exec "$JAVACMD" \
+ $MAVEN_OPTS \
+ $MAVEN_DEBUG_OPTS \
+ -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
+ "-Dmaven.home=${M2_HOME}" \
+ "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
+ ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
diff --git a/core/mvnw.cmd b/core/mvnw.cmd
new file mode 100644
index 0000000..1d8ab01
--- /dev/null
+++ b/core/mvnw.cmd
@@ -0,0 +1,188 @@
+@REM ----------------------------------------------------------------------------
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements. See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership. The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License. You may obtain a copy of the License at
+@REM
+@REM https://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied. See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM ----------------------------------------------------------------------------
+
+@REM ----------------------------------------------------------------------------
+@REM Maven Start Up Batch script
+@REM
+@REM Required ENV vars:
+@REM JAVA_HOME - location of a JDK home dir
+@REM
+@REM Optional ENV vars
+@REM M2_HOME - location of maven2's installed home dir
+@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
+@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
+@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
+@REM e.g. to debug Maven itself, use
+@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+@REM ----------------------------------------------------------------------------
+
+@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
+@echo off
+@REM set title of command window
+title %0
+@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
+@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
+
+@REM set %HOME% to equivalent of $HOME
+if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
+
+@REM Execute a user defined script before this one
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
+@REM check for pre script, once with legacy .bat ending and once with .cmd ending
+if exist "%USERPROFILE%\mavenrc_pre.bat" call "%USERPROFILE%\mavenrc_pre.bat" %*
+if exist "%USERPROFILE%\mavenrc_pre.cmd" call "%USERPROFILE%\mavenrc_pre.cmd" %*
+:skipRcPre
+
+@setlocal
+
+set ERROR_CODE=0
+
+@REM To isolate internal variables from possible post scripts, we use another setlocal
+@setlocal
+
+@REM ==== START VALIDATION ====
+if not "%JAVA_HOME%" == "" goto OkJHome
+
+echo.
+echo Error: JAVA_HOME not found in your environment. >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+:OkJHome
+if exist "%JAVA_HOME%\bin\java.exe" goto init
+
+echo.
+echo Error: JAVA_HOME is set to an invalid directory. >&2
+echo JAVA_HOME = "%JAVA_HOME%" >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+@REM ==== END VALIDATION ====
+
+:init
+
+@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
+@REM Fallback to current working directory if not found.
+
+set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
+IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
+
+set EXEC_DIR=%CD%
+set WDIR=%EXEC_DIR%
+:findBaseDir
+IF EXIST "%WDIR%"\.mvn goto baseDirFound
+cd ..
+IF "%WDIR%"=="%CD%" goto baseDirNotFound
+set WDIR=%CD%
+goto findBaseDir
+
+:baseDirFound
+set MAVEN_PROJECTBASEDIR=%WDIR%
+cd "%EXEC_DIR%"
+goto endDetectBaseDir
+
+:baseDirNotFound
+set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
+cd "%EXEC_DIR%"
+
+:endDetectBaseDir
+
+IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
+
+@setlocal EnableExtensions EnableDelayedExpansion
+for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
+@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
+
+:endReadAdditionalConfig
+
+SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
+set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
+set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar"
+
+FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
+ IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
+)
+
+@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
+if exist %WRAPPER_JAR% (
+ if "%MVNW_VERBOSE%" == "true" (
+ echo Found %WRAPPER_JAR%
+ )
+) else (
+ if not "%MVNW_REPOURL%" == "" (
+ SET DOWNLOAD_URL="%MVNW_REPOURL%/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar"
+ )
+ if "%MVNW_VERBOSE%" == "true" (
+ echo Couldn't find %WRAPPER_JAR%, downloading it ...
+ echo Downloading from: %DOWNLOAD_URL%
+ )
+
+ powershell -Command "&{"^
+ "$webclient = new-object System.Net.WebClient;"^
+ "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
+ "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
+ "}"^
+ "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^
+ "}"
+ if "%MVNW_VERBOSE%" == "true" (
+ echo Finished downloading %WRAPPER_JAR%
+ )
+)
+@REM End of extension
+
+@REM Provide a "standardized" way to retrieve the CLI args that will
+@REM work with both Windows and non-Windows executions.
+set MAVEN_CMD_LINE_ARGS=%*
+
+%MAVEN_JAVA_EXE% ^
+ %JVM_CONFIG_MAVEN_PROPS% ^
+ %MAVEN_OPTS% ^
+ %MAVEN_DEBUG_OPTS% ^
+ -classpath %WRAPPER_JAR% ^
+ "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" ^
+ %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
+if ERRORLEVEL 1 goto error
+goto end
+
+:error
+set ERROR_CODE=1
+
+:end
+@endlocal & set ERROR_CODE=%ERROR_CODE%
+
+if not "%MAVEN_SKIP_RC%"=="" goto skipRcPost
+@REM check for post script, once with legacy .bat ending and once with .cmd ending
+if exist "%USERPROFILE%\mavenrc_post.bat" call "%USERPROFILE%\mavenrc_post.bat"
+if exist "%USERPROFILE%\mavenrc_post.cmd" call "%USERPROFILE%\mavenrc_post.cmd"
+:skipRcPost
+
+@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
+if "%MAVEN_BATCH_PAUSE%"=="on" pause
+
+if "%MAVEN_TERMINATE_CMD%"=="on" exit %ERROR_CODE%
+
+cmd /C exit /B %ERROR_CODE%
diff --git a/core/pom.xml b/core/pom.xml
new file mode 100644
index 0000000..51a2647
--- /dev/null
+++ b/core/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-schema-registry-all</artifactId>
+ <version>0.0.2-SNAPSHOT</version>
+ </parent>
+
+ <packaging>jar</packaging>
+ <artifactId>core</artifactId>
+ <name>rocketmq-schema-registry-core ${project.version}</name>
+ <description>rocketmq-schema-registry-core</description>
+
+ <properties>
+ <java.version>1.8</java.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>common</artifactId>
+ <version>0.0.2-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>schema-storage-rocketmq</artifactId>
+ <version>0.0.2-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.sun</groupId>
+ <artifactId>tools</artifactId>
+ <version>1.8</version>
+ <scope>system</scope>
+ <systemPath>${java.home}/../lib/tools.jar</systemPath>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <version>${spring-boot.version}</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/CoreApplication.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/CoreApplication.java
new file mode 100644
index 0000000..133695a
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/CoreApplication.java
@@ -0,0 +1,25 @@
+package org.apache.rocketmq.schema.registry.core;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.ComponentScan.Filter;
+import org.springframework.context.annotation.FilterType;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+import springfox.documentation.oas.annotations.EnableOpenApi;
+
+@SpringBootApplication
+@EnableScheduling
+@EnableOpenApi
+@ComponentScan(excludeFilters = @Filter(type = FilterType.ASPECTJ, pattern = "org.apache.rocketmq.schema.registry.storage..*"))
+public class CoreApplication {
+
+ public CoreApplication() {
+ }
+
+ public static void main(String[] args) {
+ SpringApplication.run(CoreApplication.class, args);
+ }
+
+}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/RequestProcessor.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/RequestProcessor.java
new file mode 100644
index 0000000..4b2e828
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/RequestProcessor.java
@@ -0,0 +1,81 @@
+/**
+ * Copyright 2022, Xiaomi.
+ * All rights reserved.
+ * Author: wangfan8@xiaomi.com
+ */
+
+package org.apache.rocketmq.schema.registry.core.api;
+
+import java.util.function.Supplier;
+
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Component
+public class RequestProcessor {
+
+ @Autowired
+ public RequestProcessor() {
+
+ }
+
+ /**
+ * Request processor to handle request.
+ *
+ * @param qualifiedName qualifiedName
+ * @param requestName request name
+ * @param supplier supplier
+ * @param <R> response
+ * @return response of supplier
+ */
+ public <R> R processRequest(
+ final QualifiedName qualifiedName,
+ final String requestName,
+ final Supplier<R> supplier) {
+ /**
+ * TODO: 1. add rate limiter and metrics statics
+ * TODO: 2. add async process
+ */
+
+ long time = System.currentTimeMillis();
+ try {
+ log.info("Handling request: {} for {}", requestName, qualifiedName);
+ return supplier.get();
+ } catch (SchemaException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw new SchemaException(String.format("%s request failed due to: %s %s", requestName, e.getMessage(), e.getCause()), e);
+ } finally {
+ log.info("Handle request: {} cost {}", requestName, (System.currentTimeMillis() - time));
+ }
+ }
+
+ /**
+ * Request processor to handle request.
+ *
+ * @param requestName request name
+ * @param supplier supplier
+ * @param <R> response
+ * @return response of supplier
+ */
+ public <R> R processRequest(
+ final String requestName,
+ final Supplier<R> supplier) {
+ long time = System.currentTimeMillis();
+ try {
+ log.info("Handling request: {}", requestName);
+ return supplier.get();
+ } catch (SchemaException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw new SchemaException(String.format("Request: %s failed due to %s %s", requestName, e.getMessage(), e.getCause()), e);
+ } finally {
+ log.info("Handle request: {} cost {}", requestName, (System.currentTimeMillis() - time));
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
new file mode 100644
index 0000000..c575e1d
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
@@ -0,0 +1,355 @@
+/**
+ * Copyright 2022, Xiaomi.
+ * All rights reserved.
+ * Author: wangfan8@xiaomi.com
+ */
+
+package org.apache.rocketmq.schema.registry.core.api.v1;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import java.net.HttpURLConnection;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException;
+import org.apache.rocketmq.schema.registry.core.api.RequestProcessor;
+import org.apache.rocketmq.schema.registry.core.service.SchemaService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * RocketMQ schema registry V1 API implementation.
+ */
+@RestController
+@RequestMapping(
+ path = "/schema-registry/v1",
+ produces = MediaType.APPLICATION_JSON_VALUE
+)
+@Api(
+ value = "SchemaRegistryV1",
+ produces = MediaType.APPLICATION_JSON_VALUE,
+ consumes = MediaType.APPLICATION_JSON_VALUE
+)
+@Slf4j
+public class SchemaController {
+
+ private final RequestProcessor requestProcessor;
+ private final SchemaService<SchemaDto> schemaService;
+
+ /**
+ * Constructor.
+ *
+ * @param requestProcessor request processor
+ * @param schemaService schema service
+ */
+ @Autowired
+ public SchemaController(
+ final RequestProcessor requestProcessor,
+ final SchemaService<SchemaDto> schemaService
+ ) {
+ this.requestProcessor = requestProcessor;
+ this.schemaService = schemaService;
+ }
+
+ @RequestMapping(
+ method = RequestMethod.POST,
+ path = "/cluster/{cluster-name}/subject/{subject-name}/schema/{schema-name}",
+ consumes = MediaType.APPLICATION_JSON_VALUE
+ )
+ @ResponseStatus(HttpStatus.CREATED)
+ @ApiOperation(
+ value = "Register a new schema",
+ notes = "Return success if there were no errors registering the schema"
+ )
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_CREATED,
+ message = "The schema was registered"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested schema cannot be registered"
+ )
+ }
+ )
+ public SchemaDto registerSchema(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable(name = "cluster-name") final String clusterName,
+ @ApiParam(value = "The subject of the schema", required = true)
+ @PathVariable(name = "subject-name") final String subjectName,
+ @ApiParam(value = "The name of the schema", required = true)
+ @PathVariable("schema-name") final String schemaName,
+ @ApiParam(value = "The schema detail", required = true)
+ @RequestBody final SchemaDto schemaDto
+ ) {
+ return registerSchema(clusterName, "default", subjectName, schemaName, schemaDto);
+ }
+
+ @RequestMapping(
+ method = RequestMethod.POST,
+ path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}",
+ consumes = MediaType.APPLICATION_JSON_VALUE
+ )
+ @ResponseStatus(HttpStatus.CREATED)
+ @ApiOperation(
+ value = "Register a new schema",
+ notes = "Return success if there were no errors registering the schema"
+ )
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_CREATED,
+ message = "The schema was registered"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested schema cannot be registered"
+ )
+ }
+ )
+ public SchemaDto registerSchema(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable(value = "cluster-name") final String cluster,
+ @ApiParam(value = "The tenant of the schema", required = true)
+ @PathVariable(value = "tenant-name") final String tenant,
+ @ApiParam(value = "The subject of the schema", required = true)
+ @PathVariable(name = "subject-name") final String subject,
+ @ApiParam(value = "The name of the schema", required = true)
+ @PathVariable("schema-name") final String schemaName,
+ @ApiParam(value = "The schema detail", required = true)
+ @RequestBody final SchemaDto schemaDto
+ ) {
+ // TODO: support register by sql
+ final QualifiedName name = new QualifiedName(cluster, tenant, subject, schemaName);
+ schemaDto.setQualifiedName(name);
+
+ return this.requestProcessor.processRequest(
+ name,
+ "register",
+ () -> {
+ return this.schemaService.register(name, schemaDto);
+ }
+ );
+ }
+
+ @RequestMapping(
+ path = "/tenant/{tenant-name}/schema/{schema-name}",
+ method = RequestMethod.DELETE
+ )
+ @ResponseStatus(HttpStatus.OK)
+ @ApiOperation(
+ value = "Delete schema",
+ notes = "Delete the schema under the given tenant"
+ )
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_OK,
+ message = "Schema deleted success"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested schema cannot be found or it's still been used"
+ )
+ }
+ )
+ public SchemaDto deleteSchema(
+ @ApiParam(value = "The tenant of the schema", required = true)
+ @PathVariable("tenant-name") final String tenant,
+ @ApiParam(value = "The name of the schema", required = true)
+ @PathVariable("schema-name") final String schemaName
+ ) {
+ QualifiedName name = new QualifiedName(null, tenant, null, schemaName);
+ return this.requestProcessor.processRequest(
+ name,
+ "deleteSchema",
+ () -> this.schemaService.delete(name)
+ );
+ }
+
+ @RequestMapping(
+ path = "/cluster/{cluster-name}/subject/{subject-name}/schema/{schema-name}",
+ method = RequestMethod.PUT,
+ consumes = MediaType.APPLICATION_JSON_VALUE
+ )
+ @ApiOperation(
+ value = "Update schema and generate new schema version",
+ notes = "Update the given schema"
+ )
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_OK,
+ message = "Update schema success"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested schema cannot be found"
+ )
+ }
+ )
+ public SchemaDto updateSchema(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable("cluster-name") final String cluster,
+ @ApiParam(value = "The subject of the schema", required = true)
+ @PathVariable("subject-name") final String subject,
+ @ApiParam(value = "The name of the schema", required = true)
+ @PathVariable("schema-name") final String schemaName,
+ @ApiParam(value = "The schema detail", required = true)
+ @RequestBody final SchemaDto schemaDto
+ ) {
+ return this.updateSchema(cluster, "default", subject, schemaName, schemaDto);
+ }
+
+ @RequestMapping(
+ path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}",
+ method = RequestMethod.PUT,
+ consumes = MediaType.APPLICATION_JSON_VALUE
+ )
+ @ApiOperation(
+ value = "Update schema and generate new schema version",
+ notes = "Update the given schema"
+ )
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_OK,
+ message = "Update schema success"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested schema cannot be found"
+ )
+ }
+ )
+ public SchemaDto updateSchema(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable(value = "cluster-name") final String cluster,
+ @ApiParam(value = "The tenant of the schema", required = true)
+ @PathVariable(value = "tenant-name") final String tenant,
+ @ApiParam(value = "The subject of the schema", required = true)
+ @PathVariable(name = "subject-name") final String subject,
+ @ApiParam(value = "The name of the schema", required = true)
+ @PathVariable("schema-name") final String schemaName,
+ @ApiParam(value = "The schema detail", required = true)
+ @RequestBody final SchemaDto schemaDto
+ ) {
+ QualifiedName name = new QualifiedName(cluster, tenant, subject, schemaName);
+ return this.requestProcessor.processRequest(
+ name,
+ "updateSchema",
+ () -> this.schemaService.update(name, schemaDto)
+ );
+ }
+
+ @RequestMapping(
+ method = RequestMethod.GET,
+ path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}"
+ )
+ @ApiOperation(
+ value = "Schema information",
+ notes = "Schema information for the given schema name under the tenant")
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_OK,
+ message = "The schema is returned"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested tenant or schema cannot be found"
+ )
+ }
+ )
+ public SchemaDto getSchema(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable(value = "cluster-name") final String cluster,
+ @ApiParam(value = "The tenant of the schema", required = true)
+ @PathVariable(value = "tenant-name") final String tenant,
+ @ApiParam(value = "The subject of the schema", required = true)
+ @PathVariable(name = "subject-name") final String subject,
+ @ApiParam(value = "The name of the schema", required = true)
+ @PathVariable("schema-name") final String schemaName
+ ) {
+ QualifiedName name = new QualifiedName(cluster, tenant, subject, schemaName);
+ log.info("Request for get schema for schema: {}", name.fullName());
+ return this.requestProcessor.processRequest(
+ name,
+ "getSchema",
+ () -> schemaService.get(name)
+ );
+ }
+
+ @RequestMapping(
+ method = RequestMethod.GET,
+ path = "/subject/{subject-name}"
+ )
+ @ApiOperation(
+ value = "Schema information",
+ notes = "Schema information for the given schema name under the subject")
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_OK,
+ message = "The schema is returned"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested tenant or schema cannot be found"
+ )
+ }
+ )
+ public SchemaRecordDto getSchemaBySubject(
+ @ApiParam(value = "The name of the schema", required = true)
+ @PathVariable("subject-name") String subject
+ ) {
+ return getSchemaBySubject("default", subject);
+ }
+
+ @RequestMapping(
+ method = RequestMethod.GET,
+ path = "/cluster/{cluster-name}/subject/{subject-name}"
+ )
+ @ApiOperation(
+ value = "Schema information",
+ notes = "Schema information for the given schema name under the subject")
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_OK,
+ message = "The schema is returned"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested tenant or schema cannot be found"
+ )
+ }
+ )
+ public SchemaRecordDto getSchemaBySubject(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable("cluster-name") String cluster,
+ @ApiParam(value = "The name of the subject", required = true)
+ @PathVariable("subject-name") String subject
+ ) {
+ QualifiedName name = new QualifiedName(cluster, null, subject, null);
+
+ return this.requestProcessor.processRequest(
+ "getSchemaBySubject",
+ () -> schemaService.getBySubject(name)
+ );
+ }
+}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/config/SchemaManagerConfig.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/config/SchemaManagerConfig.java
new file mode 100644
index 0000000..eee526d
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/config/SchemaManagerConfig.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.config;
+
+import org.apache.rocketmq.schema.registry.common.properties.GlobalConfig;
+import org.apache.rocketmq.schema.registry.core.dependency.ArtifactoryDependencyServiceImpl;
+import org.apache.rocketmq.schema.registry.core.dependency.DependencyService;
+import org.apache.rocketmq.schema.registry.common.storage.StorageManager;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class SchemaManagerConfig {
+
+ /**
+ * Manager of the storages.
+ *
+ * @param config global config
+ * @return The storage manager
+ */
+ @Bean
+ public StorageManager storageManager(final GlobalConfig config) {
+ return new StorageManager(config);
+ }
+
+ /**
+ * Manager of the dependencies.
+ *
+ * @param config global config
+ * @return The storage manager
+ */
+ @Bean
+ public DependencyService dependencyManager(final GlobalConfig config) {
+ return new ArtifactoryDependencyServiceImpl(config);
+ }
+}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/config/SchemaPropertiesConfig.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/config/SchemaPropertiesConfig.java
new file mode 100644
index 0000000..cae6d4f
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/config/SchemaPropertiesConfig.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.config;
+
+import org.apache.rocketmq.schema.registry.common.properties.GlobalConfig;
+import org.apache.rocketmq.schema.registry.common.properties.GlobalConfigImpl;
+import org.apache.rocketmq.schema.registry.common.properties.SchemaProperties;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class SchemaPropertiesConfig {
+
+ /**
+ * Binding properties from configuration file.
+ *
+ * @return The schema properties object.
+ */
+ @Bean
+ @ConfigurationProperties(prefix = "schema")
+ public SchemaProperties schemaProperties() {
+ return new SchemaProperties();
+ }
+
+ /**
+ * Init the global config.
+ *
+ * @param schemaProperties The overall properties to use
+ * @return The global config object
+ */
+ @Bean
+ public GlobalConfig config(final SchemaProperties schemaProperties) {
+ return new GlobalConfigImpl(schemaProperties);
+ }
+
+}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/config/SchemaServiceConfig.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/config/SchemaServiceConfig.java
new file mode 100644
index 0000000..6dadcde
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/config/SchemaServiceConfig.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.config;
+
+import org.apache.rocketmq.schema.registry.common.auth.AccessControlService;
+import org.apache.rocketmq.schema.registry.common.auth.DefaultAccessControlServiceImpl;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.properties.GlobalConfig;
+import org.apache.rocketmq.schema.registry.common.utils.IdGenerator;
+import org.apache.rocketmq.schema.registry.common.utils.SnowFlakeIdGenerator;
+import org.apache.rocketmq.schema.registry.core.dependency.DependencyService;
+import org.apache.rocketmq.schema.registry.common.storage.StorageManager;
+import org.apache.rocketmq.schema.registry.core.service.SchemaInitializationService;
+import org.apache.rocketmq.schema.registry.core.service.SchemaService;
+import org.apache.rocketmq.schema.registry.core.service.SchemaServiceImpl;
+import org.apache.rocketmq.schema.registry.common.storage.StorageServiceProxy;
+import org.apache.rocketmq.schema.registry.common.utils.StorageUtil;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class SchemaServiceConfig {
+
+ @Bean
+ public SchemaService<SchemaDto> schemaService(
+ final GlobalConfig config,
+ final AccessControlService accessController,
+ final StorageServiceProxy storageServiceProxy,
+ final StorageUtil storageUtil,
+ final DependencyService dependencyService,
+ final IdGenerator idGenerator
+ ) {
+ return new SchemaServiceImpl(
+ config,
+ accessController,
+ storageServiceProxy,
+ storageUtil,
+ dependencyService,
+ idGenerator
+ );
+ }
+
+ /**
+ * Handle startup and shutdown of schema service.
+ *
+ * @param storageManager Plugin manager to use
+ *
+ * @return The initialization service bean
+ */
+ @Bean
+ public SchemaInitializationService schemaInitializationService(StorageManager storageManager) {
+ return new SchemaInitializationService(storageManager);
+ }
+
+ /**
+ * Access controller.
+ *
+ * @param config global config
+ * @return authorization class based on config
+ */
+ @Bean
+ @ConditionalOnMissingBean(AccessControlService.class)
+ public AccessControlService accessController(final GlobalConfig config) {
+ return new DefaultAccessControlServiceImpl(config);
+ }
+
+ /**
+ * The storage service proxy bean.
+ *
+ * @param storageManager storage manager
+ * @param storageUtil storage utilities
+ * @return The schema service proxy bean
+ */
+ @Bean
+ public StorageServiceProxy storageServiceProxy(
+ final StorageManager storageManager,
+ final StorageUtil storageUtil
+ ) {
+ return new StorageServiceProxy(storageManager, storageUtil);
+ }
+
+ /**
+ * The id generator of the service.
+ *
+ * @param config global config
+ * @return The id generator
+ */
+ @Bean
+ public IdGenerator idGenerator(final GlobalConfig config) {
+ return new SnowFlakeIdGenerator(config);
+ }
+
+
+}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/config/SchemaUtilsConfig.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/config/SchemaUtilsConfig.java
new file mode 100644
index 0000000..8f43e3b
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/config/SchemaUtilsConfig.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.config;
+
+import org.apache.rocketmq.schema.registry.common.utils.StorageUtil;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class SchemaUtilsConfig {
+
+ /**
+ * Storage utility bean.
+ *
+ * @return The storage util instance
+ */
+ @Bean
+ public StorageUtil storageUtil() {
+ return new StorageUtil();
+ }
+}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/config/SwaggerConfig.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/config/SwaggerConfig.java
new file mode 100644
index 0000000..347176a
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/config/SwaggerConfig.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.config;
+
+import java.util.ArrayList;
+
+import java.util.Collection;
+import java.util.List;
+import org.springframework.boot.actuate.autoconfigure.endpoint.web.CorsEndpointProperties;
+import org.springframework.boot.actuate.autoconfigure.endpoint.web.WebEndpointProperties;
+import org.springframework.boot.actuate.autoconfigure.web.server.ManagementPortType;
+import org.springframework.boot.actuate.endpoint.ExposableEndpoint;
+import org.springframework.boot.actuate.endpoint.web.EndpointLinksResolver;
+import org.springframework.boot.actuate.endpoint.web.EndpointMapping;
+import org.springframework.boot.actuate.endpoint.web.EndpointMediaTypes;
+import org.springframework.boot.actuate.endpoint.web.ExposableWebEndpoint;
+import org.springframework.boot.actuate.endpoint.web.WebEndpointsSupplier;
+import org.springframework.boot.actuate.endpoint.web.annotation.ControllerEndpointsSupplier;
+import org.springframework.boot.actuate.endpoint.web.annotation.ServletEndpointsSupplier;
+import org.springframework.boot.actuate.endpoint.web.servlet.WebMvcEndpointHandlerMapping;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.env.Environment;
+import org.springframework.core.env.Profiles;
+
+import org.springframework.util.StringUtils;
+import springfox.documentation.builders.RequestHandlerSelectors;
+import springfox.documentation.oas.annotations.EnableOpenApi;
+import springfox.documentation.service.ApiInfo;
+import springfox.documentation.service.Contact;
+import springfox.documentation.spi.DocumentationType;
+import springfox.documentation.spring.web.plugins.Docket;
+import springfox.documentation.swagger2.annotations.EnableSwagger2;
+
+@Configuration
+@EnableOpenApi
+@EnableSwagger2
+public class SwaggerConfig {
+
+ @Bean
+ public Docket docket(Environment environment) {
+ Profiles profiles = Profiles.of("dev","test");
+ boolean isEnable = environment.acceptsProfiles(profiles);
+
+ return new Docket(DocumentationType.OAS_30)
+ .apiInfo(apiInfo())
+ .enable(isEnable)
+ .select()
+ .apis(RequestHandlerSelectors.basePackage("org.apache.rocketmq.schema.registry.core"))
+ .build();
+ }
+
+ private ApiInfo apiInfo(){
+
+ //ä½č
äæ”ęÆ
+ Contact contact = new Contact(
+ "ēåø",
+ "wangfan8@xiaomi.com",
+ "wangfan8@xiaomi.com");
+
+ return new ApiInfo(
+ "RocketMQ Schema Registry",
+ "RocketMQ Schema Registry Swagger API Document",
+ "V1.0",
+ "",
+ contact,
+ "Apache 2.0",
+ "http://www.apache.org/licenses/LICENSE-2.0",
+ new ArrayList<>());
+ }
+
+ @Bean
+ public WebMvcEndpointHandlerMapping webMvcEndpointHandlerMapping(
+ WebEndpointsSupplier webEndpointsSupplier,
+ ServletEndpointsSupplier servletEndpointsSupplier,
+ ControllerEndpointsSupplier controllerEndpointsSupplier,
+ EndpointMediaTypes endpointMediaTypes,
+ CorsEndpointProperties corsEndpointProperties,
+ WebEndpointProperties webEndpointProperties,
+ Environment environment
+ ) {
+ List<ExposableEndpoint<?>> allEndpoints = new ArrayList<>();
+ Collection<ExposableWebEndpoint> webEndpoints = webEndpointsSupplier.getEndpoints();
+
+ allEndpoints.addAll(webEndpoints);
+ allEndpoints.addAll(servletEndpointsSupplier.getEndpoints());
+ allEndpoints.addAll(controllerEndpointsSupplier.getEndpoints());
+ String basePath = webEndpointProperties.getBasePath();
+ EndpointMapping endpointMapping = new EndpointMapping(basePath);
+ boolean shouldRegister = should(webEndpointProperties, environment, basePath);
+ return new WebMvcEndpointHandlerMapping(endpointMapping, webEndpoints, endpointMediaTypes, corsEndpointProperties.toCorsConfiguration(), new EndpointLinksResolver(allEndpoints, basePath), shouldRegister, null);
+ }
+
+ private boolean should (WebEndpointProperties webEndpointProperties, Environment environment, String basePath) {
+ return webEndpointProperties.getDiscovery().isEnabled() && (StringUtils.hasText(basePath) || ManagementPortType.get(environment).equals(ManagementPortType.DIFFERENT));
+ }
+}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/ArtifactoryDependencyServiceImpl.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/ArtifactoryDependencyServiceImpl.java
new file mode 100644
index 0000000..684c7ea
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/ArtifactoryDependencyServiceImpl.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.dependency;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.common.model.Dependency;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.properties.GlobalConfig;
+import org.apache.rocketmq.schema.registry.common.utils.CommonUtil;
+
+@Slf4j
+public class ArtifactoryDependencyServiceImpl implements DependencyService {
+
+ private final GlobalConfig config;
+ private final String parent;
+ private final String localRepo;
+ private final String jdkPath;
+ private String dependencyTemplate;
+
+
+ public ArtifactoryDependencyServiceImpl(final GlobalConfig config) {
+ this.config = config;
+ this.parent = config.getDependencyCompilePath();
+ this.localRepo = config.getDependencyLocalRepositoryPath();
+ this.jdkPath = config.getDependencyJdkPath();
+
+ if (config.isUploadEnabled()) {
+ try {
+ CommonUtil.mkdir(parent);
+ CommonUtil.mkdir(localRepo);
+ Path templatePath = Paths.get(config.getDependencyTemplate());
+ this.dependencyTemplate = new String(Files.readAllBytes(templatePath));
+ } catch (IOException e) {
+ throw new SchemaException("Init dependency template file failed", e);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Dependency compile(SchemaInfo schemaInfo) {
+ DependencyHelper dependencyHelper = DynamicCompileProvider
+ .compile(jdkPath, parent, schemaInfo, dependencyTemplate);
+
+ // upload jar file to remote repository
+ DynamicCompileProvider.upload(
+ localRepo,
+ dependencyHelper,
+ config.getDependencyUsername(),
+ config.getDependencyPassword(),
+ config.getDependencyRepositoryUrl()
+ );
+ return dependencyHelper.getDependency();
+ }
+
+
+}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/DependencyHelper.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/DependencyHelper.java
new file mode 100644
index 0000000..f095476
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/DependencyHelper.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.dependency;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.common.model.Dependency;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.utils.CommonUtil;
+
+@Slf4j
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class DependencyHelper {
+
+ private String schemaName;
+
+ private String schemaDir;
+ private String idlDir;
+ private String javaDir;
+
+ private String idlFilePath;
+ private String jarFilePath;
+ private String pomFilePath;
+ private String jdkPath;
+
+ private String idl;
+
+ Map<String, ByteBuffer> schemaJarClass;
+ private Dependency dependency;
+
+ public DependencyHelper(final String jdkPath, final String parentDir, final SchemaInfo schemaInfo) {
+ this.schemaName = schemaInfo.getSchemaName();
+
+ this.dependency = Dependency.builder()
+ .groupId(schemaInfo.getNamespace())
+ .artifactId(schemaInfo.getSchemaName())
+ .version(schemaInfo.getSchemaType() + "." + schemaInfo.getLastRecordVersion())
+ .build();
+
+ this.schemaDir = String.format("%s/%s/%d", parentDir, schemaInfo.getSchemaName(), schemaInfo.getLastRecordVersion());
+ this.idlDir = String.format("%s/avro", schemaDir);
+ this.javaDir = String.format("%s/java", schemaDir);
+
+ this.idlFilePath = String.format("%s/%s.avro", idlDir, schemaName);
+ this.jarFilePath = String.format("%s/%s.jar", schemaDir, dependency.getArtifactId());
+ this.pomFilePath = String.format("%s/%s.pom", schemaDir, dependency.getArtifactId());
+ this.jdkPath = jdkPath;
+
+ this.idl = schemaInfo.getLastRecordIdl();
+ }
+
+ public void initIdlFile() {
+ CommonUtil.mkdir(idlDir);
+ try (FileWriter fileWriter = new FileWriter(idlFilePath)) {
+ fileWriter.write(idl);
+ fileWriter.flush();
+ } catch (IOException e) {
+ throw new SchemaException("Init idl file failed", e);
+ }
+
+ log.info("write schema " + schemaName + "'s idl: " + idl + " to file " + idlFilePath + " success");
+ }
+
+ public Map<String, ByteBuffer> compileSchema() {
+ CommonUtil.mkdir(javaDir);
+ try (Stream<Path> pathStream = Files.walk(Paths.get(javaDir))) {
+ CommonUtil.execCommand(jdkPath, "-jar", "tools/avro-tools-1.11.0.jar", "compile", "schema", idlFilePath, javaDir);
+ List<File> javaFileList = pathStream
+ .filter(Files::isRegularFile)
+ .map(Path::toFile)
+ .collect(Collectors.toList());
+ log.info("Success flush java files: " + javaFileList);
+ return CommonUtil.compileJavaFile(javaFileList);
+ } catch (Throwable e){
+ throw new SchemaException("Compile schema failed", e);
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/DependencyService.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/DependencyService.java
new file mode 100644
index 0000000..e049543
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/DependencyService.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.dependency;
+
+import org.apache.rocketmq.schema.registry.common.model.Dependency;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+
+public interface DependencyService {
+
+ String ERROR_MESSAGE_DEFAULT = "Not implemented dependency service method";
+
+ /**
+ * Compile the dependency and upload generate file
+ *
+ * @param schemaInfo compile needed information
+ * @return generated file path
+ */
+ default Dependency compile(SchemaInfo schemaInfo) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
+}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/DynamicCompileProvider.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/DynamicCompileProvider.java
new file mode 100644
index 0000000..555127c
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/DynamicCompileProvider.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.dependency;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.maven.repository.internal.MavenRepositorySystemUtils;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.properties.GlobalConfigImpl;
+import org.apache.rocketmq.schema.registry.common.utils.CommonUtil;
+import org.eclipse.aether.DefaultRepositorySystemSession;
+import org.eclipse.aether.RepositorySystem;
+import org.eclipse.aether.RepositorySystemSession;
+import org.eclipse.aether.artifact.Artifact;
+import org.eclipse.aether.artifact.DefaultArtifact;
+import org.eclipse.aether.connector.basic.BasicRepositoryConnectorFactory;
+import org.eclipse.aether.deployment.DeployRequest;
+import org.eclipse.aether.deployment.DeployResult;
+import org.eclipse.aether.deployment.DeploymentException;
+import org.eclipse.aether.impl.DefaultServiceLocator;
+import org.eclipse.aether.repository.Authentication;
+import org.eclipse.aether.repository.LocalRepository;
+import org.eclipse.aether.repository.RemoteRepository;
+import org.eclipse.aether.spi.connector.RepositoryConnectorFactory;
+import org.eclipse.aether.spi.connector.transport.TransporterFactory;
+import org.eclipse.aether.transport.file.FileTransporterFactory;
+import org.eclipse.aether.transport.http.HttpTransporterFactory;
+import org.eclipse.aether.util.artifact.SubArtifact;
+import org.eclipse.aether.util.repository.AuthenticationBuilder;
+
+@Data
+@Slf4j
+public class DynamicCompileProvider {
+ private final static RepositorySystem system = newRepositorySystem();
+
+ public static DependencyHelper compile(final String jdkPath, final String parentDir,
+ final SchemaInfo schemaInfo, final String dependencyTemplate) {
+ DependencyHelper dependencyHelper = new DependencyHelper(jdkPath, parentDir, schemaInfo);
+
+ dependencyHelper.initIdlFile();
+
+ Map<String, ByteBuffer> schemaJarClass = dependencyHelper.compileSchema();
+
+ CommonUtil.generateJarFile(dependencyHelper.getJarFilePath(), schemaJarClass);
+
+ CommonUtil.generatePomFile(
+ dependencyTemplate,
+ dependencyHelper.getDependency().getGroupId(),
+ dependencyHelper.getDependency().getArtifactId(),
+ dependencyHelper.getDependency().getVersion(),
+ dependencyHelper.getPomFilePath()
+ );
+
+ return dependencyHelper;
+ }
+
+ private static RepositorySystem newRepositorySystem() {
+ DefaultServiceLocator locator = MavenRepositorySystemUtils.newServiceLocator();
+ locator.addService(RepositoryConnectorFactory.class, BasicRepositoryConnectorFactory.class);
+ locator.addService(TransporterFactory.class, FileTransporterFactory.class);
+ locator.addService(TransporterFactory.class, HttpTransporterFactory.class);
+ locator.setErrorHandler(new DefaultServiceLocator.ErrorHandler() {
+ @Override
+ public void serviceCreationFailed(Class<?> tpe, Class<?> impl, Throwable cause) {
+ log.error("maven service locator create failed", cause);
+ }
+ });
+ return locator.getService(RepositorySystem.class);
+ }
+
+ public static RepositorySystemSession newSession(RepositorySystem rs, String versionCache) {
+ DefaultRepositorySystemSession session = MavenRepositorySystemUtils.newSession();
+ LocalRepository localRepo = new LocalRepository(versionCache);
+ session.setLocalRepositoryManager(rs.newLocalRepositoryManager(session, localRepo));
+ return session;
+ }
+
+ // TODO: replace to artifactory-client-java
+ public static void upload(
+ final String localRepo,
+ final DependencyHelper dependencyHelper,
+ final String userName,
+ final String passWord,
+ final String repoUrl
+ ) {
+ String groupId = dependencyHelper.getDependency().getGroupId();
+ String artifactId = dependencyHelper.getDependency().getArtifactId();
+ String version = dependencyHelper.getDependency().getVersion();
+ File jarFile = new File(dependencyHelper.getJarFilePath());
+ File pomFile = new File(dependencyHelper.getPomFilePath());
+
+ Artifact jarArtifact = new DefaultArtifact(groupId, artifactId, null, "jar", version, null, jarFile);
+ Artifact pomArtifact = new SubArtifact(jarArtifact, null, "pom", pomFile);
+
+ Authentication auth = new AuthenticationBuilder()
+ .addUsername(userName)
+ .addPassword(passWord)
+ .build();
+
+ RemoteRepository remoteRepo = new RemoteRepository
+ .Builder("central", "default", repoUrl)
+ .setAuthentication(auth)
+ .build();
+
+ try {
+ String versionCache = CommonUtil.mkdir(String.format("%s/version_cache/%d_resources",
+ localRepo, System.currentTimeMillis())).getCanonicalPath();
+ RepositorySystemSession session = newSession(system, versionCache);
+ DeployRequest deployRequest = new DeployRequest()
+ .setRepository(remoteRepo)
+ .addArtifact(jarArtifact)
+ .addArtifact(pomArtifact);
+ DeployResult result = system.deploy(session, deployRequest);
+ log.info("transfer artifact " + groupId + ":" + artifactId + ":" + version + " to central success.");
+ } catch (DeploymentException | IOException e) {
+ throw new SchemaException("Deploy jar file: " + jarFile + " failed", e);
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/DynamicJarsProvider.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/DynamicJarsProvider.java
new file mode 100644
index 0000000..104060f
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/DynamicJarsProvider.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.dependency;
+
+import java.io.File;
+import java.io.IOException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.maven.repository.internal.MavenRepositorySystemUtils;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.common.utils.CommonUtil;
+import org.eclipse.aether.DefaultRepositorySystemSession;
+import org.eclipse.aether.RepositorySystem;
+import org.eclipse.aether.RepositorySystemSession;
+import org.eclipse.aether.artifact.Artifact;
+import org.eclipse.aether.artifact.DefaultArtifact;
+import org.eclipse.aether.connector.basic.BasicRepositoryConnectorFactory;
+import org.eclipse.aether.deployment.DeployRequest;
+import org.eclipse.aether.deployment.DeploymentException;
+import org.eclipse.aether.impl.DefaultServiceLocator;
+import org.eclipse.aether.repository.Authentication;
+import org.eclipse.aether.repository.LocalRepository;
+import org.eclipse.aether.repository.RemoteRepository;
+import org.eclipse.aether.spi.connector.RepositoryConnectorFactory;
+import org.eclipse.aether.spi.connector.transport.TransporterFactory;
+import org.eclipse.aether.transport.file.FileTransporterFactory;
+import org.eclipse.aether.transport.http.HttpTransporterFactory;
+import org.eclipse.aether.util.artifact.SubArtifact;
+import org.eclipse.aether.util.repository.AuthenticationBuilder;
+
+@Slf4j
+public class DynamicJarsProvider {
+
+}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/package-info.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/package-info.java
new file mode 100644
index 0000000..cd10ac9
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2017 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * talos connector.
+ *
+ * @author wangfan8
+ * @since 1.3.0
+ */
+
+@ParametersAreNonnullByDefault
+package org.apache.rocketmq.schema.registry.core;
+import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaInitializationService.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaInitializationService.java
new file mode 100644
index 0000000..17f851a
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaInitializationService.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.service;
+
+import javax.annotation.Nonnull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.schema.registry.common.storage.StorageManager;
+import org.springframework.boot.actuate.health.Health;
+import org.springframework.boot.actuate.health.HealthIndicator;
+import org.springframework.context.event.ContextClosedEvent;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
+
+@Slf4j
+public class SchemaInitializationService implements HealthIndicator {
+
+ protected static final String PLUGIN_LOAD_KEY = "pluginLoaded";
+ protected static final String STORAGE_CONNECT_KEY = "storageConnected";
+ protected static final String THRIFT_KEY = "thrift";
+
+ @Nonnull
+ private final StorageManager storageManager;
+
+ public SchemaInitializationService(@Nonnull StorageManager storageManager) {
+ this.storageManager = storageManager;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Health health() {
+ final boolean isPluginLoaded = storageManager.isPluginLoaded();
+ final boolean isConnected = storageManager.isConnected();
+ final Health.Builder builder = (isPluginLoaded && isConnected) ? Health.up() : Health.outOfService();
+
+ return builder
+ .withDetail(PLUGIN_LOAD_KEY, isPluginLoaded)
+ .withDetail(STORAGE_CONNECT_KEY, isConnected)
+ .build();
+ }
+
+ /**
+ * Shutdown schema service.
+ *
+ * @param event Shutting down event
+ */
+ @EventListener
+ public void stop(ContextClosedEvent event) {
+ log.info("Stopping schema service by {}.", event);
+ try {
+ this.storageManager.stop();
+ } catch (Exception e) {
+ log.error("Stop failed since {}", e.getMessage(), e);
+ }
+ log.info("Schema service finished.");
+ }
+
+ /**
+ * Init schema service.
+ *
+ * @param event Starting Event
+ */
+ @EventListener
+ public void start(ContextRefreshedEvent event) {
+ log.info("Starting schema service by {}.", event);
+ try {
+ this.storageManager.loadPlugin();
+ this.storageManager.start(event.getApplicationContext());
+ } catch (Exception e) {
+ log.error("Unable to initialize services due to {}", e.getMessage(), e);
+ }
+ log.info("Finished starting internal services.");
+ }
+}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
new file mode 100644
index 0000000..2618c29
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.service;
+
+import java.util.Optional;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.dto.BaseDto;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+
+public interface SchemaService<T extends BaseDto> {
+
+ /**
+ * Register the given schema.
+ *
+ * @param qualifiedName tenant / name of the schema
+ * @param dto register resource information
+ * @return registered schema object
+ */
+ T register(QualifiedName qualifiedName, T dto);
+
+ /**
+ * Register the schema.
+ *
+ * @param qualifiedName tenant / name of the schema
+ * @param dto update information
+ * @return updated schema object
+ */
+ T update(QualifiedName qualifiedName, T dto);
+
+ /**
+ * Deletes the schema.
+ *
+ * @param qualifiedName tenant / name of the schema
+ * @return deleted schema object
+ */
+ T delete(QualifiedName qualifiedName);
+
+ /**
+ * Query the schema object with the given name.
+ *
+ * @param qualifiedName tenant / name of the schema
+ * @return schema object with the schemaName
+ */
+ T get(QualifiedName qualifiedName);
+
+ /**
+ * Query the schema object with the given subject name.
+ *
+ * @param qualifiedName subject of the schema binding
+ * @return schema object with the schemaName
+ */
+ SchemaRecordDto getBySubject(QualifiedName qualifiedName);
+}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
new file mode 100644
index 0000000..4f4bfb2
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.service;
+
+import com.google.common.base.Strings;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.context.RequestContext;
+import org.apache.rocketmq.schema.registry.common.auth.AccessControlService;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaExistException;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaCompatibilityException;
+import org.apache.rocketmq.schema.registry.common.model.Dependency;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
+import org.apache.rocketmq.schema.registry.common.properties.GlobalConfig;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaOperation;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException;
+import org.apache.rocketmq.schema.registry.common.context.RequestContextManager;
+import org.apache.rocketmq.schema.registry.common.utils.IdGenerator;
+import org.apache.rocketmq.schema.registry.core.dependency.DependencyService;
+import org.apache.rocketmq.schema.registry.common.storage.StorageServiceProxy;
+import org.apache.rocketmq.schema.registry.common.utils.CommonUtil;
+import org.apache.rocketmq.schema.registry.common.utils.StorageUtil;
+
+@Slf4j
+public class SchemaServiceImpl implements SchemaService<SchemaDto> {
+
+ private final GlobalConfig config;
+
+ private final AccessControlService accessController;
+ private final StorageServiceProxy storageServiceProxy;
+ private final StorageUtil storageUtil;
+
+ private final DependencyService dependencyService;
+
+ private final IdGenerator idGenerator;
+
+ public SchemaServiceImpl(
+ final GlobalConfig config,
+ final AccessControlService accessController,
+ final StorageServiceProxy storageServiceProxy,
+ final StorageUtil storageUtil,
+ final DependencyService dependencyService,
+ final IdGenerator idGenerator
+ ) {
+ this.config = config;
+ this.accessController = accessController;
+ this.storageServiceProxy = storageServiceProxy;
+ this.storageUtil = storageUtil;
+ this.dependencyService = dependencyService;
+ this.idGenerator = idGenerator;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SchemaDto register(QualifiedName qualifiedName, SchemaDto schemaDto) {
+ final RequestContext requestContext = RequestContextManager.getContext();
+ log.info("register get request context: " + requestContext);
+
+ checkSchemaValid(schemaDto);
+ checkSchemaExist(qualifiedName);
+
+ // TODO: add user and ak sk
+ accessController.checkPermission("", qualifiedName.getTenant(), SchemaOperation.REGISTER);
+
+ SchemaInfo current = storageServiceProxy.get(qualifiedName, config.isCacheEnabled());
+ if (current != null) {
+ throw new SchemaExistException(qualifiedName);
+ }
+
+ SchemaInfo schemaInfo = storageUtil.convertFromSchemaDto(schemaDto);
+ schemaInfo.setUniqueId(idGenerator.nextId());
+ schemaInfo.setLastRecordVersion(1L);
+ schemaInfo.getLastRecord().setSchema(qualifiedName.schemaFullName());
+ schemaInfo.getLastRecord().bindSubject(qualifiedName.subjectInfo());
+
+ if (config.isUploadEnabled()) {
+ // TODO: async upload to speed up register operation and keep atomic with register
+ Dependency dependency = dependencyService.compile(schemaInfo);
+ schemaInfo.setLastRecordDependency(dependency);
+ }
+
+ log.info("Creating schema info {}: {}", qualifiedName, schemaInfo);
+ storageServiceProxy.register(qualifiedName, schemaInfo);
+ return storageUtil.convertToSchemaDto(schemaInfo);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SchemaDto update(QualifiedName qualifiedName, SchemaDto schemaDto) {
+ final RequestContext requestContext = RequestContextManager.getContext();
+ log.info("update get request context: " + requestContext);
+
+ this.accessController.checkPermission("", "", SchemaOperation.UPDATE);
+
+ SchemaInfo current = storageServiceProxy.get(qualifiedName, config.isCacheEnabled());
+ if (current == null) {
+ throw new SchemaNotFoundException("Schema " + qualifiedName + " not exist, ignored update.");
+ }
+
+ SchemaInfo update = storageUtil.convertFromSchemaDto(schemaDto);
+
+ if (update.getDetails() != null) {
+ SubjectInfo subjectInfo = qualifiedName.subjectInfo();
+ SchemaRecordInfo updateRecord = update.getDetails().lastRecord();
+ updateRecord.setVersion(current.getLastRecordVersion() + 1);
+ updateRecord.setSchema(qualifiedName.schemaFullName());
+ updateRecord.setSchemaId(current.getUniqueId());
+ updateRecord.bindSubject(subjectInfo);
+ current.getLastRecord().unbindSubject(subjectInfo);
+
+ List<SchemaRecordInfo> currentRecords = new ArrayList<>(current.getDetails().getSchemaRecords());
+ currentRecords.add(updateRecord);
+ update.getDetails().setSchemaRecords(currentRecords);
+ }
+
+ if (update.getMeta() == null) {
+ update.setMeta(current.getMeta());
+ }
+
+ if (update.getStorage() == null) {
+ update.setStorage(current.getStorage());
+ }
+
+ if (update.getExtras() == null) {
+ update.setExtras(current.getExtras());
+ }
+
+ if (update.getAudit() == null) {
+ // todo
+ update.setAudit(current.getAudit());
+ }
+
+ if (update.getQualifiedName() == null) {
+ update.setQualifiedName(current.getQualifiedName());
+ }
+
+// checkSchemaValid(schemaDto);
+ CommonUtil.validateCompatibility(update, current, current.getMeta().getCompatibility());
+
+ if (config.isUploadEnabled()) {
+ Dependency dependency = dependencyService.compile(update);
+ update.setLastRecordDependency(dependency);
+ }
+
+ log.info("Updating schema info {}: {}", qualifiedName, update);
+ storageServiceProxy.update(qualifiedName, update);
+ return storageUtil.convertToSchemaDto(update);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SchemaDto delete(QualifiedName qualifiedName) {
+ final RequestContext requestContext = RequestContextManager.getContext();
+ log.info("delete request context: " + requestContext);
+
+ this.accessController.checkPermission("", qualifiedName.getTenant(), SchemaOperation.DELETE);
+
+ SchemaInfo current = storageServiceProxy.get(qualifiedName, config.isCacheEnabled());
+ if (current == null) {
+ throw new SchemaNotFoundException("Schema " + qualifiedName + " not exist, ignored update.");
+ }
+
+ log.info("delete schema {}", qualifiedName);
+ storageServiceProxy.delete(qualifiedName);
+ return storageUtil.convertToSchemaDto(current);
+ }
+
+ // TODO add get last record query
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SchemaDto get(QualifiedName qualifiedName) {
+ final RequestContext requestContext = RequestContextManager.getContext();
+ log.info("register get request context: " + requestContext);
+
+ CommonUtil.validateName(qualifiedName);
+
+ this.accessController.checkPermission("", qualifiedName.getTenant(), SchemaOperation.GET);
+
+ SchemaInfo schemaInfo = storageServiceProxy.get(qualifiedName, config.isCacheEnabled());
+ if (schemaInfo == null) {
+ throw new SchemaNotFoundException(qualifiedName);
+ }
+
+ log.info("get schema {}", qualifiedName);
+ return storageUtil.convertToSchemaDto(schemaInfo);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SchemaRecordDto getBySubject(QualifiedName qualifiedName) {
+ final RequestContext requestContext = RequestContextManager.getContext();
+ log.info("register get request context: " + requestContext);
+
+// CommonUtil.validateName(qualifiedName);
+ this.accessController.checkPermission("", qualifiedName.getSubject(), SchemaOperation.GET);
+
+ SchemaRecordInfo recordInfo = storageServiceProxy.getBySubject(qualifiedName, config.isCacheEnabled());
+ if (recordInfo == null) {
+ throw new SchemaException("Subject: " + qualifiedName + " not exist");
+ }
+
+ log.info("get schema by subject: {}", qualifiedName.getSubject());
+ return storageUtil.convertToSchemaRecordDto(recordInfo);
+ }
+
+ private void checkSchemaExist(final QualifiedName qualifiedName) {
+ if (storageServiceProxy.get(qualifiedName, config.isCacheEnabled()) != null) {
+ throw new SchemaExistException(qualifiedName);
+ }
+ }
+
+ private void checkSchemaValid(final SchemaDto schemaDto) {
+ CommonUtil.validateName(schemaDto.getQualifiedName());
+
+ // TODO: check and set namespace from idl
+ if (Strings.isNullOrEmpty(schemaDto.getMeta().getNamespace())) {
+ throw new SchemaCompatibilityException("Schema namespace is null, please check your config.");
+ }
+
+ if (schemaDto.getDetails().getSchemaRecords().size() > 1) {
+ throw new SchemaCompatibilityException("Can not register schema with multi records.");
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/Service.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/Service.java
new file mode 100644
index 0000000..4e8227e
--- /dev/null
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/Service.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.service;
+
+import org.apache.rocketmq.schema.registry.common.dto.BaseDto;
+
+public interface Service<T extends BaseDto> {
+
+
+}
diff --git a/core/src/main/resources/application.properties b/core/src/main/resources/application.properties
new file mode 100644
index 0000000..a5d7fdd
--- /dev/null
+++ b/core/src/main/resources/application.properties
@@ -0,0 +1,29 @@
+
+spring.profiles.active=dev
+spring.application.name=rocketmq-schema-registry
+server.port=8080
+logging.file.name = log/app.log
+
+schema.dependency.upload-enabled = false
+#schema.dependency.jdk-path
+#schema.dependency.compile-path
+#schema.dependency.local-repository-path
+#schema.dependency.repository-url
+#schema.dependency.username
+#schema.dependency.password
+schema.storage.type=rocketmq
+schema.storage.config-path=schema-storage-rocketmq/src/main/resources/rocketmq.properties
+
+springfox.documentation.swagger-ui.enabled=true
+management.health.db.enabled=true
+
+server.error.include-stacktrace = on_param
+server.error.include-message = always
+
+spring.mvc.pathmatch.matching-strategy = ANT_PATH_MATCHER
+
+spring.main.allow-bean-definition-overriding=true
+management.health.elasticsearch.enabled=false
+management.endpoints.enabled-by-default=true
+management.endpoints.web.exposure.include= health,info
+management.endpoints.web.exposure.exclude= httptrace,shutdown
\ No newline at end of file
diff --git a/core/src/main/resources/template.pom b/core/src/main/resources/template.pom
new file mode 100644
index 0000000..2bef620
--- /dev/null
+++ b/core/src/main/resources/template.pom
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>$groupId</groupId>
+ <artifactId>$artifactId</artifactId>
+ <version>$version</version>
+ <description>schema service auto generated POM</description>
+</project>
+
diff --git a/core/src/test/java/org/apache/rocketmq/schema/registry/core/CoreApplicationTests.java b/core/src/test/java/org/apache/rocketmq/schema/registry/core/CoreApplicationTests.java
new file mode 100644
index 0000000..a49fd05
--- /dev/null
+++ b/core/src/test/java/org/apache/rocketmq/schema/registry/core/CoreApplicationTests.java
@@ -0,0 +1,11 @@
+package org.apache.rocketmq.schema.registry.core;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class CoreApplicationTests {
+
+ @Test
+ void contextLoads() {}
+}
diff --git a/core/src/test/java/org/apache/rocketmq/schema/registry/core/controller/HelloControllerTest.java b/core/src/test/java/org/apache/rocketmq/schema/registry/core/controller/HelloControllerTest.java
new file mode 100644
index 0000000..4ed7622
--- /dev/null
+++ b/core/src/test/java/org/apache/rocketmq/schema/registry/core/controller/HelloControllerTest.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright 2022, Xiaomi.
+ * All rights reserved.
+ * Author: wangfan8@xiaomi.com
+ */
+
+package org.apache.rocketmq.schema.registry.core.controller;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+import org.junit.jupiter.api.Test;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.http.MediaType;
+import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
+
+@SpringBootTest
+@AutoConfigureMockMvc
+public class HelloControllerTest {
+ @Autowired
+ private MockMvc mvc;
+
+ @Test
+ public void getHello() throws Exception {
+ mvc.perform(MockMvcRequestBuilders.get("/").accept(MediaType.APPLICATION_JSON))
+ .andExpect(status().isOk())
+ .andExpect(content().string(equalTo("Greetings from Spring Boot!")));
+ }
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..0e7d641
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,171 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache</groupId>
+ <artifactId>apache</artifactId>
+ <version>18</version>
+ </parent>
+
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-schema-registry-all</artifactId>
+ <version>0.0.2-SNAPSHOT</version>
+ <packaging>pom</packaging>
+ <name>rocketmq-schema-registry</name>
+ <description>rocketmq-schema-registry</description>
+
+ <modules>
+ <module>common</module>
+ <module>core</module>
+ <module>schema-storage-rocketmq</module>
+ <module>storage-war</module>
+ </modules>
+
+ <properties>
+ <java.version>1.8</java.version>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <!-- internal package versions -->
+ <spring-boot.version>2.6.7</spring-boot.version>
+ <spring.version>3.2.8-RELEASE</spring.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-dependencies</artifactId>
+ <version>${spring-boot.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ <version>${spring-boot.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <version>${spring-boot.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ <version>${spring-boot.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ <version>${spring-boot.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.aspectj</groupId>
+ <artifactId>aspectjweaver</artifactId>
+ <version>1.9.5</version>
+ </dependency>
+
+<!-- <dependency>-->
+<!-- <groupId>org.springframework</groupId>-->
+<!-- <artifactId>spring-aspects</artifactId>-->
+<!-- <version>${spring.version}</version>-->
+<!-- </dependency>-->
+
+<!-- <dependency>-->
+<!-- <groupId>org.springframework</groupId>-->
+<!-- <artifactId>spring-orm</artifactId>-->
+<!-- <version>${spring.version}</version>-->
+<!-- </dependency>-->
+
+<!-- <dependency>-->
+<!-- <groupId>org.springdoc</groupId>-->
+<!-- <artifactId>springdoc-openapi-ui</artifactId>-->
+<!-- <version>1.6.9</version>-->
+<!-- </dependency>-->
+
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-boot-starter</artifactId>
+ <version>3.0.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>1.18.24</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>annotations</artifactId>
+ <version>3.0.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>31.1-jre</version>
+ </dependency>
+
+ <dependency>
+ <groupId>net.sf.dozer</groupId>
+ <artifactId>dozer</artifactId>
+ <version>5.4.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.rocksdb</groupId>
+ <artifactId>rocksdbjni</artifactId>
+ <version>7.2.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.11.0</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <version>${spring-boot.version}</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <distributionManagement>
+ <repository>
+ <id>central</id>
+ <name>maven-release-virtual</name>
+ <url>
+ https://pkgs.d.xiaomi.net/artifactory/maven-release-virtual
+ </url>
+ </repository>
+ <snapshotRepository>
+ <id>snapshots</id>
+ <name>maven-snapshot-virtual</name>
+ <url>
+ https://pkgs.d.xiaomi.net/artifactory/maven-snapshot-virtual
+ </url>
+ </snapshotRepository>
+ </distributionManagement>
+
+</project>
diff --git a/schema-storage-rocketmq/pom.xml b/schema-storage-rocketmq/pom.xml
new file mode 100644
index 0000000..d91306f
--- /dev/null
+++ b/schema-storage-rocketmq/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-schema-registry-all</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>0.0.2-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>schema-storage-rocketmq</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>common</artifactId>
+ <version>0.0.2-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ <version>4.9.3</version>
+ </dependency>
+ </dependencies>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+
+</project>
\ No newline at end of file
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
new file mode 100644
index 0000000..1dbba4a
--- /dev/null
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.rocketmq;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaExistException;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException;
+import org.apache.rocketmq.schema.registry.common.json.JsonConverter;
+import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import static org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.DELETE_KEYS;
+import static org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.STORAGE_LOCAL_CACHE_PATH;
+import static org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.STORAGE_LOCAL_CACHE_PATH_DEFAULT;
+import static org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.STORAGE_ROCKETMQ_CONSUMER_GROUP;
+import static org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.STORAGE_ROCKETMQ_CONSUMER_GROUP_DEFAULT;
+import static org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.STORAGE_ROCKETMQ_NAMESRV;
+import static org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.STORAGE_ROCKETMQ_NAMESRV_DEFAULT;
+import static org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.STORAGE_ROCKETMQ_PRODUCER_GROUP;
+import static org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.STORAGE_ROCKETMQ_PRODUCER_GROUP_DEFAULT;
+import static org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.STORAGE_ROCKETMQ_TOPIC;
+import static org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.STORAGE_ROCKETMQ_TOPIC_DEFAULT;
+import static org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.STORAGE_ROCKSDB_SCHEMA_COLUMN_FAMILY;
+import static org.apache.rocketmq.schema.registry.storage.rocketmq.configs.RocketmqConfigConstants.STORAGE_ROCKSDB_SUBJECT_COLUMN_FAMILY;
+
+@Slf4j
+public class RocketmqClient {
+
+ private Properties properties;
+ private DefaultMQProducer producer;
+ private DefaultMQPushConsumer scheduleConsumer;
+ private String storageTopic;
+ private String cachePath;
+ private JsonConverter converter;
+ private final List<ColumnFamilyHandle> cfHandleList = new ArrayList<>();
+ private final List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+ private final Map<String, ColumnFamilyHandle> cfHandleMap = new HashMap<>();
+
+
+ /**
+ * RocksDB for cache
+ */
+ // TODO setCreateMissingColumnFamilies
+ private final Options options = new Options().setCreateIfMissing(true);
+ private final DBOptions dbOptions = new DBOptions().setCreateIfMissing(true);
+ private RocksDB cache;
+
+ public RocketmqClient(Properties props) {
+ init(props);
+ startRemoteStorage();
+ startLocalCache();
+ }
+
+ private void startLocalCache() {
+ try {
+ List<byte[]> cfs = RocksDB.listColumnFamilies(options, cachePath);
+ if (cfs.size() <= 1) {
+ List<byte[]> columnFamilies = Arrays.asList(STORAGE_ROCKSDB_SCHEMA_COLUMN_FAMILY,
+ STORAGE_ROCKSDB_SUBJECT_COLUMN_FAMILY);
+ // TODO: add default cf in handles when needed
+ cache = org.rocksdb.RocksDB.open(options, cachePath);
+ cfDescriptors.addAll(columnFamilies.stream()
+ .map(ColumnFamilyDescriptor::new)
+ .collect(Collectors.toList()));
+ cfHandleList.addAll(cache.createColumnFamilies(cfDescriptors));
+ } else {
+ cfDescriptors.addAll(cfs.stream()
+ .map(ColumnFamilyDescriptor::new)
+ .collect(Collectors.toList()));
+ cache = org.rocksdb.RocksDB.open(dbOptions, cachePath, cfDescriptors, cfHandleList);
+ }
+
+ cfHandleMap.putAll(
+ cfHandleList.stream().collect(Collectors.toMap(h -> {
+ try {
+ return new String(h.getName());
+ } catch (RocksDBException e) {
+ throw new SchemaException("Failed to open RocksDB", e);
+ }
+ }, h -> h)));
+
+ assert cfHandleList.size() >= 2;
+ } catch (RocksDBException e) {
+ throw new SchemaException("Failed to open RocksDB", e);
+ }
+ }
+
+ public void startRemoteStorage() {
+ try {
+ producer.start();
+
+ scheduleConsumer.subscribe(storageTopic, "*");
+ scheduleConsumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+ msgs.forEach(msg -> {
+ synchronized (this) {
+ try {
+ if (msg.getKeys().equals(DELETE_KEYS)) {
+ // delete
+ byte[] schemaFullName = msg.getBody();
+ byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName);
+ if (schemaInfoBytes != null) {
+ deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class));
+ cache.delete(schemaCfHandle(), schemaFullName);
+ }
+ } else {
+ byte[] schemaFullName = converter.toBytes(msg.getKeys());
+ byte[] schemaInfoBytes = msg.getBody();
+ SchemaInfo update = converter.fromJson(schemaInfoBytes, SchemaInfo.class);
+ byte[] lastRecordBytes = converter.toJsonAsBytes(update.getLastRecord());
+
+ byte[] result = cache.get(schemaCfHandle(), schemaFullName);
+ if (result == null) {
+ // register
+ cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
+ cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes);
+ } else {
+ SchemaInfo current = converter.fromJson(result, SchemaInfo.class);
+ if (current.getLastRecordVersion() == update.getLastRecordVersion()) {
+ return;
+ }
+ if (current.getLastRecordVersion() > update.getLastRecordVersion()) {
+ throw new SchemaException("Schema version is invalid, update: "
+ + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion());
+ }
+
+ cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
+ update.getLastRecord().getSubjects().forEach(subject -> {
+ try {
+ cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecordBytes);
+ } catch (RocksDBException e) {
+ throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed.", e);
+ }
+ });
+ }
+ }
+ } catch (Throwable e) {
+ throw new SchemaException("Rebuild schema cache failed", e);
+ }
+ }
+ });
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ scheduleConsumer.start();
+ } catch (MQClientException e) {
+ throw new SchemaException("Rocketmq client start failed", e);
+ }
+ }
+
+ // TODO: next query on other machine may can't found schema in cache since send is async with receive
+ public SchemaInfo registerSchema(SchemaInfo schema) {
+ byte[] subjectFullName = converter.toBytes(schema.subjectFullName());
+ byte[] schemaFullName = converter.toBytes(schema.schemaFullName());
+ byte[] schemaInfo = converter.toJsonAsBytes(schema);
+ byte[] lastRecord = converter.toJsonAsBytes(schema.getLastRecord());
+
+ try {
+ synchronized (this) {
+ if (cache.get(schemaCfHandle(), schemaFullName) != null) {
+ throw new SchemaExistException(schema.getQualifiedName());
+ }
+
+ SendResult result = producer.send(new Message(storageTopic, "", schema.schemaFullName(), schemaInfo));
+ if (!result.getSendStatus().equals(SendStatus.SEND_OK)) {
+ throw new SchemaException("Register schema: " + schema.getQualifiedName() + " failed: " + result.getSendStatus());
+ }
+
+ cache.put(schemaCfHandle(), schemaFullName, schemaInfo);
+ cache.put(subjectCfHandle(), subjectFullName, lastRecord);
+ }
+
+ return schema;
+ } catch (SchemaException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SchemaException("register schema failed", e);
+ }
+ }
+
+ public void delete(QualifiedName name) {
+ byte[] schemaFullName = converter.toBytes(name.schemaFullName());
+
+ try {
+ synchronized (this) {
+ byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName);
+ if (schemaInfoBytes == null) {
+ throw new SchemaNotFoundException(name);
+ }
+
+ Message msg = new Message(storageTopic, "", DELETE_KEYS, schemaFullName);
+ SendResult result = producer.send(msg);
+ if (!result.getSendStatus().equals(SendStatus.SEND_OK)) {
+ throw new SchemaException("Delete schema: " + name + " failed: " + result.getSendStatus());
+ }
+
+ cache.delete(schemaCfHandle(), schemaFullName);
+ deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class));
+ }
+ } catch (SchemaException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SchemaException("Delete schema " + name + " failed", e);
+ }
+ }
+
+ public SchemaInfo updateSchema(SchemaInfo update) {
+ byte[] schemaFullName = converter.toBytes(update.schemaFullName());
+ byte[] schemaInfo = converter.toJsonAsBytes(update);
+ byte[] lastRecord = converter.toJsonAsBytes(update.getLastRecord());
+
+ try {
+ synchronized (this) {
+ Message msg = new Message(storageTopic, "", update.schemaFullName(), schemaInfo);
+ SendResult result = producer.send(msg);
+ if (result.getSendStatus() != SendStatus.SEND_OK) {
+ throw new SchemaException("Update " + update.getQualifiedName() + " failed: " + result.getSendStatus());
+ }
+
+ cache.put(schemaCfHandle(), schemaFullName, schemaInfo);
+ update.getLastRecord().getSubjects().forEach(subject -> {
+ try {
+ cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecord);
+ } catch (RocksDBException e) {
+ throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed", e);
+ }
+ });
+ }
+ return update;
+ } catch (SchemaException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SchemaException("Update schema " + update.getQualifiedName() + " failed", e);
+ }
+ }
+
+ public byte[] getSchema(QualifiedName qualifiedName) {
+ try {
+ // TODO: get from rocketmq topic if cache not contain
+ return cache.get(schemaCfHandle(), converter.toBytes(qualifiedName.schemaFullName()));
+ } catch (RocksDBException e) {
+ throw new SchemaException("Get schema " + qualifiedName + " failed", e);
+ }
+ }
+
+ public byte[] getBySubject(QualifiedName qualifiedName) {
+ try {
+ return cache.get(subjectCfHandle(), converter.toBytes(qualifiedName.subjectFullName()));
+ } catch (RocksDBException e) {
+ throw new SchemaException("Get by subject " + qualifiedName + " failed", e);
+ }
+ }
+
+ private void init(Properties props) {
+ this.properties = props;
+ this.storageTopic = props.getProperty(STORAGE_ROCKETMQ_TOPIC, STORAGE_ROCKETMQ_TOPIC_DEFAULT);
+ this.cachePath = props.getProperty(STORAGE_LOCAL_CACHE_PATH, STORAGE_LOCAL_CACHE_PATH_DEFAULT);
+
+ this.producer = new DefaultMQProducer(
+ props.getProperty(STORAGE_ROCKETMQ_PRODUCER_GROUP, STORAGE_ROCKETMQ_PRODUCER_GROUP_DEFAULT)
+ );
+
+ this.producer.setNamesrvAddr(
+ props.getProperty(STORAGE_ROCKETMQ_NAMESRV, STORAGE_ROCKETMQ_NAMESRV_DEFAULT)
+ );
+
+ this.scheduleConsumer = new DefaultMQPushConsumer(
+ props.getProperty(STORAGE_ROCKETMQ_CONSUMER_GROUP, STORAGE_ROCKETMQ_CONSUMER_GROUP_DEFAULT)
+ );
+
+ this.scheduleConsumer.setNamesrvAddr(
+ props.getProperty(STORAGE_ROCKETMQ_NAMESRV, STORAGE_ROCKETMQ_NAMESRV_DEFAULT)
+ );
+
+ this.converter = new JsonConverterImpl();
+ }
+
+ private ColumnFamilyHandle schemaCfHandle() {
+ return cfHandleMap.get(new String(STORAGE_ROCKSDB_SCHEMA_COLUMN_FAMILY));
+ }
+
+ private ColumnFamilyHandle subjectCfHandle() {
+ return cfHandleMap.get(new String(STORAGE_ROCKSDB_SUBJECT_COLUMN_FAMILY));
+ }
+
+ private void deleteAllSubject(SchemaInfo current) {
+ // delete subjects bind to any version
+ List<SchemaRecordInfo> allSchemaRecords = current.getDetails().getSchemaRecords();
+ List<String> allSubjects = allSchemaRecords.parallelStream()
+ .flatMap(record -> record.getSubjects().stream().map(SubjectInfo::fullName))
+ .collect(Collectors.toList());
+
+ allSubjects.forEach(subject -> {
+ try {
+ cache.delete(subjectCfHandle(), converter.toBytes(subject));
+ } catch (RocksDBException e) {
+ throw new SchemaException("Delete schema " + current.getQualifiedName() + "'s subjects failed", e);
+ }
+ });
+ }
+}
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
new file mode 100644
index 0000000..7eabe27
--- /dev/null
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.rocketmq;
+
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+
+public interface RocketmqStorageClient {
+
+ /**
+ * Standard error message for all default implementations.
+ */
+ String ERROR_MESSAGE_DEFAULT = "Not supported for rocketmq storage client";
+
+ /**
+ * Register rocketmq schema entity.
+ *
+ * @param schemaInfo schema info
+ */
+ default SchemaInfo register(SchemaInfo schemaInfo) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
+
+ /**
+ * Delete rocketmq schema entity.
+ *
+ * @param qualifiedName schema name
+ */
+ default void delete(QualifiedName qualifiedName) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
+
+ /**
+ * Update rocketmq schema entity.
+ *
+ * @param schemaInfo schema info
+ */
+ default SchemaInfo update(SchemaInfo schemaInfo) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
+
+ /**
+ * Get rocketmq schema entity.
+ *
+ * @param qualifiedName schema name
+ */
+ default SchemaInfo getSchema(QualifiedName qualifiedName) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
+
+ /**
+ * Get rocketmq schema entity by subject.
+ *
+ * @param qualifiedName schema name
+ */
+ default SchemaRecordInfo getBySubject(QualifiedName qualifiedName) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
+}
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
new file mode 100644
index 0000000..8b4752f
--- /dev/null
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.rocketmq;
+
+import java.io.File;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import org.apache.rocketmq.schema.registry.common.json.JsonConverter;
+import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+import org.apache.rocketmq.schema.registry.common.utils.CommonUtil;
+
+@Slf4j
+public class RocketmqStorageClientImpl implements RocketmqStorageClient {
+
+ private Properties storageConfig;
+ private RocketmqClient rocketmqClient;
+ private JsonConverter jsonConverter;
+
+ public RocketmqStorageClientImpl(StoragePluginContext context) {
+ storageConfig = CommonUtil.loadProperties(new File(context.getConfig().getStorageConfigPath()));
+ rocketmqClient = new RocketmqClient(storageConfig);
+ jsonConverter = new JsonConverterImpl();
+ }
+
+ /**
+ * Register rocketmq schema entity.
+ *
+ * @param schemaInfo schema info
+ */
+ @Override
+ public SchemaInfo register(SchemaInfo schemaInfo) {
+ return rocketmqClient.registerSchema(schemaInfo);
+ }
+
+ /**
+ * Delete rocketmq schema entity.
+ *
+ * @param qualifiedName schema name
+ */
+ @Override
+ public void delete(QualifiedName qualifiedName) {
+ rocketmqClient.delete(qualifiedName);
+ }
+
+ /**
+ * Update rocketmq schema entity.
+ *
+ * @param schemaInfo schema info
+ */
+ @Override
+ public SchemaInfo update(SchemaInfo schemaInfo) {
+ return rocketmqClient.updateSchema(schemaInfo);
+ }
+
+ /**
+ * Get rocketmq schema entity.
+ *
+ * @param qualifiedName schema name
+ */
+ @Override
+ public SchemaInfo getSchema(QualifiedName qualifiedName) {
+ byte[] result = rocketmqClient.getSchema(qualifiedName);
+ return result == null ? null : jsonConverter.fromJson(result, SchemaInfo.class);
+ }
+
+ /**
+ * Get rocketmq schema entity from subject.
+ *
+ * @param qualifiedName schema name
+ */
+ @Override
+ public SchemaRecordInfo getBySubject(QualifiedName qualifiedName) {
+ byte[] result = rocketmqClient.getBySubject(qualifiedName);
+ return result == null ? null : jsonConverter.fromJson(result, SchemaRecordInfo.class);
+ }
+}
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageFactory.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageFactory.java
new file mode 100644
index 0000000..1bb2852
--- /dev/null
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.rocketmq;
+
+import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.storage.SpringStorageFactory;
+import org.apache.rocketmq.schema.registry.common.storage.StorageService;
+import org.apache.rocketmq.schema.registry.storage.rocketmq.configs.ClientConfig;
+import org.apache.rocketmq.schema.registry.storage.rocketmq.configs.ServiceConfig;
+
+public class RocketmqStorageFactory extends SpringStorageFactory {
+
+ public RocketmqStorageFactory(final StoragePluginContext storageServiceContext) {
+ super(storageServiceContext);
+ super.registerClazz(ClientConfig.class, ServiceConfig.class);
+ super.refresh();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public StorageService<SchemaInfo> getStorageService() {
+ return this.ctx.getBean(RocketmqStorageService.class);
+ }
+}
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStoragePlugin.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStoragePlugin.java
new file mode 100644
index 0000000..8fda76e
--- /dev/null
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStoragePlugin.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.rocketmq;
+
+import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import org.apache.rocketmq.schema.registry.common.model.StorageType;
+import org.apache.rocketmq.schema.registry.common.storage.StorageFactory;
+import org.apache.rocketmq.schema.registry.common.storage.StoragePlugin;
+
+public class RocketmqStoragePlugin implements StoragePlugin {
+ private static final StorageType STORAGE_TYPE = StorageType.ROCKETMQ;
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public StorageType getType() {
+ return STORAGE_TYPE;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public StorageFactory load(final StoragePluginContext storageContext) {
+ return new RocketmqStorageFactory(storageContext);
+ }
+}
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
new file mode 100644
index 0000000..f15345c
--- /dev/null
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.rocketmq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+import org.apache.rocketmq.schema.registry.common.storage.StorageService;
+import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
+
+@Slf4j
+public class RocketmqStorageService implements StorageService<SchemaInfo> {
+
+ private final RocketmqStorageClient storageClient;
+
+ /**
+ * Constructor.
+ *
+ * @param storageClient rocketmq storage client
+ */
+ public RocketmqStorageService(RocketmqStorageClient storageClient) {
+ this.storageClient = storageClient;
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public SchemaInfo register(StorageServiceContext context, SchemaInfo schemaInfo) {
+ return storageClient.register(schemaInfo);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void delete(StorageServiceContext context, QualifiedName name) {
+ storageClient.delete(name);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public SchemaInfo update(StorageServiceContext context, SchemaInfo schemaInfo) {
+ return storageClient.update(schemaInfo);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public SchemaInfo get(StorageServiceContext context, QualifiedName name) {
+ return storageClient.getSchema(name);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public SchemaRecordInfo getBySubject(StorageServiceContext context, QualifiedName name) {
+ return storageClient.getBySubject(name);
+ }
+}
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageUtils.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageUtils.java
new file mode 100644
index 0000000..c42a542
--- /dev/null
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.rocketmq;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+public class RocketmqStorageUtils {
+
+ public static boolean validate(byte[] value, byte[] expect) throws RocksDBException {
+ // TODO
+ return true;
+ }
+
+ public static void checkAndPut(String key, byte[] value, RocksDB db) throws RocksDBException {
+ byte[] oldValue = db.get(key.getBytes(StandardCharsets.UTF_8));
+ if (oldValue == null || !Arrays.equals(oldValue, value)) {
+ db.put(key.getBytes(StandardCharsets.UTF_8), value);
+ }
+ }
+}
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ClientConfig.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ClientConfig.java
new file mode 100644
index 0000000..9ca2489
--- /dev/null
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ClientConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.rocketmq.configs;
+
+import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import org.apache.rocketmq.schema.registry.storage.rocketmq.RocketmqStorageClient;
+import org.apache.rocketmq.schema.registry.storage.rocketmq.RocketmqStorageClientImpl;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class ClientConfig {
+
+ /**
+ * Talos client instance.
+ *
+ * @param context connector context
+ * @return MetacatTalosClient
+ */
+ @Bean
+ public RocketmqStorageClient rocketmqStorageClient(StoragePluginContext context) {
+ return new RocketmqStorageClientImpl(context);
+ }
+}
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java
new file mode 100644
index 0000000..e541364
--- /dev/null
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.rocketmq.configs;
+
+import java.nio.charset.StandardCharsets;
+
+public class RocketmqConfigConstants {
+
+ public static final String STORAGE_TYPE = "storage.type";
+ public static final String STORAGE_TYPE_DEFAULT = "rocketmq";
+
+ public static final String STORAGE_ROCKETMQ_PRODUCER_GROUP = "storage.rocketmq.producer.group";
+ public static final String STORAGE_ROCKETMQ_PRODUCER_GROUP_DEFAULT = "default";
+
+ public static final String STORAGE_ROCKETMQ_CONSUMER_GROUP = "storage.rocketmq.consumer.group";
+ // TODO : ip
+ public static final String STORAGE_ROCKETMQ_CONSUMER_GROUP_DEFAULT = "default";
+
+ public static final String STORAGE_ROCKETMQ_NAMESRV = "storage.rocketmq.namesrv";
+ public static final String STORAGE_ROCKETMQ_NAMESRV_DEFAULT = "localhost:9876";
+
+ public static final String STORAGE_ROCKETMQ_TOPIC = "storage.rocketmq.topic";
+ public static final String STORAGE_ROCKETMQ_TOPIC_DEFAULT = "schema_registry_storage";
+
+ public static final String STORAGE_LOCAL_CACHE_PATH = "storage.local.cache.path";
+ public static final String STORAGE_LOCAL_CACHE_PATH_DEFAULT = "/tmp/schema-registry/cache";
+
+ public static final byte[] STORAGE_ROCKSDB_SCHEMA_DEFAULT_FAMILY = "default".getBytes(StandardCharsets.UTF_8);
+ public static final byte[] STORAGE_ROCKSDB_SCHEMA_COLUMN_FAMILY = "schema".getBytes(StandardCharsets.UTF_8);
+ public static final byte[] STORAGE_ROCKSDB_SUBJECT_COLUMN_FAMILY = "subject".getBytes(StandardCharsets.UTF_8);
+
+ public static final String DELETE_KEYS = "%DEL%";
+
+
+}
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ServiceConfig.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ServiceConfig.java
new file mode 100644
index 0000000..b8905ea
--- /dev/null
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ServiceConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.rocketmq.configs;
+
+import org.apache.rocketmq.schema.registry.storage.rocketmq.RocketmqStorageClient;
+import org.apache.rocketmq.schema.registry.storage.rocketmq.RocketmqStorageService;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class ServiceConfig {
+
+ /**
+ * create rocketmq storage service.
+ *
+ * @param storageClient rocketmq storage Client
+ *
+ * @return talos connector table Service
+ */
+ @Bean
+ public RocketmqStorageService rocketmqStorageService(RocketmqStorageClient storageClient) {
+ return new RocketmqStorageService(storageClient);
+ }
+}
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/package-info.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/package-info.java
new file mode 100644
index 0000000..fcd541a
--- /dev/null
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2017 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * talos connector.
+ *
+ * @author wangfan8
+ * @since 1.3.0
+ */
+
+@ParametersAreNonnullByDefault
+package org.apache.rocketmq.schema.registry.storage.rocketmq;
+import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/schema-storage-rocketmq/src/main/resources/META-INF/services/org.apache.rocketmq.schema.registry.common.storage.StoragePlugin b/schema-storage-rocketmq/src/main/resources/META-INF/services/org.apache.rocketmq.schema.registry.common.storage.StoragePlugin
new file mode 100644
index 0000000..e86d158
--- /dev/null
+++ b/schema-storage-rocketmq/src/main/resources/META-INF/services/org.apache.rocketmq.schema.registry.common.storage.StoragePlugin
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.rocketmq.schema.registry.storage.rocketmq.RocketmqStoragePlugin
+
\ No newline at end of file
diff --git a/schema-storage-rocketmq/src/main/resources/rocketmq.properties b/schema-storage-rocketmq/src/main/resources/rocketmq.properties
new file mode 100644
index 0000000..5070713
--- /dev/null
+++ b/schema-storage-rocketmq/src/main/resources/rocketmq.properties
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+storage.type=rocketmq
+#storage.local.cache.path
\ No newline at end of file
diff --git a/storage-war/pom.xml b/storage-war/pom.xml
new file mode 100644
index 0000000..d8be831
--- /dev/null
+++ b/storage-war/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-schema-registry-all</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>0.0.2-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>storage-war</artifactId>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+
+ <packaging>war</packaging>
+
+ <dependencies>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-war-plugin</artifactId>
+ <configuration>
+ <failOnMissingWebXml>false</failOnMissingWebXml>
+ </configuration>
+ </plugin>
+ </plugins>
+
+ </build>
+
+
+
+</project>
\ No newline at end of file
diff --git a/storage-war/src/main/java/org/apache/rocketmq/schema/registry/StorageWar.java b/storage-war/src/main/java/org/apache/rocketmq/schema/registry/StorageWar.java
new file mode 100644
index 0000000..5e3a86e
--- /dev/null
+++ b/storage-war/src/main/java/org/apache/rocketmq/schema/registry/StorageWar.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry;
+
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.ComponentScan.Filter;
+import org.springframework.context.annotation.FilterType;
+
+@SpringBootApplication
+@ComponentScan(excludeFilters = @Filter(type = FilterType.ASPECTJ, pattern = "com.netflix.metacat.connector..*"))
+public class StorageWar extends SpringBootServletInitializer {
+
+ public StorageWar() {
+ }
+
+ public static void main(String[] args) {
+ new StorageWar().configure(new SpringApplicationBuilder(StorageWar.class))
+ .run(args);
+ }
+}