You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/02/08 05:58:04 UTC

[pulsar] branch branch-2.11 updated: Fix flakyness of testAutoSchemaFunction by creating the subscription before the function. (#17353)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 949465bd183 Fix flakyness of testAutoSchemaFunction by creating the subscription before the function. (#17353)
949465bd183 is described below

commit 949465bd1835bda93d35d01ee4604082c71e5048
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Wed Aug 31 10:53:09 2022 +0200

    Fix flakyness of testAutoSchemaFunction by creating the subscription before the function. (#17353)
    
    Otherwise if the function creates the subscription, there is no schema on the topic
    and because of a bug a compatibility check fails on the new schema and the new consumer
    can't be creatd
---
 .../tests/integration/functions/PulsarFunctionsTest.java   | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 2b837b55f49..908d95784d6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -809,6 +809,11 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                                     String outputSchemaType,
                                     SubscriptionInitialPosition subscriptionInitialPosition) throws Exception {
 
+        if (StringUtils.isNotEmpty(inputTopicName)) {
+            ensureSubscriptionCreated(
+                inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
+        }
+
         CommandGenerator generator;
         log.info("------- INPUT TOPIC: '{}', customSchemaInputs: {}", inputTopicName, customSchemaInputs);
         if (inputTopicName.endsWith(".*")) {
@@ -854,11 +859,6 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
                 commands);
         assertTrue(result.getStdout().contains("Created successfully"));
-
-        if (StringUtils.isNotEmpty(inputTopicName)) {
-            ensureSubscriptionCreated(
-                    inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
-        }
     }
 
     private void updateFunctionParallelism(String functionName, int parallelism) throws Exception {
@@ -1536,6 +1536,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                                            String logTopicName,
                                            String functionName,
                                            Schema<?> schema) throws Exception {
+        ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), schema);
+
         CommandGenerator generator;
         log.info("------- INPUT TOPIC: '{}'", inputTopicName);
         if (inputTopicName.endsWith(".*")) {
@@ -1556,8 +1558,6 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
                 commands);
         assertTrue(result.getStdout().contains("Created successfully"));
-
-        ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), schema);
     }
 
     private void publishAndConsumeMessages(String inputTopic,