You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/31 06:46:54 UTC

[rocketmq-schema-registry] 07/34: fix delete schema

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git

commit 4550c9a2562e3bcfaac327067c0837239068b1e2
Author: huitong <yi...@alibaba-inc.com>
AuthorDate: Tue Jul 19 16:37:17 2022 +0800

    fix delete schema
---
 .../schema/registry/common/QualifiedName.java      |  4 +-
 .../registry/storage/rocketmq/RocketmqClient.java  | 71 ++++++++++++++--------
 .../rocketmq/RocketmqStorageClientImpl.java        | 20 +++---
 3 files changed, 55 insertions(+), 40 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
index e945909..5cda52b 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
@@ -71,11 +71,11 @@ public class QualifiedName implements Serializable {
     }
 
     public String fullName() {
-        return cluster + '/' + tenant + '/' + subject + '/' + schema + '/' + version;
+        return cluster + '/' + tenant + '/' + subject + '/' + schema;
     }
 
     public String schemaFullName() {
-        return schema + '/' + version;
+        return schema;
     }
 
     public String subjectFullName() {
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index 8e1a21f..1d07084 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -27,6 +27,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -245,10 +246,8 @@ public class RocketmqClient {
 
     // TODO: next query on other machine may can't found schema in cache since send is async with receive
     public SchemaInfo registerSchema(SchemaInfo schema) {
-        byte[] subjectFullName = converter.toBytes(schema.subjectFullName());
         byte[] schemaFullName = converter.toBytes(schema.schemaFullName());
         byte[] schemaInfo = converter.toJsonAsBytes(schema);
-        byte[] lastRecord = converter.toJsonAsBytes(schema.getLastRecord());
 
         try {
             synchronized (this) {
@@ -260,9 +259,6 @@ public class RocketmqClient {
                 if (!result.getSendStatus().equals(SendStatus.SEND_OK)) {
                     throw new SchemaException("Register schema: " + schema.getQualifiedName() + " failed: " + result.getSendStatus());
                 }
-
-                cache.put(schemaCfHandle(), schemaFullName, schemaInfo);
-                cache.put(subjectCfHandle(), subjectFullName, lastRecord);
             }
 
             return schema;
@@ -273,24 +269,22 @@ public class RocketmqClient {
         }
     }
 
-    public void delete(QualifiedName name) {
-        byte[] schemaFullName = converter.toBytes(name.schemaFullName());
+    public void deleteBySubject(QualifiedName name) {
+
+        SchemaInfo schemaInfo = getSchemaInfoBySubject(name.subjectFullName());
+        if (schemaInfo == null) {
+            throw new SchemaNotFoundException(name);
+        }
 
         try {
             synchronized (this) {
-                byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName);
-                if (schemaInfoBytes == null) {
-                    throw new SchemaNotFoundException(name);
-                }
 
+                byte[] schemaFullName = converter.toBytes(schemaInfo.schemaFullName());
                 Message msg = new Message(storageTopic, "", DELETE_KEYS, schemaFullName);
                 SendResult result = producer.send(msg);
                 if (!result.getSendStatus().equals(SendStatus.SEND_OK)) {
                     throw new SchemaException("Delete schema: " + name + " failed: " + result.getSendStatus());
                 }
-
-                cache.delete(schemaCfHandle(), schemaFullName);
-                deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class));
             }
         } catch (SchemaException e) {
             throw e;
@@ -299,10 +293,36 @@ public class RocketmqClient {
         }
     }
 
+    public void deleteByVersion(QualifiedName name) {
+
+        SchemaInfo schemaInfo = getSchemaInfoBySubject(name.subjectFullName());
+        if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
+            throw new SchemaNotFoundException(name);
+        }
+        List<SchemaRecordInfo> schemaRecords = schemaInfo.getDetails().getSchemaRecords();
+        schemaRecords.removeIf(record -> record.getVersion() == name.getVersion());
+        if (CollectionUtils.isEmpty(schemaRecords)) {
+            deleteBySubject(name);
+        }
+        byte[] schemaInfoBytes = converter.toJsonAsBytes(schemaInfo);
+
+        try {
+            synchronized (this) {
+                Message msg = new Message(storageTopic, "", schemaInfo.schemaFullName(), schemaInfoBytes);
+                SendResult result = producer.send(msg);
+                if (result.getSendStatus() != SendStatus.SEND_OK) {
+                    throw new SchemaException("Update " + name + " failed: " + result.getSendStatus());
+                }
+            }
+        } catch (SchemaException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new SchemaException("Update schema " + name + " failed", e);
+        }
+    }
+
     public SchemaInfo updateSchema(SchemaInfo update) {
-        byte[] schemaFullName = converter.toBytes(update.schemaFullName());
         byte[] schemaInfo = converter.toJsonAsBytes(update);
-        byte[] lastRecord = converter.toJsonAsBytes(update.getLastRecord());
 
         try {
             synchronized (this) {
@@ -311,15 +331,6 @@ public class RocketmqClient {
                 if (result.getSendStatus() != SendStatus.SEND_OK) {
                     throw new SchemaException("Update " + update.getQualifiedName() + " failed: " + result.getSendStatus());
                 }
-
-                cache.put(schemaCfHandle(), schemaFullName, schemaInfo);
-                update.getLastRecord().getSubjects().forEach(subject -> {
-                    try {
-                        cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecord);
-                    } catch (RocksDBException e) {
-                        throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed", e);
-                    }
-                });
             }
             return update;
         } catch (SchemaException e) {
@@ -346,6 +357,16 @@ public class RocketmqClient {
         }
     }
 
+    public SchemaInfo getSchemaInfoBySubject(String subjectFullName) {
+        byte[] lastRecordBytes = getBySubject(subjectFullName);
+        if (lastRecordBytes == null) {
+            return null;
+        }
+        SchemaRecordInfo lastRecord = converter.fromJson(lastRecordBytes, SchemaRecordInfo.class);
+        byte[] result = getSchema(lastRecord.getSchema());
+        return result == null ? null : converter.fromJson(result, SchemaInfo.class);
+    }
+
     private void init(Properties props) {
         this.storageTopic = props.getProperty(STORAGE_ROCKETMQ_TOPIC, STORAGE_ROCKETMQ_TOPIC_DEFAULT);
         this.cachePath = props.getProperty(STORAGE_LOCAL_CACHE_PATH, STORAGE_LOCAL_CACHE_PATH_DEFAULT);
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
index 2fa82ff..2b0987e 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
@@ -64,7 +64,11 @@ public class RocketmqStorageClientImpl implements RocketmqStorageClient {
      */
     @Override
     public void delete(QualifiedName qualifiedName) {
-        rocketmqClient.delete(qualifiedName);
+        if (qualifiedName.getVersion() == null) {
+            rocketmqClient.deleteBySubject(qualifiedName);
+        } else {
+            rocketmqClient.deleteByVersion(qualifiedName);
+        }
     }
 
     /**
@@ -101,7 +105,7 @@ public class RocketmqStorageClientImpl implements RocketmqStorageClient {
         }
 
         // schema version is given
-        SchemaInfo schemaInfo = getSchemaInfoBySubject(qualifiedName.subjectFullName());
+        SchemaInfo schemaInfo = rocketmqClient.getSchemaInfoBySubject(qualifiedName.subjectFullName());
         if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
             return null;
         }
@@ -112,20 +116,10 @@ public class RocketmqStorageClientImpl implements RocketmqStorageClient {
 
     @Override
     public List<SchemaRecordInfo> listBySubject(QualifiedName qualifiedName) {
-        SchemaInfo schemaInfo = getSchemaInfoBySubject(qualifiedName.subjectFullName());
+        SchemaInfo schemaInfo = rocketmqClient.getSchemaInfoBySubject(qualifiedName.subjectFullName());
         if (schemaInfo == null || schemaInfo.getDetails() == null) {
             return null;
         }
         return schemaInfo.getDetails().getSchemaRecords();
     }
-
-    private SchemaInfo getSchemaInfoBySubject(String subjectFullName) {
-        byte[] lastRecordBytes = rocketmqClient.getBySubject(subjectFullName);
-        if (lastRecordBytes == null) {
-            return null;
-        }
-        SchemaRecordInfo lastRecord = jsonConverter.fromJson(lastRecordBytes, SchemaRecordInfo.class);
-        byte[] result = rocketmqClient.getSchema(lastRecord.getSchema());
-        return result == null ? null : jsonConverter.fromJson(result, SchemaInfo.class);
-    }
 }