You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by gu...@apache.org on 2022/12/19 06:31:13 UTC

[pulsar] branch branch-2.10 updated: [fix][broker]Update interceptor handler exception (#18940)

This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new ad9c133a5de [fix][broker]Update interceptor handler exception (#18940)
ad9c133a5de is described below

commit ad9c133a5ded46c9adad099ea77630cdec8bb9bf
Author: Guangning E <gu...@streamnative.io>
AuthorDate: Mon Dec 19 14:10:28 2022 +0800

    [fix][broker]Update interceptor handler exception (#18940)
    
    (cherry picked from commit e07b67fd8c5c5cc0fd41ca0c0d956b0cb514c96e)
---
 .../apache/pulsar/broker/web/ExceptionHandler.java | 27 +++++++++++++------
 .../broker/intercept/BrokerInterceptorTest.java    | 30 ++++++++++++++++++++++
 .../broker/intercept/CounterBrokerInterceptor.java | 20 ++++++++++++---
 3 files changed, 66 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java
index 8b200a8b9f6..b70168853a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java
@@ -25,6 +25,11 @@ import javax.servlet.ServletResponse;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.Response;
 import org.apache.pulsar.common.intercept.InterceptException;
+import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpHeader;
 import org.eclipse.jetty.http.HttpVersion;
 import org.eclipse.jetty.http.MetaData;
 
@@ -35,16 +40,22 @@ public class ExceptionHandler {
 
     public void handle(ServletResponse response, Exception ex) throws IOException {
         if (ex instanceof InterceptException) {
-            String reason = ex.getMessage();
-            byte[] content = reason.getBytes(StandardCharsets.UTF_8);
-            MetaData.Response info = new MetaData.Response();
-            info.setHttpVersion(HttpVersion.HTTP_1_1);
-            info.setReason(reason);
-            info.setStatus(((InterceptException) ex).getErrorCode());
-            info.setContentLength(content.length);
             if (response instanceof org.eclipse.jetty.server.Response) {
+                String errorData = ObjectMapperFactory
+                        .getThreadLocal().writeValueAsString(new ErrorData(ex.getMessage()));
+                byte[] errorBytes = errorData.getBytes(StandardCharsets.UTF_8);
+                int errorCode = ((InterceptException) ex).getErrorCode();
+                HttpFields httpFields = new HttpFields();
+                HttpField httpField = new HttpField(HttpHeader.CONTENT_TYPE, "application/json;charset=utf-8");
+                httpFields.add(httpField);
+                MetaData.Response info = new MetaData.Response(HttpVersion.HTTP_1_1, errorCode, httpFields);
+                info.setHttpVersion(HttpVersion.HTTP_1_1);
+                info.setReason(errorData);
+                info.setStatus(errorCode);
+                info.setContentLength(errorBytes.length);
                 ((org.eclipse.jetty.server.Response) response).getHttpChannel().sendResponse(info,
-                        ByteBuffer.wrap(content), true);
+                        ByteBuffer.wrap(errorBytes),
+                        true);
             } else {
                 ((HttpServletResponse) response).sendError(((InterceptException) ex).getErrorCode(),
                         ex.getMessage());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
index 0e46b147e7b..37c0f8fddc1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -40,7 +41,9 @@ import org.testng.annotations.Test;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.ArgumentMatchers.same;
@@ -209,8 +212,35 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
         Awaitility.await().until(() -> !interceptor.getResponseList().isEmpty());
         CounterBrokerInterceptor.ResponseEvent responseEvent = interceptor.getResponseList().get(0);
         Assert.assertEquals(responseEvent.getRequestUri(), "/admin/v3/test/asyncGet/my-topic/1000");
+
         Assert.assertEquals(responseEvent.getResponseStatus(),
                 javax.ws.rs.core.Response.noContent().build().getStatus());
     }
 
+    public void requestInterceptorFailedTest() {
+        Set<String> allowedClusters = new HashSet<>();
+        allowedClusters.add(configClusterName);
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(new HashSet<>(), allowedClusters);
+        try {
+            admin.tenants().createTenant("test-interceptor-failed-tenant", tenantInfo);
+            Assert.fail("Create tenant because interceptor should fail");
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getHttpError(), "Create tenant failed");
+        }
+
+        try {
+            admin.namespaces().createNamespace("public/test-interceptor-failed-namespace");
+            Assert.fail("Create namespace because interceptor should fail");
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getHttpError(), "Create namespace failed");
+        }
+
+        try {
+            admin.topics().createNonPartitionedTopic("persistent://public/default/test-interceptor-failed-topic");
+            Assert.fail("Create topic because interceptor should fail");
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getHttpError(), "Create topic failed");
+        }
+    }
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index 930bebf5e47..42d3740a548 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -33,6 +33,7 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.http.HttpStatus;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Producer;
@@ -42,6 +43,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.CommandAck;
+import org.apache.pulsar.common.intercept.InterceptException;
 import org.eclipse.jetty.server.Response;
 
 
@@ -159,10 +161,20 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
     }
 
     @Override
-    public void onWebserviceRequest(ServletRequest request) {
+    public void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException {
         count.incrementAndGet();
+        String url = ((HttpServletRequest) request).getRequestURL().toString();
         if (log.isDebugEnabled()) {
-            log.debug("[{}] On [{}] Webservice request", count, ((HttpServletRequest) request).getRequestURL().toString());
+            log.debug("[{}] On [{}] Webservice request", count, url);
+        }
+        if (url.contains("/admin/v2/tenants/test-interceptor-failed-tenant")) {
+            throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create tenant failed");
+        }
+        if (url.contains("/admin/v2/namespaces/public/test-interceptor-failed-namespace")) {
+            throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create namespace failed");
+        }
+        if (url.contains("/admin/v2/persistent/public/default/test-interceptor-failed-topic")) {
+            throw new InterceptException(HttpStatus.SC_PRECONDITION_FAILED, "Create topic failed");
         }
     }
 
@@ -170,7 +182,8 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
     public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
         count.incrementAndGet();
         if (log.isDebugEnabled()) {
-            log.debug("[{}] On [{}] Webservice response {}", count, ((HttpServletRequest) request).getRequestURL().toString(), response);
+            log.debug("[{}] On [{}] Webservice response {}",
+                    count, ((HttpServletRequest) request).getRequestURL().toString(), response);
         }
         if (response instanceof Response) {
             Response res = (Response) response;
@@ -178,6 +191,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
         }
     }
 
+
     @Override
     public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain)
             throws IOException, ServletException {