You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/05/11 03:46:50 UTC

[pulsar] branch master updated: [WebSocket] Fix MultiTopicReader#getConsumer ClassCastException (#15534)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dd6d3720f0a [WebSocket] Fix MultiTopicReader#getConsumer ClassCastException (#15534)
dd6d3720f0a is described below

commit dd6d3720f0a1d10df9885592ff0e0f9481325f2a
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Tue May 10 22:46:44 2022 -0500

    [WebSocket] Fix MultiTopicReader#getConsumer ClassCastException (#15534)
    
    ### Motivation
    
    This fixes an issue similar to the one solved in https://github.com/apache/pulsar/pull/14316. When the `reader` is a `MultiTopicReader`, the `getConsumer()` method currently throws a `ClassCastException`.
    
    ### Modifications
    
    * Update `MultiTopicReader#getConsumer` so that it safely casts the `reader`.
    * Update the `ReaderHandler` constructor to use the `getConsumer` method.
    
    ### Verifying this change
    
    I expanded existing tests to cover the scenario that would have previously failed.
    
    ### Does this pull request potentially affect one of the following parts:
    
    No, this is not a breaking change.
---
 .../java/org/apache/pulsar/websocket/ReaderHandler.java  | 16 ++++++++++------
 .../org/apache/pulsar/websocket/ReaderHandlerTest.java   |  4 ++++
 2 files changed, 14 insertions(+), 6 deletions(-)

diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index 54688ddb6b9..6510386aef9 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -104,13 +104,11 @@ public class ReaderHandler extends AbstractWebSocketHandler {
             }
 
             this.reader = builder.create();
-            if (reader instanceof MultiTopicsReaderImpl) {
-                this.subscription = ((MultiTopicsReaderImpl<?>) reader).getMultiTopicsConsumer().getSubscription();
-            } else if (reader instanceof ReaderImpl) {
-                this.subscription = ((ReaderImpl<?>) reader).getConsumer().getSubscription();
-            } else {
+            Consumer<?> consumer = getConsumer();
+            if (consumer == null) {
                 throw new IllegalArgumentException(String.format("Illegal Reader Type %s", reader.getClass()));
             }
+            this.subscription = consumer.getSubscription();
             if (!this.service.addReader(this)) {
                 log.warn("[{}:{}] Failed to add reader handler for topic {}", request.getRemoteAddr(),
                         request.getRemotePort(), topic);
@@ -272,7 +270,13 @@ public class ReaderHandler extends AbstractWebSocketHandler {
     }
 
     public Consumer<?> getConsumer() {
-        return reader != null ? ((ReaderImpl<?>) reader).getConsumer() : null;
+        if (reader instanceof MultiTopicsReaderImpl) {
+            return ((MultiTopicsReaderImpl<?>) reader).getMultiTopicsConsumer();
+        } else if (reader instanceof ReaderImpl) {
+            return ((ReaderImpl<?>) reader).getConsumer();
+        } else {
+            return null;
+        }
     }
 
     public String getSubscription() {
diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
index 0d2a13d1a74..7dfa8b6e314 100644
--- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
+++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
@@ -74,6 +74,8 @@ public class ReaderHandlerTest {
         ReaderHandler readerHandler = new ReaderHandler(wss, request, servletUpgradeResponse);
         // verify success
         Assert.assertEquals(readerHandler.getSubscription(), subName);
+        // Verify consumer is returned
+        readerHandler.getConsumer();
     }
 
     @Test
@@ -102,6 +104,8 @@ public class ReaderHandlerTest {
         ReaderHandler readerHandler = new ReaderHandler(wss, request, servletUpgradeResponse);
         // verify success
         Assert.assertEquals(readerHandler.getSubscription(), subName);
+        // Verify consumer is successfully returned
+        readerHandler.getConsumer();
     }
 
     @Test