You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2023/02/17 07:55:05 UTC

[rocketmq-schema-registry] branch main updated: [ISSUE #49] Support cache for SchemaRegistryClient (#76)

This is an automated email from the ASF dual-hosted git repository.

huitong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git


The following commit(s) were added to refs/heads/main by this push:
     new 921cb9c  [ISSUE #49] Support cache for SchemaRegistryClient  (#76)
921cb9c is described below

commit 921cb9cdd18b1501d41ad9631607ffd843ec1675
Author: Ao Qiao <qi...@foxmail.com>
AuthorDate: Fri Feb 17 15:55:01 2023 +0800

    [ISSUE #49] Support cache for SchemaRegistryClient  (#76)
    
    * copy normal client
    
    * Add unit test
    
    * Add licenses
    
    * add test
    
    * add comments
    
    * add comments
    
    * add new test
---
 .../client/CachedSchemaRegistryClient.java         | 488 +++++++++++++++++++++
 .../client/SchemaRegistryClientFactory.java        |   2 +-
 .../client/CachedSchemaRegistryClientTest.java     | 162 +++++++
 3 files changed, 651 insertions(+), 1 deletion(-)

diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java
new file mode 100644
index 0000000..8e87666
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
+import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.rest.RestService;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse;
+
+public class CachedSchemaRegistryClient implements SchemaRegistryClient {
+
+    private static final String DEFAULT_TENANT = "default";
+    private static final String DEFAULT_CLUSTER = "default";
+    private static final int DEFAULT_CAPACITY = 100;
+    private static final int DEFAULT_DURATION = 10;
+
+    private final RestService restService;
+
+    /**
+     * when deleting schema by subject, use these maps to invalidate all caches by KEY SubjectAndId / SubjectAndVersion / SubjectAndSchema
+     */
+    private final Map<String, Set<Long>> subjectToId; // restore recordIds that cached in SubjectAndId, used when delete all subject caches
+
+    private final Map<String, Set<Long>> subjectToVersion; // restore versions that cached in SubjectAndVersion, used when delete all subject caches
+
+    private final Map<String, Set<String>> subjectToSchema; // restore schema that cached in SubjectAndSchema, used when delete all subject caches
+
+    private final Cache<SubjectAndVersion, GetSchemaResponse> schemaCacheBySubjectAndVersion;
+
+    private final Cache<String, List<String>> subjectCache; //cache for subject
+
+    private final Cache<SubjectAndId, GetSchemaResponse> schemaCacheBySubjectAndId;
+
+    private final Cache<String, GetSchemaResponse> schemaCacheBySubject; //schema cache by Subject only
+
+    private final Cache<SubjectAndSchema, GetSchemaResponse> schemaCache; //schema cache by SubjectAndSchema
+
+    private final Cache<String, List<String>> tenantCache;
+
+    public CachedSchemaRegistryClient(RestService restService) {
+        this(restService, DEFAULT_CAPACITY, TimeUnit.MINUTES, DEFAULT_DURATION);
+    }
+
+    public CachedSchemaRegistryClient(RestService restService, int capacity, TimeUnit unit, int duration) {
+        this.restService = restService;
+        subjectToId = new HashMap<>();
+        subjectToVersion = new HashMap<>();
+        subjectToSchema = new HashMap<>();
+        this.subjectCache = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+        this.schemaCache = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+        this.tenantCache = CacheBuilder.newBuilder().maximumSize(1).expireAfterWrite(1, unit).build();
+        this.schemaCacheBySubjectAndVersion = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+        this.schemaCacheBySubjectAndId = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+        this.schemaCacheBySubject = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+    }
+
+    @Override
+    public RegisterSchemaResponse registerSchema(String subject, String schemaName,
+        RegisterSchemaRequest request) throws RestClientException, IOException {
+        return restService.registerSchema(subject, schemaName, request);
+    }
+
+    @Override
+    public RegisterSchemaResponse registerSchema(String clusterName, String tenant, String subjectName,
+        String schemaName, RegisterSchemaRequest request) throws IOException, RestClientException {
+        return restService.registerSchema(clusterName, tenant, subjectName, schemaName, request);
+    }
+
+    @Override
+    public DeleteSchemeResponse deleteSchema(String cluster, String tenant,
+        String subject) throws IOException, RestClientException {
+        String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject);
+
+        schemaCacheBySubject.invalidate(subjectFullName);
+        //invalidate schemaCacheBySubjectAndVersion
+        if (subjectToVersion.get(subjectFullName) != null) {
+            subjectToVersion.get(subjectFullName).forEach(
+                version -> schemaCacheBySubjectAndVersion.invalidate(new SubjectAndVersion(cluster, tenant, subject, version)));
+        }
+        //invalidate schemaCacheBySubjectAndId
+        if (subjectToId.get(subjectFullName) != null) {
+            subjectToId.get(subjectFullName).forEach(
+                recordId -> schemaCacheBySubjectAndId.invalidate(new SubjectAndId(cluster, tenant, subject, recordId)));
+        }
+        // invalidate schemaCache
+        if (subjectToSchema.get(subjectFullName) != null) {
+            subjectToSchema.get(subjectFullName).forEach(
+                schema -> schemaCache.invalidate(new SubjectAndSchema(cluster, tenant, subject, schema)));
+        }
+        subjectToVersion.remove(subjectFullName);
+        subjectToId.remove(subjectFullName);
+        subjectToSchema.remove(subjectFullName);
+        return restService.deleteSchema(cluster, tenant, subject);
+    }
+
+    @Override
+    public DeleteSchemeResponse deleteSchema(String cluster, String tenant, String subject,
+        long version) throws IOException, RestClientException {
+        String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject);
+        schemaCacheBySubject.invalidate(subjectFullName);
+        schemaCacheBySubjectAndVersion.invalidate(new SubjectAndVersion(cluster, tenant, subject, version));
+        subjectToVersion.get(subjectFullName).remove(version);
+
+        return restService.deleteSchema(cluster, tenant, subject, version);
+    }
+
+    @Override
+    public UpdateSchemaResponse updateSchema(String subject, String schemaName,
+        UpdateSchemaRequest request) throws RestClientException, IOException {
+        // invalidate schemaCache
+        schemaCache.invalidate(new SubjectAndSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schemaName));
+        return restService.updateSchema(subject, schemaName, request);
+    }
+
+    @Override
+    public UpdateSchemaResponse updateSchema(String cluster, String tenant, String subjectName,
+        String schemaName, UpdateSchemaRequest request) throws IOException, RestClientException {
+        // invalidate schemaCache
+        schemaCache.invalidate(new SubjectAndSchema(cluster, tenant, subjectName, schemaName));
+        return restService.updateSchema(cluster, tenant, subjectName, schemaName, request);
+    }
+
+    @Override
+    public GetSchemaResponse getSchemaBySubject(String subject) throws RestClientException, IOException {
+        String fullName = String.format("%s/%s/%s", DEFAULT_CLUSTER, DEFAULT_TENANT, subject);
+        GetSchemaResponse result = schemaCacheBySubject.getIfPresent(fullName);
+        if (result != null) {
+            return result;
+        }
+        result = restService.getSchemaBySubject(subject);
+        schemaCacheBySubject.put(fullName, result);
+        return result;
+    }
+
+    @Override
+    public GetSchemaResponse getSchemaBySubject(String cluster, String tenant,
+        String subject) throws RestClientException, IOException {
+        String fullName = String.format("%s/%s/%s", cluster, tenant, subject);
+        GetSchemaResponse result = schemaCacheBySubject.getIfPresent(fullName);
+        if (result != null) {
+            return result;
+        }
+        result = restService.getSchemaBySubject(cluster, tenant, subject);
+        schemaCacheBySubject.put(fullName, result);
+        return result;
+    }
+
+    @Override
+    public GetSchemaResponse getSchemaBySubjectAndVersion(String cluster, String tenant, String subject,
+        long version) throws IOException, RestClientException {
+        SubjectAndVersion subjectAndVersion = new SubjectAndVersion(cluster, tenant, subject, version);
+        GetSchemaResponse result = schemaCacheBySubjectAndVersion.getIfPresent(subjectAndVersion);
+        if (result != null) {
+            return result;
+        }
+
+        String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject);
+        Set<Long> versions = subjectToId.get(subjectFullName);
+        if (versions == null) {
+            versions = new HashSet<>();
+        }
+        versions.add(version);
+        subjectToId.put(subjectFullName, versions);
+
+        result = restService.getSchemaBySubject(cluster, tenant, subject, version);
+        schemaCacheBySubjectAndVersion.put(subjectAndVersion, result);
+        return result;
+    }
+
+    public GetSchemaResponse getSchemaBySubjectAndVersion(String subject, long version)
+        throws IOException, RestClientException {
+        SubjectAndVersion subjectAndVersion = new SubjectAndVersion(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, version);
+        GetSchemaResponse result = schemaCacheBySubjectAndVersion.getIfPresent(subjectAndVersion);
+        if (result != null) {
+            return result;
+        }
+
+        String subjectFullName = String.format("%s/%s/%s", DEFAULT_CLUSTER, DEFAULT_TENANT, subject);
+        Set<Long> versions = subjectToId.get(subjectFullName);
+        if (versions == null) {
+            versions = new HashSet<>();
+        }
+        versions.add(version);
+        subjectToId.put(subjectFullName, versions);
+
+        result = restService.getSchemaBySubject(subject, version);
+        schemaCacheBySubjectAndVersion.put(subjectAndVersion, result);
+        return result;
+    }
+
+    @Override
+    public GetSchemaResponse getTargetSchema(String cluster, String tenant, String subject, String schema)
+        throws RestClientException, IOException {
+        SubjectAndSchema subjectAndSchema = new SubjectAndSchema(cluster, tenant, subject, schema);
+        GetSchemaResponse result = schemaCache.getIfPresent(subjectAndSchema);
+        if (result != null) {
+            return result;
+        }
+        String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject);
+        result = restService.getTargetSchema(cluster, tenant, subject, schema);
+        schemaCache.put(subjectAndSchema, result);
+
+        Set<String> schemas = subjectToSchema.get(subjectFullName);
+        if (schemas == null) {
+            schemas = new HashSet<>();
+        }
+        schemas.add(schema);
+        subjectToSchema.put(subjectFullName, schemas);
+
+        return result;
+    }
+
+    @Override
+    public GetSchemaResponse getTargetSchema(String subject, String schema)
+        throws RestClientException, IOException {
+        SubjectAndSchema subjectAndSchema = new SubjectAndSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schema);
+        GetSchemaResponse result = schemaCache.getIfPresent(subjectAndSchema);
+        if (result != null) {
+            return result;
+        }
+        result = restService.getTargetSchema(subject, schema);
+        schemaCache.put(subjectAndSchema, result);
+
+        String subjectFullName = String.format("%s/%s/%s", DEFAULT_CLUSTER, DEFAULT_TENANT, subject);
+        Set<String> schemas = subjectToSchema.get(subjectFullName);
+        if (schemas == null) {
+            schemas = new HashSet<>();
+        }
+        schemas.add(schema);
+        subjectToSchema.put(subjectFullName, schemas);
+
+        return result;
+    }
+
+    @Override
+    public List<SchemaRecordDto> getSchemaListBySubject(String cluster, String tenant,
+        String subject) throws RestClientException, IOException {
+        return restService.getSchemaListBySubject(cluster, tenant, subject);
+    }
+
+    @Override
+    public List<String> getSubjectsByTenant(String cluster, String tenant)
+        throws RestClientException, IOException {
+        String fullName = String.format("%s/%s", cluster, tenant);
+        List<String> result = subjectCache.getIfPresent(fullName);
+        if (!result.isEmpty()) {
+            return result;
+        }
+        result = restService.getSubjectsByTenant(cluster, tenant);
+        subjectCache.put(fullName, result);
+        return result;
+    }
+
+    @Override
+    public List<String> getAllTenants(String cluster) throws RestClientException, IOException {
+        List<String> result = tenantCache.getIfPresent(cluster);
+        if (result != null) {
+            return result;
+        }
+        result = restService.getAllTenants(cluster);
+        tenantCache.put(cluster, result);
+        return result;
+    }
+
+    public GetSchemaResponse getSchemaByRecordId(String cluster, String tenant, String subject,
+        long recordId) throws RestClientException, IOException {
+        SubjectAndId subjectAndId = new SubjectAndId(cluster, tenant, subject, recordId);
+        GetSchemaResponse result = schemaCacheBySubjectAndId.getIfPresent(subjectAndId);
+        if (result != null) {
+            return result;
+        }
+        String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject);
+
+        Set<Long> recordIds = subjectToId.get(subjectFullName);
+        if (recordIds == null) {
+            recordIds = new HashSet<>();
+        }
+        recordIds.add(recordId);
+        subjectToId.put(subjectFullName, recordIds);
+
+        result = restService.getSchemaByRecordId(cluster, tenant, subject, recordId);
+        schemaCacheBySubjectAndId.put(subjectAndId, result);
+        return result;
+    }
+
+    @Override
+    public GetSchemaResponse getSchemaByRecordId(String subject, long recordId)
+        throws RestClientException, IOException {
+        SubjectAndId subjectAndId = new SubjectAndId(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, recordId);
+        GetSchemaResponse result = schemaCacheBySubjectAndId.getIfPresent(subjectAndId);
+        if (result != null) {
+            return result;
+        }
+
+        String subjectFullName = String.format("%s/%s/%s", DEFAULT_CLUSTER, DEFAULT_TENANT, subject);
+
+        Set<Long> recordIds = subjectToId.get(subjectFullName);
+        if (recordIds == null) {
+            recordIds = new HashSet<>();
+        }
+        recordIds.add(recordId);
+        subjectToId.put(subjectFullName, recordIds);
+
+        result = restService.getSchemaByRecordId(subject, recordId);
+        schemaCacheBySubjectAndId.put(subjectAndId, result);
+        return result;
+    }
+
+    @AllArgsConstructor
+    static class SubjectAndId {
+        private String cluster;
+        private String tenant;
+        private String subject;
+        private long recordId;
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            SubjectAndId that = (SubjectAndId) o;
+            return Objects.equals(subject, that.subject)
+                && Objects.equals(tenant, that.tenant)
+                && Objects.equals(cluster, that.cluster)
+                && recordId == that.recordId;
+        }
+
+        public String getCluster() {
+            return cluster;
+        }
+
+        public String getTenant() {
+            return tenant;
+        }
+
+        public String getSubject() {
+            return subject;
+        }
+
+        public long getRecordId() {
+            return recordId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(cluster, tenant, subject, recordId);
+        }
+
+        @Override
+        public String toString() {
+            return "SubjectAndId{" + "cluster=" + cluster + "tenant=" + tenant + "subject='" + subject + '\'' + ", recordId=" + recordId + '}';
+        }
+    }
+
+    @AllArgsConstructor
+    static class SubjectAndVersion {
+        private String cluster;
+        private String tenant;
+        private String subject;
+        private long version;
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            SubjectAndVersion that = (SubjectAndVersion) o;
+            return Objects.equals(subject, that.subject)
+                && Objects.equals(tenant, that.tenant)
+                && Objects.equals(cluster, that.cluster)
+                && version == that.version;
+        }
+
+        public String getCluster() {
+            return cluster;
+        }
+
+        public String getTenant() {
+            return tenant;
+        }
+
+        public String getSubject() {
+            return subject;
+        }
+
+        public long getVersion() {
+            return version;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(cluster, tenant, subject, version);
+        }
+
+        @Override
+        public String toString() {
+            return "SubjectAndId{" + "cluster=" + cluster + "tenant=" + tenant + "subject='" + subject + '\'' + ", version=" + version + '}';
+        }
+    }
+
+    @AllArgsConstructor
+    static class SubjectAndSchema {
+        private String cluster;
+        private String tenant;
+        private String subject;
+        private String schema;
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            SubjectAndSchema that = (SubjectAndSchema) o;
+            return Objects.equals(subject, that.subject)
+                && Objects.equals(tenant, that.tenant)
+                && Objects.equals(cluster, that.cluster)
+                && Objects.equals(schema, that.schema);
+        }
+
+        public String getCluster() {
+            return cluster;
+        }
+
+        public String getTenant() {
+            return tenant;
+        }
+
+        public String getSubject() {
+            return subject;
+        }
+
+        public String getSchema() {
+            return schema;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(cluster, tenant, subject, schema);
+        }
+
+        @Override
+        public String toString() {
+            return "SubjectAndId{" + "cluster=" + cluster + "tenant=" + tenant + "subject='" + subject + '\'' + ", schema=" + schema + '}';
+        }
+    }
+}
+
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClientFactory.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClientFactory.java
index 26167c3..00e0d07 100644
--- a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClientFactory.java
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClientFactory.java
@@ -30,6 +30,6 @@ public class SchemaRegistryClientFactory {
      */
     public static SchemaRegistryClient newClient(String baseUrl, Map<String, String> map) {
         RestService restService = null == map ? new RestService(baseUrl) : new RestService(baseUrl, map);
-        return new NormalSchemaRegistryClient(restService);
+        return new CachedSchemaRegistryClient(restService);
     }
 }
