You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/01 17:54:12 UTC

[pulsar] branch branch-2.3 updated: Issue #3211: Fix NPE when creating schema after deleting a schema (#3836)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new 47f85a1  Issue #3211: Fix NPE when creating schema after deleting a schema (#3836)
47f85a1 is described below

commit 47f85a1ddf27ca2b952ac8820537e9e0abd667b7
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Tue Mar 19 23:19:05 2019 +0800

    Issue #3211: Fix NPE when creating schema after deleting a schema (#3836)
    
    Fixes #3211
    Fixes #2786
    
    *Motivation*
    
    When a schema is deleted, the schema is not removed directly.
    You can still fetch the latest schema but its state is marked as `deleted`.
    
    So when we apply schema compatibility check, we should ignore deleted schema.
    
    *Modifications*
    
    Ignore deleted schema when doing schema compatibility check
---
 .../service/schema/SchemaRegistryServiceImpl.java  |  15 ++-
 .../tests/integration/schema/SchemaTest.java       | 110 +++++++++++++++++++++
 .../pulsar/tests/integration/schema/Schemas.java   |  79 +++++++++++++++
 3 files changed, 200 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index f066908..9b47368 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -85,8 +85,12 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
     @NotNull
     public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema,
                                                               SchemaCompatibilityStrategy strategy) {
-        return getSchema(schemaId).thenApply(
-                (existingSchema) -> existingSchema == null || isCompatible(existingSchema, schema, strategy))
+        return getSchema(schemaId)
+            .thenApply(
+                (existingSchema) ->
+                    existingSchema == null
+                        || existingSchema.schema.isDeleted()
+                        || isCompatible(existingSchema, schema, strategy))
             .thenCompose(isCompatible -> {
                     if (isCompatible) {
                         byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
@@ -151,8 +155,11 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
 
     private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId, SchemaData schema,
                                                                     SchemaCompatibilityStrategy strategy) {
-        return getSchema(schemaId).thenApply(
-                (existingSchema) -> existingSchema != null && isCompatible(existingSchema, schema, strategy));
+        return getSchema(schemaId)
+            .thenApply(
+                (existingSchema) ->
+                    !(existingSchema == null || existingSchema.schema.isDeleted())
+                        && isCompatible(existingSchema, schema, strategy));
     }
 
     interface Functions {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
new file mode 100644
index 0000000..fab2542
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.schema;
+
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.schema.Schemas.Person;
+import org.apache.pulsar.tests.integration.schema.Schemas.Student;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test Pulsar Schema.
+ */
+@Slf4j
+public class SchemaTest extends PulsarTestSuite {
+
+    private PulsarClient client;
+    private PulsarAdmin admin;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        this.client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build();
+        this.admin = PulsarAdmin.builder()
+            .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+            .build();
+    }
+
+    @Test
+    public void testCreateSchemaAfterDeletion() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topic = "test-create-schema-after-deletion";
+        final String fqtn = TopicName.get(
+             TopicDomain.persistent.value(),
+             tenant,
+             namespace,
+             topic
+         ).toString();
+
+        admin.namespaces().createNamespace(
+            tenant + "/" + namespace,
+            Sets.newHashSet(pulsarCluster.getClusterName())
+        );
+
+        // Create a topic with `Person`
+        try (Producer<Person> producer = client.newProducer(Schema.AVRO(Person.class))
+             .topic(fqtn)
+             .create()
+        ) {
+            Person person = new Person();
+            person.setName("Tom Hanks");
+            person.setAge(60);
+
+            producer.send(person);
+
+            log.info("Successfully published person : {}", person);
+        }
+
+        log.info("Deleting schema of topic {}", fqtn);
+        // delete the schema
+        admin.schemas().deleteSchema(fqtn);
+        log.info("Successfully deleted schema of topic {}", fqtn);
+
+        // after deleting the topic, try to create a topic with a different schema
+        try (Producer<Student> producer = client.newProducer(Schema.AVRO(Student.class))
+             .topic(fqtn)
+             .create()
+        ) {
+            Student student = new Student();
+            student.setName("Tom Jerry");
+            student.setAge(30);
+            student.setGpa(6);
+            student.setGpa(10);
+
+            producer.send(student);
+
+            log.info("Successfully published student : {}", student);
+        }
+    }
+
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java
new file mode 100644
index 0000000..ebe798d
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.tests.integration.schema;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+/**
+ * Keep a list of schemas for testing.
+ */
+public final class Schemas {
+
+    /**
+     * A Person Struct.
+     */
+    @Data
+    @Getter
+    @Setter
+    @ToString
+    @EqualsAndHashCode
+    public static class Person {
+
+        private String name;
+        private int age;
+
+    }
+
+    /**
+     * A Student Struct.
+     */
+    @Data
+    @Getter
+    @Setter
+    @ToString
+    @EqualsAndHashCode
+    public static class Student {
+
+        private String name;
+        private int age;
+        private int gpa;
+        private int grade;
+
+    }
+
+    private Schemas() {}
+
+}