You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/07 14:57:24 UTC

[GitHub] [flink] syhily opened a new pull request, #21252: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase

syhily opened a new pull request, #21252:
URL: https://github.com/apache/flink/pull/21252

   ## What is the purpose of the change
   
   The tests in `PulsarSinkITCase` may failed with some error logs like below. This is caused by we may consume messages on a blank topic with no schema. Pulsar topic's schema is define by the first message sent to it. Or you can create schema on topic manually.
   
   ```
   Failed to subscribe for topic [persistent://public/default/bCQuHnEp] in topics consumer, subscribe error: org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: {"errorMsg":"Topic does not have schema to check","reqId":2052942634743575747, "remote":"localhost/127.0.0.1:44835", "local":"/127.0.0.1:41334"}
   ```
   
   In this test, we will manually create the topic schema before the test which fixes this race condition.
   
   ## Brief change log
   
   - Create schema in `PulsarSinkITCase`.
   - Log all the exceptions in `ControlSource` for better debugging.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as:
   
   - PulsarSinkITCase
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #21252: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #21252:
URL: https://github.com/apache/flink/pull/21252#discussion_r1019821626


##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java:
##########
@@ -180,6 +180,10 @@ public void createTopic(String topic, int numberOfPartitions) {
         }
     }
 
+    public void createSchema(String topic, Schema<?> schema) {

Review Comment:
   This is a tools in testing code. The end user can't touch these code. Should we add this `@VisibleForTesting` annotation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on pull request #21252: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase

Posted by GitBox <gi...@apache.org>.
syhily commented on PR #21252:
URL: https://github.com/apache/flink/pull/21252#issuecomment-1306066610

   The CI failed in e2e tests. But it's not related to Pulsar. So I didn't trigger the CI again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #21252: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #21252:
URL: https://github.com/apache/flink/pull/21252#discussion_r1019826795


##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java:
##########
@@ -183,37 +191,58 @@ public List<String> getExpectedRecords() {
     private static class StopSignal implements Closeable {
         private static final Logger LOG = LoggerFactory.getLogger(StopSignal.class);
 
-        private final String topic;
         private final int desiredCounts;
         // This is a thread-safe list.
         private final List<String> consumedRecords;
         private final AtomicLong deadline;
         private final ExecutorService executor;
+        private final Consumer<String> consumer;
+        private final AtomicReference<PulsarClientException> throwableException;
 
         public StopSignal(
                 PulsarRuntimeOperator operator, String topic, int messageCounts, Duration timeout) {
-            this.topic = topic;
             this.desiredCounts = messageCounts;
             this.consumedRecords = Collections.synchronizedList(new ArrayList<>(messageCounts));
             this.deadline = new AtomicLong(timeout.toMillis() + System.currentTimeMillis());
             this.executor = Executors.newSingleThreadExecutor();
+            ConsumerBuilder<String> consumerBuilder =
+                    operator.client()
+                            .newConsumer(Schema.STRING)
+                            .topic(topic)
+                            .subscriptionName(randomAlphanumeric(10))
+                            .subscriptionMode(Durable)
+                            .subscriptionType(Exclusive)
+                            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+            this.consumer = sneakyClient(consumerBuilder::subscribe);
+            this.throwableException = new AtomicReference<>();

Review Comment:
   I think it's ok to use `CompletableFuture` with better readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun merged pull request #21252: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase

Posted by GitBox <gi...@apache.org>.
tisonkun merged PR #21252:
URL: https://github.com/apache/flink/pull/21252


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #21252: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #21252:
URL: https://github.com/apache/flink/pull/21252#discussion_r1019822514


##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java:
##########
@@ -180,6 +180,10 @@ public void createTopic(String topic, int numberOfPartitions) {
         }
     }
 
+    public void createSchema(String topic, Schema<?> schema) {
+        sneakyAdmin(() -> admin().schemas().createSchema(topic, schema.getSchemaInfo()));

Review Comment:
   These checked exceptions should be handled in some situation. But I `sneaky` them just because of the poor design in Pulsar API. We can leave it here and work through all the code to see which `sneaky` should be better removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #21252: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #21252:
URL: https://github.com/apache/flink/pull/21252#discussion_r1019825570


##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java:
##########
@@ -183,37 +191,58 @@ public List<String> getExpectedRecords() {
     private static class StopSignal implements Closeable {
         private static final Logger LOG = LoggerFactory.getLogger(StopSignal.class);
 
-        private final String topic;
         private final int desiredCounts;
         // This is a thread-safe list.
         private final List<String> consumedRecords;
         private final AtomicLong deadline;
         private final ExecutorService executor;
+        private final Consumer<String> consumer;

Review Comment:
   This `Consumer` is created and used in `ControlSource` with an `SharedReference`. Because we may use it both in a Flink testing instance and the testing code for consuming the messages from Pulsar and judge if we can stop this application.
   
   It's quite ok to remove it. But I prefer to keep it with a close method. I'll add it in next commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #21252: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #21252:
URL: https://github.com/apache/flink/pull/21252#discussion_r1019857265


##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java:
##########
@@ -183,37 +191,58 @@ public List<String> getExpectedRecords() {
     private static class StopSignal implements Closeable {
         private static final Logger LOG = LoggerFactory.getLogger(StopSignal.class);
 
-        private final String topic;
         private final int desiredCounts;
         // This is a thread-safe list.
         private final List<String> consumedRecords;
         private final AtomicLong deadline;
         private final ExecutorService executor;
+        private final Consumer<String> consumer;
+        private final AtomicReference<PulsarClientException> throwableException;
 
         public StopSignal(
                 PulsarRuntimeOperator operator, String topic, int messageCounts, Duration timeout) {
-            this.topic = topic;
             this.desiredCounts = messageCounts;
             this.consumedRecords = Collections.synchronizedList(new ArrayList<>(messageCounts));
             this.deadline = new AtomicLong(timeout.toMillis() + System.currentTimeMillis());
             this.executor = Executors.newSingleThreadExecutor();
+            ConsumerBuilder<String> consumerBuilder =
+                    operator.client()
+                            .newConsumer(Schema.STRING)
+                            .topic(topic)
+                            .subscriptionName(randomAlphanumeric(10))
+                            .subscriptionMode(Durable)
+                            .subscriptionType(Exclusive)
+                            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+            this.consumer = sneakyClient(consumerBuilder::subscribe);
+            this.throwableException = new AtomicReference<>();
 
             // Start consuming.
             executor.execute(
                     () -> {
                         while (consumedRecords.size() < desiredCounts) {
                             // This method would block until we consumed a message.
                             int counts = desiredCounts - consumedRecords.size();
-                            List<Message<String>> messages =
-                                    operator.receiveMessages(this.topic, Schema.STRING, counts);
-                            for (Message<String> message : messages) {
-                                consumedRecords.add(message.getValue());
+                            for (int i = 0; i < counts; i++) {
+                                try {
+                                    Message<String> message = consumer.receive();
+                                    consumedRecords.add(message.getValue());
+                                } catch (PulsarClientException e) {
+                                    throwableException.set(e);
+                                    break;
+                                }
                             }
                         }
                     });
         }
 
         public boolean canStop() {
+            PulsarClientException exception = throwableException.get();
+            if (exception != null) {
+                LOG.error("Error in consuming messages from Pulsar.");
+                LOG.error("", exception);
+                return true;
+            }

Review Comment:
   Yep. I think you are right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #21252: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #21252:
URL: https://github.com/apache/flink/pull/21252#discussion_r1016457270


##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java:
##########
@@ -183,37 +191,58 @@ public List<String> getExpectedRecords() {
     private static class StopSignal implements Closeable {
         private static final Logger LOG = LoggerFactory.getLogger(StopSignal.class);
 
-        private final String topic;
         private final int desiredCounts;
         // This is a thread-safe list.
         private final List<String> consumedRecords;
         private final AtomicLong deadline;
         private final ExecutorService executor;
+        private final Consumer<String> consumer;
+        private final AtomicReference<PulsarClientException> throwableException;
 
         public StopSignal(
                 PulsarRuntimeOperator operator, String topic, int messageCounts, Duration timeout) {
-            this.topic = topic;
             this.desiredCounts = messageCounts;
             this.consumedRecords = Collections.synchronizedList(new ArrayList<>(messageCounts));
             this.deadline = new AtomicLong(timeout.toMillis() + System.currentTimeMillis());
             this.executor = Executors.newSingleThreadExecutor();
+            ConsumerBuilder<String> consumerBuilder =

Review Comment:
   ```suggestion
               final ConsumerBuilder<String> consumerBuilder =
   ```
   nit



##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java:
##########
@@ -180,6 +180,10 @@ public void createTopic(String topic, int numberOfPartitions) {
         }
     }
 
+    public void createSchema(String topic, Schema<?> schema) {

Review Comment:
   ```suggestion
       @VisibleForTesting
       public void createSchema(String topic, Schema<?> schema) {
   ```



##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java:
##########
@@ -180,6 +180,10 @@ public void createTopic(String topic, int numberOfPartitions) {
         }
     }
 
+    public void createSchema(String topic, Schema<?> schema) {
+        sneakyAdmin(() -> admin().schemas().createSchema(topic, schema.getSchemaInfo()));

Review Comment:
   Why do we use these `sneaky` utility functions everywhere? It feels a bit fishy to hide checked exceptions. Either we have to document the reason properly or we should get rid of it and do proper exception handling. WDYT?
   I understand that such a change would be out-of-scope of this PR. We would need to create a follow-up ticket.



##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java:
##########
@@ -183,37 +191,58 @@ public List<String> getExpectedRecords() {
     private static class StopSignal implements Closeable {
         private static final Logger LOG = LoggerFactory.getLogger(StopSignal.class);
 
-        private final String topic;
         private final int desiredCounts;
         // This is a thread-safe list.
         private final List<String> consumedRecords;
         private final AtomicLong deadline;
         private final ExecutorService executor;
+        private final Consumer<String> consumer;
+        private final AtomicReference<PulsarClientException> throwableException;
 
         public StopSignal(
                 PulsarRuntimeOperator operator, String topic, int messageCounts, Duration timeout) {
-            this.topic = topic;
             this.desiredCounts = messageCounts;
             this.consumedRecords = Collections.synchronizedList(new ArrayList<>(messageCounts));
             this.deadline = new AtomicLong(timeout.toMillis() + System.currentTimeMillis());
             this.executor = Executors.newSingleThreadExecutor();
+            ConsumerBuilder<String> consumerBuilder =
+                    operator.client()
+                            .newConsumer(Schema.STRING)
+                            .topic(topic)
+                            .subscriptionName(randomAlphanumeric(10))
+                            .subscriptionMode(Durable)
+                            .subscriptionType(Exclusive)
+                            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+            this.consumer = sneakyClient(consumerBuilder::subscribe);
+            this.throwableException = new AtomicReference<>();

Review Comment:
   What about working with `CompletableFutures` here? It feels more natural to use rather than utilizing an `AtomicReference` here.



##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java:
##########
@@ -183,37 +191,58 @@ public List<String> getExpectedRecords() {
     private static class StopSignal implements Closeable {
         private static final Logger LOG = LoggerFactory.getLogger(StopSignal.class);
 
-        private final String topic;
         private final int desiredCounts;
         // This is a thread-safe list.
         private final List<String> consumedRecords;
         private final AtomicLong deadline;
         private final ExecutorService executor;
+        private final Consumer<String> consumer;
+        private final AtomicReference<PulsarClientException> throwableException;
 
         public StopSignal(
                 PulsarRuntimeOperator operator, String topic, int messageCounts, Duration timeout) {
-            this.topic = topic;
             this.desiredCounts = messageCounts;
             this.consumedRecords = Collections.synchronizedList(new ArrayList<>(messageCounts));
             this.deadline = new AtomicLong(timeout.toMillis() + System.currentTimeMillis());
             this.executor = Executors.newSingleThreadExecutor();
+            ConsumerBuilder<String> consumerBuilder =
+                    operator.client()
+                            .newConsumer(Schema.STRING)
+                            .topic(topic)
+                            .subscriptionName(randomAlphanumeric(10))
+                            .subscriptionMode(Durable)
+                            .subscriptionType(Exclusive)
+                            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+            this.consumer = sneakyClient(consumerBuilder::subscribe);
+            this.throwableException = new AtomicReference<>();
 
             // Start consuming.
             executor.execute(
                     () -> {
                         while (consumedRecords.size() < desiredCounts) {
                             // This method would block until we consumed a message.
                             int counts = desiredCounts - consumedRecords.size();
-                            List<Message<String>> messages =
-                                    operator.receiveMessages(this.topic, Schema.STRING, counts);
-                            for (Message<String> message : messages) {
-                                consumedRecords.add(message.getValue());
+                            for (int i = 0; i < counts; i++) {
+                                try {
+                                    Message<String> message = consumer.receive();
+                                    consumedRecords.add(message.getValue());
+                                } catch (PulsarClientException e) {
+                                    throwableException.set(e);
+                                    break;
+                                }
                             }
                         }
                     });
         }
 
         public boolean canStop() {
+            PulsarClientException exception = throwableException.get();
+            if (exception != null) {
+                LOG.error("Error in consuming messages from Pulsar.");
+                LOG.error("", exception);
+                return true;
+            }

Review Comment:
   I feel like there is a non-optimal separation of concerns here: we shouldn't log in a method that checks the state but rather have the logging be done in the calling method.



##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java:
##########
@@ -183,37 +191,58 @@ public List<String> getExpectedRecords() {
     private static class StopSignal implements Closeable {
         private static final Logger LOG = LoggerFactory.getLogger(StopSignal.class);
 
-        private final String topic;
         private final int desiredCounts;
         // This is a thread-safe list.
         private final List<String> consumedRecords;
         private final AtomicLong deadline;
         private final ExecutorService executor;
+        private final Consumer<String> consumer;
+        private final AtomicReference<PulsarClientException> throwableException;
 
         public StopSignal(
                 PulsarRuntimeOperator operator, String topic, int messageCounts, Duration timeout) {
-            this.topic = topic;
             this.desiredCounts = messageCounts;
             this.consumedRecords = Collections.synchronizedList(new ArrayList<>(messageCounts));
             this.deadline = new AtomicLong(timeout.toMillis() + System.currentTimeMillis());
             this.executor = Executors.newSingleThreadExecutor();
+            ConsumerBuilder<String> consumerBuilder =
+                    operator.client()
+                            .newConsumer(Schema.STRING)
+                            .topic(topic)
+                            .subscriptionName(randomAlphanumeric(10))
+                            .subscriptionMode(Durable)
+                            .subscriptionType(Exclusive)

Review Comment:
   ```suggestion
                               .subscriptionMode(SubscriptionMode.Durable)
                               .subscriptionType(SubscriptionType.Exclusive)
   ```
   nit: The enum usage here is not obvious because the enum identifiers are not uppercased. Could we replace the static import for these with a qualified access to improve readability?



##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java:
##########
@@ -183,37 +191,58 @@ public List<String> getExpectedRecords() {
     private static class StopSignal implements Closeable {
         private static final Logger LOG = LoggerFactory.getLogger(StopSignal.class);
 
-        private final String topic;
         private final int desiredCounts;
         // This is a thread-safe list.
         private final List<String> consumedRecords;
         private final AtomicLong deadline;
         private final ExecutorService executor;
+        private final Consumer<String> consumer;

Review Comment:
   The Consumer is only accessed in the constructor. I don't see a necessity to create a dedicated field for that one. Or am I missing something in this aspect? :thinking:  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #21252: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #21252:
URL: https://github.com/apache/flink/pull/21252#discussion_r1019822514


##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java:
##########
@@ -180,6 +180,10 @@ public void createTopic(String topic, int numberOfPartitions) {
         }
     }
 
+    public void createSchema(String topic, Schema<?> schema) {
+        sneakyAdmin(() -> admin().schemas().createSchema(topic, schema.getSchemaInfo()));

Review Comment:
   These checked exceptions should be in some situation. But I `sneaky` them just because of the poor design in Pulsar API. We can leave it here and work through all the code to see which `sneaky` should be better removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #21252: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase

Posted by GitBox <gi...@apache.org>.
XComp commented on PR #21252:
URL: https://github.com/apache/flink/pull/21252#issuecomment-1321488646

   Thanks for taking over @tisonkun. I was offline for a week anyway. So, good that you unblocked PR #21249 by merging this PR.
   
   > Style concern, if it's not enforced by the spotless plugin, varies from person to person. I believe who writes the most part of the code has the right to write code in his/her style.
   
   I agree with your view on that. Most of my comments were of cosmetic nature and not really blocking the PR. I created a follow-up issue FLINK-30109 to discuss the sneaky utility methods, though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #21252: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #21252:
URL: https://github.com/apache/flink/pull/21252#discussion_r1019864714


##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java:
##########
@@ -180,6 +180,10 @@ public void createTopic(String topic, int numberOfPartitions) {
         }
     }
 
+    public void createSchema(String topic, Schema<?> schema) {

Review Comment:
   Since it's already a test scope class, I think we don't need this annotation :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #21252: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #21252:
URL: https://github.com/apache/flink/pull/21252#discussion_r1027592579


##########
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java:
##########
@@ -180,6 +180,10 @@ public void createTopic(String topic, int numberOfPartitions) {
         }
     }
 
+    public void createSchema(String topic, Schema<?> schema) {
+        sneakyAdmin(() -> admin().schemas().createSchema(topic, schema.getSchemaInfo()));

Review Comment:
   Thanks for the brief clarification, @syhily . I created FLINK-30109 to cover the issue. We either should add more documentation clarifying why we do this or removing it as it's a workaround that might raise questions to readers. Let's move the discussion into FLINK-30109. I'm curious about your view on that and why the Pulsar API is forcing us to do this exception transformation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21252: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21252:
URL: https://github.com/apache/flink/pull/21252#issuecomment-1305750844

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "79cf1b3b4d3e071f821be08f6469bb7cfd6202c2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "79cf1b3b4d3e071f821be08f6469bb7cfd6202c2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 79cf1b3b4d3e071f821be08f6469bb7cfd6202c2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org