You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/30 18:36:23 UTC

[GitHub] sijie closed pull request #2669: Clarify and add tests for schema change scenarios

sijie closed pull request #2669: Clarify and add tests for schema change scenarios
URL: https://github.com/apache/pulsar/pull/2669
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 21451d6bba..ab565a5d83 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -571,7 +571,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) {
                 try {
                     TopicName topicName = TopicName.get(topic);
                     if (bundle.includes(topicName)) {
-                        CompletableFuture<Topic> future = brokerService.getOrCreateTopic(topic, null);
+                        CompletableFuture<Topic> future = brokerService.getOrCreateTopic(topic);
                         if (future != null) {
                             persistentTopics.add(future);
                         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 800545cfef..8cd0f01630 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1225,7 +1225,7 @@ private Topic getTopicReference(TopicName topicName) {
     }
 
     private Topic getOrCreateTopic(TopicName topicName) {
-        return pulsar().getBrokerService().getOrCreateTopic(topicName.toString(), null).join();
+        return pulsar().getBrokerService().getOrCreateTopic(topicName.toString()).join();
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 406d3e55c8..9a5fc29a8f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -112,7 +112,6 @@
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.common.schema.SchemaData;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FieldParser;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -451,19 +450,14 @@ public void unloadNamespaceBundlesGracefully() {
     }
 
     public CompletableFuture<Optional<Topic>> getTopicIfExists(final String topic) {
-        return getTopic(topic, false /* createIfMissing */, null /* schemaData */ );
+        return getTopic(topic, false /* createIfMissing */);
     }
 
     public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
-        return getOrCreateTopic(topic, null);
+        return getTopic(topic, true /* createIfMissing */).thenApply(Optional::get);
     }
 
-    public CompletableFuture<Topic> getOrCreateTopic(final String topic, SchemaData schemaData) {
-        return getTopic(topic, true /* createIfMissing */, schemaData ).thenApply(Optional::get);
-    }
-
-    private CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean createIfMissing,
-            SchemaData schemaData) {
+    private CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean createIfMissing) {
         try {
             CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic);
             if (topicFuture != null) {
@@ -477,8 +471,8 @@ public void unloadNamespaceBundlesGracefully() {
             }
             final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent);
             return topics.computeIfAbsent(topic, (topicName) -> {
-                return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing, schemaData)
-                        : createNonPersistentTopic(topicName, schemaData);
+                    return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing)
+                        : createNonPersistentTopic(topicName);
             });
         } catch (IllegalArgumentException e) {
             log.warn("[{}] Illegalargument exception when loading topic", topic, e);
@@ -495,7 +489,7 @@ public void unloadNamespaceBundlesGracefully() {
         }
     }
 
-    private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic, SchemaData schemaData) {
+    private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
         CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
 
         if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
@@ -526,15 +520,7 @@ public void unloadNamespaceBundlesGracefully() {
             return null;
         });
 
-        return topicFuture.thenCompose(ot -> {
-            if (ot.isPresent()) {
-                // If a schema is provided, add or validate it before the
-                // topic is "visible"
-                return ot.get().addSchema(schemaData).thenApply(schemaVersion -> ot);
-            } else {
-                return CompletableFuture.completedFuture(ot);
-            }
-        });
+        return topicFuture;
     }
 
     private static <T> CompletableFuture<T> failedFuture(Throwable t) {
@@ -592,7 +578,7 @@ public PulsarClient getReplicationClient(String cluster) {
      * @throws RuntimeException
      */
     protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final String topic,
-            boolean createIfMissing, SchemaData schemaData) throws RuntimeException {
+            boolean createIfMissing) throws RuntimeException {
         checkTopicNsOwnership(topic);
 
         final CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
@@ -620,15 +606,7 @@ public PulsarClient getReplicationClient(String cluster) {
                 log.debug("topic-loading for {} added into pending queue", topic);
             }
         }
-        return topicFuture.thenCompose(ot -> {
-            if (ot.isPresent()) {
-                // If a schema is provided, add or validate it before the
-                // topic is "visible"
-                return ot.get().addSchema(schemaData).thenApply(schemaVersion -> ot);
-            } else {
-                return CompletableFuture.completedFuture(ot);
-            }
-        });
+        return topicFuture;
     }
 
     private void createPersistentTopic(final String topic, boolean createIfMissing, CompletableFuture<Optional<Topic>> topicFuture) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 51c3ee01a4..500ee97427 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -57,6 +57,7 @@
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
+import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -585,20 +586,23 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                             }
                         }
 
