You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/27 08:06:37 UTC
[pulsar] 02/03: Fix primitive schema upload for ALWAYS_COMPATIBLE
strategy. (#10386)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 592c3f8040a10826ec5488a6836209cddaddd263
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Apr 27 06:56:36 2021 +0800
Fix primitive schema upload for ALWAYS_COMPATIBLE strategy. (#10386)
* Fix primary schema upload for ALWAYS_COMPATIBLE strategy.
* Fix checkstyle.
(cherry picked from commit f8716c258f095f8cb0ec44ea7e6b56cb431224f3)
---
.../service/schema/BookkeeperSchemaStorage.java | 7 +------
.../SchemaTypeCompatibilityCheckTest.java | 23 ++++++++++++++++++++--
2 files changed, 22 insertions(+), 8 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 0911ac6..2ecc927 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -31,7 +31,6 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -294,12 +293,8 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> {
if (optLocatorEntry.isPresent()) {
- // Schema locator was already present
+
SchemaStorageFormat.SchemaLocator locator = optLocatorEntry.get().locator;
- byte[] storedHash = locator.getInfo().getHash().toByteArray();
- if (storedHash.length > 0 && Arrays.equals(storedHash, hash)) {
- return completedFuture(locator.getInfo().getVersion());
- }
if (log.isDebugEnabled()) {
log.debug("[{}] findSchemaEntryByHash - hash={}", schemaId, hash);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
index bc711c5..61cc989 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
@@ -20,7 +20,9 @@ package org.apache.pulsar.schema.compatibility;
import com.google.common.collect.Sets;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
@@ -313,17 +315,34 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes
};
for (Schema<?> schema : schemas) {
- pulsarClient.newProducer(schema)
+ Producer<?> p = pulsarClient.newProducer(schema)
.topic(topicName)
.create();
+ p.close();
}
for (Schema<?> schema : schemas) {
- pulsarClient.newConsumer(schema)
+ Consumer<?> c = pulsarClient.newConsumer(schema)
.topic(topicName)
.subscriptionName(UUID.randomUUID().toString())
.subscribe();
+ c.close();
}
+
+ List<SchemaInfo> schemasOfTopic = admin.schemas().getAllSchemas(topicName);
+
+ // bytes[] schema and bytebuffer schema does not upload schema info to the schema registry
+ assertEquals(schemasOfTopic.size(), schemas.length - 2);
+
+ // Try to upload the schema again.
+ for (Schema<?> schema : schemas) {
+ Producer<?> p = pulsarClient.newProducer(schema)
+ .topic(topicName)
+ .create();
+ p.close();
+ }
+
+ assertEquals(schemasOfTopic.size(), schemas.length - 2);
}
}