You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/19 15:19:11 UTC
[pulsar] branch master updated: Issue #3211: Fix NPE when creating
schema after deleting a schema (#3836)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8997375 Issue #3211: Fix NPE when creating schema after deleting a schema (#3836)
8997375 is described below
commit 8997375ef6a0b6c6286478b495c0ca785ae41710
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Tue Mar 19 23:19:05 2019 +0800
Issue #3211: Fix NPE when creating schema after deleting a schema (#3836)
Fixes #3211
Fixes #2786
*Motivation*
When a schema is deleted, the schema is not removed directly.
You can still fetch the latest schema but its state is marked as `deleted`.
So when we apply schema compatibility check, we should ignore deleted schema.
*Modifications*
Ignore deleted schema when doing schema compatibility check
---
.../service/schema/SchemaRegistryServiceImpl.java | 15 ++-
.../tests/integration/schema/SchemaTest.java | 110 +++++++++++++++++++++
.../pulsar/tests/integration/schema/Schemas.java | 79 +++++++++++++++
3 files changed, 200 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index dc5a681..b54b0c7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -85,8 +85,12 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
@NotNull
public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy) {
- return getSchema(schemaId).thenApply(
- (existingSchema) -> existingSchema == null || isCompatible(existingSchema, schema, strategy))
+ return getSchema(schemaId)
+ .thenApply(
+ (existingSchema) ->
+ existingSchema == null
+ || existingSchema.schema.isDeleted()
+ || isCompatible(existingSchema, schema, strategy))
.thenCompose(isCompatible -> {
if (isCompatible) {
byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
@@ -151,8 +155,11 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy) {
- return getSchema(schemaId).thenApply(
- (existingSchema) -> existingSchema != null && isCompatible(existingSchema, schema, strategy));
+ return getSchema(schemaId)
+ .thenApply(
+ (existingSchema) ->
+ !(existingSchema == null || existingSchema.schema.isDeleted())
+ && isCompatible(existingSchema, schema, strategy));
}
interface Functions {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
new file mode 100644
index 0000000..fab2542
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.schema;
+
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.schema.Schemas.Person;
+import org.apache.pulsar.tests.integration.schema.Schemas.Student;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test Pulsar Schema.
+ */
+@Slf4j
+public class SchemaTest extends PulsarTestSuite {
+
+ private PulsarClient client;
+ private PulsarAdmin admin;
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ this.client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ this.admin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+ .build();
+ }
+
+ @Test
+ public void testCreateSchemaAfterDeletion() throws Exception {
+ final String tenant = PUBLIC_TENANT;
+ final String namespace = "test-namespace-" + randomName(16);
+ final String topic = "test-create-schema-after-deletion";
+ final String fqtn = TopicName.get(
+ TopicDomain.persistent.value(),
+ tenant,
+ namespace,
+ topic
+ ).toString();
+
+ admin.namespaces().createNamespace(
+ tenant + "/" + namespace,
+ Sets.newHashSet(pulsarCluster.getClusterName())
+ );
+
+ // Create a topic with `Person`
+ try (Producer<Person> producer = client.newProducer(Schema.AVRO(Person.class))
+ .topic(fqtn)
+ .create()
+ ) {
+ Person person = new Person();
+ person.setName("Tom Hanks");
+ person.setAge(60);
+
+ producer.send(person);
+
+ log.info("Successfully published person : {}", person);
+ }
+
+ log.info("Deleting schema of topic {}", fqtn);
+ // delete the schema
+ admin.schemas().deleteSchema(fqtn);
+ log.info("Successfully deleted schema of topic {}", fqtn);
+
+ // after deleting the topic, try to create a topic with a different schema
+ try (Producer<Student> producer = client.newProducer(Schema.AVRO(Student.class))
+ .topic(fqtn)
+ .create()
+ ) {
+ Student student = new Student();
+ student.setName("Tom Jerry");
+ student.setAge(30);
+ student.setGpa(6);
+ student.setGpa(10);
+
+ producer.send(student);
+
+ log.info("Successfully published student : {}", student);
+ }
+ }
+
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java
new file mode 100644
index 0000000..ebe798d
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.tests.integration.schema;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+/**
+ * Keep a list of schemas for testing.
+ */
+public final class Schemas {
+
+ /**
+ * A Person Struct.
+ */
+ @Data
+ @Getter
+ @Setter
+ @ToString
+ @EqualsAndHashCode
+ public static class Person {
+
+ private String name;
+ private int age;
+
+ }
+
+ /**
+ * A Student Struct.
+ */
+ @Data
+ @Getter
+ @Setter
+ @ToString
+ @EqualsAndHashCode
+ public static class Student {
+
+ private String name;
+ private int age;
+ private int gpa;
+ private int grade;
+
+ }
+
+ private Schemas() {}
+
+}