You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2023/02/17 07:55:05 UTC
[rocketmq-schema-registry] branch main updated: [ISSUE #49] Support cache for SchemaRegistryClient (#76)
This is an automated email from the ASF dual-hosted git repository.
huitong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
The following commit(s) were added to refs/heads/main by this push:
new 921cb9c [ISSUE #49] Support cache for SchemaRegistryClient (#76)
921cb9c is described below
commit 921cb9cdd18b1501d41ad9631607ffd843ec1675
Author: Ao Qiao <qi...@foxmail.com>
AuthorDate: Fri Feb 17 15:55:01 2023 +0800
[ISSUE #49] Support cache for SchemaRegistryClient (#76)
* copy normal client
* Add unit test
* Add licenses
* add test
* add comments
* add comments
* add new test
---
.../client/CachedSchemaRegistryClient.java | 488 +++++++++++++++++++++
.../client/SchemaRegistryClientFactory.java | 2 +-
.../client/CachedSchemaRegistryClientTest.java | 162 +++++++
3 files changed, 651 insertions(+), 1 deletion(-)
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java
new file mode 100644
index 0000000..8e87666
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
+import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.rest.RestService;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse;
+
+public class CachedSchemaRegistryClient implements SchemaRegistryClient {
+
+ private static final String DEFAULT_TENANT = "default";
+ private static final String DEFAULT_CLUSTER = "default";
+ private static final int DEFAULT_CAPACITY = 100;
+ private static final int DEFAULT_DURATION = 10;
+
+ private final RestService restService;
+
+ /**
+ * when deleting schema by subject, use these maps to invalidate all caches by KEY SubjectAndId / SubjectAndVersion / SubjectAndSchema
+ */
+ private final Map<String, Set<Long>> subjectToId; // restore recordIds that cached in SubjectAndId, used when delete all subject caches
+
+ private final Map<String, Set<Long>> subjectToVersion; // restore versions that cached in SubjectAndVersion, used when delete all subject caches
+
+ private final Map<String, Set<String>> subjectToSchema; // restore schema that cached in SubjectAndSchema, used when delete all subject caches
+
+ private final Cache<SubjectAndVersion, GetSchemaResponse> schemaCacheBySubjectAndVersion;
+
+ private final Cache<String, List<String>> subjectCache; //cache for subject
+
+ private final Cache<SubjectAndId, GetSchemaResponse> schemaCacheBySubjectAndId;
+
+ private final Cache<String, GetSchemaResponse> schemaCacheBySubject; //schema cache by Subject only
+
+ private final Cache<SubjectAndSchema, GetSchemaResponse> schemaCache; //schema cache by SubjectAndSchema
+
+ private final Cache<String, List<String>> tenantCache;
+
+ public CachedSchemaRegistryClient(RestService restService) {
+ this(restService, DEFAULT_CAPACITY, TimeUnit.MINUTES, DEFAULT_DURATION);
+ }
+
+ public CachedSchemaRegistryClient(RestService restService, int capacity, TimeUnit unit, int duration) {
+ this.restService = restService;
+ subjectToId = new HashMap<>();
+ subjectToVersion = new HashMap<>();
+ subjectToSchema = new HashMap<>();
+ this.subjectCache = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+ this.schemaCache = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+ this.tenantCache = CacheBuilder.newBuilder().maximumSize(1).expireAfterWrite(1, unit).build();
+ this.schemaCacheBySubjectAndVersion = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+ this.schemaCacheBySubjectAndId = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+ this.schemaCacheBySubject = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+ }
+
+ @Override
+ public RegisterSchemaResponse registerSchema(String subject, String schemaName,
+ RegisterSchemaRequest request) throws RestClientException, IOException {
+ return restService.registerSchema(subject, schemaName, request);
+ }
+
+ @Override
+ public RegisterSchemaResponse registerSchema(String clusterName, String tenant, String subjectName,
+ String schemaName, RegisterSchemaRequest request) throws IOException, RestClientException {
+ return restService.registerSchema(clusterName, tenant, subjectName, schemaName, request);
+ }
+
+ @Override
+ public DeleteSchemeResponse deleteSchema(String cluster, String tenant,
+ String subject) throws IOException, RestClientException {
+ String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject);
+
+ schemaCacheBySubject.invalidate(subjectFullName);
+ //invalidate schemaCacheBySubjectAndVersion
+ if (subjectToVersion.get(subjectFullName) != null) {
+ subjectToVersion.get(subjectFullName).forEach(
+ version -> schemaCacheBySubjectAndVersion.invalidate(new SubjectAndVersion(cluster, tenant, subject, version)));
+ }
+ //invalidate schemaCacheBySubjectAndId
+ if (subjectToId.get(subjectFullName) != null) {
+ subjectToId.get(subjectFullName).forEach(
+ recordId -> schemaCacheBySubjectAndId.invalidate(new SubjectAndId(cluster, tenant, subject, recordId)));
+ }
+ // invalidate schemaCache
+ if (subjectToSchema.get(subjectFullName) != null) {
+ subjectToSchema.get(subjectFullName).forEach(
+ schema -> schemaCache.invalidate(new SubjectAndSchema(cluster, tenant, subject, schema)));
+ }
+ subjectToVersion.remove(subjectFullName);
+ subjectToId.remove(subjectFullName);
+ subjectToSchema.remove(subjectFullName);
+ return restService.deleteSchema(cluster, tenant, subject);
+ }
+
+ @Override
+ public DeleteSchemeResponse deleteSchema(String cluster, String tenant, String subject,
+ long version) throws IOException, RestClientException {
+ String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject);
+ schemaCacheBySubject.invalidate(subjectFullName);
+ schemaCacheBySubjectAndVersion.invalidate(new SubjectAndVersion(cluster, tenant, subject, version));
+ subjectToVersion.get(subjectFullName).remove(version);
+
+ return restService.deleteSchema(cluster, tenant, subject, version);
+ }
+
+ @Override
+ public UpdateSchemaResponse updateSchema(String subject, String schemaName,
+ UpdateSchemaRequest request) throws RestClientException, IOException {
+ // invalidate schemaCache
+ schemaCache.invalidate(new SubjectAndSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schemaName));
+ return restService.updateSchema(subject, schemaName, request);
+ }
+
+ @Override
+ public UpdateSchemaResponse updateSchema(String cluster, String tenant, String subjectName,
+ String schemaName, UpdateSchemaRequest request) throws IOException, RestClientException {
+ // invalidate schemaCache
+ schemaCache.invalidate(new SubjectAndSchema(cluster, tenant, subjectName, schemaName));
+ return restService.updateSchema(cluster, tenant, subjectName, schemaName, request);
+ }
+
+ @Override
+ public GetSchemaResponse getSchemaBySubject(String subject) throws RestClientException, IOException {
+ String fullName = String.format("%s/%s/%s", DEFAULT_CLUSTER, DEFAULT_TENANT, subject);
+ GetSchemaResponse result = schemaCacheBySubject.getIfPresent(fullName);
+ if (result != null) {
+ return result;
+ }
+ result = restService.getSchemaBySubject(subject);
+ schemaCacheBySubject.put(fullName, result);
+ return result;
+ }
+
+ @Override
+ public GetSchemaResponse getSchemaBySubject(String cluster, String tenant,
+ String subject) throws RestClientException, IOException {
+ String fullName = String.format("%s/%s/%s", cluster, tenant, subject);
+ GetSchemaResponse result = schemaCacheBySubject.getIfPresent(fullName);
+ if (result != null) {
+ return result;
+ }
+ result = restService.getSchemaBySubject(cluster, tenant, subject);
+ schemaCacheBySubject.put(fullName, result);
+ return result;
+ }
+
+ @Override
+ public GetSchemaResponse getSchemaBySubjectAndVersion(String cluster, String tenant, String subject,
+ long version) throws IOException, RestClientException {
+ SubjectAndVersion subjectAndVersion = new SubjectAndVersion(cluster, tenant, subject, version);
+ GetSchemaResponse result = schemaCacheBySubjectAndVersion.getIfPresent(subjectAndVersion);
+ if (result != null) {
+ return result;
+ }
+
+ String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject);
+ Set<Long> versions = subjectToId.get(subjectFullName);
+ if (versions == null) {
+ versions = new HashSet<>();
+ }
+ versions.add(version);
+ subjectToId.put(subjectFullName, versions);
+
+ result = restService.getSchemaBySubject(cluster, tenant, subject, version);
+ schemaCacheBySubjectAndVersion.put(subjectAndVersion, result);
+ return result;
+ }
+
+ public GetSchemaResponse getSchemaBySubjectAndVersion(String subject, long version)
+ throws IOException, RestClientException {
+ SubjectAndVersion subjectAndVersion = new SubjectAndVersion(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, version);
+ GetSchemaResponse result = schemaCacheBySubjectAndVersion.getIfPresent(subjectAndVersion);
+ if (result != null) {
+ return result;
+ }
+
+ String subjectFullName = String.format("%s/%s/%s", DEFAULT_CLUSTER, DEFAULT_TENANT, subject);
+ Set<Long> versions = subjectToId.get(subjectFullName);
+ if (versions == null) {
+ versions = new HashSet<>();
+ }
+ versions.add(version);
+ subjectToId.put(subjectFullName, versions);
+
+ result = restService.getSchemaBySubject(subject, version);
+ schemaCacheBySubjectAndVersion.put(subjectAndVersion, result);
+ return result;
+ }
+
+ @Override
+ public GetSchemaResponse getTargetSchema(String cluster, String tenant, String subject, String schema)
+ throws RestClientException, IOException {
+ SubjectAndSchema subjectAndSchema = new SubjectAndSchema(cluster, tenant, subject, schema);
+ GetSchemaResponse result = schemaCache.getIfPresent(subjectAndSchema);
+ if (result != null) {
+ return result;
+ }
+ String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject);
+ result = restService.getTargetSchema(cluster, tenant, subject, schema);
+ schemaCache.put(subjectAndSchema, result);
+
+ Set<String> schemas = subjectToSchema.get(subjectFullName);
+ if (schemas == null) {
+ schemas = new HashSet<>();
+ }
+ schemas.add(schema);
+ subjectToSchema.put(subjectFullName, schemas);
+
+ return result;
+ }
+
+ @Override
+ public GetSchemaResponse getTargetSchema(String subject, String schema)
+ throws RestClientException, IOException {
+ SubjectAndSchema subjectAndSchema = new SubjectAndSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schema);
+ GetSchemaResponse result = schemaCache.getIfPresent(subjectAndSchema);
+ if (result != null) {
+ return result;
+ }
+ result = restService.getTargetSchema(subject, schema);
+ schemaCache.put(subjectAndSchema, result);
+
+ String subjectFullName = String.format("%s/%s/%s", DEFAULT_CLUSTER, DEFAULT_TENANT, subject);
+ Set<String> schemas = subjectToSchema.get(subjectFullName);
+ if (schemas == null) {
+ schemas = new HashSet<>();
+ }
+ schemas.add(schema);
+ subjectToSchema.put(subjectFullName, schemas);
+
+ return result;
+ }
+
+ @Override
+ public List<SchemaRecordDto> getSchemaListBySubject(String cluster, String tenant,
+ String subject) throws RestClientException, IOException {
+ return restService.getSchemaListBySubject(cluster, tenant, subject);
+ }
+
+ @Override
+ public List<String> getSubjectsByTenant(String cluster, String tenant)
+ throws RestClientException, IOException {
+ String fullName = String.format("%s/%s", cluster, tenant);
+ List<String> result = subjectCache.getIfPresent(fullName);
+ if (!result.isEmpty()) {
+ return result;
+ }
+ result = restService.getSubjectsByTenant(cluster, tenant);
+ subjectCache.put(fullName, result);
+ return result;
+ }
+
+ @Override
+ public List<String> getAllTenants(String cluster) throws RestClientException, IOException {
+ List<String> result = tenantCache.getIfPresent(cluster);
+ if (result != null) {
+ return result;
+ }
+ result = restService.getAllTenants(cluster);
+ tenantCache.put(cluster, result);
+ return result;
+ }
+
+ public GetSchemaResponse getSchemaByRecordId(String cluster, String tenant, String subject,
+ long recordId) throws RestClientException, IOException {
+ SubjectAndId subjectAndId = new SubjectAndId(cluster, tenant, subject, recordId);
+ GetSchemaResponse result = schemaCacheBySubjectAndId.getIfPresent(subjectAndId);
+ if (result != null) {
+ return result;
+ }
+ String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject);
+
+ Set<Long> recordIds = subjectToId.get(subjectFullName);
+ if (recordIds == null) {
+ recordIds = new HashSet<>();
+ }
+ recordIds.add(recordId);
+ subjectToId.put(subjectFullName, recordIds);
+
+ result = restService.getSchemaByRecordId(cluster, tenant, subject, recordId);
+ schemaCacheBySubjectAndId.put(subjectAndId, result);
+ return result;
+ }
+
+ @Override
+ public GetSchemaResponse getSchemaByRecordId(String subject, long recordId)
+ throws RestClientException, IOException {
+ SubjectAndId subjectAndId = new SubjectAndId(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, recordId);
+ GetSchemaResponse result = schemaCacheBySubjectAndId.getIfPresent(subjectAndId);
+ if (result != null) {
+ return result;
+ }
+
+ String subjectFullName = String.format("%s/%s/%s", DEFAULT_CLUSTER, DEFAULT_TENANT, subject);
+
+ Set<Long> recordIds = subjectToId.get(subjectFullName);
+ if (recordIds == null) {
+ recordIds = new HashSet<>();
+ }
+ recordIds.add(recordId);
+ subjectToId.put(subjectFullName, recordIds);
+
+ result = restService.getSchemaByRecordId(subject, recordId);
+ schemaCacheBySubjectAndId.put(subjectAndId, result);
+ return result;
+ }
+
+ @AllArgsConstructor
+ static class SubjectAndId {
+ private String cluster;
+ private String tenant;
+ private String subject;
+ private long recordId;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubjectAndId that = (SubjectAndId) o;
+ return Objects.equals(subject, that.subject)
+ && Objects.equals(tenant, that.tenant)
+ && Objects.equals(cluster, that.cluster)
+ && recordId == that.recordId;
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ public String getTenant() {
+ return tenant;
+ }
+
+ public String getSubject() {
+ return subject;
+ }
+
+ public long getRecordId() {
+ return recordId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cluster, tenant, subject, recordId);
+ }
+
+ @Override
+ public String toString() {
+ return "SubjectAndId{" + "cluster=" + cluster + "tenant=" + tenant + "subject='" + subject + '\'' + ", recordId=" + recordId + '}';
+ }
+ }
+
+ @AllArgsConstructor
+ static class SubjectAndVersion {
+ private String cluster;
+ private String tenant;
+ private String subject;
+ private long version;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubjectAndVersion that = (SubjectAndVersion) o;
+ return Objects.equals(subject, that.subject)
+ && Objects.equals(tenant, that.tenant)
+ && Objects.equals(cluster, that.cluster)
+ && version == that.version;
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ public String getTenant() {
+ return tenant;
+ }
+
+ public String getSubject() {
+ return subject;
+ }
+
+ public long getVersion() {
+ return version;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cluster, tenant, subject, version);
+ }
+
+ @Override
+ public String toString() {
+ return "SubjectAndId{" + "cluster=" + cluster + "tenant=" + tenant + "subject='" + subject + '\'' + ", version=" + version + '}';
+ }
+ }
+
+ @AllArgsConstructor
+ static class SubjectAndSchema {
+ private String cluster;
+ private String tenant;
+ private String subject;
+ private String schema;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubjectAndSchema that = (SubjectAndSchema) o;
+ return Objects.equals(subject, that.subject)
+ && Objects.equals(tenant, that.tenant)
+ && Objects.equals(cluster, that.cluster)
+ && Objects.equals(schema, that.schema);
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ public String getTenant() {
+ return tenant;
+ }
+
+ public String getSubject() {
+ return subject;
+ }
+
+ public String getSchema() {
+ return schema;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cluster, tenant, subject, schema);
+ }
+
+ @Override
+ public String toString() {
+ return "SubjectAndId{" + "cluster=" + cluster + "tenant=" + tenant + "subject='" + subject + '\'' + ", schema=" + schema + '}';
+ }
+ }
+}
+
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClientFactory.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClientFactory.java
index 26167c3..00e0d07 100644
--- a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClientFactory.java
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClientFactory.java
@@ -30,6 +30,6 @@ public class SchemaRegistryClientFactory {
*/
public static SchemaRegistryClient newClient(String baseUrl, Map<String, String> map) {
RestService restService = null == map ? new RestService(baseUrl) : new RestService(baseUrl, map);
- return new NormalSchemaRegistryClient(restService);
+ return new CachedSchemaRegistryClient(restService);
}
}
diff --git a/client/src/test/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClientTest.java b/client/src/test/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClientTest.java
new file mode 100644
index 0000000..a377d34
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClientTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.rest.RestService;
+import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.model.Compatibility;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class CachedSchemaRegistryClientTest {
+
+ private final static String baseUrl = "http://localhost:8080";
+ private final static String topic = "TopicTest";
+
+ static RestService restService;
+ static NormalSchemaRegistryClient normalSchemaRegistryClient;
+ static CachedSchemaRegistryClient cachedSchemaRegistryClient;
+
+ static CachedSchemaRegistryClient cachedSchemaRegistryClientCapacity2;
+
+ @BeforeAll
+ static void setUp() {
+ restService = new RestService(baseUrl);
+ normalSchemaRegistryClient = new NormalSchemaRegistryClient(restService);
+ cachedSchemaRegistryClient = new CachedSchemaRegistryClient(restService);
+ cachedSchemaRegistryClientCapacity2 = new CachedSchemaRegistryClient(restService, 2, TimeUnit.MINUTES, 2);
+ try {
+ DeleteSchemeResponse response1
+ = cachedSchemaRegistryClient.deleteSchema("default", "default", topic);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (RestClientException e) {
+ throw new RuntimeException(e);
+ }
+ registerSchema();
+ }
+
+ static void registerSchema() {
+ RegisterSchemaRequest request = RegisterSchemaRequest.builder()
+ .schemaIdl("{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.example.serde\","
+ + "\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}")
+ .schemaType(SchemaType.AVRO)
+ .compatibility(Compatibility.BACKWARD)
+ .owner("test").build();
+ try {
+ RegisterSchemaResponse response
+ = cachedSchemaRegistryClient.registerSchema(topic, "Charge", request);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ void getSchemaBySubject() {
+ try {
+ GetSchemaResponse normalResponse = normalSchemaRegistryClient.getSchemaBySubject(topic);
+ GetSchemaResponse cachedResponse = cachedSchemaRegistryClient.getSchemaBySubject(topic);
+ GetSchemaResponse cachedResponse2 = cachedSchemaRegistryClient.getSchemaBySubject(topic);
+ GetSchemaResponse cachedResponse3 = cachedSchemaRegistryClient.getSchemaBySubject("default", "default", topic);
+
+ assertEquals(normalResponse, cachedResponse2);
+ assertEquals(cachedResponse3, cachedResponse);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ void getSchemaBySubjectAndVersion() {
+ try {
+ GetSchemaResponse normalResponse = normalSchemaRegistryClient.getSchemaBySubjectAndVersion("default", "default", topic, 1);
+ GetSchemaResponse cachedResponse = cachedSchemaRegistryClient.getSchemaBySubjectAndVersion("default", "default", topic, 1);
+ GetSchemaResponse cachedResponse2 = cachedSchemaRegistryClient.getSchemaBySubjectAndVersion("default", "default", topic, 1);
+ GetSchemaResponse cachedResponse3 = cachedSchemaRegistryClient.getSchemaBySubjectAndVersion(topic, 1);
+
+ assertEquals(normalResponse, cachedResponse2);
+ assertEquals(cachedResponse3, cachedResponse);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (RestClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ void getSchemaBySubjectAndId() {
+ try {
+ GetSchemaResponse normalResponse = normalSchemaRegistryClient.getSchemaByRecordId("default", "default", topic, Long.parseLong("135023078756319233"));
+ GetSchemaResponse cachedResponse = cachedSchemaRegistryClient.getSchemaByRecordId("default", "default", topic, Long.parseLong("135023078756319233"));
+ GetSchemaResponse cachedResponse2 = cachedSchemaRegistryClient.getSchemaByRecordId("default", "default", topic, Long.parseLong("135023078756319233"));
+ GetSchemaResponse cachedResponse3 = cachedSchemaRegistryClient.getSchemaByRecordId(topic, Long.parseLong("135023078756319233"));
+
+ assertEquals(normalResponse, cachedResponse2);
+ assertEquals(cachedResponse3, cachedResponse);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (RestClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ void getSchemaBySubjectWithCapacity2() {
+ RegisterSchemaRequest request = RegisterSchemaRequest.builder()
+ .schemaIdl("{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.example.serde\","
+ + "\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}")
+ .schemaType(SchemaType.AVRO)
+ .compatibility(Compatibility.BACKWARD)
+ .owner("test").build();
+ try {
+ DeleteSchemeResponse responseD1
+ = cachedSchemaRegistryClient.deleteSchema("default", "default", "TopicTest2");
+ DeleteSchemeResponse responseD2
+ = cachedSchemaRegistryClient.deleteSchema("default", "default", "TopicTest3");
+ RegisterSchemaResponse response
+ = cachedSchemaRegistryClient.registerSchema("TopicTest2", "Charge2", request);
+ RegisterSchemaResponse response2
+ = cachedSchemaRegistryClient.registerSchema("TopicTest3", "Charge3", request);
+ GetSchemaResponse normalResponse = normalSchemaRegistryClient.getSchemaBySubject(topic);
+ GetSchemaResponse normalResponse2 = normalSchemaRegistryClient.getSchemaBySubject("TopicTest2");
+ GetSchemaResponse normalResponse3 = normalSchemaRegistryClient.getSchemaBySubject("TopicTest3");
+
+ GetSchemaResponse cachedResponse = cachedSchemaRegistryClientCapacity2.getSchemaBySubject(topic);
+ GetSchemaResponse cachedResponse2 = cachedSchemaRegistryClientCapacity2.getSchemaBySubject("TopicTest2");
+ GetSchemaResponse cachedResponse3 = cachedSchemaRegistryClientCapacity2.getSchemaBySubject("TopicTest3");
+
+ assertEquals(normalResponse, cachedResponse);
+ assertEquals(normalResponse2, cachedResponse2);
+ assertEquals(normalResponse3, cachedResponse3);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+
+ }
+
+}
\ No newline at end of file