You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/30 08:14:31 UTC
[rocketmq-schema-registry] 13/27: fix delete version
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
commit 05bcf9d8f243572efc598dae6237afc54cb2bda3
Author: huitong <yi...@alibaba-inc.com>
AuthorDate: Mon Jul 25 11:20:10 2022 +0800
fix delete version
---
.../registry/common/exception/SchemaExistException.java | 2 +-
.../rocketmq/schema/registry/common/model/SchemaInfo.java | 3 +++
.../schema/registry/storage/rocketmq/RocketmqClient.java | 13 ++++++++++---
3 files changed, 14 insertions(+), 4 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java
index 462f0a0..f8a9071 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java
@@ -27,7 +27,7 @@ public class SchemaExistException extends SchemaException {
private final int errorCode = 40401;
public SchemaExistException(final QualifiedName qualifiedName) {
- this(String.format("Schema: %s is exist, please check your configuration.", qualifiedName));
+ this(String.format("Schema: %s is exist, please check your configuration.", qualifiedName.schemaFullName()));
}
public SchemaExistException(final String msg) {
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java
index 56f1953..6fc1ecf 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java
@@ -81,4 +81,7 @@ public class SchemaInfo extends BaseInfo {
getLastRecord().setVersion(version);
}
+ public int getRecordCount() {
+ return getDetails().getSchemaRecords().size();
+ }
}
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index 9c0bc7a..f671a7b 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -234,7 +234,8 @@ public class RocketmqClient {
}
synchronized (this) {
try {
- log.info("receive msg, the content is {}", new String(msg.getBody()));
+ log.info("receive msg, queue={}, offset={}, key={}, the content is {}", msg.getQueueId(),
+ msg.getQueueOffset(), msg.getKeys(), new String(msg.getBody()));
if (msg.getKeys().equals(DELETE_KEYS)) {
// delete
byte[] schemaFullName = msg.getBody();
@@ -256,11 +257,12 @@ public class RocketmqClient {
cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes);
} else {
SchemaInfo current = converter.fromJson(result, SchemaInfo.class);
- if (current.getLastRecordVersion() == update.getLastRecordVersion()) {
+ boolean isDeleted = current.getRecordCount() > update.getRecordCount();
+ if (current.getLastRecordVersion() == update.getLastRecordVersion() && !isDeleted) {
log.info("Schema version is the same, no need to update.");
return;
}
- if (current.getLastRecordVersion() > update.getLastRecordVersion()) {
+ if (current.getLastRecordVersion() > update.getLastRecordVersion() && !isDeleted) {
throw new SchemaException("Schema version is invalid, update: "
+ update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion());
}
@@ -338,11 +340,16 @@ public class RocketmqClient {
if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
throw new SchemaNotFoundException(name);
}
+ List<SubjectInfo> subjects = schemaInfo.getLastRecord().getSubjects();
List<SchemaRecordInfo> schemaRecords = schemaInfo.getDetails().getSchemaRecords();
schemaRecords.removeIf(record -> record.getVersion() == name.getVersion());
if (CollectionUtils.isEmpty(schemaRecords)) {
deleteBySubject(name);
}
+ // delete but still need bind subject
+ if (schemaInfo.getLastRecord().getSubjects().isEmpty()) {
+ schemaInfo.getLastRecord().setSubjects(subjects);
+ }
byte[] schemaInfoBytes = converter.toJsonAsBytes(schemaInfo);
try {