You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/29 11:49:24 UTC
[rocketmq-schema-registry] 10/23: optimize controller
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
commit bccec3ffeab1648d744c0958e2af5cf20f7f8aff
Author: huitong <yi...@alibaba-inc.com>
AuthorDate: Wed Jul 20 16:42:47 2022 +0800
optimize controller
---
.../schema/registry/common/QualifiedName.java | 8 +-
.../schema/registry/common/dto/SubjectDto.java | 3 +
.../schema/registry/common/model/SubjectInfo.java | 5 +-
.../schema/registry/common/utils/CommonUtil.java | 7 +-
.../registry/core/api/v1/SchemaController.java | 135 +++++++++++++++++++--
.../registry/core/service/SchemaServiceImpl.java | 29 ++---
.../registry/storage/rocketmq/RocketmqClient.java | 22 +++-
7 files changed, 169 insertions(+), 40 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
index f97c2d0..06b5059 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
@@ -34,10 +34,6 @@ import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
public class QualifiedName implements Serializable {
private static final long serialVersionUID = 2266514833942841209L;
- public static final String DEFAULT_TENANT = "default";
-
- public static final String DEFAULT_CLUSTER = "cluster";
-
private String cluster;
private String tenant;
private String subject;
@@ -71,7 +67,7 @@ public class QualifiedName implements Serializable {
}
public SubjectInfo subjectInfo() {
- return new SubjectInfo(cluster, subject);
+ return new SubjectInfo(cluster, tenant, subject);
}
public String fullName() {
@@ -79,7 +75,7 @@ public class QualifiedName implements Serializable {
}
public String schemaFullName() {
- return schema;
+ return tenant + '/' + schema;
}
public String subjectFullName() {
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SubjectDto.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SubjectDto.java
index 2ff3ad2..4d19d5f 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SubjectDto.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SubjectDto.java
@@ -35,6 +35,9 @@ public class SubjectDto {
@ApiModelProperty(value = "Cluster of this subject", required = true)
private String cluster;
+ @ApiModelProperty(value = "Tenant of this subject", required = true)
+ private String tenant;
+
@ApiModelProperty(value = "Name of this subject", required = true)
private String subject;
}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java
index 9790f13..cc61275 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java
@@ -33,10 +33,11 @@ public class SubjectInfo implements Serializable {
private static final long serialVersionUID = -92808722007777844L;
private String cluster;
+ private String tenant;
private String subject;
public String fullName() {
- return cluster + '/' + subject;
+ return cluster + '/' + tenant + '/' + subject;
}
@Override
@@ -44,6 +45,8 @@ public class SubjectInfo implements Serializable {
final StringBuilder sb = new StringBuilder("{");
sb.append("\"cluster\":\"")
.append(cluster).append('\"');
+ sb.append("\"tenant\":\"")
+ .append(tenant).append('\"');
sb.append(",\"subject\":\"")
.append(subject).append('\"');
sb.append('}');
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
index ddc2731..a29d6e6 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.schema.registry.common.utils;
+import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.io.BufferedReader;
import java.io.File;
@@ -60,9 +61,9 @@ import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
public class CommonUtil {
public static void validateName(QualifiedName qualifiedName) {
-// Preconditions.checkNotNull(qualifiedName.getTenant(), "Tenant is null");
-// Preconditions.checkNotNull(qualifiedName.getSubject(), "Subject is null");
-// Preconditions.checkNotNull(qualifiedName.getName(), "Schema name is null");
+ Preconditions.checkNotNull(qualifiedName.getTenant(), "Tenant is null");
+ Preconditions.checkNotNull(qualifiedName.getSubject(), "Subject is null");
+ Preconditions.checkNotNull(qualifiedName.getSchema(), "Schema name is null");
}
public static boolean isQualifiedNameEmpty(QualifiedName qualifiedName) {
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
index 335294c..558d652 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
@@ -46,11 +46,13 @@ import org.springframework.web.bind.annotation.RestController;
@Slf4j
public class SchemaController {
- private final String cluster;
- private final String tenant;
private final RequestProcessor requestProcessor;
private final SchemaService<SchemaDto> schemaService;
+ public static final String DEFAULT_TENANT = "default";
+
+ public static final String DEFAULT_CLUSTER = "default";
+
/**
* Constructor.
*
@@ -62,8 +64,6 @@ public class SchemaController {
final RequestProcessor requestProcessor,
final SchemaService<SchemaDto> schemaService
) {
- this.cluster = QualifiedName.DEFAULT_CLUSTER;
- this.tenant = QualifiedName.DEFAULT_TENANT;
this.requestProcessor = requestProcessor;
this.schemaService = schemaService;
}
@@ -97,6 +97,43 @@ public class SchemaController {
@PathVariable("schema-name") final String schemaName,
@ApiParam(value = "The schema detail", required = true)
@RequestBody final SchemaDto schemaDto
+ ) {
+ return registerSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schemaName, schemaDto);
+ }
+
+ @RequestMapping(
+ method = RequestMethod.POST,
+ path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}",
+ consumes = MediaType.APPLICATION_JSON_VALUE
+ )
+ @ResponseStatus(HttpStatus.CREATED)
+ @ApiOperation(
+ value = "Register a new schema",
+ notes = "Return success if there were no errors registering the schema"
+ )
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_CREATED,
+ message = "The schema was registered"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested schema cannot be registered"
+ )
+ }
+ )
+ public SchemaDto registerSchema(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable(value = "cluster-name") final String cluster,
+ @ApiParam(value = "The tenant of the schema", required = true)
+ @PathVariable(value = "tenant-name") final String tenant,
+ @ApiParam(value = "The subject of the schema", required = true)
+ @PathVariable(name = "subject-name") final String subject,
+ @ApiParam(value = "The name of the schema", required = true)
+ @PathVariable("schema-name") final String schemaName,
+ @ApiParam(value = "The schema detail", required = true)
+ @RequestBody final SchemaDto schemaDto
) {
// TODO: support register by sql
final QualifiedName name = new QualifiedName(cluster, tenant, subject, schemaName);
@@ -112,7 +149,7 @@ public class SchemaController {
}
@RequestMapping(
- path = "/subject/{subject-name}/schema",
+ path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema",
method = RequestMethod.DELETE
)
@ResponseStatus(HttpStatus.OK)
@@ -133,6 +170,10 @@ public class SchemaController {
}
)
public SchemaDto deleteSchema(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable(value = "cluster-name") final String cluster,
+ @ApiParam(value = "The tenant of the schema", required = true)
+ @PathVariable(value = "tenant-name") final String tenant,
@ApiParam(value = "The subject of the schema", required = true)
@PathVariable("subject-name") final String subject
) {
@@ -145,7 +186,7 @@ public class SchemaController {
}
@RequestMapping(
- path = "/subject/{subject-name}/schema/versions/{version}",
+ path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions/{version}",
method = RequestMethod.DELETE
)
@ResponseStatus(HttpStatus.OK)
@@ -166,6 +207,10 @@ public class SchemaController {
}
)
public SchemaDto deleteSchema(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable(value = "cluster-name") final String cluster,
+ @ApiParam(value = "The tenant of the schema", required = true)
+ @PathVariable(value = "tenant-name") final String tenant,
@ApiParam(value = "The subject of the schema", required = true)
@PathVariable("subject-name") final String subject,
@ApiParam(value = "The version of the schema", required = true)
@@ -208,6 +253,42 @@ public class SchemaController {
@PathVariable("schema-name") final String schemaName,
@ApiParam(value = "The schema detail", required = true)
@RequestBody final SchemaDto schemaDto
+ ) {
+ return updateSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schemaName, schemaDto);
+ }
+
+ @RequestMapping(
+ path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}",
+ method = RequestMethod.PUT,
+ consumes = MediaType.APPLICATION_JSON_VALUE
+ )
+ @ApiOperation(
+ value = "Update schema and generate new schema version",
+ notes = "Update the given schema"
+ )
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_OK,
+ message = "Update schema success"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested schema cannot be found"
+ )
+ }
+ )
+ public SchemaDto updateSchema(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable(value = "cluster-name") final String cluster,
+ @ApiParam(value = "The tenant of the schema", required = true)
+ @PathVariable(value = "tenant-name") final String tenant,
+ @ApiParam(value = "The subject of the schema", required = true)
+ @PathVariable(name = "subject-name") final String subject,
+ @ApiParam(value = "The name of the schema", required = true)
+ @PathVariable("schema-name") final String schemaName,
+ @ApiParam(value = "The schema detail", required = true)
+ @RequestBody final SchemaDto schemaDto
) {
QualifiedName name = new QualifiedName(cluster, tenant, subject, schemaName);
return this.requestProcessor.processRequest(
@@ -240,6 +321,36 @@ public class SchemaController {
public SchemaRecordDto getSchemaBySubject(
@ApiParam(value = "The name of the subject", required = true)
@PathVariable("subject-name") String subject
+ ) {
+ return getSchemaBySubject(DEFAULT_CLUSTER, DEFAULT_CLUSTER, subject);
+ }
+
+ @RequestMapping(
+ method = RequestMethod.GET,
+ path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema"
+ )
+ @ApiOperation(
+ value = "Schema information",
+ notes = "Schema information with the latest version under the subject")
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_OK,
+ message = "The schema is returned"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested tenant or schema cannot be found"
+ )
+ }
+ )
+ public SchemaRecordDto getSchemaBySubject(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable(value = "cluster-name") final String cluster,
+ @ApiParam(value = "The tenant of the schema", required = true)
+ @PathVariable(value = "tenant-name") final String tenant,
+ @ApiParam(value = "The name of the subject", required = true)
+ @PathVariable("subject-name") String subject
) {
QualifiedName name = new QualifiedName(cluster, tenant, subject, null);
log.info("Request for get schema for subject: {}", name.subjectFullName());
@@ -252,7 +363,7 @@ public class SchemaController {
@RequestMapping(
method = RequestMethod.GET,
- path = "/subject/{subject-name}/schema/versions/{version}"
+ path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions/{version}"
)
@ApiOperation(
value = "Schema information",
@@ -270,6 +381,10 @@ public class SchemaController {
}
)
public SchemaRecordDto getSchemaBySubject(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable(value = "cluster-name") final String cluster,
+ @ApiParam(value = "The tenant of the schema", required = true)
+ @PathVariable(value = "tenant-name") final String tenant,
@ApiParam(value = "The name of the subject", required = true)
@PathVariable("subject-name") String subject,
@ApiParam(value = "The version of the schema", required = true)
@@ -285,7 +400,7 @@ public class SchemaController {
@RequestMapping(
method = RequestMethod.GET,
- path = "/subject/{subject-name}/schema/versions"
+ path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions"
)
@ApiOperation(
value = "Schema information",
@@ -303,6 +418,10 @@ public class SchemaController {
}
)
public List<SchemaRecordDto> getSchemaListBySubject(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable(value = "cluster-name") final String cluster,
+ @ApiParam(value = "The tenant of the schema", required = true)
+ @PathVariable(value = "tenant-name") final String tenant,
@ApiParam(value = "The name of the subject", required = true)
@PathVariable("subject-name") String subject
) {
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
index 5f72d34..9d368b9 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
@@ -17,34 +17,33 @@
package org.apache.rocketmq.schema.registry.core.service;
-import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
import java.util.stream.Collectors;
+import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
-import org.apache.rocketmq.schema.registry.common.context.RequestContext;
import org.apache.rocketmq.schema.registry.common.auth.AccessControlService;
+import org.apache.rocketmq.schema.registry.common.context.RequestContext;
+import org.apache.rocketmq.schema.registry.common.context.RequestContextManager;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaCompatibilityException;
import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
import org.apache.rocketmq.schema.registry.common.exception.SchemaExistException;
-import org.apache.rocketmq.schema.registry.common.exception.SchemaCompatibilityException;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException;
import org.apache.rocketmq.schema.registry.common.model.Dependency;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaOperation;
import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
import org.apache.rocketmq.schema.registry.common.properties.GlobalConfig;
-import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
-import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
-import org.apache.rocketmq.schema.registry.common.model.SchemaOperation;
-import org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException;
-import org.apache.rocketmq.schema.registry.common.context.RequestContextManager;
-import org.apache.rocketmq.schema.registry.common.utils.IdGenerator;
-import org.apache.rocketmq.schema.registry.core.dependency.DependencyService;
import org.apache.rocketmq.schema.registry.common.storage.StorageServiceProxy;
import org.apache.rocketmq.schema.registry.common.utils.CommonUtil;
+import org.apache.rocketmq.schema.registry.common.utils.IdGenerator;
import org.apache.rocketmq.schema.registry.common.utils.StorageUtil;
+import org.apache.rocketmq.schema.registry.core.dependency.DependencyService;
@Slf4j
public class SchemaServiceImpl implements SchemaService<SchemaDto> {
@@ -83,6 +82,7 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> {
final RequestContext requestContext = RequestContextManager.getContext();
log.info("register get request context: " + requestContext);
+ schemaDto.setQualifiedName(qualifiedName);
checkSchemaValid(schemaDto);
checkSchemaExist(qualifiedName);
@@ -120,6 +120,8 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> {
final RequestContext requestContext = RequestContextManager.getContext();
log.info("update get request context: " + requestContext);
+ schemaDto.setQualifiedName(qualifiedName);
+
this.accessController.checkPermission("", "", SchemaOperation.UPDATE);
SchemaInfo current = storageServiceProxy.get(qualifiedName, config.isCacheEnabled());
@@ -156,14 +158,9 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> {
}
if (update.getAudit() == null) {
- // todo
update.setAudit(current.getAudit());
}
- if (update.getQualifiedName() == null) {
- update.setQualifiedName(current.getQualifiedName());
- }
-
// checkSchemaValid(schemaDto);
CommonUtil.validateCompatibility(update, current, current.getMeta().getCompatibility());
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index 913c360..5af299d 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -103,8 +103,8 @@ public class RocketmqClient {
public RocketmqClient(Properties props) {
init(props);
createStorageTopic();
- startRemoteStorage();
startLocalCache();
+ startRemoteStorage();
}
private void createStorageTopic() {
@@ -198,17 +198,25 @@ public class RocketmqClient {
@Override
public void run() {
- List<MessageExt> msgList = scheduleConsumer.poll(1000);
- if (CollectionUtils.isNotEmpty(msgList)) {
- msgList.forEach(this::consumeMessage);
+ try {
+ List<MessageExt> msgList = scheduleConsumer.poll(1000);
+ if (CollectionUtils.isNotEmpty(msgList)) {
+ msgList.forEach(this::consumeMessage);
+ }
+ scheduleConsumer.commitSync();
+ } catch (Exception e) {
+ log.error("consume message exception, consume offset may not commit");
}
}
private void consumeMessage(MessageExt msg) {
+ if (msg.getKeys() == null) {
+ return;
+ }
synchronized (this) {
try {
log.info("receive msg, the content is {}", new String(msg.getBody()));
- if (DELETE_KEYS.equals(msg.getKeys())) {
+ if (msg.getKeys().equals(DELETE_KEYS)) {
// delete
byte[] schemaFullName = msg.getBody();
byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName);
@@ -230,6 +238,7 @@ public class RocketmqClient {
} else {
SchemaInfo current = converter.fromJson(result, SchemaInfo.class);
if (current.getLastRecordVersion() == update.getLastRecordVersion()) {
+ log.info("Schema version is the same, no need to update.");
return;
}
if (current.getLastRecordVersion() > update.getLastRecordVersion()) {
@@ -248,7 +257,8 @@ public class RocketmqClient {
}
}
} catch (Throwable e) {
- throw new SchemaException("Rebuild schema cache failed", e);
+ log.error("Update schema cache failed, msg {}", new String(msg.getBody()), e);
+ throw new SchemaException("Update schema " + msg.getKeys() + " failed.", e);
}
}
}