You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/31 08:39:13 UTC

[rocketmq-schema-registry] 13/37: fix delete version

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git

commit 05bcf9d8f243572efc598dae6237afc54cb2bda3
Author: huitong <yi...@alibaba-inc.com>
AuthorDate: Mon Jul 25 11:20:10 2022 +0800

    fix delete version
---
 .../registry/common/exception/SchemaExistException.java     |  2 +-
 .../rocketmq/schema/registry/common/model/SchemaInfo.java   |  3 +++
 .../schema/registry/storage/rocketmq/RocketmqClient.java    | 13 ++++++++++---
 3 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java
index 462f0a0..f8a9071 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java
@@ -27,7 +27,7 @@ public class SchemaExistException extends SchemaException {
   private final int errorCode = 40401;
 
   public SchemaExistException(final QualifiedName qualifiedName) {
-    this(String.format("Schema: %s is exist, please check your configuration.", qualifiedName));
+    this(String.format("Schema: %s is exist, please check your configuration.", qualifiedName.schemaFullName()));
   }
 
   public SchemaExistException(final String msg) {
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java
index 56f1953..6fc1ecf 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java
@@ -81,4 +81,7 @@ public class SchemaInfo extends BaseInfo {
         getLastRecord().setVersion(version);
     }
 
+    public int getRecordCount() {
+        return getDetails().getSchemaRecords().size();
+    }
 }
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index 9c0bc7a..f671a7b 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -234,7 +234,8 @@ public class RocketmqClient {
             }
             synchronized (this) {
                 try {
-                    log.info("receive msg, the content is {}", new String(msg.getBody()));
+                    log.info("receive msg, queue={}, offset={}, key={}, the content is {}", msg.getQueueId(),
+                        msg.getQueueOffset(), msg.getKeys(), new String(msg.getBody()));
                     if (msg.getKeys().equals(DELETE_KEYS)) {
                         // delete
                         byte[] schemaFullName = msg.getBody();
@@ -256,11 +257,12 @@ public class RocketmqClient {
                             cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes);
                         } else {
                             SchemaInfo current = converter.fromJson(result, SchemaInfo.class);
-                            if (current.getLastRecordVersion() == update.getLastRecordVersion()) {
+                            boolean isDeleted = current.getRecordCount() > update.getRecordCount();
+                            if (current.getLastRecordVersion() == update.getLastRecordVersion() && !isDeleted) {
                                 log.info("Schema version is the same, no need to update.");
                                 return;
                             }
-                            if (current.getLastRecordVersion() > update.getLastRecordVersion()) {
+                            if (current.getLastRecordVersion() > update.getLastRecordVersion() && !isDeleted) {
                                 throw new SchemaException("Schema version is invalid, update: "
                                     + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion());
                             }
@@ -338,11 +340,16 @@ public class RocketmqClient {
         if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
             throw new SchemaNotFoundException(name);
         }
+        List<SubjectInfo> subjects = schemaInfo.getLastRecord().getSubjects();
         List<SchemaRecordInfo> schemaRecords = schemaInfo.getDetails().getSchemaRecords();
         schemaRecords.removeIf(record -> record.getVersion() == name.getVersion());
         if (CollectionUtils.isEmpty(schemaRecords)) {
             deleteBySubject(name);
         }
+        // delete but still need bind subject
+        if (schemaInfo.getLastRecord().getSubjects().isEmpty()) {
+            schemaInfo.getLastRecord().setSubjects(subjects);
+        }
         byte[] schemaInfoBytes = converter.toJsonAsBytes(schemaInfo);
 
         try {