-                        service.getOrCreateTopic(topicName.toString(), schema)
+                        service.getOrCreateTopic(topicName.toString())
                                 .thenCompose(topic -> {
                                     if (schema != null) {
-                                        return topic.isSchemaCompatible(schema).thenCompose(isCompatible -> {
-                                            if (isCompatible) {
-                                                return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
-                                                    subType, priorityLevel, consumerName, isDurable,
-                                                    startMessageId, metadata, readCompacted, initialPosition);
-                                            } else {
-                                                return FutureUtil.failedFuture(new BrokerServiceException(
-                                                    "Trying to subscribe with incompatible schema"
-                                                ));
-                                            }
-                                        });
+                                        return topic.addSchemaIfIdleOrCheckCompatible(schema)
+                                            .thenCompose(isCompatible -> {
+                                                    if (isCompatible) {
+                                                        return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
+                                                                subType, priorityLevel, consumerName, isDurable,
+                                                                startMessageId, metadata,
+                                                                readCompacted, initialPosition);
+                                                    } else {
+                                                        return FutureUtil.failedFuture(
+                                                                new BrokerServiceException(
+                                                                        "Trying to subscribe with incompatible schema"
+                                                        ));
+                                                    }
+                                                });
                                     } else {
                                         return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
                                             subType, priorityLevel, consumerName, isDurable,
@@ -792,7 +796,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
 
                         log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);
 
