You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "ferrirW (via GitHub)" <gi...@apache.org> on 2023/01/30 07:35:51 UTC

[GitHub] [rocketmq-schema-registry] ferrirW commented on a diff in pull request #76: [ISSUE #49] Support cache for SchemaRegistryClient

ferrirW commented on code in PR #76:
URL: https://github.com/apache/rocketmq-schema-registry/pull/76#discussion_r1090228939


##########
client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
+import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.rest.RestService;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse;
+
+public class CachedSchemaRegistryClient implements SchemaRegistryClient {
+
+    private static final String DEFAULT_TENANT = "default";
+    private static final String DEFAULT_CLUSTER = "default";
+    private static final int DEFAULT_CAPACITY = 100;
+    private static final int DEFAULT_DURATION = 10;
+
+    private final RestService restService;
+
+    private final Map<String, Set<Long>> subjectToId; // restore recordIds that cached in SubjectAndId
+
+    private final Map<String, Set<Long>> subjectToVersion; // restore versions that cached in SubjectAndVersion
+
+    private final Map<String, Set<String>> subjectToSchema; // restore schema that cached in SubjectAndSchema
+
+    private final Cache<SubjectAndVersion, GetSchemaResponse> schemaCacheBySubjectAndVersion;
+
+    private final Cache<String, List<String>> subjectCache; //cache for subject
+
+    private final Cache<SubjectAndId, GetSchemaResponse> schemaCacheBySubjectAndId;
+
+    private final Cache<String, GetSchemaResponse> schemaCacheBySubject; //schema cache by Subject only
+
+    private final Cache<SubjectAndSchema, GetSchemaResponse> schemaCache; //schema cache by SubjectAndSchema
+
+    private final Cache<String, List<String>> tenantCache;
+
+    public CachedSchemaRegistryClient(RestService restService) {
+        this(restService, DEFAULT_CAPACITY, TimeUnit.MINUTES, DEFAULT_DURATION);
+    }
+
+    public CachedSchemaRegistryClient(RestService restService, int capacity, TimeUnit unit, int duration) {
+        this.restService = restService;
+        subjectToId = new HashMap<>();
+        subjectToVersion = new HashMap<>();
+        subjectToSchema = new HashMap<>();
+        this.subjectCache = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+        this.schemaCache = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+        this.tenantCache = CacheBuilder.newBuilder().maximumSize(1).expireAfterWrite(1, unit).build();
+        this.schemaCacheBySubjectAndVersion = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+        this.schemaCacheBySubjectAndId = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+        this.schemaCacheBySubject = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+    }
+
+    @Override
+    public RegisterSchemaResponse registerSchema(String subject, String schemaName,
+        RegisterSchemaRequest request) throws RestClientException, IOException {
+        return restService.registerSchema(subject, schemaName, request);

Review Comment:
   Maybe the cache can be updated directly after registerSchema success?



##########
client/src/test/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClientTest.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client;
+
+import java.io.IOException;
+import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.rest.RestService;
+import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.model.Compatibility;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class CachedSchemaRegistryClientTest {

Review Comment:
   suggest add 1 test: get when cache size exceeds the threshold. such as cache size is 5 and the number of get schema elements is 6.



##########
client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
+import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.rest.RestService;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse;
+
+public class CachedSchemaRegistryClient implements SchemaRegistryClient {
+
+    private static final String DEFAULT_TENANT = "default";
+    private static final String DEFAULT_CLUSTER = "default";
+    private static final int DEFAULT_CAPACITY = 100;
+    private static final int DEFAULT_DURATION = 10;
+
+    private final RestService restService;
+
+    private final Map<String, Set<Long>> subjectToId; // restore recordIds that cached in SubjectAndId
+
+    private final Map<String, Set<Long>> subjectToVersion; // restore versions that cached in SubjectAndVersion
+
+    private final Map<String, Set<String>> subjectToSchema; // restore schema that cached in SubjectAndSchema
+
+    private final Cache<SubjectAndVersion, GetSchemaResponse> schemaCacheBySubjectAndVersion;
+
+    private final Cache<String, List<String>> subjectCache; //cache for subject
+
+    private final Cache<SubjectAndId, GetSchemaResponse> schemaCacheBySubjectAndId;
+
+    private final Cache<String, GetSchemaResponse> schemaCacheBySubject; //schema cache by Subject only
+
+    private final Cache<SubjectAndSchema, GetSchemaResponse> schemaCache; //schema cache by SubjectAndSchema
+
+    private final Cache<String, List<String>> tenantCache;
+
+    public CachedSchemaRegistryClient(RestService restService) {
+        this(restService, DEFAULT_CAPACITY, TimeUnit.MINUTES, DEFAULT_DURATION);
+    }
+
+    public CachedSchemaRegistryClient(RestService restService, int capacity, TimeUnit unit, int duration) {
+        this.restService = restService;
+        subjectToId = new HashMap<>();
+        subjectToVersion = new HashMap<>();
+        subjectToSchema = new HashMap<>();
+        this.subjectCache = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+        this.schemaCache = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+        this.tenantCache = CacheBuilder.newBuilder().maximumSize(1).expireAfterWrite(1, unit).build();
+        this.schemaCacheBySubjectAndVersion = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+        this.schemaCacheBySubjectAndId = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+        this.schemaCacheBySubject = CacheBuilder.newBuilder().maximumSize(capacity).expireAfterWrite(duration, unit).build();
+    }
+
+    @Override
+    public RegisterSchemaResponse registerSchema(String subject, String schemaName,
+        RegisterSchemaRequest request) throws RestClientException, IOException {
+        return restService.registerSchema(subject, schemaName, request);
+    }
+
+    @Override
+    public RegisterSchemaResponse registerSchema(String clusterName, String tenant, String subjectName,
+        String schemaName, RegisterSchemaRequest request) throws IOException, RestClientException {
+        return restService.registerSchema(clusterName, tenant, subjectName, schemaName, request);
+    }
+
+    @Override
+    public DeleteSchemeResponse deleteSchema(String cluster, String tenant,
+        String subject) throws IOException, RestClientException {
+        String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject);
+
+        schemaCacheBySubject.invalidate(subjectFullName);
+        //invalidate schemaCacheBySubjectAndVersion
+        if (subjectToVersion.get(subjectFullName) != null) {
+            subjectToVersion.get(subjectFullName).forEach(
+                version -> schemaCacheBySubjectAndVersion.invalidate(new SubjectAndVersion(cluster, tenant, subject, version)));
+        }
+        //invalidate schemaCacheBySubjectAndId
+        if (subjectToId.get(subjectFullName) != null) {
+            subjectToId.get(subjectFullName).forEach(
+                recordId -> schemaCacheBySubjectAndId.invalidate(new SubjectAndId(cluster, tenant, subject, recordId)));
+        }
+        // invalidate schemaCache
+        if (subjectToSchema.get(subjectFullName) != null) {
+            subjectToSchema.get(subjectFullName).forEach(
+                schema -> schemaCache.invalidate(new SubjectAndSchema(cluster, tenant, subject, schema)));
+        }
+        subjectToVersion.remove(subjectFullName);
+        subjectToId.remove(subjectFullName);
+        subjectToSchema.remove(subjectFullName);
+        return restService.deleteSchema(cluster, tenant, subject);
+    }
+
+    @Override
+    public DeleteSchemeResponse deleteSchema(String cluster, String tenant, String subject,
+        long version) throws IOException, RestClientException {
+        String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject);
+        schemaCacheBySubject.invalidate(subjectFullName);
+        schemaCacheBySubjectAndVersion.invalidate(new SubjectAndVersion(cluster, tenant, subject, version));
+        subjectToVersion.get(subjectFullName).remove(version);
+
+        return restService.deleteSchema(cluster, tenant, subject, version);
+    }
+
+    @Override
+    public UpdateSchemaResponse updateSchema(String subject, String schemaName,
+        UpdateSchemaRequest request) throws RestClientException, IOException {
+        // invalidate schemaCache
+        schemaCache.invalidate(new SubjectAndSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schemaName));
+        return restService.updateSchema(subject, schemaName, request);
+    }
+
+    @Override
+    public UpdateSchemaResponse updateSchema(String cluster, String tenant, String subjectName,
+        String schemaName, UpdateSchemaRequest request) throws IOException, RestClientException {
+        // invalidate schemaCache
+        schemaCache.invalidate(new SubjectAndSchema(cluster, tenant, subjectName, schemaName));
+        return restService.updateSchema(cluster, tenant, subjectName, schemaName, request);
+    }
+
+    @Override
+    public GetSchemaResponse getSchemaBySubject(String subject) throws RestClientException, IOException {
+        String fullName = String.format("%s/%s/%s", DEFAULT_CLUSTER, DEFAULT_TENANT, subject);
+        GetSchemaResponse result = schemaCacheBySubject.getIfPresent(fullName);
+        if (result != null) {
+            return result;
+        }
+        result = restService.getSchemaBySubject(subject);
+        schemaCacheBySubject.put(fullName, result);

Review Comment:
   Maybe cache update mechanism can be write like:
   
   `
           Cache<Integer, String> a = CacheBuilder.newBuilder().build(
               new CacheLoader<Integer, String>() {
                   @Override
                   public String load(Integer key) throws Exception {
                       return restService.getSchemaBySubject(subject);
                   }
               }
   `



##########
client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
+import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.rest.RestService;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse;
+
+public class CachedSchemaRegistryClient implements SchemaRegistryClient {
+
+    private static final String DEFAULT_TENANT = "default";
+    private static final String DEFAULT_CLUSTER = "default";
+    private static final int DEFAULT_CAPACITY = 100;
+    private static final int DEFAULT_DURATION = 10;
+
+    private final RestService restService;
+
+    private final Map<String, Set<Long>> subjectToId; // restore recordIds that cached in SubjectAndId

Review Comment:
   Is all this cache necessary? Maybe we should add some comment details. Ignore this question if there's an actual scene.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org