You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/30 08:14:23 UTC

[rocketmq-schema-registry] 05/27: add getSchema method and fix storage consumer

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git

commit b1a1fc95e5fddedfd0b6607f591b8e03e9cda3da
Author: huitong <yi...@alibaba-inc.com>
AuthorDate: Mon Jul 18 17:22:09 2022 +0800

    add getSchema method and fix storage consumer
---
 .DS_Store                                          | Bin 0 -> 6148 bytes
 .../schema/registry/common/QualifiedName.java      |  21 ++-
 .../exception/SchemaAuthorizedException.java       |   5 +
 .../exception/SchemaCompatibilityException.java    |   4 +
 .../registry/common/exception/SchemaException.java |   5 +
 .../common/exception/SchemaExistException.java     |   4 +
 .../common/exception/SchemaNotFoundException.java  |   4 +
 .../registry/common/storage/StorageService.java    |   6 +
 .../common/storage/StorageServiceProxy.java        |  19 ++-
 core/pom.xml                                       |   8 -
 .../registry/core/api/v1/SchemaController.java     |  79 ++++++++-
 .../core/expection}/RequestExceptionHandler.java   |  15 +-
 .../registry/core/service/SchemaService.java       |   9 +
 .../registry/core/service/SchemaServiceImpl.java   |  19 +++
 schema-storage-rocketmq/pom.xml                    |   6 +
 .../registry/storage/rocketmq/RocketmqClient.java  | 182 ++++++++++++++-------
 .../storage/rocketmq/RocketmqStorageClient.java    |  11 ++
 .../rocketmq/RocketmqStorageClientImpl.java        |  42 ++++-
 .../storage/rocketmq/RocketmqStorageService.java   |   7 +
 .../rocketmq/configs/RocketmqConfigConstants.java  |   2 +-
 .../src/main/resources/rocketmq.properties         |   2 +-
 21 files changed, 352 insertions(+), 98 deletions(-)

