You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/31 06:46:55 UTC
[rocketmq-schema-registry] 08/34: hide cluster and tenant in SchemaController
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
commit c7dffcdd857fc53b451ce117d76b568e9b64af7a
Author: huitong <yi...@alibaba-inc.com>
AuthorDate: Tue Jul 19 17:05:14 2022 +0800
hide cluster and tenant in SchemaController
---
.../schema/registry/common/QualifiedName.java | 4 +
.../registry/core/api/v1/SchemaController.java | 225 +++------------------
.../registry/storage/rocketmq/RocketmqClient.java | 9 +-
3 files changed, 29 insertions(+), 209 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
index 5cda52b..f97c2d0 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
@@ -34,6 +34,10 @@ import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
public class QualifiedName implements Serializable {
private static final long serialVersionUID = 2266514833942841209L;
+ public static final String DEFAULT_TENANT = "default";
+
+ public static final String DEFAULT_CLUSTER = "cluster";
+
private String cluster;
private String tenant;
private String subject;
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
index ab997fb..edbe75d 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
@@ -36,7 +36,7 @@ import org.springframework.web.bind.annotation.RestController;
*/
@RestController
@RequestMapping(
- path = "/schema-registry/v1/cluster/{cluster-name}",
+ path = "/schema-registry/v1",
produces = MediaType.APPLICATION_JSON_VALUE
)
@Api(
@@ -47,6 +47,8 @@ import org.springframework.web.bind.annotation.RestController;
@Slf4j
public class SchemaController {
+ private final String cluster;
+ private final String tenant;
private final RequestProcessor requestProcessor;
private final SchemaService<SchemaDto> schemaService;
@@ -61,6 +63,8 @@ public class SchemaController {
final RequestProcessor requestProcessor,
final SchemaService<SchemaDto> schemaService
) {
+ this.cluster = QualifiedName.DEFAULT_CLUSTER;
+ this.tenant = QualifiedName.DEFAULT_TENANT;
this.requestProcessor = requestProcessor;
this.schemaService = schemaService;
}
@@ -88,45 +92,6 @@ public class SchemaController {
}
)
public SchemaDto registerSchema(
- @ApiParam(value = "The cluster of the subject", required = true)
- @PathVariable(name = "cluster-name") final String clusterName,
- @ApiParam(value = "The subject of the schema", required = true)
- @PathVariable(name = "subject-name") final String subjectName,
- @ApiParam(value = "The name of the schema", required = true)
- @PathVariable("schema-name") final String schemaName,
- @ApiParam(value = "The schema detail", required = true)
- @RequestBody final SchemaDto schemaDto
- ) {
- return registerSchema(clusterName, "default", subjectName, schemaName, schemaDto);
- }
-
- @RequestMapping(
- method = RequestMethod.POST,
- path = "/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}",
- consumes = MediaType.APPLICATION_JSON_VALUE
- )
- @ResponseStatus(HttpStatus.CREATED)
- @ApiOperation(
- value = "Register a new schema",
- notes = "Return success if there were no errors registering the schema"
- )
- @ApiResponses(
- {
- @ApiResponse(
- code = HttpURLConnection.HTTP_CREATED,
- message = "The schema was registered"
- ),
- @ApiResponse(
- code = HttpURLConnection.HTTP_NOT_FOUND,
- message = "The requested schema cannot be registered"
- )
- }
- )
- public SchemaDto registerSchema(
- @ApiParam(value = "The cluster of the subject", required = true)
- @PathVariable(value = "cluster-name") final String cluster,
- @ApiParam(value = "The tenant of the schema", required = true)
- @PathVariable(value = "tenant-name") final String tenant,
@ApiParam(value = "The subject of the schema", required = true)
@PathVariable(name = "subject-name") final String subject,
@ApiParam(value = "The name of the schema", required = true)
@@ -148,13 +113,13 @@ public class SchemaController {
}
@RequestMapping(
- path = "/tenant/{tenant-name}/subject/{subject-name}/schema",
+ path = "/subject/{subject-name}/schema",
method = RequestMethod.DELETE
)
@ResponseStatus(HttpStatus.OK)
@ApiOperation(
value = "Delete schema",
- notes = "Delete the schema under the given tenant and subject"
+ notes = "Delete the schema under the given subject"
)
@ApiResponses(
{
@@ -169,10 +134,6 @@ public class SchemaController {
}
)
public SchemaDto deleteSchema(
- @ApiParam(value = "The cluster of the subject", required = true)
- @PathVariable("cluster-name") final String cluster,
- @ApiParam(value = "The tenant of the schema", required = true)
- @PathVariable("tenant-name") final String tenant,
@ApiParam(value = "The subject of the schema", required = true)
@PathVariable("subject-name") final String subject
) {
@@ -185,13 +146,13 @@ public class SchemaController {
}
@RequestMapping(
- path = "/tenant/{tenant-name}/subject/{subject-name}/schema/versions/{version}",
+ path = "/subject/{subject-name}/schema/versions/{version}",
method = RequestMethod.DELETE
)
@ResponseStatus(HttpStatus.OK)
@ApiOperation(
value = "Delete schema",
- notes = "Delete the schema under the given tenant, subject and version"
+ notes = "Delete the schema under the given subject and version"
)
@ApiResponses(
{
@@ -206,10 +167,6 @@ public class SchemaController {
}
)
public SchemaDto deleteSchema(
- @ApiParam(value = "The cluster of the subject", required = true)
- @PathVariable("cluster-name") final String cluster,
- @ApiParam(value = "The tenant of the schema", required = true)
- @PathVariable("tenant-name") final String tenant,
@ApiParam(value = "The subject of the schema", required = true)
@PathVariable("subject-name") final String subject,
@ApiParam(value = "The version of the schema", required = true)
@@ -223,42 +180,9 @@ public class SchemaController {
);
}
- @RequestMapping(
- path = "/subject/{subject-name}/schema/{schema-name}",
- method = RequestMethod.PUT,
- consumes = MediaType.APPLICATION_JSON_VALUE
- )
- @ApiOperation(
- value = "Update schema and generate new schema version",
- notes = "Update the given schema"
- )
- @ApiResponses(
- {
- @ApiResponse(
- code = HttpURLConnection.HTTP_OK,
- message = "Update schema success"
- ),
- @ApiResponse(
- code = HttpURLConnection.HTTP_NOT_FOUND,
- message = "The requested schema cannot be found"
- )
- }
- )
- public SchemaDto updateSchema(
- @ApiParam(value = "The cluster of the subject", required = true)
- @PathVariable("cluster-name") final String cluster,
- @ApiParam(value = "The subject of the schema", required = true)
- @PathVariable("subject-name") final String subject,
- @ApiParam(value = "The name of the schema", required = true)
- @PathVariable("schema-name") final String schemaName,
- @ApiParam(value = "The schema detail", required = true)
- @RequestBody final SchemaDto schemaDto
- ) {
- return this.updateSchema(cluster, "default", subject, schemaName, schemaDto);
- }
@RequestMapping(
- path = "/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}",
+ path = "/subject/{subject-name}/schema/{schema-name}",
method = RequestMethod.PUT,
consumes = MediaType.APPLICATION_JSON_VALUE
)
@@ -279,10 +203,6 @@ public class SchemaController {
}
)
public SchemaDto updateSchema(
- @ApiParam(value = "The cluster of the subject", required = true)
- @PathVariable(value = "cluster-name") final String cluster,
- @ApiParam(value = "The tenant of the schema", required = true)
- @PathVariable(value = "tenant-name") final String tenant,
@ApiParam(value = "The subject of the schema", required = true)
@PathVariable(name = "subject-name") final String subject,
@ApiParam(value = "The name of the schema", required = true)
@@ -298,13 +218,14 @@ public class SchemaController {
);
}
+
@RequestMapping(
method = RequestMethod.GET,
- path = "/tenant/{tenant-name}/subject/{subject-name}/schema"
+ path = "/subject/{subject-name}/schema"
)
@ApiOperation(
value = "Schema information",
- notes = "Schema information with the latest version under the tenant and subject")
+ notes = "Schema information with the latest version under the subject")
@ApiResponses(
{
@ApiResponse(
@@ -317,13 +238,9 @@ public class SchemaController {
)
}
)
- public SchemaRecordDto getSchemaByTenantSubject(
- @ApiParam(value = "The cluster of the subject", required = true)
- @PathVariable(value = "cluster-name") final String cluster,
- @ApiParam(value = "The tenant of the schema", required = true)
- @PathVariable(value = "tenant-name") final String tenant,
- @ApiParam(value = "The subject of the schema", required = true)
- @PathVariable(name = "subject-name") final String subject
+ public SchemaRecordDto getSchemaBySubject(
+ @ApiParam(value = "The name of the subject", required = true)
+ @PathVariable("subject-name") String subject
) {
QualifiedName name = new QualifiedName(cluster, tenant, subject, null);
log.info("Request for get schema for subject: {}", name.subjectFullName());
@@ -336,11 +253,11 @@ public class SchemaController {
@RequestMapping(
method = RequestMethod.GET,
- path = "/tenant/{tenant-name}/subject/{subject-name}/schema/versions/{version}"
+ path = "/subject/{subject-name}/schema/versions/{version}"
)
@ApiOperation(
value = "Schema information",
- notes = "Schema information with the given version under the tenant and the subject")
+ notes = "Schema information with the given version under the subject")
@ApiResponses(
{
@ApiResponse(
@@ -353,15 +270,11 @@ public class SchemaController {
)
}
)
- public SchemaRecordDto getSchemaByTenantSubject(
- @ApiParam(value = "The cluster of the subject", required = true)
- @PathVariable("cluster-name") final String cluster,
- @ApiParam(value = "The tenant of the schema", required = true)
- @PathVariable(value = "tenant-name") final String tenant,
+ public SchemaRecordDto getSchemaBySubject(
@ApiParam(value = "The name of the subject", required = true)
- @PathVariable("subject-name") final String subject,
+ @PathVariable("subject-name") String subject,
@ApiParam(value = "The version of the schema", required = true)
- @PathVariable("version") final String version
+ @PathVariable("version") String version
) {
QualifiedName name = new QualifiedName(cluster, tenant, subject, null, Long.parseLong(version));
@@ -373,11 +286,11 @@ public class SchemaController {
@RequestMapping(
method = RequestMethod.GET,
- path = "/tenant/{tenant-name}/subject/{subject-name}/schema/versions"
+ path = "/subject/{subject-name}/schema/versions"
)
@ApiOperation(
value = "Schema information",
- notes = "Schema information with a list of versions under the tenant and the subject")
+ notes = "Schema information with a list of versions under the subject")
@ApiResponses(
{
@ApiResponse(
@@ -390,11 +303,7 @@ public class SchemaController {
)
}
)
- public List<SchemaRecordDto> getSchemaListByTenantSubject(
- @ApiParam(value = "The cluster of the subject", required = true)
- @PathVariable("cluster-name") String cluster,
- @ApiParam(value = "The tenant of the schema", required = true)
- @PathVariable(value = "tenant-name") final String tenant,
+ public List<SchemaRecordDto> getSchemaListBySubject(
@ApiParam(value = "The name of the subject", required = true)
@PathVariable("subject-name") String subject
) {
@@ -405,90 +314,4 @@ public class SchemaController {
() -> schemaService.listBySubject(name)
);
}
-
- @RequestMapping(
- method = RequestMethod.GET,
- path = "/subject/{subject-name}/schema"
- )
- @ApiOperation(
- value = "Schema information",
- notes = "Schema information with the latest version under the subject")
- @ApiResponses(
- {
- @ApiResponse(
- code = HttpURLConnection.HTTP_OK,
- message = "The schema is returned"
- ),
- @ApiResponse(
- code = HttpURLConnection.HTTP_NOT_FOUND,
- message = "The requested tenant or schema cannot be found"
- )
- }
- )
- public SchemaRecordDto getSchemaBySubject(
- @ApiParam(value = "The cluster of the subject", required = true)
- @PathVariable("cluster-name") String cluster,
- @ApiParam(value = "The name of the subject", required = true)
- @PathVariable("subject-name") String subject
- ) {
- return getSchemaByTenantSubject(cluster, "default", subject);
- }
-
- @RequestMapping(
- method = RequestMethod.GET,
- path = "/subject/{subject-name}/schema/versions/{version}"
- )
- @ApiOperation(
- value = "Schema information",
- notes = "Schema information with the given version under the subject")
- @ApiResponses(
- {
- @ApiResponse(
- code = HttpURLConnection.HTTP_OK,
- message = "The schema is returned"
- ),
- @ApiResponse(
- code = HttpURLConnection.HTTP_NOT_FOUND,
- message = "The requested tenant or schema cannot be found"
- )
- }
- )
- public SchemaRecordDto getSchemaBySubject(
- @ApiParam(value = "The cluster of the subject", required = true)
- @PathVariable("cluster-name") String cluster,
- @ApiParam(value = "The name of the subject", required = true)
- @PathVariable("subject-name") String subject,
- @ApiParam(value = "The version of the schema", required = true)
- @PathVariable("version") String version
- ) {
- return getSchemaByTenantSubject(cluster, "default", subject, version);
- }
-
- @RequestMapping(
- method = RequestMethod.GET,
- path = "/subject/{subject-name}/schema/versions"
- )
- @ApiOperation(
- value = "Schema information",
- notes = "Schema information with a list of versions under the subject")
- @ApiResponses(
- {
- @ApiResponse(
- code = HttpURLConnection.HTTP_OK,
- message = "The schema is returned"
- ),
- @ApiResponse(
- code = HttpURLConnection.HTTP_NOT_FOUND,
- message = "The requested tenant or schema cannot be found"
- )
- }
- )
- public List<SchemaRecordDto> getSchemaListBySubject(
- @ApiParam(value = "The cluster of the subject", required = true)
- @PathVariable("cluster-name") String cluster,
- @ApiParam(value = "The name of the subject", required = true)
- @PathVariable("subject-name") String subject
- ) {
- return getSchemaListByTenantSubject(cluster, "default", subject);
- }
}
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index 1d07084..1a25063 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -24,16 +24,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import java.util.stream.Collectors;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
@@ -44,8 +39,6 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
import org.apache.rocketmq.schema.registry.common.exception.SchemaExistException;