You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/02/08 05:58:04 UTC
[pulsar] branch branch-2.11 updated: Fix flakyness of testAutoSchemaFunction by creating the subscription before the function. (#17353)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 949465bd183 Fix flakyness of testAutoSchemaFunction by creating the subscription before the function. (#17353)
949465bd183 is described below
commit 949465bd1835bda93d35d01ee4604082c71e5048
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Wed Aug 31 10:53:09 2022 +0200
Fix flakyness of testAutoSchemaFunction by creating the subscription before the function. (#17353)
Otherwise if the function creates the subscription, there is no schema on the topic
and because of a bug a compatibility check fails on the new schema and the new consumer
can't be creatd
---
.../tests/integration/functions/PulsarFunctionsTest.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 2b837b55f49..908d95784d6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -809,6 +809,11 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
String outputSchemaType,
SubscriptionInitialPosition subscriptionInitialPosition) throws Exception {
+ if (StringUtils.isNotEmpty(inputTopicName)) {
+ ensureSubscriptionCreated(
+ inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
+ }
+
CommandGenerator generator;
log.info("------- INPUT TOPIC: '{}', customSchemaInputs: {}", inputTopicName, customSchemaInputs);
if (inputTopicName.endsWith(".*")) {
@@ -854,11 +859,6 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
commands);
assertTrue(result.getStdout().contains("Created successfully"));
-
- if (StringUtils.isNotEmpty(inputTopicName)) {
- ensureSubscriptionCreated(
- inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
- }
}
private void updateFunctionParallelism(String functionName, int parallelism) throws Exception {
@@ -1536,6 +1536,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
String logTopicName,
String functionName,
Schema<?> schema) throws Exception {
+ ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), schema);
+
CommandGenerator generator;
log.info("------- INPUT TOPIC: '{}'", inputTopicName);
if (inputTopicName.endsWith(".*")) {
@@ -1556,8 +1558,6 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
commands);
assertTrue(result.getStdout().contains("Created successfully"));
-
- ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), schema);
}
private void publishAndConsumeMessages(String inputTopic,