-                        service.getOrCreateTopic(topicName.toString(), schema).thenAccept((Topic topic) -> {
+                        service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
                             // Before creating producer, check if backlog quota exceeded
                             // on topic
                             if (topic.isBacklogQuotaExceeded(producerName)) {
@@ -827,7 +831,16 @@ protected void handleProducer(final CommandProducer cmdProducer) {
                             if (schema != null) {
                                 schemaVersionFuture = topic.addSchema(schema);
                             } else {
-                                schemaVersionFuture = CompletableFuture.completedFuture(SchemaVersion.Empty);
+                                schemaVersionFuture = topic.hasSchema().thenCompose((hasSchema) -> {
+                                        CompletableFuture<SchemaVersion> result = new CompletableFuture<>();
+                                        if (hasSchema) {
+                                            result.completeExceptionally(new IncompatibleSchemaException(
+                                                "Producers cannot connect without a schema to topics with a schema"));
+                                        } else {
+                                            result.complete(SchemaVersion.Empty);
+                                        }
+                                        return result;
+                                    });
                             }
 
                             schemaVersionFuture.exceptionally(exception -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 7488204fca..5837898a58 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -131,9 +131,28 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats
 
     Position getLastMessageId();
 
+    /**
+     * Whether a topic has had a schema defined for it.
+     */
+    CompletableFuture<Boolean> hasSchema();
+
+    /**
+     * Add a schema to the topic. This will fail if the new schema is incompatible with the current
+     * schema.
+     */
     CompletableFuture<SchemaVersion> addSchema(SchemaData schema);
 
+    /**
+     * Check if schema is compatible with current topic schema.
+     */
     CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema);
 
+    /**
+     * If the topic is idle (no producers, no entries, no subscribers and no existing schema),
+     * add the passed schema to the topic. Otherwise, check that the passed schema is compatible
+     * with what the topic already has.
+     */
+    CompletableFuture<Boolean> addSchemaIfIdleOrCheckCompatible(SchemaData schema);
+
     CompletableFuture<Void> deleteForcefully();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 281d8438db..9527191cea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -1002,6 +1002,15 @@ public void markBatchMessagePublished() {
 
     private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
 
+    @Override
+    public CompletableFuture<Boolean> hasSchema() {
+        String base = TopicName.get(getName()).getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        return brokerService.pulsar()
+            .getSchemaRegistryService()
+            .getSchema(id).thenApply((schema) -> schema != null);
+    }
+
     @Override
     public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
         if (schema == null) {
@@ -1023,4 +1032,16 @@ public void markBatchMessagePublished() {
             .getSchemaRegistryService()
             .isCompatibleWithLatestVersion(id, schema);
     }
+
+    @Override
+    public CompletableFuture<Boolean> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
+        return hasSchema()
+            .thenCompose((hasSchema) -> {
+                    if (hasSchema || isActive() || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
+                        return isSchemaCompatible(schema);
+                    } else {
+                        return addSchema(schema).thenApply((ignore) -> true);
+                    }
+                });
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4d289c4260..6c12296a6f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1790,6 +1790,15 @@ public synchronized OffloadProcessStatus offloadStatus() {
 
     private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);
 
+    @Override
+    public CompletableFuture<Boolean> hasSchema() {
+        String base = TopicName.get(getName()).getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        return brokerService.pulsar()
+            .getSchemaRegistryService()
+            .getSchema(id).thenApply((schema) -> schema != null);
+    }
+
     @Override
     public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
         if (schema == null) {
@@ -1811,4 +1820,16 @@ public synchronized OffloadProcessStatus offloadStatus() {
             .getSchemaRegistryService()
             .isCompatibleWithLatestVersion(id, schema);
     }
+
+    @Override
+    public CompletableFuture<Boolean> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
+        return hasSchema()
+            .thenCompose((hasSchema) -> {
+                    if (hasSchema || isActive() || ledger.getTotalSize() != 0) {
+                        return isSchemaCompatible(schema);
+                    } else {
+                        return addSchema(schema).thenApply((ignore) -> true);
+                    }
+                });
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 8cecad4b22..aa0a39eba2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -83,23 +83,25 @@
     @Override
     @NotNull
     public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema) {
-        return checkCompatibilityWithLatest(schemaId, schema).thenCompose(isCompatible -> {
-            if (isCompatible) {
-                byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
-                SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
-                    .setType(Functions.convertFromDomainType(schema.getType()))
-                    .setSchema(ByteString.copyFrom(schema.getData()))
-                    .setSchemaId(schemaId)
-                    .setUser(schema.getUser())
-                    .setDeleted(false)
-                    .setTimestamp(clock.millis())
-                    .addAllProps(toPairs(schema.getProps()))
-                    .build();
-                return schemaStorage.put(schemaId, info.toByteArray(), context);
-            } else {
-                return FutureUtil.failedFuture(new IncompatibleSchemaException());
-            }
-        });
+        return getSchema(schemaId).thenApply(
+                (existingSchema) -> existingSchema == null || isCompatible(existingSchema, schema))
+            .thenCompose(isCompatible -> {
+                    if (isCompatible) {
+                        byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
+                        SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
+                            .setType(Functions.convertFromDomainType(schema.getType()))
+                            .setSchema(ByteString.copyFrom(schema.getData()))
+                            .setSchemaId(schemaId)
+                            .setUser(schema.getUser())
+                            .setDeleted(false)
+                            .setTimestamp(clock.millis())
+                            .addAllProps(toPairs(schema.getProps()))
+                            .build();
+                        return schemaStorage.put(schemaId, info.toByteArray(), context);
+                    } else {
+                        return FutureUtil.failedFuture(new IncompatibleSchemaException());
+                    }
+                });
     }
 
     @Override
@@ -135,15 +137,14 @@ public void close() throws Exception {
             .build();
     }
 
-    private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId, SchemaData schema) {
+    private boolean isCompatible(SchemaAndMetadata existingSchema, SchemaData newSchema) {
+        return compatibilityChecks.getOrDefault(newSchema.getType(), SchemaCompatibilityCheck.DEFAULT)
+            .isCompatible(existingSchema.schema, newSchema);
+    }
 
-        return getSchema(schemaId).thenApply(storedSchema ->
-            (storedSchema == null) ||
-                compatibilityChecks.getOrDefault(
-                    schema.getType(),
-                    SchemaCompatibilityCheck.DEFAULT
-                ).isCompatible(storedSchema.schema, schema)
-        );
+    private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId, SchemaData schema) {
+        return getSchema(schemaId).thenApply(
+                (existingSchema) -> existingSchema != null && isCompatible(existingSchema, schema));
     }
 
     interface Functions {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 7a068234c6..94823ad08e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -171,7 +171,7 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
             consumer.acknowledge(msg);
         }
 
-        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1, null).get();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
         ManagedCursorImpl cursor = (ManagedCursorImpl) topic.getManagedLedger().getCursors().iterator().next();
         retryStrategically((test) -> cursor.getState().equals("Open"), 5, 100);
 
@@ -206,7 +206,7 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
         }
 
         // (5) Broker should create new cursor-ledger and remove old cursor-ledger
-        topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1, null).get();
+        topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
         final ManagedCursorImpl cursor1 = (ManagedCursorImpl) topic.getManagedLedger().getCursors().iterator().next();
         retryStrategically((test) -> cursor1.getState().equals("Open"), 5, 100);
         long newCursorLedgerId = cursor1.getCursorLedger();
@@ -261,7 +261,7 @@ public void testSkipCorruptDataLedger() throws Exception {
         Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
                 .receiverQueueSize(5).subscribe();
 
-        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1, null).get();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
         ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
         ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next();
         Field configField = ManagedCursorImpl.class.getDeclaredField("config");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index b9b1b847d6..54d1644975 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -105,7 +105,7 @@ public void testOwnedNsCheck() throws Exception {
         BrokerService service = pulsar.getBrokerService();
 
         final CountDownLatch latch1 = new CountDownLatch(1);
-        service.getOrCreateTopic(topic, null).thenAccept(t -> {
+        service.getOrCreateTopic(topic).thenAccept(t -> {
             latch1.countDown();
             fail("should fail as NS is not owned");
         }).exceptionally(exception -> {
@@ -118,7 +118,7 @@ public void testOwnedNsCheck() throws Exception {
         admin.lookups().lookupTopic(topic);
 
         final CountDownLatch latch2 = new CountDownLatch(1);
-        service.getOrCreateTopic(topic, null).thenAccept(t -> {
+        service.getOrCreateTopic(topic).thenAccept(t -> {
             try {
                 assertNotNull(service.getTopicReference(topic));
             } catch (Exception e) {
@@ -747,7 +747,7 @@ public void testTopicLoadingOnDisableNamespaceBundle() throws Exception {
 
         // try to create topic which should fail as bundle is disable
         CompletableFuture<Optional<Topic>> futureResult = pulsar.getBrokerService()
-                .loadOrCreatePersistentTopic(topicName, true, null);
+                .loadOrCreatePersistentTopic(topicName, true);
 
         try {
             futureResult.get();
@@ -790,7 +790,7 @@ public void testTopicFailureShouldNotHaveDeadLock() {
 
         // create topic async and wait on the future completion
         executor.submit(() -> {
-            service.getOrCreateTopic(deadLockTestTopic, null).thenAccept(topic -> topicCreation.complete(null)).exceptionally(e -> {
+            service.getOrCreateTopic(deadLockTestTopic).thenAccept(topic -> topicCreation.complete(null)).exceptionally(e -> {
                 topicCreation.completeExceptionally(e.getCause());
                 return null;
             });
@@ -842,7 +842,7 @@ public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception {
 
         // create topic async and wait on the future completion
         executor.submit(() -> {
-            service.getOrCreateTopic(deadLockTestTopic, null).thenAccept(topic -> topicCreation.complete(null)).exceptionally(e -> {
+            service.getOrCreateTopic(deadLockTestTopic).thenAccept(topic -> topicCreation.complete(null)).exceptionally(e -> {
                 topicCreation.completeExceptionally(e.getCause());
                 return null;
             });
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index 6b01cd6d83..e6de6bf62d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pulsar.client.api;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -41,23 +43,147 @@ protected void cleanup() throws Exception {
 
     @Test
     public void testString() throws Exception {
-        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic1").subscriptionName("my-subscriber-name").subscribe();
+        try (Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic1")
+                .subscriptionName("my-subscriber-name").subscribe();
+             Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic1").create()) {
+            int N = 10;
 
-        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic1").create();
+            for (int i = 0; i < N; i++) {
+                producer.send("my-message-" + i);
+            }
 
-        int N = 10;
+            for (int i = 0; i < N; i++) {
+                Message<String> msg = consumer.receive();
+                assertEquals(msg.getValue(), "my-message-" + i);
 
-        for (int i = 0; i < N; i++) {
-            producer.send("my-message-" + i);
+                consumer.acknowledge(msg);
+            }
         }
+    }
+
+    static class V1Data {
+        int i;
+
+        V1Data() {
+            this.i = 0;
+        }
+
+        V1Data(int i) {
+            this.i = i;
+        }
+
+        @Override
+        public int hashCode() {
+            return i;
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            return (other instanceof V1Data) && i == ((V1Data)other).i;
+        }
+    }
+
+    @Test
+    public void newProducerNewTopicNewSchema() throws Exception {
+        String topic = "my-property/my-ns/schema-test";
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
+                .topic(topic).create()) {
+            p.send(new V1Data(0));
+        }
+    }
+
+    @Test
+    public void newProducerTopicExistsWithoutSchema() throws Exception {
+        String topic = "my-property/my-ns/schema-test";
+        try (Producer<byte[]> p = pulsarClient.newProducer().topic(topic).create()) {
+            p.send(topic.getBytes(UTF_8));
+        }
+
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
+                .topic(topic).create()) {
+            p.send(new V1Data(0));
+        }
+    }
+
+    @Test
+    public void newProducerTopicExistsWithSchema() throws Exception {
+        String topic = "my-property/my-ns/schema-test";
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
+                .topic(topic).create()) {
+            p.send(new V1Data(1));
+        }
+
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
+                .topic(topic).create()) {
+            p.send(new V1Data(0));
+        }
+    }
+
+    @Test
+    public void newProducerWithoutSchemaOnTopicWithSchema() throws Exception {
+        String topic = "my-property/my-ns/schema-test";
 
-        for (int i = 0; i < N; i++) {
-            Message<String> msg = consumer.receive();
-            assertEquals(msg.getValue(), "my-message-" + i);
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
+                .topic(topic).create()) {
+            p.send(new V1Data(0));
+        }
+
+        try (Producer<byte[]> p = pulsarClient.newProducer().topic(topic).create()) {
+            Assert.fail("Shouldn't be able to connect to a schema'd topic with no schema");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e.getMessage().contains("IncompatibleSchemaException"));
+        }
+    }
+
+    @Test
+    public void newConsumerWithSchemaOnNewTopic() throws Exception {
+        String topic = "my-property/my-ns/schema-test";
+
+        try (Consumer<V1Data> c = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+                .topic(topic).subscriptionName("sub1").subscribe();
+             Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topic).create()) {
+            V1Data toSend = new V1Data(1);
+            p.send(toSend);
+            Assert.assertEquals(toSend, c.receive().getValue());
+        }
+    }
+
+    @Test
+    public void newConsumerWithSchemaOnExistingTopicWithoutSchema() throws Exception {
+        String topic = "my-property/my-ns/schema-test";
+
+        try (Producer<byte[]> p = pulsarClient.newProducer().topic(topic).create();
+             Consumer<V1Data> c = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+                .topic(topic).subscriptionName("sub1").subscribe()) {
+            Assert.fail("Shouldn't be able to consume with a schema from a topic which has no schema set");
+        } catch (PulsarClientException e) {
+            Assert.assertTrue(e.getMessage().contains("Trying to subscribe with incompatible schema"));
+        }
+    }
+
+    @Test
+    public void newConsumerWithSchemaTopicHasSchema() throws Exception {
+        String topic = "my-property/my-ns/schema-test";
+
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topic).create();
+             Consumer<V1Data> c = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+                .topic(topic).subscriptionName("sub1").subscribe()) {
+            V1Data toSend = new V1Data(1);
+            p.send(toSend);
+            Assert.assertEquals(toSend, c.receive().getValue());
+        }
+    }
+
+    @Test
+    public void newBytesConsumerWithTopicWithSchema() throws Exception {
+        String topic = "my-property/my-ns/schema-test";
 
-            consumer.acknowledge(msg);
+        try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topic).create();
+             Consumer<byte[]> c = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe()) {
+            p.send(new V1Data(1));
+            Assert.assertTrue(c.receive().getValue().length > 0);
         }
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services