You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/30 08:14:23 UTC
[rocketmq-schema-registry] 05/27: add getSchema method and fix storage consumer
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
commit b1a1fc95e5fddedfd0b6607f591b8e03e9cda3da
Author: huitong <yi...@alibaba-inc.com>
AuthorDate: Mon Jul 18 17:22:09 2022 +0800
add getSchema method and fix storage consumer
---
.DS_Store | Bin 0 -> 6148 bytes
.../schema/registry/common/QualifiedName.java | 21 ++-
.../exception/SchemaAuthorizedException.java | 5 +
.../exception/SchemaCompatibilityException.java | 4 +
.../registry/common/exception/SchemaException.java | 5 +
.../common/exception/SchemaExistException.java | 4 +
.../common/exception/SchemaNotFoundException.java | 4 +
.../registry/common/storage/StorageService.java | 6 +
.../common/storage/StorageServiceProxy.java | 19 ++-
core/pom.xml | 8 -
.../registry/core/api/v1/SchemaController.java | 79 ++++++++-
.../core/expection}/RequestExceptionHandler.java | 15 +-
.../registry/core/service/SchemaService.java | 9 +
.../registry/core/service/SchemaServiceImpl.java | 19 +++
schema-storage-rocketmq/pom.xml | 6 +
.../registry/storage/rocketmq/RocketmqClient.java | 182 ++++++++++++++-------
.../storage/rocketmq/RocketmqStorageClient.java | 11 ++
.../rocketmq/RocketmqStorageClientImpl.java | 42 ++++-
.../storage/rocketmq/RocketmqStorageService.java | 7 +
.../rocketmq/configs/RocketmqConfigConstants.java | 2 +-
.../src/main/resources/rocketmq.properties | 2 +-
21 files changed, 352 insertions(+), 98 deletions(-)
diff --git a/.DS_Store b/.DS_Store
new file mode 100644
index 0000000..a1ac880
Binary files /dev/null and b/.DS_Store differ
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
index 39c812f..0fb34bf 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
@@ -38,6 +38,7 @@ public class QualifiedName implements Serializable {
private String tenant;
private String subject;
private String schema;
+ private Long version;
public QualifiedName(
@Nullable final String cluster,
@@ -51,16 +52,30 @@ public class QualifiedName implements Serializable {
this.schema = schema;
}
+ public QualifiedName(
+ @Nullable final String cluster,
+ @Nullable final String tenant,
+ @Nullable final String subject,
+ @Nullable final String schema,
+ @Nullable final Long version
+ ) {
+ this.cluster= cluster;
+ this.tenant= tenant;
+ this.subject= subject;
+ this.schema = schema;
+ this.version = version;
+ }
+
public SubjectInfo subjectInfo() {
return new SubjectInfo(cluster, subject);
}
public String fullName() {
- return cluster + '/' + tenant + '/' + subject + '/' + schema;
+ return cluster + '/' + tenant + '/' + subject + '/' + schema + '/' + version;
}
public String schemaFullName() {
- return tenant + '/' + schema;
+ return tenant + '/' + schema + '/' + version;
}
public String subjectFullName() {
@@ -78,6 +93,8 @@ public class QualifiedName implements Serializable {
.append(subject).append('\"');
sb.append(",\"name\":\"")
.append(schema).append('\"');
+ sb.append(",\"version\":\"")
+ .append(version).append('\"');
sb.append('}');
return sb.toString();
}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaAuthorizedException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaAuthorizedException.java
index b190017..c1cea96 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaAuthorizedException.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaAuthorizedException.java
@@ -17,9 +17,14 @@
package org.apache.rocketmq.schema.registry.common.exception;
+import lombok.Getter;
+
+@Getter
public class SchemaAuthorizedException extends SchemaException {
private static final long serialVersionUID = 204882338833006991L;
+ private final int errorCode = 40101;
+
public SchemaAuthorizedException(final String tenant, final String schemaName) {
this(String.format("Schema: %s/%s not found, please check your configuration.", tenant, schemaName));
}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaCompatibilityException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaCompatibilityException.java
index c694cb0..cf2aafd 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaCompatibilityException.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaCompatibilityException.java
@@ -17,11 +17,15 @@
package org.apache.rocketmq.schema.registry.common.exception;
+import lombok.Getter;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
+@Getter
public class SchemaCompatibilityException extends SchemaException {
private static final long serialVersionUID = 2602020608319903212L;
+ private final int errorCode = 40901;
+
public SchemaCompatibilityException(final QualifiedName qualifiedName) {
this(String.format("Schema: %s validate failed.", qualifiedName));
}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaException.java
index 524b54c..a95fb90 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaException.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaException.java
@@ -17,8 +17,13 @@
package org.apache.rocketmq.schema.registry.common.exception;
+import lombok.Getter;
+
+@Getter
public class SchemaException extends RuntimeException {
+ private final int errorCode = 50001;
+
/** Constructor. */
public SchemaException() {
super();
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java
index bcea88c..462f0a0 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java
@@ -17,11 +17,15 @@
package org.apache.rocketmq.schema.registry.common.exception;
+import lombok.Getter;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
+@Getter
public class SchemaExistException extends SchemaException {
private static final long serialVersionUID = -9177284523006645052L;
+ private final int errorCode = 40401;
+
public SchemaExistException(final QualifiedName qualifiedName) {
this(String.format("Schema: %s is exist, please check your configuration.", qualifiedName));
}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaNotFoundException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaNotFoundException.java
index 12bcedb..0a3b7b5 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaNotFoundException.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaNotFoundException.java
@@ -17,11 +17,15 @@
package org.apache.rocketmq.schema.registry.common.exception;
+import lombok.Getter;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
+@Getter
public class SchemaNotFoundException extends SchemaException {
private static final long serialVersionUID = 554251224980156176L;
+ private final int errorCode = 40402;
+
public SchemaNotFoundException(final QualifiedName qualifiedName) {
this(String.format("Schema: %s not found, please check your configuration.", qualifiedName));
}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
index cdcc36b..7e57d1f 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.schema.registry.common.storage;
+import java.util.List;
+
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
import org.apache.rocketmq.schema.registry.common.model.BaseInfo;
@@ -78,4 +80,8 @@ public interface StorageService<T extends BaseInfo> {
default SchemaRecordInfo getBySubject(final StorageServiceContext context, final QualifiedName name) {
throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
}
+
+ default List<SchemaRecordInfo> listBySubject(final StorageServiceContext context, final QualifiedName name) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
index 4ff223c..bd134d8 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.schema.registry.common.storage;
+import java.util.List;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.context.RequestContext;
@@ -68,7 +70,7 @@ public class StorageServiceProxy {
*
* @param name Qualified name with tenant / name of schema
*/
- @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.schema()")
+ @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()")
public void delete(final QualifiedName name) {
final RequestContext requestContext = RequestContextManager.getContext();
final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
@@ -84,7 +86,7 @@ public class StorageServiceProxy {
* @param schemaInfo schema information instance
* @return true if errors after this should be ignored.
*/
- @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.schema()")
+ @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()")
public SchemaInfo update(final QualifiedName name, final SchemaInfo schemaInfo) {
final RequestContext requestContext = RequestContextManager.getContext();
final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
@@ -101,7 +103,7 @@ public class StorageServiceProxy {
* @param useCache if schema can be retrieved from cache
* @return schema information instance
*/
- @Cacheable(key = "'schema.' + #name.getTenant() + '/' + #name.schema()", condition = "#useCache")
+ @Cacheable(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()", condition = "#useCache")
public SchemaInfo get(final QualifiedName name, final boolean useCache) {
final RequestContext requestContext = RequestContextManager.getContext();
final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
@@ -110,7 +112,7 @@ public class StorageServiceProxy {
return storageService.get(storageServiceContext, name);
}
- @Cacheable(key = "'subject.' + #subject", condition = "#useCache")
+ @Cacheable(key = "'subject.' + #name.getSubject() + '/' + #name.getVersion()", condition = "#useCache && #name.getVersion() != null")
public SchemaRecordInfo getBySubject(final QualifiedName name, final boolean useCache) {
final RequestContext requestContext = RequestContextManager.getContext();
final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
@@ -118,4 +120,13 @@ public class StorageServiceProxy {
return storageService.getBySubject(storageServiceContext, name);
}
+
+ @Cacheable(key = "'subject.' + #name.getSubject()", condition = "#useCache")
+ public List<SchemaRecordInfo> listBySubject(final QualifiedName name, final boolean useCache) {
+ final RequestContext requestContext = RequestContextManager.getContext();
+ final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
+ final StorageService<SchemaInfo> storageService = storageManager.getStorageService();
+
+ return storageService.listBySubject(storageServiceContext, name);
+ }
}
diff --git a/core/pom.xml b/core/pom.xml
index 51a2647..2d2f203 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -30,14 +30,6 @@
<version>0.0.2-SNAPSHOT</version>
</dependency>
- <dependency>
- <groupId>com.sun</groupId>
- <artifactId>tools</artifactId>
- <version>1.8</version>
- <scope>system</scope>
- <systemPath>${java.home}/../lib/tools.jar</systemPath>
- </dependency>
-
</dependencies>
<build>
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
index c575e1d..109775b 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
@@ -12,6 +12,7 @@ import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.net.HttpURLConnection;
+import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
@@ -296,11 +297,11 @@ public class SchemaController {
@RequestMapping(
method = RequestMethod.GET,
- path = "/subject/{subject-name}"
+ path = "/subject/{subject-name}/schema"
)
@ApiOperation(
value = "Schema information",
- notes = "Schema information for the given schema name under the subject")
+ notes = "Schema information with the latest version under the subject")
@ApiResponses(
{
@ApiResponse(
@@ -314,7 +315,7 @@ public class SchemaController {
}
)
public SchemaRecordDto getSchemaBySubject(
- @ApiParam(value = "The name of the schema", required = true)
+ @ApiParam(value = "The name of the subject", required = true)
@PathVariable("subject-name") String subject
) {
return getSchemaBySubject("default", subject);
@@ -322,11 +323,11 @@ public class SchemaController {
@RequestMapping(
method = RequestMethod.GET,
- path = "/cluster/{cluster-name}/subject/{subject-name}"
+ path = "/cluster/{cluster-name}/subject/{subject-name}/schema"
)
@ApiOperation(
value = "Schema information",
- notes = "Schema information for the given schema name under the subject")
+ notes = "Schema information with the latest version under the subject")
@ApiResponses(
{
@ApiResponse(
@@ -352,4 +353,72 @@ public class SchemaController {
() -> schemaService.getBySubject(name)
);
}
+
+ @RequestMapping(
+ method = RequestMethod.GET,
+ path = "/cluster/{cluster-name}/subject/{subject-name}/schema/versions/{version}"
+ )
+ @ApiOperation(
+ value = "Schema information",
+ notes = "Schema information with the given version under the subject")
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_OK,
+ message = "The schema is returned"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested tenant or schema cannot be found"
+ )
+ }
+ )
+ public SchemaRecordDto getSchemaBySubject(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable("cluster-name") String cluster,
+ @ApiParam(value = "The name of the subject", required = true)
+ @PathVariable("subject-name") String subject,
+ @ApiParam(value = "The version of the schema", required = true)
+ @PathVariable("version") String version
+ ) {
+ QualifiedName name = new QualifiedName(cluster, null, subject, null, Long.parseLong(version));
+
+ return this.requestProcessor.processRequest(
+ "getSchemaBySubject",
+ () -> schemaService.getBySubject(name)
+ );
+ }
+
+ @RequestMapping(
+ method = RequestMethod.GET,
+ path = "/cluster/{cluster-name}/subject/{subject-name}/schema/versions"
+ )
+ @ApiOperation(
+ value = "Schema information",
+ notes = "Schema information with a list of versions under the subject")
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_OK,
+ message = "The schema is returned"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested tenant or schema cannot be found"
+ )
+ }
+ )
+ public List<SchemaRecordDto> getSchemaListBySubject(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable("cluster-name") String cluster,
+ @ApiParam(value = "The name of the subject", required = true)
+ @PathVariable("subject-name") String subject
+ ) {
+ QualifiedName name = new QualifiedName(cluster, null, subject, null);
+
+ return this.requestProcessor.processRequest(
+ "getSchemaListBySubject",
+ () -> schemaService.listBySubject(name)
+ );
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/RequestExceptionHandler.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/expection/RequestExceptionHandler.java
similarity index 80%
rename from common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/RequestExceptionHandler.java
rename to core/src/main/java/org/apache/rocketmq/schema/registry/core/expection/RequestExceptionHandler.java
index 0c0cb29..04a74f3 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/RequestExceptionHandler.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/expection/RequestExceptionHandler.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.common.exception;
+package org.apache.rocketmq.schema.registry.core.expection;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
@@ -37,21 +38,13 @@ public class RequestExceptionHandler {
* @param e The inner exception to handle
* @throws IOException on error in sending error
*/
- @ExceptionHandler({SchemaException.class})
+ @ExceptionHandler(SchemaException.class)
public void handleException(
final HttpServletResponse response,
final SchemaException e
) throws IOException {
- final int status;
-
- if (e instanceof SchemaNotFoundException) {
- status = HttpStatus.NOT_FOUND.value();
- } else {
- status = HttpStatus.INTERNAL_SERVER_ERROR.value();
- }
-
log.error("Global handle SchemaException: " + e.getMessage(), e);
- response.sendError(status, e.getMessage());
+ response.sendError(e.getErrorCode(), e.getMessage());
}
}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
index 2618c29..84c70cc 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.schema.registry.core.service;
+import java.util.List;
import java.util.Optional;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.dto.BaseDto;
@@ -67,4 +68,12 @@ public interface SchemaService<T extends BaseDto> {
* @return schema object with the schemaName
*/
SchemaRecordDto getBySubject(QualifiedName qualifiedName);
+
+ /**
+ * Query the schema object with the given subject name.
+ *
+ * @param qualifiedName subject of the schema binding
+ * @return schema object with the schemaName
+ */
+ List<SchemaRecordDto> listBySubject(QualifiedName qualifiedName);
}
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
index 4f4bfb2..56efb9c 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
@@ -21,6 +21,8 @@ import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.stream.Collectors;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.context.RequestContext;
@@ -236,6 +238,23 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> {
return storageUtil.convertToSchemaRecordDto(recordInfo);
}
+ @Override
+ public List<SchemaRecordDto> listBySubject(QualifiedName qualifiedName) {
+ final RequestContext requestContext = RequestContextManager.getContext();
+ log.info("register get request context: " + requestContext);
+
+ // CommonUtil.validateName(qualifiedName);
+ this.accessController.checkPermission("", qualifiedName.getSubject(), SchemaOperation.GET);
+
+ List<SchemaRecordInfo> recordInfos = storageServiceProxy.listBySubject(qualifiedName, config.isCacheEnabled());
+ if (recordInfos == null) {
+ throw new SchemaException("Subject: " + qualifiedName + " not exist");
+ }
+
+ log.info("list schema by subject: {}", qualifiedName.getSubject());
+ return recordInfos.stream().map(storageUtil::convertToSchemaRecordDto).collect(Collectors.toList());
+ }
+
private void checkSchemaExist(final QualifiedName qualifiedName) {
if (storageServiceProxy.get(qualifiedName, config.isCacheEnabled()) != null) {
throw new SchemaExistException(qualifiedName);
diff --git a/schema-storage-rocketmq/pom.xml b/schema-storage-rocketmq/pom.xml
index d91306f..a3f4b17 100644
--- a/schema-storage-rocketmq/pom.xml
+++ b/schema-storage-rocketmq/pom.xml
@@ -40,6 +40,12 @@
<artifactId>rocketmq-client</artifactId>
<version>4.9.3</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-tools</artifactId>
+ <version>4.9.3</version>
+ </dependency>
</dependencies>
<properties>
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index 1dbba4a..8e1a21f 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -19,12 +19,16 @@ package org.apache.rocketmq.schema.registry.storage.rocketmq;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -33,8 +37,14 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
import org.apache.rocketmq.schema.registry.common.exception.SchemaExistException;
@@ -44,6 +54,7 @@ import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl;
import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
@@ -68,9 +79,9 @@ import static org.apache.rocketmq.schema.registry.storage.rocketmq.configs.Rocke
@Slf4j
public class RocketmqClient {
- private Properties properties;
private DefaultMQProducer producer;
- private DefaultMQPushConsumer scheduleConsumer;
+ private DefaultLitePullConsumer scheduleConsumer;
+ private DefaultMQAdminExt mqAdminExt;
private String storageTopic;
private String cachePath;
private JsonConverter converter;
@@ -89,10 +100,40 @@ public class RocketmqClient {
public RocketmqClient(Properties props) {
init(props);
+ createStorageTopic();
startRemoteStorage();
startLocalCache();
}
+ private void createStorageTopic() {
+
+ try {
+ mqAdminExt.start();
+
+ try {
+ ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+ HashMap<String, BrokerData> brokerAddrTable = clusterInfo.getBrokerAddrTable();
+ for (BrokerData brokerData : brokerAddrTable.values()) {
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName(storageTopic);
+ topicConfig.setReadQueueNums(8);
+ topicConfig.setWriteQueueNums(8);
+ // TODO compact topic (TopicAttributes)
+ String brokerAddr = brokerData.selectBrokerAddr();
+ mqAdminExt.createAndUpdateTopicConfig(brokerAddr, topicConfig);
+ }
+ } catch (Exception e) {
+ throw new SchemaException("Failed to create storage rocketmq topic", e);
+ } finally {
+ mqAdminExt.shutdown();
+ }
+
+ } catch (MQClientException e) {
+ throw new SchemaException("Rocketmq admin tool start failed", e);
+ }
+
+ }
+
private void startLocalCache() {
try {
List<byte[]> cfs = RocksDB.listColumnFamilies(options, cachePath);
@@ -131,66 +172,77 @@ public class RocketmqClient {
try {
producer.start();
- scheduleConsumer.subscribe(storageTopic, "*");
- scheduleConsumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- msgs.forEach(msg -> {
- synchronized (this) {
- try {
- if (msg.getKeys().equals(DELETE_KEYS)) {
- // delete
- byte[] schemaFullName = msg.getBody();
- byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName);
- if (schemaInfoBytes != null) {
- deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class));
- cache.delete(schemaCfHandle(), schemaFullName);
- }
- } else {
- byte[] schemaFullName = converter.toBytes(msg.getKeys());
- byte[] schemaInfoBytes = msg.getBody();
- SchemaInfo update = converter.fromJson(schemaInfoBytes, SchemaInfo.class);
- byte[] lastRecordBytes = converter.toJsonAsBytes(update.getLastRecord());
-
- byte[] result = cache.get(schemaCfHandle(), schemaFullName);
- if (result == null) {
- // register
- cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
- cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes);
- } else {
- SchemaInfo current = converter.fromJson(result, SchemaInfo.class);
- if (current.getLastRecordVersion() == update.getLastRecordVersion()) {
- return;
- }
- if (current.getLastRecordVersion() > update.getLastRecordVersion()) {
- throw new SchemaException("Schema version is invalid, update: "
- + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion());
- }
-
- cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
- update.getLastRecord().getSubjects().forEach(subject -> {
- try {
- cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecordBytes);
- } catch (RocksDBException e) {
- throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed.", e);
- }
- });
- }
- }
- } catch (Throwable e) {
- throw new SchemaException("Rebuild schema cache failed", e);
- }
- }
- });
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ scheduleConsumer.setPullThreadNums(4);
+ scheduleConsumer.start();
+
+ Collection<MessageQueue> messageQueueList = scheduleConsumer.fetchMessageQueues(storageTopic);
+ scheduleConsumer.assign(messageQueueList);
+ messageQueueList.forEach(mq -> {
+ try {
+ scheduleConsumer.seekToBegin(mq);
+ } catch (MQClientException e) {
+ e.printStackTrace();
}
});
- scheduleConsumer.start();
+ while (true) {
+ List<MessageExt> msgList = scheduleConsumer.poll(1000);
+ if (msgList != null) {
+ msgList.forEach(this::consumeMessage);
+ }
+ }
} catch (MQClientException e) {
throw new SchemaException("Rocketmq client start failed", e);
}
}
+ private void consumeMessage(MessageExt msg) {
+ synchronized (this) {
+ try {
+ if (msg.getKeys().equals(DELETE_KEYS)) {
+ // delete
+ byte[] schemaFullName = msg.getBody();
+ byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName);
+ if (schemaInfoBytes != null) {
+ deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class));
+ cache.delete(schemaCfHandle(), schemaFullName);
+ }
+ } else {
+ byte[] schemaFullName = converter.toBytes(msg.getKeys());
+ byte[] schemaInfoBytes = msg.getBody();
+ SchemaInfo update = converter.fromJson(schemaInfoBytes, SchemaInfo.class);
+ byte[] lastRecordBytes = converter.toJsonAsBytes(update.getLastRecord());
+
+ byte[] result = cache.get(schemaCfHandle(), schemaFullName);
+ if (result == null) {
+ // register
+ cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
+ cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes);
+ } else {
+ SchemaInfo current = converter.fromJson(result, SchemaInfo.class);
+ if (current.getLastRecordVersion() == update.getLastRecordVersion()) {
+ return;
+ }
+ if (current.getLastRecordVersion() > update.getLastRecordVersion()) {
+ throw new SchemaException("Schema version is invalid, update: "
+ + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion());
+ }
+
+ cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
+ update.getLastRecord().getSubjects().forEach(subject -> {
+ try {
+ cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecordBytes);
+ } catch (RocksDBException e) {
+ throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed.", e);
+ }
+ });
+ }
+ }
+ } catch (Throwable e) {
+ throw new SchemaException("Rebuild schema cache failed", e);
+ }
+ }
+ }
+
// TODO: next query on other machine may can't found schema in cache since send is async with receive
public SchemaInfo registerSchema(SchemaInfo schema) {
byte[] subjectFullName = converter.toBytes(schema.subjectFullName());
@@ -277,25 +329,24 @@ public class RocketmqClient {
}
}
- public byte[] getSchema(QualifiedName qualifiedName) {
+ public byte[] getSchema(String schemaFullName) {
try {
// TODO: get from rocketmq topic if cache not contain
- return cache.get(schemaCfHandle(), converter.toBytes(qualifiedName.schemaFullName()));
+ return cache.get(schemaCfHandle(), converter.toBytes(schemaFullName));
} catch (RocksDBException e) {
- throw new SchemaException("Get schema " + qualifiedName + " failed", e);
+ throw new SchemaException("Get schema " + schemaFullName + " failed", e);
}
}
- public byte[] getBySubject(QualifiedName qualifiedName) {
+ public byte[] getBySubject(String subjectFullName) {
try {
- return cache.get(subjectCfHandle(), converter.toBytes(qualifiedName.subjectFullName()));
+ return cache.get(subjectCfHandle(), converter.toBytes(subjectFullName));
} catch (RocksDBException e) {
- throw new SchemaException("Get by subject " + qualifiedName + " failed", e);
+ throw new SchemaException("Get by subject " + subjectFullName + " failed", e);
}
}
private void init(Properties props) {
- this.properties = props;
this.storageTopic = props.getProperty(STORAGE_ROCKETMQ_TOPIC, STORAGE_ROCKETMQ_TOPIC_DEFAULT);
this.cachePath = props.getProperty(STORAGE_LOCAL_CACHE_PATH, STORAGE_LOCAL_CACHE_PATH_DEFAULT);
@@ -307,7 +358,7 @@ public class RocketmqClient {
props.getProperty(STORAGE_ROCKETMQ_NAMESRV, STORAGE_ROCKETMQ_NAMESRV_DEFAULT)
);
- this.scheduleConsumer = new DefaultMQPushConsumer(
+ this.scheduleConsumer = new DefaultLitePullConsumer(
props.getProperty(STORAGE_ROCKETMQ_CONSUMER_GROUP, STORAGE_ROCKETMQ_CONSUMER_GROUP_DEFAULT)
);
@@ -315,6 +366,11 @@ public class RocketmqClient {
props.getProperty(STORAGE_ROCKETMQ_NAMESRV, STORAGE_ROCKETMQ_NAMESRV_DEFAULT)
);
+ this.mqAdminExt = new DefaultMQAdminExt();
+ mqAdminExt.setNamesrvAddr(
+ props.getProperty(STORAGE_ROCKETMQ_NAMESRV, STORAGE_ROCKETMQ_NAMESRV_DEFAULT)
+ );
+
this.converter = new JsonConverterImpl();
}
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
index 7eabe27..ecb3fb6 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.schema.registry.storage.rocketmq;
+import java.util.List;
+
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
@@ -72,4 +74,13 @@ public interface RocketmqStorageClient {
default SchemaRecordInfo getBySubject(QualifiedName qualifiedName) {
throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
}
+
+ /**
+ * list all versions of rocketmq schema entity by subject.
+ *
+ * @param qualifiedName schema name
+ */
+ default List<SchemaRecordInfo> listBySubject(QualifiedName qualifiedName) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
}
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
index 8b4752f..2fa82ff 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
@@ -18,8 +18,14 @@
package org.apache.rocketmq.schema.registry.storage.rocketmq;
import java.io.File;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
import org.apache.rocketmq.schema.registry.common.json.JsonConverter;
@@ -78,7 +84,7 @@ public class RocketmqStorageClientImpl implements RocketmqStorageClient {
*/
@Override
public SchemaInfo getSchema(QualifiedName qualifiedName) {
- byte[] result = rocketmqClient.getSchema(qualifiedName);
+ byte[] result = rocketmqClient.getSchema(qualifiedName.schemaFullName());
return result == null ? null : jsonConverter.fromJson(result, SchemaInfo.class);
}
@@ -89,7 +95,37 @@ public class RocketmqStorageClientImpl implements RocketmqStorageClient {
*/
@Override
public SchemaRecordInfo getBySubject(QualifiedName qualifiedName) {
- byte[] result = rocketmqClient.getBySubject(qualifiedName);
- return result == null ? null : jsonConverter.fromJson(result, SchemaRecordInfo.class);
+ if (qualifiedName.getVersion() == null) {
+ byte[] result = rocketmqClient.getBySubject(qualifiedName.subjectFullName());
+ return result == null ? null : jsonConverter.fromJson(result, SchemaRecordInfo.class);
+ }
+
+ // schema version is given
+ SchemaInfo schemaInfo = getSchemaInfoBySubject(qualifiedName.subjectFullName());
+ if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
+ return null;
+ }
+ Map<Long, SchemaRecordInfo> versionSchemaMap = schemaInfo.getDetails().getSchemaRecords()
+ .stream().collect(Collectors.toMap(SchemaRecordInfo::getVersion, Function.identity()));
+ return versionSchemaMap.get(qualifiedName.getVersion());
+ }
+
+ @Override
+ public List<SchemaRecordInfo> listBySubject(QualifiedName qualifiedName) {
+ SchemaInfo schemaInfo = getSchemaInfoBySubject(qualifiedName.subjectFullName());
+ if (schemaInfo == null || schemaInfo.getDetails() == null) {
+ return null;
+ }
+ return schemaInfo.getDetails().getSchemaRecords();
+ }
+
+ private SchemaInfo getSchemaInfoBySubject(String subjectFullName) {
+ byte[] lastRecordBytes = rocketmqClient.getBySubject(subjectFullName);
+ if (lastRecordBytes == null) {
+ return null;
+ }
+ SchemaRecordInfo lastRecord = jsonConverter.fromJson(lastRecordBytes, SchemaRecordInfo.class);
+ byte[] result = rocketmqClient.getSchema(lastRecord.getSchema());
+ return result == null ? null : jsonConverter.fromJson(result, SchemaInfo.class);
}
}
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
index f15345c..ac0b2e9 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.schema.registry.storage.rocketmq;
+import java.util.List;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
@@ -77,4 +79,9 @@ public class RocketmqStorageService implements StorageService<SchemaInfo> {
public SchemaRecordInfo getBySubject(StorageServiceContext context, QualifiedName name) {
return storageClient.getBySubject(name);
}
+
+ @Override
+ public List<SchemaRecordInfo> listBySubject(StorageServiceContext context, QualifiedName name) {
+ return storageClient.listBySubject(name);
+ }
}
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java
index e541364..31e6744 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java
@@ -35,7 +35,7 @@ public class RocketmqConfigConstants {
public static final String STORAGE_ROCKETMQ_NAMESRV_DEFAULT = "localhost:9876";
public static final String STORAGE_ROCKETMQ_TOPIC = "storage.rocketmq.topic";
- public static final String STORAGE_ROCKETMQ_TOPIC_DEFAULT = "schema_registry_storage";
+ public static final String STORAGE_ROCKETMQ_TOPIC_DEFAULT = "RMQ_SYS_schema_registry_storage";
public static final String STORAGE_LOCAL_CACHE_PATH = "storage.local.cache.path";
public static final String STORAGE_LOCAL_CACHE_PATH_DEFAULT = "/tmp/schema-registry/cache";
diff --git a/schema-storage-rocketmq/src/main/resources/rocketmq.properties b/schema-storage-rocketmq/src/main/resources/rocketmq.properties
index 5070713..3a94c6a 100644
--- a/schema-storage-rocketmq/src/main/resources/rocketmq.properties
+++ b/schema-storage-rocketmq/src/main/resources/rocketmq.properties
@@ -16,4 +16,4 @@
#
storage.type=rocketmq
-#storage.local.cache.path
\ No newline at end of file
+storage.local.cache.path=/Users/xyb/app/schema-registry/cache
\ No newline at end of file