You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/30 08:14:27 UTC
[rocketmq-schema-registry] 09/27: fix storage client
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
commit 3abfa0201efce802bd4a9b1e50315f4601a27e5a
Author: huitong <yi...@alibaba-inc.com>
AuthorDate: Tue Jul 19 22:10:43 2022 +0800
fix storage client
---
.DS_Store | Bin 6148 -> 0 bytes
.../registry/common/dto/SchemaRecordDto.java | 3 +-
.../registry/common/model/SchemaRecordInfo.java | 2 +-
.../common/storage/StorageServiceProxy.java | 8 +-
.../registry/core/api/v1/SchemaController.java | 7 +-
.../registry/core/service/SchemaServiceImpl.java | 2 +-
.../registry/storage/rocketmq/RocketmqClient.java | 112 ++++++++++++---------
.../src/main/resources/rocketmq.properties | 2 +-
8 files changed, 77 insertions(+), 59 deletions(-)
diff --git a/.DS_Store b/.DS_Store
deleted file mode 100644
index a1ac880..0000000
Binary files a/.DS_Store and /dev/null differ
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaRecordDto.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaRecordDto.java
index f844188..585629c 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaRecordDto.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaRecordDto.java
@@ -25,6 +25,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.rocketmq.schema.registry.common.model.Dependency;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
@Data
@EqualsAndHashCode(callSuper = false)
@@ -52,5 +53,5 @@ public class SchemaRecordDto {
private List<SubjectDto> subjects;
@ApiModelProperty(value = "Schema type")
- private String type;
+ private SchemaType type;
}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java
index 2ec4d53..02af194 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java
@@ -37,7 +37,7 @@ public class SchemaRecordInfo implements Serializable {
private String idl;
private Dependency dependency;
private List<SubjectInfo> subjects;
- private String type;
+ private SchemaType type;
// private List<FieldInfo> fields;
public void bindSubject(final SubjectInfo subjectInfo) {
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
index bd134d8..41feb2c 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
@@ -70,7 +70,7 @@ public class StorageServiceProxy {
*
* @param name Qualified name with tenant / name of schema
*/
- @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()")
+ @CacheEvict(key = "'schema.' + #name.getSchema()")
public void delete(final QualifiedName name) {
final RequestContext requestContext = RequestContextManager.getContext();
final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
@@ -86,7 +86,7 @@ public class StorageServiceProxy {
* @param schemaInfo schema information instance
* @return true if errors after this should be ignored.
*/
- @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()")
+ @CacheEvict(key = "'schema.' + #name.getSchema()")
public SchemaInfo update(final QualifiedName name, final SchemaInfo schemaInfo) {
final RequestContext requestContext = RequestContextManager.getContext();
final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
@@ -103,7 +103,7 @@ public class StorageServiceProxy {
* @param useCache if schema can be retrieved from cache
* @return schema information instance
*/
- @Cacheable(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()", condition = "#useCache")
+ @Cacheable(key = "'schema.' + #name.getSchema()", condition = "#useCache")
public SchemaInfo get(final QualifiedName name, final boolean useCache) {
final RequestContext requestContext = RequestContextManager.getContext();
final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
@@ -112,7 +112,6 @@ public class StorageServiceProxy {
return storageService.get(storageServiceContext, name);
}
- @Cacheable(key = "'subject.' + #name.getSubject() + '/' + #name.getVersion()", condition = "#useCache && #name.getVersion() != null")
public SchemaRecordInfo getBySubject(final QualifiedName name, final boolean useCache) {
final RequestContext requestContext = RequestContextManager.getContext();
final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
@@ -121,7 +120,6 @@ public class StorageServiceProxy {
return storageService.getBySubject(storageServiceContext, name);
}
- @Cacheable(key = "'subject.' + #name.getSubject()", condition = "#useCache")
public List<SchemaRecordInfo> listBySubject(final QualifiedName name, final boolean useCache) {
final RequestContext requestContext = RequestContextManager.getContext();
final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext);
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
index edbe75d..335294c 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
@@ -6,19 +6,18 @@
package org.apache.rocketmq.schema.registry.core.api.v1;
+import java.net.HttpURLConnection;
+import java.util.List;
+
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
-import java.net.HttpURLConnection;
-import java.util.List;
-import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
-import org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException;
import org.apache.rocketmq.schema.registry.core.api.RequestProcessor;
import org.apache.rocketmq.schema.registry.core.service.SchemaService;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
index 4073431..5f72d34 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
@@ -98,7 +98,7 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> {
schemaInfo.setUniqueId(idGenerator.nextId());
schemaInfo.setLastRecordVersion(1L);
schemaInfo.getLastRecord().setSchema(qualifiedName.schemaFullName());
- schemaInfo.getLastRecord().setType(schemaInfo.getMeta().getType().name());
+ schemaInfo.getLastRecord().setType(schemaInfo.getMeta().getType());
schemaInfo.getLastRecord().bindSubject(qualifiedName.subjectInfo());
if (config.isUploadEnabled()) {
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index 1a25063..913c360 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -24,6 +24,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@@ -33,6 +36,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
@@ -48,6 +52,7 @@ import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl;
import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
+import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
@@ -83,6 +88,9 @@ public class RocketmqClient {
private final List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
private final Map<String, ColumnFamilyHandle> cfHandleMap = new HashMap<>();
+ private ScheduledExecutorService scheduledExecutorService;
+
+ private static final Integer PULL_TASK_INTERVAL = 5 * 1000;
/**
* RocksDB for cache
@@ -178,61 +186,70 @@ public class RocketmqClient {
e.printStackTrace();
}
});
- while (true) {
- List<MessageExt> msgList = scheduleConsumer.poll(1000);
- if (msgList != null) {
- msgList.forEach(this::consumeMessage);
- }
- }
+ this.scheduledExecutorService.scheduleAtFixedRate(new RocketmqStoragePullTask(),
+ 0, PULL_TASK_INTERVAL, TimeUnit.MILLISECONDS);
+
} catch (MQClientException e) {
throw new SchemaException("Rocketmq client start failed", e);
}
}
- private void consumeMessage(MessageExt msg) {
- synchronized (this) {
- try {
- if (msg.getKeys().equals(DELETE_KEYS)) {
- // delete
- byte[] schemaFullName = msg.getBody();
- byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName);
- if (schemaInfoBytes != null) {
- deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class));
- cache.delete(schemaCfHandle(), schemaFullName);
- }
- } else {
- byte[] schemaFullName = converter.toBytes(msg.getKeys());
- byte[] schemaInfoBytes = msg.getBody();
- SchemaInfo update = converter.fromJson(schemaInfoBytes, SchemaInfo.class);
- byte[] lastRecordBytes = converter.toJsonAsBytes(update.getLastRecord());
-
- byte[] result = cache.get(schemaCfHandle(), schemaFullName);
- if (result == null) {
- // register
- cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
- cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes);
- } else {
- SchemaInfo current = converter.fromJson(result, SchemaInfo.class);
- if (current.getLastRecordVersion() == update.getLastRecordVersion()) {
- return;
- }
- if (current.getLastRecordVersion() > update.getLastRecordVersion()) {
- throw new SchemaException("Schema version is invalid, update: "
- + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion());
- }
+ public class RocketmqStoragePullTask implements Runnable {
+
+ @Override
+ public void run() {
+ List<MessageExt> msgList = scheduleConsumer.poll(1000);
+ if (CollectionUtils.isNotEmpty(msgList)) {
+ msgList.forEach(this::consumeMessage);
+ }
+ }
- cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
- update.getLastRecord().getSubjects().forEach(subject -> {
- try {
- cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecordBytes);
- } catch (RocksDBException e) {
- throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed.", e);
+ private void consumeMessage(MessageExt msg) {
+ synchronized (this) {
+ try {
+ log.info("receive msg, the content is {}", new String(msg.getBody()));
+ if (DELETE_KEYS.equals(msg.getKeys())) {
+ // delete
+ byte[] schemaFullName = msg.getBody();
+ byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName);
+ if (schemaInfoBytes != null) {
+ deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class));
+ cache.delete(schemaCfHandle(), schemaFullName);
+ }
+ } else {
+ byte[] schemaFullName = converter.toBytes(msg.getKeys());
+ byte[] schemaInfoBytes = msg.getBody();
+ SchemaInfo update = converter.fromJson(schemaInfoBytes, SchemaInfo.class);
+ byte[] lastRecordBytes = converter.toJsonAsBytes(update.getLastRecord());
+
+ byte[] result = cache.get(schemaCfHandle(), schemaFullName);
+ if (result == null) {
+ // register
+ cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
+ cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes);
+ } else {
+ SchemaInfo current = converter.fromJson(result, SchemaInfo.class);
+ if (current.getLastRecordVersion() == update.getLastRecordVersion()) {
+ return;
+ }
+ if (current.getLastRecordVersion() > update.getLastRecordVersion()) {
+ throw new SchemaException("Schema version is invalid, update: "
+ + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion());
}
- });
+
+ cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes);
+ update.getLastRecord().getSubjects().forEach(subject -> {
+ try {
+ cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecordBytes);
+ } catch (RocksDBException e) {
+ throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed.", e);
+ }
+ });
+ }
}
+ } catch (Throwable e) {
+ throw new SchemaException("Rebuild schema cache failed", e);
}
- } catch (Throwable e) {
- throw new SchemaException("Rebuild schema cache failed", e);
}
}
}
@@ -386,6 +403,9 @@ public class RocketmqClient {
);
this.converter = new JsonConverterImpl();
+
+ this.scheduledExecutorService =
+ Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RocketmqStoragePullTask"));
}
private ColumnFamilyHandle schemaCfHandle() {
diff --git a/schema-storage-rocketmq/src/main/resources/rocketmq.properties b/schema-storage-rocketmq/src/main/resources/rocketmq.properties
index 3a94c6a..5070713 100644
--- a/schema-storage-rocketmq/src/main/resources/rocketmq.properties
+++ b/schema-storage-rocketmq/src/main/resources/rocketmq.properties
@@ -16,4 +16,4 @@
#
storage.type=rocketmq
-storage.local.cache.path=/Users/xyb/app/schema-registry/cache
\ No newline at end of file
+#storage.local.cache.path
\ No newline at end of file