You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/31 06:46:54 UTC
[rocketmq-schema-registry] 07/34: fix delete schema
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
commit 4550c9a2562e3bcfaac327067c0837239068b1e2
Author: huitong <yi...@alibaba-inc.com>
AuthorDate: Tue Jul 19 16:37:17 2022 +0800
fix delete schema
---
.../schema/registry/common/QualifiedName.java | 4 +-
.../registry/storage/rocketmq/RocketmqClient.java | 71 ++++++++++++++--------
.../rocketmq/RocketmqStorageClientImpl.java | 20 +++---
3 files changed, 55 insertions(+), 40 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
index e945909..5cda52b 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
@@ -71,11 +71,11 @@ public class QualifiedName implements Serializable {
}
public String fullName() {
- return cluster + '/' + tenant + '/' + subject + '/' + schema + '/' + version;
+ return cluster + '/' + tenant + '/' + subject + '/' + schema;
}
public String schemaFullName() {
- return schema + '/' + version;
+ return schema;
}
public String subjectFullName() {
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index 8e1a21f..1d07084 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -27,6 +27,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -245,10 +246,8 @@ public class RocketmqClient {
// TODO: next query on other machine may can't found schema in cache since send is async with receive
public SchemaInfo registerSchema(SchemaInfo schema) {
- byte[] subjectFullName = converter.toBytes(schema.subjectFullName());
byte[] schemaFullName = converter.toBytes(schema.schemaFullName());
byte[] schemaInfo = converter.toJsonAsBytes(schema);
- byte[] lastRecord = converter.toJsonAsBytes(schema.getLastRecord());
try {
synchronized (this) {
@@ -260,9 +259,6 @@ public class RocketmqClient {
if (!result.getSendStatus().equals(SendStatus.SEND_OK)) {
throw new SchemaException("Register schema: " + schema.getQualifiedName() + " failed: " + result.getSendStatus());
}
-
- cache.put(schemaCfHandle(), schemaFullName, schemaInfo);
- cache.put(subjectCfHandle(), subjectFullName, lastRecord);
}
return schema;
@@ -273,24 +269,22 @@ public class RocketmqClient {
}
}
- public void delete(QualifiedName name) {
- byte[] schemaFullName = converter.toBytes(name.schemaFullName());
+ public void deleteBySubject(QualifiedName name) {
+
+ SchemaInfo schemaInfo = getSchemaInfoBySubject(name.subjectFullName());
+ if (schemaInfo == null) {
+ throw new SchemaNotFoundException(name);
+ }
try {
synchronized (this) {
- byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName);
- if (schemaInfoBytes == null) {
- throw new SchemaNotFoundException(name);
- }
+ byte[] schemaFullName = converter.toBytes(schemaInfo.schemaFullName());
Message msg = new Message(storageTopic, "", DELETE_KEYS, schemaFullName);
SendResult result = producer.send(msg);
if (!result.getSendStatus().equals(SendStatus.SEND_OK)) {
throw new SchemaException("Delete schema: " + name + " failed: " + result.getSendStatus());
}
-
- cache.delete(schemaCfHandle(), schemaFullName);
- deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class));
}
} catch (SchemaException e) {
throw e;
@@ -299,10 +293,36 @@ public class RocketmqClient {
}
}
+ public void deleteByVersion(QualifiedName name) {
+
+ SchemaInfo schemaInfo = getSchemaInfoBySubject(name.subjectFullName());
+ if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
+ throw new SchemaNotFoundException(name);
+ }
+ List<SchemaRecordInfo> schemaRecords = schemaInfo.getDetails().getSchemaRecords();
+ schemaRecords.removeIf(record -> record.getVersion() == name.getVersion());
+ if (CollectionUtils.isEmpty(schemaRecords)) {
+ deleteBySubject(name);
+ }
+ byte[] schemaInfoBytes = converter.toJsonAsBytes(schemaInfo);
+
+ try {
+ synchronized (this) {
+ Message msg = new Message(storageTopic, "", schemaInfo.schemaFullName(), schemaInfoBytes);
+ SendResult result = producer.send(msg);
+ if (result.getSendStatus() != SendStatus.SEND_OK) {
+ throw new SchemaException("Update " + name + " failed: " + result.getSendStatus());
+ }
+ }
+ } catch (SchemaException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SchemaException("Update schema " + name + " failed", e);
+ }
+ }
+
public SchemaInfo updateSchema(SchemaInfo update) {
- byte[] schemaFullName = converter.toBytes(update.schemaFullName());
byte[] schemaInfo = converter.toJsonAsBytes(update);
- byte[] lastRecord = converter.toJsonAsBytes(update.getLastRecord());
try {
synchronized (this) {
@@ -311,15 +331,6 @@ public class RocketmqClient {
if (result.getSendStatus() != SendStatus.SEND_OK) {
throw new SchemaException("Update " + update.getQualifiedName() + " failed: " + result.getSendStatus());
}
-
- cache.put(schemaCfHandle(), schemaFullName, schemaInfo);
- update.getLastRecord().getSubjects().forEach(subject -> {
- try {
- cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecord);
- } catch (RocksDBException e) {
- throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed", e);
- }
- });
}
return update;
} catch (SchemaException e) {
@@ -346,6 +357,16 @@ public class RocketmqClient {
}
}
+ public SchemaInfo getSchemaInfoBySubject(String subjectFullName) {
+ byte[] lastRecordBytes = getBySubject(subjectFullName);
+ if (lastRecordBytes == null) {
+ return null;
+ }
+ SchemaRecordInfo lastRecord = converter.fromJson(lastRecordBytes, SchemaRecordInfo.class);
+ byte[] result = getSchema(lastRecord.getSchema());
+ return result == null ? null : converter.fromJson(result, SchemaInfo.class);
+ }
+
private void init(Properties props) {
this.storageTopic = props.getProperty(STORAGE_ROCKETMQ_TOPIC, STORAGE_ROCKETMQ_TOPIC_DEFAULT);
this.cachePath = props.getProperty(STORAGE_LOCAL_CACHE_PATH, STORAGE_LOCAL_CACHE_PATH_DEFAULT);
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
index 2fa82ff..2b0987e 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
@@ -64,7 +64,11 @@ public class RocketmqStorageClientImpl implements RocketmqStorageClient {
*/
@Override
public void delete(QualifiedName qualifiedName) {
- rocketmqClient.delete(qualifiedName);
+ if (qualifiedName.getVersion() == null) {
+ rocketmqClient.deleteBySubject(qualifiedName);
+ } else {
+ rocketmqClient.deleteByVersion(qualifiedName);
+ }
}
/**
@@ -101,7 +105,7 @@ public class RocketmqStorageClientImpl implements RocketmqStorageClient {
}
// schema version is given
- SchemaInfo schemaInfo = getSchemaInfoBySubject(qualifiedName.subjectFullName());
+ SchemaInfo schemaInfo = rocketmqClient.getSchemaInfoBySubject(qualifiedName.subjectFullName());
if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
return null;
}
@@ -112,20 +116,10 @@ public class RocketmqStorageClientImpl implements RocketmqStorageClient {
@Override
public List<SchemaRecordInfo> listBySubject(QualifiedName qualifiedName) {
- SchemaInfo schemaInfo = getSchemaInfoBySubject(qualifiedName.subjectFullName());
+ SchemaInfo schemaInfo = rocketmqClient.getSchemaInfoBySubject(qualifiedName.subjectFullName());
if (schemaInfo == null || schemaInfo.getDetails() == null) {
return null;
}
return schemaInfo.getDetails().getSchemaRecords();
}
-
- private SchemaInfo getSchemaInfoBySubject(String subjectFullName) {
- byte[] lastRecordBytes = rocketmqClient.getBySubject(subjectFullName);
- if (lastRecordBytes == null) {
- return null;
- }
- SchemaRecordInfo lastRecord = jsonConverter.fromJson(lastRecordBytes, SchemaRecordInfo.class);
- byte[] result = rocketmqClient.getSchema(lastRecord.getSchema());
- return result == null ? null : jsonConverter.fromJson(result, SchemaInfo.class);
- }
}