You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/25 03:45:09 UTC

[rocketmq-schema-registry] 09/14: fix storage client

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git

commit 3abfa0201efce802bd4a9b1e50315f4601a27e5a
Author: huitong <yi...@alibaba-inc.com>
AuthorDate: Tue Jul 19 22:10:43 2022 +0800

    fix storage client
---
 .DS_Store                                          | Bin 6148 -> 0 bytes
 .../registry/common/dto/SchemaRecordDto.java       |   3 +-
 .../registry/common/model/SchemaRecordInfo.java    |   2 +-
 .../common/storage/StorageServiceProxy.java        |   8 +-
 .../registry/core/api/v1/SchemaController.java     |   7 +-
 .../registry/core/service/SchemaServiceImpl.java   |   2 +-
 .../registry/storage/rocketmq/RocketmqClient.java  | 112 ++++++++++++---------
 .../src/main/resources/rocketmq.properties         |   2 +-
 8 files changed, 77 insertions(+), 59 deletions(-)

diff --git a/.DS_Store b/.DS_Store
deleted file mode 100644
index a1ac880..0000000
Binary files a/.DS_Store and /dev/null differ
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaRecordDto.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaRecordDto.java
index f844188..585629c 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaRecordDto.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaRecordDto.java
@@ -25,6 +25,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.rocketmq.schema.registry.common.model.Dependency;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
 
 @Data
 @EqualsAndHashCode(callSuper = false)