diff --git a/.DS_Store b/.DS_Store
new file mode 100644
index 0000000..a1ac880
Binary files /dev/null and b/.DS_Store differ
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
index 39c812f..0fb34bf 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
@@ -38,6 +38,7 @@ public class QualifiedName implements Serializable {
     private String tenant;
     private String subject;
     private String schema;
+    private Long version;
 
     public QualifiedName(
         @Nullable final String cluster,
@@ -51,16 +52,30 @@ public class QualifiedName implements Serializable {
         this.schema = schema;
     }
 
+    public QualifiedName(
+        @Nullable final String cluster,
+        @Nullable final String tenant,
+        @Nullable final String subject,
+        @Nullable final String schema,
+        @Nullable final Long version
+    ) {
+        this.cluster= cluster;
+        this.tenant= tenant;
+        this.subject= subject;
+        this.schema = schema;
+        this.version = version;
+    }
+
     public SubjectInfo subjectInfo() {
         return new SubjectInfo(cluster, subject);
     }
 
     public String fullName() {
-        return cluster + '/' + tenant + '/' + subject + '/' + schema;
+        return cluster + '/' + tenant + '/' + subject + '/' + schema + '/' + version;
     }
 
     public String schemaFullName() {
-        return tenant + '/' + schema;
+        return tenant + '/' + schema + '/' + version;
     }
 
     public String subjectFullName() {
@@ -78,6 +93,8 @@ public class QualifiedName implements Serializable {
             .append(subject).append('\"');
         sb.append(",\"name\":\"")
             .append(schema).append('\"');
+        sb.append(",\"version\":\"")
+            .append(version).append('\"');
         sb.append('}');
         return sb.toString();
     }
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaAuthorizedException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaAuthorizedException.java
index b190017..c1cea96 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaAuthorizedException.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaAuthorizedException.java
@@ -17,9 +17,14 @@
 
 package org.apache.rocketmq.schema.registry.common.exception;
 
+import lombok.Getter;
+
+@Getter
 public class SchemaAuthorizedException extends SchemaException {
     private static final long serialVersionUID = 204882338833006991L;
 
+    private final int errorCode = 40101;
+
     public SchemaAuthorizedException(final String tenant, final String schemaName) {
         this(String.format("Schema: %s/%s not found, please check your configuration.", tenant, schemaName));
     }
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaCompatibilityException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaCompatibilityException.java
index c694cb0..cf2aafd 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaCompatibilityException.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaCompatibilityException.java
@@ -17,11 +17,15 @@
 
 package org.apache.rocketmq.schema.registry.common.exception;
 
+import lombok.Getter;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 
+@Getter
 public class SchemaCompatibilityException extends SchemaException {
     private static final long serialVersionUID = 2602020608319903212L;
 
+    private final int errorCode = 40901;
+
     public SchemaCompatibilityException(final QualifiedName qualifiedName) {
         this(String.format("Schema: %s validate failed.", qualifiedName));
     }
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaException.java
index 524b54c..a95fb90 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaException.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaException.java
@@ -17,8 +17,13 @@
 
 package org.apache.rocketmq.schema.registry.common.exception;
 
+import lombok.Getter;
+
+@Getter
 public class SchemaException extends RuntimeException {
 
+    private final int errorCode = 50001;
+
     /** Constructor. */
     public SchemaException() {
         super();
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java
index bcea88c..462f0a0 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java
@@ -17,11 +17,15 @@
 
 package org.apache.rocketmq.schema.registry.common.exception;
 
+import lombok.Getter;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 
+@Getter
 public class SchemaExistException extends SchemaException {
   private static final long serialVersionUID = -9177284523006645052L;
 
+  private final int errorCode = 40401;
+
   public SchemaExistException(final QualifiedName qualifiedName) {
     this(String.format("Schema: %s is exist, please check your configuration.", qualifiedName));
   }
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaNotFoundException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaNotFoundException.java
index 12bcedb..0a3b7b5 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaNotFoundException.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaNotFoundException.java
@@ -17,11 +17,15 @@
 
 package org.apache.rocketmq.schema.registry.common.exception;
 
+import lombok.Getter;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 
+@Getter
 public class SchemaNotFoundException extends SchemaException {
     private static final long serialVersionUID = 554251224980156176L;
 
+    private final int errorCode = 40402;
+
     public SchemaNotFoundException(final QualifiedName qualifiedName) {
         this(String.format("Schema: %s not found, please check your configuration.", qualifiedName));
     }
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
index cdcc36b..7e57d1f 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.schema.registry.common.storage;
 
+import java.util.List;
+
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
 import org.apache.rocketmq.schema.registry.common.model.BaseInfo;
@@ -78,4 +80,8 @@ public interface StorageService<T extends BaseInfo> {
     default SchemaRecordInfo getBySubject(final StorageServiceContext context, final QualifiedName name) {
         throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
     }
+
+    default List<SchemaRecordInfo> listBySubject(final StorageServiceContext context, final QualifiedName name) {
+        throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
index 4ff223c..bd134d8 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.schema.registry.common.storage;
 
+import java.util.List;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 import org.apache.rocketmq.schema.registry.common.context.RequestContext;
@@ -68,7 +70,7 @@ public class StorageServiceProxy {
      *
      * @param name Qualified name with tenant / name of schema
      */
-    @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.schema()")
+    @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()")
     public void delete(final QualifiedName name) {
         final RequestContext requestContext = RequestContextManager.getContext();
         final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
@@ -84,7 +86,7 @@ public class StorageServiceProxy {
      * @param schemaInfo schema information instance
      * @return true if errors after this should be ignored.
      */
-    @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.schema()")
+    @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()")
     public SchemaInfo update(final QualifiedName name, final SchemaInfo schemaInfo) {
         final RequestContext requestContext = RequestContextManager.getContext();
         final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
@@ -101,7 +103,7 @@ public class StorageServiceProxy {
      * @param useCache if schema can be retrieved from cache
      * @return schema information instance
      */
-    @Cacheable(key = "'schema.' + #name.getTenant() + '/' + #name.schema()", condition = "#useCache")
+    @Cacheable(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()", condition = "#useCache")
     public SchemaInfo get(final QualifiedName name, final boolean useCache) {
         final RequestContext requestContext = RequestContextManager.getContext();
         final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
@@ -110,7 +112,7 @@ public class StorageServiceProxy {
         return storageService.get(storageServiceContext, name);
     }
 
-    @Cacheable(key = "'subject.' + #subject", condition = "#useCache")
+    @Cacheable(key = "'subject.' + #name.getSubject()  + '/' + #name.getVersion()", condition = "#useCache && #name.getVersion() != null")
     public SchemaRecordInfo getBySubject(final QualifiedName name, final boolean useCache) {
         final RequestContext requestContext = RequestContextManager.getContext();
         final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
@@ -118,4 +120,13 @@ public class StorageServiceProxy {
 
         return storageService.getBySubject(storageServiceContext, name);
     }
+
+    @Cacheable(key = "'subject.' + #name.getSubject()", condition = "#useCache")
+    public List<SchemaRecordInfo> listBySubject(final QualifiedName name, final boolean useCache) {
+        final RequestContext requestContext = RequestContextManager.getContext();
+        final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
+        final StorageService<SchemaInfo> storageService = storageManager.getStorageService();
+
+        return storageService.listBySubject(storageServiceContext, name);
+    }
 }
diff --git a/core/pom.xml b/core/pom.xml
index 51a2647..2d2f203 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -30,14 +30,6 @@
             <version>0.0.2-SNAPSHOT</version>
         </dependency>
 
-        <dependency>
-            <groupId>com.sun</groupId>
-            <artifactId>tools</artifactId>
-            <version>1.8</version>
-            <scope>system</scope>
-            <systemPath>${java.home}/../lib/tools.jar</systemPath>
-        </dependency>
-
     </dependencies>
 
     <build>
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
index c575e1d..109775b 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
@@ -12,6 +12,7 @@ import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import java.net.HttpURLConnection;
+import java.util.List;
 import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
@@ -296,11 +297,11 @@ public class SchemaController {
 
     @RequestMapping(
         method = RequestMethod.GET,
-        path = "/subject/{subject-name}"
+        path = "/subject/{subject-name}/schema"
     )
     @ApiOperation(
         value = "Schema information",
-        notes = "Schema information for the given schema name under the subject")
+        notes = "Schema information with the latest version under the subject")
     @ApiResponses(
         {
             @ApiResponse(
@@ -314,7 +315,7 @@ public class SchemaController {
         }
     )
     public SchemaRecordDto getSchemaBySubject(
-        @ApiParam(value = "The name of the schema", required = true)
+        @ApiParam(value = "The name of the subject", required = true)
         @PathVariable("subject-name") String subject
     ) {
         return getSchemaBySubject("default", subject);
@@ -322,11 +323,11 @@ public class SchemaController {
 
     @RequestMapping(
         method = RequestMethod.GET,
-        path = "/cluster/{cluster-name}/subject/{subject-name}"
+        path = "/cluster/{cluster-name}/subject/{subject-name}/schema"
     )
     @ApiOperation(
         value = "Schema information",
-        notes = "Schema information for the given schema name under the subject")
+        notes = "Schema information with the latest version under the subject")
     @ApiResponses(
         {
             @ApiResponse(
@@ -352,4 +353,72 @@ public class SchemaController {
             () -> schemaService.getBySubject(name)
         );
     }
+
+    @RequestMapping(
+        method = RequestMethod.GET,
+        path = "/cluster/{cluster-name}/subject/{subject-name}/schema/versions/{version}"
+    )
+    @ApiOperation(
+        value = "Schema information",
+        notes = "Schema information with the given version under the subject")
+    @ApiResponses(
+        {
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_OK,
+                message = "The schema is returned"
+            ),
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_NOT_FOUND,
+                message = "The requested tenant or schema cannot be found"
+            )
+        }
+    )
+    public SchemaRecordDto getSchemaBySubject(
+        @ApiParam(value = "The cluster of the subject", required = true)
+        @PathVariable("cluster-name") String cluster,
+        @ApiParam(value = "The name of the subject", required = true)
+        @PathVariable("subject-name") String subject,
+        @ApiParam(value = "The version of the schema", required = true)
+        @PathVariable("version") String version
+    ) {
+        QualifiedName name = new QualifiedName(cluster, null, subject, null, Long.parseLong(version));
+
+        return this.requestProcessor.processRequest(
+            "getSchemaBySubject",
+            () -> schemaService.getBySubject(name)
+        );
+    }
+
+    @RequestMapping(
+        method = RequestMethod.GET,
+        path = "/cluster/{cluster-name}/subject/{subject-name}/schema/versions"
+    )
+    @ApiOperation(
+        value = "Schema information",
+        notes = "Schema information with a list of versions under the subject")
+    @ApiResponses(
+        {
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_OK,
+                message = "The schema is returned"
+            ),
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_NOT_FOUND,
+                message = "The requested tenant or schema cannot be found"
+            )
+        }
+    )
+    public List<SchemaRecordDto> getSchemaListBySubject(
+        @ApiParam(value = "The cluster of the subject", required = true)
+        @PathVariable("cluster-name") String cluster,
+        @ApiParam(value = "The name of the subject", required = true)
+        @PathVariable("subject-name") String subject
+    ) {
+        QualifiedName name = new QualifiedName(cluster, null, subject, null);
+
+        return this.requestProcessor.processRequest(
+            "getSchemaListBySubject",
+            () -> schemaService.listBySubject(name)
+        );
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/RequestExceptionHandler.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/expection/RequestExceptionHandler.java
similarity index 80%
rename from common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/RequestExceptionHandler.java
rename to core/src/main/java/org/apache/rocketmq/schema/registry/core/expection/RequestExceptionHandler.java
index 0c0cb29..04a74f3 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/RequestExceptionHandler.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/expection/RequestExceptionHandler.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.schema.registry.common.exception;
+package org.apache.rocketmq.schema.registry.core.expection;
 
 import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
 
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
 import org.springframework.http.HttpStatus;
 import org.springframework.web.bind.annotation.ExceptionHandler;
 import org.springframework.web.bind.annotation.RestControllerAdvice;
@@ -37,21 +38,13 @@ public class RequestExceptionHandler {
      * @param e        The inner exception to handle
      * @throws IOException on error in sending error
      */
-    @ExceptionHandler({SchemaException.class})
+    @ExceptionHandler(SchemaException.class)
     public void handleException(
         final HttpServletResponse response,
         final SchemaException e
     ) throws IOException {
-        final int status;
-
-        if (e instanceof SchemaNotFoundException) {
-            status = HttpStatus.NOT_FOUND.value();
-        } else  {
-            status = HttpStatus.INTERNAL_SERVER_ERROR.value();
-        }
-
         log.error("Global handle SchemaException: " + e.getMessage(), e);
-        response.sendError(status, e.getMessage());
+        response.sendError(e.getErrorCode(), e.getMessage());
     }
 
 }
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
index 2618c29..84c70cc 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.schema.registry.core.service;
 
+import java.util.List;
 import java.util.Optional;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 import org.apache.rocketmq.schema.registry.common.dto.BaseDto;
@@ -67,4 +68,12 @@ public interface SchemaService<T extends BaseDto> {
      * @return schema object with the schemaName
      */
     SchemaRecordDto getBySubject(QualifiedName qualifiedName);
+
+    /**
+     * Query the schema object with the given subject name.
+     *
+     * @param qualifiedName subject of the schema binding
+     * @return schema object with the schemaName
+     */
+    List<SchemaRecordDto> listBySubject(QualifiedName qualifiedName);
 }
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
index 4f4bfb2..56efb9c 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
@@ -21,6 +21,8 @@ import com.google.common.base.Strings;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 import org.apache.rocketmq.schema.registry.common.context.RequestContext;
@@ -236,6 +238,23 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> {
         return storageUtil.convertToSchemaRecordDto(recordInfo);
     }
 
+    @Override
+    public List<SchemaRecordDto> listBySubject(QualifiedName qualifiedName) {
+        final RequestContext requestContext = RequestContextManager.getContext();
+        log.info("register get request context: " + requestContext);
+
+        //        CommonUtil.validateName(qualifiedName);
+        this.accessController.checkPermission("", qualifiedName.getSubject(), SchemaOperation.GET);
+
+        List<SchemaRecordInfo> recordInfos = storageServiceProxy.listBySubject(qualifiedName, config.isCacheEnabled());
+        if (recordInfos == null) {
+            throw new SchemaException("Subject: " + qualifiedName + " not exist");
+        }
+
+        log.info("list schema by subject: {}", qualifiedName.getSubject());
+        return recordInfos.stream().map(storageUtil::convertToSchemaRecordDto).collect(Collectors.toList());
+    }
+
     private void checkSchemaExist(final QualifiedName qualifiedName) {
         if (storageServiceProxy.get(qualifiedName, config.isCacheEnabled()) != null) {
             throw new SchemaExistException(qualifiedName);
diff --git a/schema-storage-rocketmq/pom.xml b/schema-storage-rocketmq/pom.xml
index d91306f..a3f4b17 100644
--- a/schema-storage-rocketmq/pom.xml
+++ b/schema-storage-rocketmq/pom.xml
@@ -40,6 +40,12 @@
             <artifactId>rocketmq-client</artifactId>
             <version>4.9.3</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+            <version>4.9.3</version>
+        </dependency>
     </dependencies>
 
     <properties>
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index 1dbba4a..8e1a21f 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -19,12 +19,16 @@ package org.apache.rocketmq.schema.registry.storage.rocketmq;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -33,8 +37,14 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
 import org.apache.rocketmq.schema.registry.common.exception.SchemaExistException;
@@ -44,6 +54,7 @@ import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl;
 import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
 import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
 import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.DBOptions;
@@ -68,9 +79,9 @@ import static org.apache.rocketmq.schema.registry.storage.rocketmq.configs.Rocke
 @Slf4j
 public class RocketmqClient {
 
-    private Properties properties;
     private DefaultMQProducer producer;
-    private DefaultMQPushConsumer scheduleConsumer;
+    private DefaultLitePullConsumer scheduleConsumer;
+    private DefaultMQAdminExt mqAdminExt;
     private String storageTopic;
     private String cachePath;
     private JsonConverter converter;
@@ -89,10 +100,40 @@ public class RocketmqClient {
 
     public RocketmqClient(Properties props) {
         init(props);
+        createStorageTopic();
         startRemoteStorage();
         startLocalCache();
     }
 
+    private void createStorageTopic() {
+
+        try {
+            mqAdminExt.start();
+
+            try {
+                ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+                HashMap<String, BrokerData> brokerAddrTable = clusterInfo.getBrokerAddrTable();
+                for (BrokerData brokerData : brokerAddrTable.values()) {
+                    TopicConfig topicConfig = new TopicConfig();
+                    topicConfig.setTopicName(storageTopic);
+                    topicConfig.setReadQueueNums(8);
+                    topicConfig.setWriteQueueNums(8);
+                    // TODO compact topic (TopicAttributes)
+                    String brokerAddr = brokerData.selectBrokerAddr();
+                    mqAdminExt.createAndUpdateTopicConfig(brokerAddr, topicConfig);
+                }
+            } catch (Exception e) {
+                throw new SchemaException("Failed to create storage rocketmq topic", e);
+            } finally {
+                mqAdminExt.shutdown();
+            }
+
+        } catch (MQClientException e) {
+            throw new SchemaException("Rocketmq admin tool start failed", e);
+        }
+
+    }
+
     private void startLocalCache() {
         try {
             List<byte[]> cfs = RocksDB.listColumnFamilies(options, cachePath);
@@ -131,66 +172,77 @@ public class RocketmqClient {
         try {
             producer.start();
 
-            scheduleConsumer.subscribe(storageTopic, "*");
-            scheduleConsumer.registerMessageListener(new MessageListenerConcurrently() {
-                @Override
-                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
-                    msgs.forEach(msg -> {
-                        synchronized (this) {
-                            try {
-                                if (msg.getKeys().equals(DELETE_KEYS)) {
-                                    // delete
-                                    byte[] schemaFullName = msg.getBody();
-                                    byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName);
-                                    if (schemaInfoBytes != null) {
-                                        deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class));
-                                        cache.delete(schemaCfHandle(), schemaFullName);
-                                    }
-                                } else {
-                                    byte[] schemaFullName = converter.toBytes(msg.getKeys());
-                                    byte[] schemaInfoBytes = msg.getBody();
-                                    SchemaInfo update = converter.fromJson(schemaInfoBytes, SchemaInfo.class);
-                                    byte[] lastRecordBytes = converter.toJsonAsBytes(update.getLastRecord());
-
-                                    byte[] result = cache.get(schemaCfHandle(), schemaFullName);
-                                    if (result == null) {
-                                        // register
-                                        cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
-                                        cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes);
-                                    } else {
-                                        SchemaInfo current = converter.fromJson(result, SchemaInfo.class);
-                                        if (current.getLastRecordVersion() == update.getLastRecordVersion()) {
-                                            return;
-                                        }
-                                        if (current.getLastRecordVersion() > update.getLastRecordVersion()) {
-                                            throw new SchemaException("Schema version is invalid, update: "
-                                                + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion());
-                                        }
-
-                                        cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
-                                        update.getLastRecord().getSubjects().forEach(subject -> {
-                                            try {
-                                                cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecordBytes);
-                                            } catch (RocksDBException e) {
-                                                throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed.", e);
-                                            }
-                                        });
-                                    }
-                                }
-                            } catch (Throwable e) {
-                                throw new SchemaException("Rebuild schema cache failed", e);
-                            }
-                        }
-                    });
-                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            scheduleConsumer.setPullThreadNums(4);
+            scheduleConsumer.start();
+
+            Collection<MessageQueue> messageQueueList = scheduleConsumer.fetchMessageQueues(storageTopic);
+            scheduleConsumer.assign(messageQueueList);
+            messageQueueList.forEach(mq -> {
+                try {
+                    scheduleConsumer.seekToBegin(mq);
+                } catch (MQClientException e) {
+                    e.printStackTrace();
                 }
             });
-            scheduleConsumer.start();
+            while (true) {
+                List<MessageExt> msgList = scheduleConsumer.poll(1000);
+                if (msgList != null) {
+                    msgList.forEach(this::consumeMessage);
+                }
+            }
         } catch (MQClientException e) {
             throw new SchemaException("Rocketmq client start failed", e);
         }
     }
 
+    private void consumeMessage(MessageExt msg) {
+        synchronized (this) {
+            try {
+                if (msg.getKeys().equals(DELETE_KEYS)) {
+                    // delete
+                    byte[] schemaFullName = msg.getBody();
+                    byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName);
+                    if (schemaInfoBytes != null) {
+                        deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class));
+                        cache.delete(schemaCfHandle(), schemaFullName);
+                    }
+                } else {
+                    byte[] schemaFullName = converter.toBytes(msg.getKeys());
+                    byte[] schemaInfoBytes = msg.getBody();
+                    SchemaInfo update = converter.fromJson(schemaInfoBytes, SchemaInfo.class);
+                    byte[] lastRecordBytes = converter.toJsonAsBytes(update.getLastRecord());
+
+                    byte[] result = cache.get(schemaCfHandle(), schemaFullName);
+                    if (result == null) {
+                        // register
+                        cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
+                        cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes);
+                    } else {
+                        SchemaInfo current = converter.fromJson(result, SchemaInfo.class);
+                        if (current.getLastRecordVersion() == update.getLastRecordVersion()) {
+                            return;
+                        }
+                        if (current.getLastRecordVersion() > update.getLastRecordVersion()) {
+                            throw new SchemaException("Schema version is invalid, update: "
+                                + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion());
+                        }
+
+                        cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
+                        update.getLastRecord().getSubjects().forEach(subject -> {
+                            try {
+                                cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecordBytes);
+                            } catch (RocksDBException e) {
+                                throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed.", e);
+                            }
+                        });
+                    }
+                }
+            } catch (Throwable e) {
+                throw new SchemaException("Rebuild schema cache failed", e);
+            }
+        }
+    }
+
     // TODO: next query on other machine may can't found schema in cache since send is async with receive
     public SchemaInfo registerSchema(SchemaInfo schema) {
         byte[] subjectFullName = converter.toBytes(schema.subjectFullName());
@@ -277,25 +329,24 @@ public class RocketmqClient {
         }
     }
 
-    public byte[] getSchema(QualifiedName qualifiedName) {
+    public byte[] getSchema(String schemaFullName) {
         try {
             // TODO: get from rocketmq topic if cache not contain
-            return cache.get(schemaCfHandle(), converter.toBytes(qualifiedName.schemaFullName()));
+            return cache.get(schemaCfHandle(), converter.toBytes(schemaFullName));
         } catch (RocksDBException e) {
-            throw new SchemaException("Get schema " + qualifiedName + " failed", e);
+            throw new SchemaException("Get schema " + schemaFullName + " failed", e);
         }
     }
 
-    public byte[] getBySubject(QualifiedName qualifiedName) {
+    public byte[] getBySubject(String subjectFullName) {
         try {
-            return cache.get(subjectCfHandle(), converter.toBytes(qualifiedName.subjectFullName()));
+            return cache.get(subjectCfHandle(), converter.toBytes(subjectFullName));
         } catch (RocksDBException e) {
-            throw new SchemaException("Get by subject " + qualifiedName + " failed", e);
+            throw new SchemaException("Get by subject " + subjectFullName + " failed", e);
         }
     }
 
     private void init(Properties props) {
-        this.properties = props;
         this.storageTopic = props.getProperty(STORAGE_ROCKETMQ_TOPIC, STORAGE_ROCKETMQ_TOPIC_DEFAULT);
         this.cachePath = props.getProperty(STORAGE_LOCAL_CACHE_PATH, STORAGE_LOCAL_CACHE_PATH_DEFAULT);
 
@@ -307,7 +358,7 @@ public class RocketmqClient {
             props.getProperty(STORAGE_ROCKETMQ_NAMESRV, STORAGE_ROCKETMQ_NAMESRV_DEFAULT)
         );
 
-        this.scheduleConsumer = new DefaultMQPushConsumer(
+        this.scheduleConsumer = new DefaultLitePullConsumer(
             props.getProperty(STORAGE_ROCKETMQ_CONSUMER_GROUP, STORAGE_ROCKETMQ_CONSUMER_GROUP_DEFAULT)
         );
 
@@ -315,6 +366,11 @@ public class RocketmqClient {
             props.getProperty(STORAGE_ROCKETMQ_NAMESRV, STORAGE_ROCKETMQ_NAMESRV_DEFAULT)
         );
 
+        this.mqAdminExt = new DefaultMQAdminExt();
+        mqAdminExt.setNamesrvAddr(
+            props.getProperty(STORAGE_ROCKETMQ_NAMESRV, STORAGE_ROCKETMQ_NAMESRV_DEFAULT)
+        );
+
         this.converter = new JsonConverterImpl();
     }
 
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
index 7eabe27..ecb3fb6 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.schema.registry.storage.rocketmq;
 
+import java.util.List;
+
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
 import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
@@ -72,4 +74,13 @@ public interface RocketmqStorageClient {
     default SchemaRecordInfo getBySubject(QualifiedName qualifiedName) {
         throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
     }
+
+    /**
+     * list all versions of rocketmq schema entity by subject.
+     *
+     * @param qualifiedName schema name
+     */
+    default List<SchemaRecordInfo> listBySubject(QualifiedName qualifiedName) {
+        throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+    }
 }
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
index 8b4752f..2fa82ff 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
@@ -18,8 +18,14 @@
 package org.apache.rocketmq.schema.registry.storage.rocketmq;
 
 import java.io.File;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
 import org.apache.rocketmq.schema.registry.common.json.JsonConverter;
@@ -78,7 +84,7 @@ public class RocketmqStorageClientImpl implements RocketmqStorageClient {
      */
     @Override
     public SchemaInfo getSchema(QualifiedName qualifiedName) {
-        byte[] result = rocketmqClient.getSchema(qualifiedName);
+        byte[] result = rocketmqClient.getSchema(qualifiedName.schemaFullName());
         return result == null ? null : jsonConverter.fromJson(result, SchemaInfo.class);
     }
 
@@ -89,7 +95,37 @@ public class RocketmqStorageClientImpl implements RocketmqStorageClient {
      */
     @Override
     public SchemaRecordInfo getBySubject(QualifiedName qualifiedName) {
-        byte[] result = rocketmqClient.getBySubject(qualifiedName);
-        return result == null ? null : jsonConverter.fromJson(result, SchemaRecordInfo.class);
+        if (qualifiedName.getVersion() == null) {
+            byte[] result = rocketmqClient.getBySubject(qualifiedName.subjectFullName());
+            return result == null ? null : jsonConverter.fromJson(result, SchemaRecordInfo.class);
+        }
+
+        // schema version is given
+        SchemaInfo schemaInfo = getSchemaInfoBySubject(qualifiedName.subjectFullName());
+        if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
+            return null;
+        }
+        Map<Long, SchemaRecordInfo> versionSchemaMap = schemaInfo.getDetails().getSchemaRecords()
+            .stream().collect(Collectors.toMap(SchemaRecordInfo::getVersion, Function.identity()));
+        return versionSchemaMap.get(qualifiedName.getVersion());
+    }
+
+    @Override
+    public List<SchemaRecordInfo> listBySubject(QualifiedName qualifiedName) {
+        SchemaInfo schemaInfo = getSchemaInfoBySubject(qualifiedName.subjectFullName());
+        if (schemaInfo == null || schemaInfo.getDetails() == null) {
+            return null;
+        }
+        return schemaInfo.getDetails().getSchemaRecords();
+    }
+
+    private SchemaInfo getSchemaInfoBySubject(String subjectFullName) {
+        byte[] lastRecordBytes = rocketmqClient.getBySubject(subjectFullName);
+        if (lastRecordBytes == null) {
+            return null;
+        }
+        SchemaRecordInfo lastRecord = jsonConverter.fromJson(lastRecordBytes, SchemaRecordInfo.class);
+        byte[] result = rocketmqClient.getSchema(lastRecord.getSchema());
+        return result == null ? null : jsonConverter.fromJson(result, SchemaInfo.class);
     }
 }
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
index f15345c..ac0b2e9 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.schema.registry.storage.rocketmq;
 
+import java.util.List;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
@@ -77,4 +79,9 @@ public class RocketmqStorageService implements StorageService<SchemaInfo> {
     public SchemaRecordInfo getBySubject(StorageServiceContext context, QualifiedName name) {
         return storageClient.getBySubject(name);
     }
+
+    @Override
+    public List<SchemaRecordInfo> listBySubject(StorageServiceContext context, QualifiedName name) {
+        return storageClient.listBySubject(name);
+    }
 }
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java
index e541364..31e6744 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java
@@ -35,7 +35,7 @@ public class RocketmqConfigConstants {
     public static final String STORAGE_ROCKETMQ_NAMESRV_DEFAULT = "localhost:9876";
 
     public static final String STORAGE_ROCKETMQ_TOPIC = "storage.rocketmq.topic";
-    public static final String STORAGE_ROCKETMQ_TOPIC_DEFAULT = "schema_registry_storage";
+    public static final String STORAGE_ROCKETMQ_TOPIC_DEFAULT = "RMQ_SYS_schema_registry_storage";
 
     public static final String STORAGE_LOCAL_CACHE_PATH = "storage.local.cache.path";
     public static final String STORAGE_LOCAL_CACHE_PATH_DEFAULT = "/tmp/schema-registry/cache";
diff --git a/schema-storage-rocketmq/src/main/resources/rocketmq.properties b/schema-storage-rocketmq/src/main/resources/rocketmq.properties
index 5070713..3a94c6a 100644
--- a/schema-storage-rocketmq/src/main/resources/rocketmq.properties
+++ b/schema-storage-rocketmq/src/main/resources/rocketmq.properties
@@ -16,4 +16,4 @@
 #
 
 storage.type=rocketmq
-#storage.local.cache.path
\ No newline at end of file
+storage.local.cache.path=/Users/xyb/app/schema-registry/cache
\ No newline at end of file