diff --git a/client/src/test/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClientTest.java b/client/src/test/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClientTest.java
new file mode 100644
index 0000000..a377d34
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClientTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.rest.RestService;
+import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.model.Compatibility;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class CachedSchemaRegistryClientTest {
+
+    private final static String baseUrl = "http://localhost:8080";
+    private final static String topic = "TopicTest";
+
+    static RestService restService;
+    static NormalSchemaRegistryClient normalSchemaRegistryClient;
+    static CachedSchemaRegistryClient cachedSchemaRegistryClient;
+
+    static CachedSchemaRegistryClient cachedSchemaRegistryClientCapacity2;
+
+    @BeforeAll
+    static void setUp() {
+        restService = new RestService(baseUrl);
+        normalSchemaRegistryClient = new NormalSchemaRegistryClient(restService);
+        cachedSchemaRegistryClient = new CachedSchemaRegistryClient(restService);
+        cachedSchemaRegistryClientCapacity2 = new CachedSchemaRegistryClient(restService, 2, TimeUnit.MINUTES, 2);
+        try {
+            DeleteSchemeResponse response1
+                = cachedSchemaRegistryClient.deleteSchema("default", "default", topic);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } catch (RestClientException e) {
+            throw new RuntimeException(e);
+        }
+        registerSchema();
+    }
+
+    static void registerSchema() {
+        RegisterSchemaRequest request = RegisterSchemaRequest.builder()
+            .schemaIdl("{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.example.serde\","
+                + "\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}")
+            .schemaType(SchemaType.AVRO)
+            .compatibility(Compatibility.BACKWARD)
+            .owner("test").build();
+        try {
+            RegisterSchemaResponse response
+                = cachedSchemaRegistryClient.registerSchema(topic, "Charge", request);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    void getSchemaBySubject() {
+        try {
+            GetSchemaResponse normalResponse = normalSchemaRegistryClient.getSchemaBySubject(topic);
+            GetSchemaResponse cachedResponse = cachedSchemaRegistryClient.getSchemaBySubject(topic);
+            GetSchemaResponse cachedResponse2 = cachedSchemaRegistryClient.getSchemaBySubject(topic);
+            GetSchemaResponse cachedResponse3 = cachedSchemaRegistryClient.getSchemaBySubject("default", "default", topic);
+
+            assertEquals(normalResponse, cachedResponse2);
+            assertEquals(cachedResponse3, cachedResponse);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    void getSchemaBySubjectAndVersion() {
+        try {
+            GetSchemaResponse normalResponse = normalSchemaRegistryClient.getSchemaBySubjectAndVersion("default", "default", topic, 1);
+            GetSchemaResponse cachedResponse = cachedSchemaRegistryClient.getSchemaBySubjectAndVersion("default", "default", topic, 1);
+            GetSchemaResponse cachedResponse2 = cachedSchemaRegistryClient.getSchemaBySubjectAndVersion("default", "default", topic, 1);
+            GetSchemaResponse cachedResponse3 = cachedSchemaRegistryClient.getSchemaBySubjectAndVersion(topic, 1);
+
+            assertEquals(normalResponse, cachedResponse2);
+            assertEquals(cachedResponse3, cachedResponse);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } catch (RestClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    void getSchemaBySubjectAndId() {
+        try {
+            GetSchemaResponse normalResponse = normalSchemaRegistryClient.getSchemaByRecordId("default", "default", topic, Long.parseLong("135023078756319233"));
+            GetSchemaResponse cachedResponse = cachedSchemaRegistryClient.getSchemaByRecordId("default", "default", topic, Long.parseLong("135023078756319233"));
+            GetSchemaResponse cachedResponse2 = cachedSchemaRegistryClient.getSchemaByRecordId("default", "default", topic, Long.parseLong("135023078756319233"));
+            GetSchemaResponse cachedResponse3 = cachedSchemaRegistryClient.getSchemaByRecordId(topic, Long.parseLong("135023078756319233"));
+
+            assertEquals(normalResponse, cachedResponse2);
+            assertEquals(cachedResponse3, cachedResponse);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } catch (RestClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    void getSchemaBySubjectWithCapacity2() {
+        RegisterSchemaRequest request = RegisterSchemaRequest.builder()
+            .schemaIdl("{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.example.serde\","
+                + "\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}")
+            .schemaType(SchemaType.AVRO)
+            .compatibility(Compatibility.BACKWARD)
+            .owner("test").build();
+        try {
+            DeleteSchemeResponse responseD1
+                = cachedSchemaRegistryClient.deleteSchema("default", "default", "TopicTest2");
+            DeleteSchemeResponse responseD2
+                = cachedSchemaRegistryClient.deleteSchema("default", "default", "TopicTest3");
+            RegisterSchemaResponse response
+                = cachedSchemaRegistryClient.registerSchema("TopicTest2", "Charge2", request);
+            RegisterSchemaResponse response2
+                = cachedSchemaRegistryClient.registerSchema("TopicTest3", "Charge3", request);
+            GetSchemaResponse normalResponse = normalSchemaRegistryClient.getSchemaBySubject(topic);
+            GetSchemaResponse normalResponse2 = normalSchemaRegistryClient.getSchemaBySubject("TopicTest2");
+            GetSchemaResponse normalResponse3 = normalSchemaRegistryClient.getSchemaBySubject("TopicTest3");
+
+            GetSchemaResponse cachedResponse = cachedSchemaRegistryClientCapacity2.getSchemaBySubject(topic);
+            GetSchemaResponse cachedResponse2 = cachedSchemaRegistryClientCapacity2.getSchemaBySubject("TopicTest2");
+            GetSchemaResponse cachedResponse3 = cachedSchemaRegistryClientCapacity2.getSchemaBySubject("TopicTest3");
+
+            assertEquals(normalResponse, cachedResponse);
+            assertEquals(normalResponse2, cachedResponse2);
+            assertEquals(normalResponse3, cachedResponse3);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        
+    }
+    
+}
\ No newline at end of file