You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/31 08:39:10 UTC

[rocketmq-schema-registry] 10/37: optimize controller

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git

commit bccec3ffeab1648d744c0958e2af5cf20f7f8aff
Author: huitong <yi...@alibaba-inc.com>
AuthorDate: Wed Jul 20 16:42:47 2022 +0800

    optimize controller
---
 .../schema/registry/common/QualifiedName.java      |   8 +-
 .../schema/registry/common/dto/SubjectDto.java     |   3 +
 .../schema/registry/common/model/SubjectInfo.java  |   5 +-
 .../schema/registry/common/utils/CommonUtil.java   |   7 +-
 .../registry/core/api/v1/SchemaController.java     | 135 +++++++++++++++++++--
 .../registry/core/service/SchemaServiceImpl.java   |  29 ++---
 .../registry/storage/rocketmq/RocketmqClient.java  |  22 +++-
 7 files changed, 169 insertions(+), 40 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
index f97c2d0..06b5059 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
@@ -34,10 +34,6 @@ import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
 public class QualifiedName implements Serializable {
     private static final long serialVersionUID = 2266514833942841209L;
 
-    public static final String DEFAULT_TENANT = "default";
-
-    public static final String DEFAULT_CLUSTER = "cluster";
-
     private String cluster;
     private String tenant;
     private String subject;
@@ -71,7 +67,7 @@ public class QualifiedName implements Serializable {
     }
 
     public SubjectInfo subjectInfo() {
-        return new SubjectInfo(cluster, subject);
+        return new SubjectInfo(cluster, tenant, subject);
     }
 
     public String fullName() {
@@ -79,7 +75,7 @@ public class QualifiedName implements Serializable {
     }
 
     public String schemaFullName() {
-        return schema;
+        return tenant + '/' + schema;
     }
 
     public String subjectFullName() {
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SubjectDto.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SubjectDto.java
index 2ff3ad2..4d19d5f 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SubjectDto.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SubjectDto.java
@@ -35,6 +35,9 @@ public class SubjectDto {
     @ApiModelProperty(value = "Cluster of this subject", required = true)
     private String cluster;
 
+    @ApiModelProperty(value = "Tenant of this subject", required = true)
+    private String tenant;
+
     @ApiModelProperty(value = "Name of this subject", required = true)
     private String subject;
 }
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java
index 9790f13..cc61275 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java
@@ -33,10 +33,11 @@ public class SubjectInfo implements Serializable {
     private static final long serialVersionUID = -92808722007777844L;
 
     private String cluster;
+    private String tenant;
     private String subject;
 
     public String fullName() {
-        return cluster + '/' + subject;
+        return cluster + '/' + tenant + '/' + subject;
     }
 
     @Override
@@ -44,6 +45,8 @@ public class SubjectInfo implements Serializable {
         final StringBuilder sb = new StringBuilder("{");
         sb.append("\"cluster\":\"")
             .append(cluster).append('\"');
+        sb.append("\"tenant\":\"")
+            .append(tenant).append('\"');
         sb.append(",\"subject\":\"")
             .append(subject).append('\"');
         sb.append('}');
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
index ddc2731..a29d6e6 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.schema.registry.common.utils;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import java.io.BufferedReader;
 import java.io.File;
@@ -60,9 +61,9 @@ import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
 public class CommonUtil {
 
     public static void validateName(QualifiedName qualifiedName) {
-//        Preconditions.checkNotNull(qualifiedName.getTenant(), "Tenant is null");
-//        Preconditions.checkNotNull(qualifiedName.getSubject(), "Subject is null");
-//        Preconditions.checkNotNull(qualifiedName.getName(), "Schema name is null");
+        Preconditions.checkNotNull(qualifiedName.getTenant(), "Tenant is null");
+        Preconditions.checkNotNull(qualifiedName.getSubject(), "Subject is null");
+        Preconditions.checkNotNull(qualifiedName.getSchema(), "Schema name is null");
     }
 
     public static boolean isQualifiedNameEmpty(QualifiedName qualifiedName) {
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
index 335294c..558d652 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
@@ -46,11 +46,13 @@ import org.springframework.web.bind.annotation.RestController;
 @Slf4j
 public class SchemaController {
 
-    private final String cluster;
-    private final String tenant;
     private final RequestProcessor requestProcessor;
     private final SchemaService<SchemaDto> schemaService;
 
+    public static final String DEFAULT_TENANT = "default";
+
+    public static final String DEFAULT_CLUSTER = "default";
+
     /**
      * Constructor.
      *
@@ -62,8 +64,6 @@ public class SchemaController {
         final RequestProcessor requestProcessor,
         final SchemaService<SchemaDto> schemaService
     ) {
-        this.cluster = QualifiedName.DEFAULT_CLUSTER;
-        this.tenant = QualifiedName.DEFAULT_TENANT;
         this.requestProcessor = requestProcessor;
         this.schemaService = schemaService;
     }
@@ -97,6 +97,43 @@ public class SchemaController {
         @PathVariable("schema-name") final String schemaName,
         @ApiParam(value = "The schema detail", required = true)
         @RequestBody final SchemaDto schemaDto
+    ) {
+        return registerSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schemaName, schemaDto);
+    }
+
+    @RequestMapping(
+        method = RequestMethod.POST,
+        path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}",
+        consumes = MediaType.APPLICATION_JSON_VALUE
+    )
+    @ResponseStatus(HttpStatus.CREATED)
+    @ApiOperation(
+        value = "Register a new schema",
+        notes = "Return success if there were no errors registering the schema"
+    )
+    @ApiResponses(
+        {
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_CREATED,
+                message = "The schema was registered"
+            ),
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_NOT_FOUND,
+                message = "The requested schema cannot be registered"
+            )
+        }
+    )
+    public SchemaDto registerSchema(
+        @ApiParam(value = "The cluster of the subject", required = true)
+        @PathVariable(value = "cluster-name") final String cluster,
+        @ApiParam(value = "The tenant of the schema", required = true)
+        @PathVariable(value = "tenant-name") final String tenant,
+        @ApiParam(value = "The subject of the schema", required = true)
+        @PathVariable(name = "subject-name") final String subject,
+        @ApiParam(value = "The name of the schema", required = true)
+        @PathVariable("schema-name") final String schemaName,
+        @ApiParam(value = "The schema detail", required = true)
+        @RequestBody final SchemaDto schemaDto
     ) {
         // TODO: support register by sql
         final QualifiedName name = new QualifiedName(cluster, tenant, subject, schemaName);
@@ -112,7 +149,7 @@ public class SchemaController {
     }
 
     @RequestMapping(
-        path = "/subject/{subject-name}/schema",
+        path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema",
         method = RequestMethod.DELETE
     )
     @ResponseStatus(HttpStatus.OK)
@@ -133,6 +170,10 @@ public class SchemaController {
         }
     )
     public SchemaDto deleteSchema(
+        @ApiParam(value = "The cluster of the subject", required = true)
+        @PathVariable(value = "cluster-name") final String cluster,
+        @ApiParam(value = "The tenant of the schema", required = true)
+        @PathVariable(value = "tenant-name") final String tenant,
         @ApiParam(value = "The subject of the schema", required = true)
         @PathVariable("subject-name") final String subject
     ) {
@@ -145,7 +186,7 @@ public class SchemaController {
     }
 
     @RequestMapping(
-        path = "/subject/{subject-name}/schema/versions/{version}",
+        path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions/{version}",
         method = RequestMethod.DELETE
     )
     @ResponseStatus(HttpStatus.OK)
@@ -166,6 +207,10 @@ public class SchemaController {
         }
     )
     public SchemaDto deleteSchema(
+        @ApiParam(value = "The cluster of the subject", required = true)
+        @PathVariable(value = "cluster-name") final String cluster,
+        @ApiParam(value = "The tenant of the schema", required = true)
+        @PathVariable(value = "tenant-name") final String tenant,
         @ApiParam(value = "The subject of the schema", required = true)
         @PathVariable("subject-name") final String subject,
         @ApiParam(value = "The version of the schema", required = true)
@@ -208,6 +253,42 @@ public class SchemaController {
         @PathVariable("schema-name") final String schemaName,
         @ApiParam(value = "The schema detail", required = true)
         @RequestBody final SchemaDto schemaDto
+    ) {
+        return updateSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schemaName, schemaDto);
+    }
+
+    @RequestMapping(
+        path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}",
+        method = RequestMethod.PUT,
+        consumes = MediaType.APPLICATION_JSON_VALUE
+    )
+    @ApiOperation(
+        value = "Update schema and generate new schema version",
+        notes = "Update the given schema"
+    )
+    @ApiResponses(
+        {
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_OK,
+                message = "Update schema success"
+            ),
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_NOT_FOUND,
+                message = "The requested schema cannot be found"
+            )
+        }
+    )
+    public SchemaDto updateSchema(
+        @ApiParam(value = "The cluster of the subject", required = true)
+        @PathVariable(value = "cluster-name") final String cluster,
+        @ApiParam(value = "The tenant of the schema", required = true)
+        @PathVariable(value = "tenant-name") final String tenant,
+        @ApiParam(value = "The subject of the schema", required = true)
+        @PathVariable(name = "subject-name") final String subject,
+        @ApiParam(value = "The name of the schema", required = true)
+        @PathVariable("schema-name") final String schemaName,
+        @ApiParam(value = "The schema detail", required = true)
+        @RequestBody final SchemaDto schemaDto
     ) {
         QualifiedName name = new QualifiedName(cluster, tenant, subject, schemaName);
         return this.requestProcessor.processRequest(
@@ -240,6 +321,36 @@ public class SchemaController {
     public SchemaRecordDto getSchemaBySubject(
         @ApiParam(value = "The name of the subject", required = true)
         @PathVariable("subject-name") String subject
+    ) {
+        return getSchemaBySubject(DEFAULT_CLUSTER, DEFAULT_CLUSTER, subject);
+    }
+
+    @RequestMapping(
+        method = RequestMethod.GET,
+        path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema"
+    )
+    @ApiOperation(
+        value = "Schema information",
+        notes = "Schema information with the latest version under the subject")
+    @ApiResponses(
+        {
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_OK,
+                message = "The schema is returned"
+            ),
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_NOT_FOUND,
+                message = "The requested tenant or schema cannot be found"
+            )
+        }
+    )
+    public SchemaRecordDto getSchemaBySubject(
+        @ApiParam(value = "The cluster of the subject", required = true)
+        @PathVariable(value = "cluster-name") final String cluster,
+        @ApiParam(value = "The tenant of the schema", required = true)
+        @PathVariable(value = "tenant-name") final String tenant,
+        @ApiParam(value = "The name of the subject", required = true)
+        @PathVariable("subject-name") String subject
     ) {
         QualifiedName name = new QualifiedName(cluster, tenant, subject, null);
         log.info("Request for get schema for subject: {}", name.subjectFullName());
@@ -252,7 +363,7 @@ public class SchemaController {
 
     @RequestMapping(
         method = RequestMethod.GET,
-        path = "/subject/{subject-name}/schema/versions/{version}"
+        path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions/{version}"
     )
     @ApiOperation(
         value = "Schema information",
@@ -270,6 +381,10 @@ public class SchemaController {
         }
     )
     public SchemaRecordDto getSchemaBySubject(
+        @ApiParam(value = "The cluster of the subject", required = true)
+        @PathVariable(value = "cluster-name") final String cluster,
+        @ApiParam(value = "The tenant of the schema", required = true)
+        @PathVariable(value = "tenant-name") final String tenant,
         @ApiParam(value = "The name of the subject", required = true)
         @PathVariable("subject-name") String subject,
         @ApiParam(value = "The version of the schema", required = true)
@@ -285,7 +400,7 @@ public class SchemaController {
 
     @RequestMapping(
         method = RequestMethod.GET,
-        path = "/subject/{subject-name}/schema/versions"
+        path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions"
     )
     @ApiOperation(
         value = "Schema information",
@@ -303,6 +418,10 @@ public class SchemaController {
         }
     )
     public List<SchemaRecordDto> getSchemaListBySubject(
+        @ApiParam(value = "The cluster of the subject", required = true)
+        @PathVariable(value = "cluster-name") final String cluster,
+        @ApiParam(value = "The tenant of the schema", required = true)
+        @PathVariable(value = "tenant-name") final String tenant,
         @ApiParam(value = "The name of the subject", required = true)
         @PathVariable("subject-name") String subject
     ) {
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
index 5f72d34..9d368b9 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
@@ -17,34 +17,33 @@
 
 package org.apache.rocketmq.schema.registry.core.service;
 
-import com.google.common.base.Strings;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
+import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
-import org.apache.rocketmq.schema.registry.common.context.RequestContext;
 import org.apache.rocketmq.schema.registry.common.auth.AccessControlService;
+import org.apache.rocketmq.schema.registry.common.context.RequestContext;
+import org.apache.rocketmq.schema.registry.common.context.RequestContextManager;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
 import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaCompatibilityException;
 import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
 import org.apache.rocketmq.schema.registry.common.exception.SchemaExistException;
-import org.apache.rocketmq.schema.registry.common.exception.SchemaCompatibilityException;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException;
 import org.apache.rocketmq.schema.registry.common.model.Dependency;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaOperation;
 import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
 import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
 import org.apache.rocketmq.schema.registry.common.properties.GlobalConfig;
-import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
-import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
-import org.apache.rocketmq.schema.registry.common.model.SchemaOperation;
-import org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException;
-import org.apache.rocketmq.schema.registry.common.context.RequestContextManager;
-import org.apache.rocketmq.schema.registry.common.utils.IdGenerator;
-import org.apache.rocketmq.schema.registry.core.dependency.DependencyService;
 import org.apache.rocketmq.schema.registry.common.storage.StorageServiceProxy;
 import org.apache.rocketmq.schema.registry.common.utils.CommonUtil;
+import org.apache.rocketmq.schema.registry.common.utils.IdGenerator;
 import org.apache.rocketmq.schema.registry.common.utils.StorageUtil;
+import org.apache.rocketmq.schema.registry.core.dependency.DependencyService;
 
 @Slf4j
 public class SchemaServiceImpl implements SchemaService<SchemaDto> {
@@ -83,6 +82,7 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> {
         final RequestContext requestContext = RequestContextManager.getContext();
         log.info("register get request context: " + requestContext);
 
+        schemaDto.setQualifiedName(qualifiedName);
         checkSchemaValid(schemaDto);
         checkSchemaExist(qualifiedName);
 
@@ -120,6 +120,8 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> {
         final RequestContext requestContext = RequestContextManager.getContext();
         log.info("update get request context: " + requestContext);
 
+        schemaDto.setQualifiedName(qualifiedName);
+
         this.accessController.checkPermission("", "", SchemaOperation.UPDATE);
 
         SchemaInfo current = storageServiceProxy.get(qualifiedName, config.isCacheEnabled());
@@ -156,14 +158,9 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> {
         }
 
         if (update.getAudit() == null) {
-            // todo
             update.setAudit(current.getAudit());
         }
 
-        if (update.getQualifiedName() == null) {
-            update.setQualifiedName(current.getQualifiedName());
-        }
-
 //        checkSchemaValid(schemaDto);
         CommonUtil.validateCompatibility(update, current, current.getMeta().getCompatibility());
 
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index 913c360..5af299d 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -103,8 +103,8 @@ public class RocketmqClient {
     public RocketmqClient(Properties props) {
         init(props);
         createStorageTopic();
-        startRemoteStorage();
         startLocalCache();
+        startRemoteStorage();
     }
 
     private void createStorageTopic() {
@@ -198,17 +198,25 @@ public class RocketmqClient {
 
         @Override
         public void run() {
-            List<MessageExt> msgList = scheduleConsumer.poll(1000);
-            if (CollectionUtils.isNotEmpty(msgList)) {
-                msgList.forEach(this::consumeMessage);
+            try {
+                List<MessageExt> msgList = scheduleConsumer.poll(1000);
+                if (CollectionUtils.isNotEmpty(msgList)) {
+                    msgList.forEach(this::consumeMessage);
+                }
+                scheduleConsumer.commitSync();
+            } catch (Exception e) {
+                log.error("consume message exception, consume offset may not commit");
             }
         }
 
         private void consumeMessage(MessageExt msg) {
+            if (msg.getKeys() == null) {
+                return;
+            }
             synchronized (this) {
                 try {
                     log.info("receive msg, the content is {}", new String(msg.getBody()));
-                    if (DELETE_KEYS.equals(msg.getKeys())) {
+                    if (msg.getKeys().equals(DELETE_KEYS)) {
                         // delete
                         byte[] schemaFullName = msg.getBody();
                         byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName);
@@ -230,6 +238,7 @@ public class RocketmqClient {
                         } else {
                             SchemaInfo current = converter.fromJson(result, SchemaInfo.class);
                             if (current.getLastRecordVersion() == update.getLastRecordVersion()) {
+                                log.info("Schema version is the same, no need to update.");
                                 return;
                             }
                             if (current.getLastRecordVersion() > update.getLastRecordVersion()) {
@@ -248,7 +257,8 @@ public class RocketmqClient {
                         }
                     }
                 } catch (Throwable e) {
-                    throw new SchemaException("Rebuild schema cache failed", e);
+                    log.error("Update schema cache failed, msg {}", new String(msg.getBody()), e);
+                    throw new SchemaException("Update schema " + msg.getKeys() + " failed.", e);
                 }
             }
         }