You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/20 15:06:06 UTC
[pulsar] 24/31: Fix http produce msg redirect issue. (#15551)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 530fafccaec8e665cf51fca2689f7b7cf827aeef
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri May 13 10:36:33 2022 +0800
Fix http produce msg redirect issue. (#15551)
Master Issue: #15546
### Motivation
When lookup the topic ownership using REST produce, the redirect URI is incorrect, because :
```
uri.getPath(false); //Get the path of the current request relative to the base URI as a string.
```
So the redirect URI does not contain the base path:
```
URI redirectURI = new URI(String.format("%s%s", redirectAddresses.get(0), uri.getPath(false)))
```
(cherry picked from commit 7f976da1b51cd868ec49b5ab43259fea4d48c8e9)
---
.../main/java/org/apache/pulsar/broker/rest/TopicsBase.java | 11 ++++++++---
.../test/java/org/apache/pulsar/broker/admin/TopicsTest.java | 8 ++++----
2 files changed, 12 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
index 770d77794d5..86e8956d950 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.broker.rest;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
+import java.net.URL;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
@@ -41,6 +41,7 @@ import java.util.stream.Collectors;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
@@ -378,10 +379,14 @@ public class TopicsBase extends PersistentTopicsBase {
log.debug("Redirect rest produce request for topic {} from {} to {}.",
topicName, pulsar().getWebServiceAddress(), redirectAddresses.get(0));
}
- URI redirectURI = new URI(String.format("%s%s", redirectAddresses.get(0), uri.getPath(false)));
+ URL redirectAddress = new URL(redirectAddresses.get(0));
+ URI redirectURI = UriBuilder.fromUri(uri.getRequestUri())
+ .host(redirectAddress.getHost())
+ .port(redirectAddress.getPort())
+ .build();
asyncResponse.resume(Response.temporaryRedirect(redirectURI).build());
future.complete(true);
- } catch (URISyntaxException | NullPointerException e) {
+ } catch (Exception e) {
if (log.isDebugEnabled()) {
log.error("Error in preparing redirect url with rest produce message request for topic {}: {}",
topicName, e.getMessage(), e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
index 4edfbfd17db..b0273714c01 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -77,6 +77,7 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import java.io.ByteArrayOutputStream;
+import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@@ -313,13 +314,13 @@ public class TopicsTest extends MockedPulsarServiceBaseTest {
@Test
public void testLookUpWithRedirect() throws Exception {
String topicName = "persistent://" + testTenant + "/" + testNamespace + "/" + testTopicName;
- String requestPath = "/admin/v3/topics/my-tenant/my-namespace/my-topic";
+ URI requestPath = URI.create(pulsar.getWebServiceAddress() + "/topics/my-tenant/my-namespace/my-topic");
//create topic on one broker
admin.topics().createNonPartitionedTopic(topicName);
PulsarService pulsar2 = startBroker(getDefaultConf());
doReturn(false).when(topics).isRequestHttps();
UriInfo uriInfo = mock(UriInfo.class);
- doReturn(requestPath).when(uriInfo).getPath(anyBoolean());
+ doReturn(requestPath).when(uriInfo).getRequestUri();
Whitebox.setInternalState(topics, "uri", uriInfo);
//do produce on another broker
topics.setPulsar(pulsar2);
@@ -336,8 +337,7 @@ public class TopicsTest extends MockedPulsarServiceBaseTest {
// Verify got redirect response
Assert.assertEquals(responseCaptor.getValue().getStatusInfo(), Response.Status.TEMPORARY_REDIRECT);
// Verify URI point to address of broker the topic was created on
- Assert.assertEquals(responseCaptor.getValue().getLocation().toString(),
- pulsar.getWebServiceAddress() + requestPath);
+ Assert.assertEquals(responseCaptor.getValue().getLocation().toString(), requestPath.toString());
}
@Test