You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/28 10:21:39 UTC

[GitHub] [pulsar] freeznet opened a new pull request #14491: [pulsar-io] throw exceptions when kafka offset backing store failed to start

freeznet opened a new pull request #14491:
URL: https://github.com/apache/pulsar/pull/14491


   ### Motivation
   
   `PulsarOffsetBackingStore` uses try-catch block in `start` when init the service, but it only catches `PulsarClientException` and ignores other possible execptions. Which will cause unexpected failed when starting connectors like debezium.
   
   ### Modifications
   
   - catch exceptions and wrap with `RuntimeException`
   - add null check in `processMessage` to prevent NPE when process the messages
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   Check the box below or label this PR directly (if you have committer privilege).
   
   Need to update docs? 
   
   - [ ] `doc-required` 
     
     (If you need help on updating docs, create a doc issue)
     
   - [ ] `no-need-doc` 
     
     (Please explain why)
     
   - [ ] `doc` 
     
     (If this PR contains doc changes)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #14491: [pulsar-io] throw exceptions when kafka offset backing store failed to start

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #14491:
URL: https://github.com/apache/pulsar/pull/14491#discussion_r817319831



##########
File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
##########
@@ -126,10 +127,13 @@ private void readNext(CompletableFuture<Void> endFuture) {
     }
 
     void processMessage(Message<byte[]> message) {
-        synchronized (data) {
+        if (message.getKey() != null) {
             data.put(
                 ByteBuffer.wrap(message.getKey().getBytes(UTF_8)),
                 ByteBuffer.wrap(message.getValue()));
+        } else {
+            log.debug("Got message without key from the offset storage topic, skip it. message value: {}",

Review comment:
       I see.
   
   BTW, I think we should introduce a mechanism to prevent some internal topics (like system topics) from client side in future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #14491: [pulsar-io] throw exceptions when kafka offset backing store failed to start

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #14491:
URL: https://github.com/apache/pulsar/pull/14491#discussion_r816518706



##########
File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
##########
@@ -126,10 +127,13 @@ private void readNext(CompletableFuture<Void> endFuture) {
     }
 
     void processMessage(Message<byte[]> message) {
-        synchronized (data) {
+        if (message.getKey() != null) {
             data.put(
                 ByteBuffer.wrap(message.getKey().getBytes(UTF_8)),
                 ByteBuffer.wrap(message.getValue()));
+        } else {
+            log.debug("Got message without key from the offset storage topic, skip it. message value: {}",

Review comment:
       It looks like the topic is produced by Pulsar producer. See `set` method:
   
   ```java
               producer.newMessage()
                   .key(new String(keyBytes, UTF_8))
   ```
   
   So I think the key could never be null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #14491: [pulsar-io] throw exceptions when kafka offset backing store failed to start

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #14491:
URL: https://github.com/apache/pulsar/pull/14491#discussion_r816506004



##########
File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
##########
@@ -126,10 +127,13 @@ private void readNext(CompletableFuture<Void> endFuture) {
     }
 
     void processMessage(Message<byte[]> message) {
-        synchronized (data) {
+        if (message.getKey() != null) {
             data.put(
                 ByteBuffer.wrap(message.getKey().getBytes(UTF_8)),
                 ByteBuffer.wrap(message.getValue()));
+        } else {
+            log.debug("Got message without key from the offset storage topic, skip it. message value: {}",

Review comment:
       For offset topic, the key must not be `null`, which is guaranteed by Kafka itself. Kafka's coordinator writes messages that must have keys to the offset topic. In addition, Kafka broker prevents any write to the offset topic by a Kafka client.
   
   Therefore, I think there is no need to perform null check here. If you insisted to check null keys, the logs here should be error because it's an unexpected case.

##########
File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
##########
@@ -49,7 +50,7 @@
 @Slf4j
 public class PulsarOffsetBackingStore implements OffsetBackingStore {
 
-    private Map<ByteBuffer, ByteBuffer> data;
+    private ConcurrentHashMap<ByteBuffer, ByteBuffer> data;

Review comment:
       I think it's better to make it `final`. And there is no need to change the type to `ConcurrentHashMap` because we only use the common methods in `Map`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #14491: [pulsar-io] throw exceptions when kafka offset backing store failed to start

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14491:
URL: https://github.com/apache/pulsar/pull/14491#issuecomment-1054122844


   @freeznet:Thanks for your contribution. For this PR, do we need to update docs?
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #14491: [pulsar-io] throw exceptions when kafka offset backing store failed to start

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14491:
URL: https://github.com/apache/pulsar/pull/14491#issuecomment-1054112746


   @freeznet:Thanks for providing doc info!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #14491: [pulsar-io] throw exceptions when kafka offset backing store failed to start

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14491:
URL: https://github.com/apache/pulsar/pull/14491#issuecomment-1054123568


   @freeznet:Thanks for providing doc info!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] freeznet commented on a change in pull request #14491: [pulsar-io] throw exceptions when kafka offset backing store failed to start

Posted by GitBox <gi...@apache.org>.
freeznet commented on a change in pull request #14491:
URL: https://github.com/apache/pulsar/pull/14491#discussion_r817265415



##########
File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
##########
@@ -126,10 +127,13 @@ private void readNext(CompletableFuture<Void> endFuture) {
     }
 
     void processMessage(Message<byte[]> message) {
-        synchronized (data) {
+        if (message.getKey() != null) {
             data.put(
                 ByteBuffer.wrap(message.getKey().getBytes(UTF_8)),
                 ByteBuffer.wrap(message.getValue()));
+        } else {
+            log.debug("Got message without key from the offset storage topic, skip it. message value: {}",

Review comment:
       The messages managed by the backing store service itself do have the guarantees, but we do not have the guarantees from the user themselves, if the user send test messages to the offset topic (i.e. use `pulsar-client produce`) and it could cause the unexpected issues. With this scenario, the unexpected messages should be ignored and not affect the backing store, and this is why it been log as debug. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #14491: [pulsar-io] throw exceptions when kafka offset backing store failed to start

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #14491:
URL: https://github.com/apache/pulsar/pull/14491#discussion_r816510866



##########
File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
##########
@@ -153,6 +157,9 @@ public void start() {
         } catch (PulsarClientException e) {
             log.error("Failed to setup pulsar producer/reader to cluster", e);
             throw new RuntimeException("Failed to setup pulsar producer/reader to cluster ",  e);
+        } catch (Exception e) {
+            log.error("Failed to start PulsarOffsetBackingStore", e);
+            throw new RuntimeException("Failed to start PulsarOffsetBackingStore",  e);

Review comment:
       It's better to change `endFuture.join()` to `endFuture.get()` and catch the specific exceptions (like `ExecutionException`) instead of catching an `Exception`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #14491: [pulsar-io] throw exceptions when kafka offset backing store failed to start

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14491:
URL: https://github.com/apache/pulsar/pull/14491#issuecomment-1054107754






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #14491: [pulsar-io] throw exceptions when kafka offset backing store failed to start

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #14491:
URL: https://github.com/apache/pulsar/pull/14491


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org