@@ -52,5 +53,5 @@ public class SchemaRecordDto {
     private List<SubjectDto> subjects;
 
     @ApiModelProperty(value = "Schema type")
-    private String type;
+    private SchemaType type;
 }
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java
index 2ec4d53..02af194 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java
@@ -37,7 +37,7 @@ public class SchemaRecordInfo implements Serializable {
     private String idl;
     private Dependency dependency;
     private List<SubjectInfo> subjects;
-    private String type;
+    private SchemaType type;
     //    private List<FieldInfo> fields;
 
     public void bindSubject(final SubjectInfo subjectInfo) {
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
index bd134d8..41feb2c 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
@@ -70,7 +70,7 @@ public class StorageServiceProxy {
      *
      * @param name Qualified name with tenant / name of schema
      */
-    @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()")
+    @CacheEvict(key = "'schema.' + #name.getSchema()")
     public void delete(final QualifiedName name) {
         final RequestContext requestContext = RequestContextManager.getContext();
         final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
@@ -86,7 +86,7 @@ public class StorageServiceProxy {
      * @param schemaInfo schema information instance
      * @return true if errors after this should be ignored.
      */
-    @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()")
+    @CacheEvict(key = "'schema.' + #name.getSchema()")
     public SchemaInfo update(final QualifiedName name, final SchemaInfo schemaInfo) {
         final RequestContext requestContext = RequestContextManager.getContext();
         final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
@@ -103,7 +103,7 @@ public class StorageServiceProxy {
      * @param useCache if schema can be retrieved from cache
      * @return schema information instance
      */
-    @Cacheable(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()", condition = "#useCache")
+    @Cacheable(key = "'schema.' + #name.getSchema()", condition = "#useCache")
     public SchemaInfo get(final QualifiedName name, final boolean useCache) {
         final RequestContext requestContext = RequestContextManager.getContext();
         final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
@@ -112,7 +112,6 @@ public class StorageServiceProxy {
         return storageService.get(storageServiceContext, name);
     }
 
-    @Cacheable(key = "'subject.' + #name.getSubject()  + '/' + #name.getVersion()", condition = "#useCache && #name.getVersion() != null")
     public SchemaRecordInfo getBySubject(final QualifiedName name, final boolean useCache) {
         final RequestContext requestContext = RequestContextManager.getContext();
         final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
@@ -121,7 +120,6 @@ public class StorageServiceProxy {
         return storageService.getBySubject(storageServiceContext, name);
     }
 
-    @Cacheable(key = "'subject.' + #name.getSubject()", condition = "#useCache")
     public List<SchemaRecordInfo> listBySubject(final QualifiedName name, final boolean useCache) {
         final RequestContext requestContext = RequestContextManager.getContext();
         final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
index edbe75d..335294c 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
@@ -6,19 +6,18 @@
 
 package org.apache.rocketmq.schema.registry.core.api.v1;
 
+import java.net.HttpURLConnection;
+import java.util.List;
+
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
-import java.net.HttpURLConnection;
-import java.util.List;
-import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
 import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
-import org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException;
 import org.apache.rocketmq.schema.registry.core.api.RequestProcessor;
 import org.apache.rocketmq.schema.registry.core.service.SchemaService;
 import org.springframework.beans.factory.annotation.Autowired;
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
index 4073431..5f72d34 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
@@ -98,7 +98,7 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> {
         schemaInfo.setUniqueId(idGenerator.nextId());
         schemaInfo.setLastRecordVersion(1L);
         schemaInfo.getLastRecord().setSchema(qualifiedName.schemaFullName());
-        schemaInfo.getLastRecord().setType(schemaInfo.getMeta().getType().name());
+        schemaInfo.getLastRecord().setType(schemaInfo.getMeta().getType());
         schemaInfo.getLastRecord().bindSubject(qualifiedName.subjectInfo());
 
         if (config.isUploadEnabled()) {
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index 1a25063..913c360 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -24,6 +24,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import lombok.extern.slf4j.Slf4j;
@@ -33,6 +36,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -48,6 +52,7 @@ import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl;
 import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
 import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
 import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
+import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -83,6 +88,9 @@ public class RocketmqClient {
     private final List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
     private final Map<String, ColumnFamilyHandle> cfHandleMap = new HashMap<>();
 
+    private ScheduledExecutorService scheduledExecutorService;
+
+    private static final Integer PULL_TASK_INTERVAL = 5 * 1000;
 
     /**
      * RocksDB for cache
@@ -178,61 +186,70 @@ public class RocketmqClient {
                     e.printStackTrace();
                 }
             });
-            while (true) {
-                List<MessageExt> msgList = scheduleConsumer.poll(1000);
-                if (msgList != null) {
-                    msgList.forEach(this::consumeMessage);
-                }
-            }
+            this.scheduledExecutorService.scheduleAtFixedRate(new RocketmqStoragePullTask(),
+                0, PULL_TASK_INTERVAL, TimeUnit.MILLISECONDS);
+
         } catch (MQClientException e) {
             throw new SchemaException("Rocketmq client start failed", e);
         }
     }
 
-    private void consumeMessage(MessageExt msg) {
-        synchronized (this) {
-            try {
-                if (msg.getKeys().equals(DELETE_KEYS)) {
-                    // delete
-                    byte[] schemaFullName = msg.getBody();
-                    byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName);
-                    if (schemaInfoBytes != null) {
-                        deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class));
-                        cache.delete(schemaCfHandle(), schemaFullName);
-                    }
-                } else {
-                    byte[] schemaFullName = converter.toBytes(msg.getKeys());
-                    byte[] schemaInfoBytes = msg.getBody();
-                    SchemaInfo update = converter.fromJson(schemaInfoBytes, SchemaInfo.class);
-                    byte[] lastRecordBytes = converter.toJsonAsBytes(update.getLastRecord());
-
-                    byte[] result = cache.get(schemaCfHandle(), schemaFullName);
-                    if (result == null) {
-                        // register
-                        cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
-                        cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes);
-                    } else {
-                        SchemaInfo current = converter.fromJson(result, SchemaInfo.class);
-                        if (current.getLastRecordVersion() == update.getLastRecordVersion()) {
-                            return;
-                        }
-                        if (current.getLastRecordVersion() > update.getLastRecordVersion()) {
-                            throw new SchemaException("Schema version is invalid, update: "
-                                + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion());
-                        }
+    public class RocketmqStoragePullTask implements Runnable {
+
+        @Override
+        public void run() {
+            List<MessageExt> msgList = scheduleConsumer.poll(1000);
+            if (CollectionUtils.isNotEmpty(msgList)) {
+                msgList.forEach(this::consumeMessage);
+            }
+        }
 
-                        cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
-                        update.getLastRecord().getSubjects().forEach(subject -> {
-                            try {
-                                cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecordBytes);
-                            } catch (RocksDBException e) {
-                                throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed.", e);
+        private void consumeMessage(MessageExt msg) {
+            synchronized (this) {
+                try {
+                    log.info("receive msg, the content is {}", new String(msg.getBody()));
+                    if (DELETE_KEYS.equals(msg.getKeys())) {
+                        // delete
+                        byte[] schemaFullName = msg.getBody();
+                        byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName);
+                        if (schemaInfoBytes != null) {
+                            deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class));
+                            cache.delete(schemaCfHandle(), schemaFullName);
+                        }
+                    } else {
+                        byte[] schemaFullName = converter.toBytes(msg.getKeys());
+                        byte[] schemaInfoBytes = msg.getBody();
+                        SchemaInfo update = converter.fromJson(schemaInfoBytes, SchemaInfo.class);
+                        byte[] lastRecordBytes = converter.toJsonAsBytes(update.getLastRecord());
+
+                        byte[] result = cache.get(schemaCfHandle(), schemaFullName);
+                        if (result == null) {
+                            // register
+                            cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
+                            cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes);
+                        } else {
+                            SchemaInfo current = converter.fromJson(result, SchemaInfo.class);
+                            if (current.getLastRecordVersion() == update.getLastRecordVersion()) {
+                                return;
+                            }
+                            if (current.getLastRecordVersion() > update.getLastRecordVersion()) {
+                                throw new SchemaException("Schema version is invalid, update: "
+                                    + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion());
                             }
-                        });
+
+                            cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
+                            update.getLastRecord().getSubjects().forEach(subject -> {
+                                try {
+                                    cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecordBytes);
+                                } catch (RocksDBException e) {
+                                    throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed.", e);
+                                }
+                            });
+                        }
                     }
+                } catch (Throwable e) {
+                    throw new SchemaException("Rebuild schema cache failed", e);
                 }
-            } catch (Throwable e) {
-                throw new SchemaException("Rebuild schema cache failed", e);
             }
         }
     }
@@ -386,6 +403,9 @@ public class RocketmqClient {
         );
 
         this.converter = new JsonConverterImpl();
+
+        this.scheduledExecutorService =
+            Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RocketmqStoragePullTask"));
     }
 
     private ColumnFamilyHandle schemaCfHandle() {
diff --git a/schema-storage-rocketmq/src/main/resources/rocketmq.properties b/schema-storage-rocketmq/src/main/resources/rocketmq.properties
index 3a94c6a..5070713 100644
--- a/schema-storage-rocketmq/src/main/resources/rocketmq.properties
+++ b/schema-storage-rocketmq/src/main/resources/rocketmq.properties
@@ -16,4 +16,4 @@
 #
 
 storage.type=rocketmq
-storage.local.cache.path=/Users/xyb/app/schema-registry/cache
\ No newline at end of file
+#storage.local.cache.path
\ No newline at end of file