You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by gu...@apache.org on 2022/12/19 06:31:13 UTC
[pulsar] branch branch-2.10 updated: [fix][broker]Update interceptor handler exception (#18940)
This is an automated email from the ASF dual-hosted git repository.
guangning pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new ad9c133a5de [fix][broker]Update interceptor handler exception (#18940)
ad9c133a5de is described below
commit ad9c133a5ded46c9adad099ea77630cdec8bb9bf
Author: Guangning E <gu...@streamnative.io>
AuthorDate: Mon Dec 19 14:10:28 2022 +0800
[fix][broker]Update interceptor handler exception (#18940)
(cherry picked from commit e07b67fd8c5c5cc0fd41ca0c0d956b0cb514c96e)
---
.../apache/pulsar/broker/web/ExceptionHandler.java | 27 +++++++++++++------
.../broker/intercept/BrokerInterceptorTest.java | 30 ++++++++++++++++++++++
.../broker/intercept/CounterBrokerInterceptor.java | 20 ++++++++++++---
3 files changed, 66 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java
index 8b200a8b9f6..b70168853a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java
@@ -25,6 +25,11 @@ import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import org.apache.pulsar.common.intercept.InterceptException;
+import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
@@ -35,16 +40,22 @@ public class ExceptionHandler {
public void handle(ServletResponse response, Exception ex) throws IOException {
if (ex instanceof InterceptException) {
- String reason = ex.getMessage();
- byte[] content = reason.getBytes(StandardCharsets.UTF_8);
- MetaData.Response info = new MetaData.Response();
- info.setHttpVersion(HttpVersion.HTTP_1_1);
- info.setReason(reason);
- info.setStatus(((InterceptException) ex).getErrorCode());
- info.setContentLength(content.length);
if (response instanceof org.eclipse.jetty.server.Response) {
+ String errorData = ObjectMapperFactory
+ .getThreadLocal().writeValueAsString(new ErrorData(ex.getMessage()));
+ byte[] errorBytes = errorData.getBytes(StandardCharsets.UTF_8);
+ int errorCode = ((InterceptException) ex).getErrorCode();
+ HttpFields httpFields = new HttpFields();
+ HttpField httpField = new HttpField(HttpHeader.CONTENT_TYPE, "application/json;charset=utf-8");
+ httpFields.add(httpField);
+ MetaData.Response info = new MetaData.Response(HttpVersion.HTTP_1_1, errorCode, httpFields);
+ info.setHttpVersion(HttpVersion.HTTP_1_1);
+ info.setReason(errorData);
+ info.setStatus(errorCode);
+ info.setContentLength(errorBytes.length);
((org.eclipse.jetty.server.Response) response).getHttpChannel().sendResponse(info,
- ByteBuffer.wrap(content), true);
+ ByteBuffer.wrap(errorBytes),
+ true);
} else {
((HttpServletResponse) response).sendError(((InterceptException) ex).getErrorCode(),
ex.getMessage());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
index 0e46b147e7b..37c0f8fddc1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -40,7 +41,9 @@ import org.testng.annotations.Test;
import java.io.IOException;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static org.mockito.ArgumentMatchers.same;
@@ -209,8 +212,35 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
Awaitility.await().until(() -> !interceptor.getResponseList().isEmpty());
CounterBrokerInterceptor.ResponseEvent responseEvent = interceptor.getResponseList().get(0);
Assert.assertEquals(responseEvent.getRequestUri(), "/admin/v3/test/asyncGet/my-topic/1000");
+
Assert.assertEquals(responseEvent.getResponseStatus(),
javax.ws.rs.core.Response.noContent().build().getStatus());
}
+ public void requestInterceptorFailedTest() {
+ Set<String> allowedClusters = new HashSet<>();
+ allowedClusters.add(configClusterName);
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(new HashSet<>(), allowedClusters);
+ try {
+ admin.tenants().createTenant("test-interceptor-failed-tenant", tenantInfo);
+ Assert.fail("Create tenant because interceptor should fail");
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getHttpError(), "Create tenant failed");
+ }
+
+ try {
+ admin.namespaces().createNamespace("public/test-interceptor-failed-namespace");
+ Assert.fail("Create namespace because interceptor should fail");
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getHttpError(), "Create namespace failed");
+ }
+
+ try {
+ admin.topics().createNonPartitionedTopic("persistent://public/default/test-interceptor-failed-topic");
+ Assert.fail("Create topic because interceptor should fail");
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getHttpError(), "Create topic failed");
+ }
+ }
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index 930bebf5e47..42d3740a548 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -33,6 +33,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
+import org.apache.http.HttpStatus;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
@@ -42,6 +43,7 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.CommandAck;
+import org.apache.pulsar.common.intercept.InterceptException;
import org.eclipse.jetty.server.Response;
@@ -159,10 +161,20 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
}
@Override
- public void onWebserviceRequest(ServletRequest request) {
+ public void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException {
count.incrementAndGet();
+ String url = ((HttpServletRequest) request).getRequestURL().toString();
if (log.isDebugEnabled()) {
- log.debug("[{}] On [{}] Webservice request", count, ((HttpServletRequest) request).getRequestURL().toString());
+ log.debug("[{}] On [{}] Webservice request", count, url);
+ }
+ if (url.contains("/admin/v2/tenants/test-interceptor-failed-tenant")) {
+ throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create tenant failed");
+ }
+ if (url.contains("/admin/v2/namespaces/public/test-interceptor-failed-namespace")) {
+ throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create namespace failed");
+ }
+ if (url.contains("/admin/v2/persistent/public/default/test-interceptor-failed-topic")) {
+ throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create topic failed");
}
}
@@ -170,7 +182,8 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
count.incrementAndGet();
if (log.isDebugEnabled()) {
- log.debug("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest) request).getRequestURL().toString(), response);
+ log.debug("[{}] On [{}] Webservice response {}",
+ count, ((HttpServletRequest) request).getRequestURL().toString(), response);
}
if (response instanceof Response) {
Response res = (Response) response;
@@ -178,6 +191,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
}
}
+
@Override
public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {