You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/24 06:35:01 UTC
[pulsar] 03/03: [Websocket] Fix ``ClassCastException`` when user create ``MultiTopicReader``. (#14316)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 20c8a1e67dff2ee14dfd36e51d80574c3c797aaf
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Thu Feb 17 13:23:03 2022 +0800
[Websocket] Fix ``ClassCastException`` when user create ``MultiTopicReader``. (#14316)
(cherry picked from commit 7a7cf54b01420aeac855eea91529ea13bd753e52)
---
.../org/apache/pulsar/websocket/ReaderHandler.java | 10 +-
.../apache/pulsar/websocket/ReaderHandlerTest.java | 214 +++++++++++++++++++++
2 files changed, 222 insertions(+), 2 deletions(-)
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index ef0279d..2b87802 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MultiTopicsReaderImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -103,8 +104,13 @@ public class ReaderHandler extends AbstractWebSocketHandler {
}
this.reader = builder.create();
-
- this.subscription = ((ReaderImpl<?>) this.reader).getConsumer().getSubscription();
+ if (reader instanceof MultiTopicsReaderImpl) {
+ this.subscription = ((MultiTopicsReaderImpl<?>) reader).getMultiTopicsConsumer().getSubscription();
+ } else if (reader instanceof ReaderImpl) {
+ this.subscription = ((ReaderImpl<?>) reader).getConsumer().getSubscription();
+ } else {
+ throw new IllegalArgumentException(String.format("Illegal Reader Type %s", reader.getClass()));
+ }
if (!this.service.addReader(this)) {
log.warn("[{}:{}] Failed to add reader handler for topic {}", request.getRemoteAddr(),
request.getRemotePort(), topic);
diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
new file mode 100644
index 0000000..0d2a13d
--- /dev/null
+++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsReaderImpl;
+import org.apache.pulsar.client.impl.ReaderImpl;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ReaderHandlerTest {
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCreateReaderImp() throws IOException {
+ final String subName = "readerImpSubscription";
+ // mock data
+ WebSocketService wss = mock(WebSocketService.class);
+ PulsarClient mockedClient = mock(PulsarClient.class);
+ when(wss.getPulsarClient()).thenReturn(mockedClient);
+ ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class);
+ when(mockedClient.newReader()).thenReturn(mockedReaderBuilder);
+ when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder);
+ when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder);
+ when(mockedReaderBuilder.receiverQueueSize(anyInt())).thenReturn(mockedReaderBuilder);
+ ReaderImpl<byte[]> mockedReader = mock(ReaderImpl.class);
+ when(mockedReaderBuilder.create()).thenReturn(mockedReader);
+ ConsumerImpl<byte[]> consumerImp = mock(ConsumerImpl.class);
+ when(consumerImp.getSubscription()).thenReturn(subName);
+ when(mockedReader.getConsumer()).thenReturn(consumerImp);
+ HttpServletRequest request = mock(HttpServletRequest.class);
+ when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic");
+ // create reader handler
+ HttpServletResponse response = spy(HttpServletResponse.class);
+ ServletUpgradeResponse servletUpgradeResponse = new ServletUpgradeResponse(response);
+ ReaderHandler readerHandler = new ReaderHandler(wss, request, servletUpgradeResponse);
+ // verify success
+ Assert.assertEquals(readerHandler.getSubscription(), subName);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCreateMultipleTopicReaderImp() throws IOException {
+ final String subName = "multipleTopicReaderImpSubscription";
+ // mock data
+ WebSocketService wss = mock(WebSocketService.class);
+ PulsarClient mockedClient = mock(PulsarClient.class);
+ when(wss.getPulsarClient()).thenReturn(mockedClient);
+ ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class);
+ when(mockedClient.newReader()).thenReturn(mockedReaderBuilder);
+ when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder);
+ when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder);
+ when(mockedReaderBuilder.receiverQueueSize(anyInt())).thenReturn(mockedReaderBuilder);
+ MultiTopicsReaderImpl<byte[]> mockedReader = mock(MultiTopicsReaderImpl.class);
+ when(mockedReaderBuilder.create()).thenReturn(mockedReader);
+ MultiTopicsConsumerImpl<byte[]> consumerImp = mock(MultiTopicsConsumerImpl.class);
+ when(consumerImp.getSubscription()).thenReturn(subName);
+ when(mockedReader.getMultiTopicsConsumer()).thenReturn(consumerImp);
+ HttpServletRequest request = mock(HttpServletRequest.class);
+ when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic");
+ // create reader handler
+ HttpServletResponse response = spy(HttpServletResponse.class);
+ ServletUpgradeResponse servletUpgradeResponse = new ServletUpgradeResponse(response);
+ ReaderHandler readerHandler = new ReaderHandler(wss, request, servletUpgradeResponse);
+ // verify success
+ Assert.assertEquals(readerHandler.getSubscription(), subName);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCreateIllegalReaderImp() throws IOException {
+ // mock data
+ WebSocketService wss = mock(WebSocketService.class);
+ PulsarClient mockedClient = mock(PulsarClient.class);
+ when(wss.getPulsarClient()).thenReturn(mockedClient);
+ ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class);
+ when(mockedClient.newReader()).thenReturn(mockedReaderBuilder);
+ when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder);
+ when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder);
+ when(mockedReaderBuilder.receiverQueueSize(anyInt())).thenReturn(mockedReaderBuilder);
+ IllegalReader illegalReader = new IllegalReader();
+ when(mockedReaderBuilder.create()).thenReturn(illegalReader);
+ HttpServletRequest request = mock(HttpServletRequest.class);
+ when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic");
+ // create reader handler
+ HttpServletResponse response = spy(HttpServletResponse.class);
+ ServletUpgradeResponse servletUpgradeResponse = new ServletUpgradeResponse(response);
+ new ReaderHandler(wss, request, servletUpgradeResponse);
+ // verify get error
+ verify(response, times(1)).sendError(anyInt(), anyString());
+ }
+
+
+ static class IllegalReader implements Reader<byte[]> {
+
+ @Override
+ public String getTopic() {
+ return null;
+ }
+
+ @Override
+ public Message<byte[]> readNext() throws PulsarClientException {
+ return null;
+ }
+
+ @Override
+ public Message<byte[]> readNext(int timeout, TimeUnit unit) throws PulsarClientException {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Message<byte[]>> readNextAsync() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return null;
+ }
+
+ @Override
+ public boolean hasReachedEndOfTopic() {
+ return false;
+ }
+
+ @Override
+ public boolean hasMessageAvailable() {
+ return false;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> hasMessageAvailableAsync() {
+ return null;
+ }
+
+ @Override
+ public boolean isConnected() {
+ return false;
+ }
+
+ @Override
+ public void seek(MessageId messageId) throws PulsarClientException {
+
+ }
+
+ @Override
+ public void seek(long timestamp) throws PulsarClientException {
+
+ }
+
+ @Override
+ public void seek(Function<String, Object> function) throws PulsarClientException {
+
+ }
+
+ @Override
+ public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> seekAsync(MessageId messageId) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> seekAsync(long timestamp) {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ }
+}