You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/26 06:58:08 UTC

[pulsar] branch master updated: [improve] [admin] [PIP-179] Dynamic configuration for check unknown request parameters (#16781)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2330f6e42d7 [improve] [admin] [PIP-179] Dynamic configuration for check unknown request parameters (#16781)
2330f6e42d7 is described below

commit 2330f6e42d743d98b7f5bc46e2450c2a8090e2fd
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Tue Jul 26 14:58:01 2022 +0800

    [improve] [admin] [PIP-179] Dynamic configuration for check unknown request parameters (#16781)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  7 ++++
 .../web/UnrecognizedPropertyExceptionMapper.java   | 34 ++++++++++++++++
 .../pulsar/broker/service/BrokerService.java       |  5 +++
 .../org/apache/pulsar/broker/web/WebService.java   |  8 ++++
 .../apache/pulsar/broker/admin/AdminRestTest.java  | 47 ++++++++++++++++++----
 site2/docs/reference-configuration.md              |  1 +
 6 files changed, 94 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f8c041c79b7..2dfa122d83d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1494,6 +1494,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
         )
     private double httpRequestsMaxPerSecond = 100.0;
 
+    @FieldContext(
+            category =  CATEGORY_HTTP,
+            dynamic = true,
+            doc = "Admin API fail on unknown request parameter in request-body. see PIP-179. Default false."
+        )
+    private boolean httpRequestsFailOnUnknownPropertiesEnabled = false;
+
     @FieldContext(
         category = CATEGORY_SASL_AUTH,
         doc = "This is a regexp, which limits the range of possible ids which can connect to the Broker using SASL.\n"
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/UnrecognizedPropertyExceptionMapper.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/UnrecognizedPropertyExceptionMapper.java
new file mode 100644
index 00000000000..63111570024
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/UnrecognizedPropertyExceptionMapper.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+
+public class UnrecognizedPropertyExceptionMapper implements ExceptionMapper<UnrecognizedPropertyException> {
+
+    @Override
+    public Response toResponse(UnrecognizedPropertyException exception) {
+        String response = String.format("Unknown property %s, perhaps you want to use one of these: %s",
+                exception.getPropertyName(), String.valueOf(exception.getKnownPropertyIds()));
+        return Response.status(Response.Status.BAD_REQUEST).entity(response).type(MediaType.TEXT_PLAIN).build();
+    }
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8e701ebf5bf..c8efd2335c3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2344,6 +2344,11 @@ public class BrokerService implements Closeable {
             this.updateMaxNumPartitionsPerPartitionedTopic((int) maxNumPartitions);
         });
 
+        // add listener to notify web service httpRequestsFailOnUnknownPropertiesEnabled changed.
+        registerConfigurationListener("httpRequestsFailOnUnknownPropertiesEnabled", enabled -> {
+            pulsar.getWebService().updateHttpRequestsFailOnUnknownPropertiesEnabled((boolean) enabled);
+        });
+
         // add more listeners here
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 508146da63f..57da8df5e73 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -80,6 +80,11 @@ public class WebService implements AutoCloseable {
     private static final DynamicSkipUnknownPropertyHandler sharedUnknownPropertyHandler =
             new DynamicSkipUnknownPropertyHandler();
 
+    public void updateHttpRequestsFailOnUnknownPropertiesEnabled(boolean httpRequestsFailOnUnknownPropertiesEnabled){
+        sharedUnknownPropertyHandler
+                .setSkipUnknownProperty(!httpRequestsFailOnUnknownPropertiesEnabled);
+    }
+
     public WebService(PulsarService pulsar) throws PulsarServerException {
         this.handlers = Lists.newArrayList();
         this.pulsar = pulsar;
@@ -152,6 +157,8 @@ public class WebService implements AutoCloseable {
         server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
 
         filterInitializer = new FilterInitializer(pulsar);
+        // Whether to reject requests with unknown attributes.
+        sharedUnknownPropertyHandler.setSkipUnknownProperty(!config.isHttpRequestsFailOnUnknownPropertiesEnabled());
     }
 
     public void addRestResources(String basePath, boolean requiresAuthentication, Map<String, Object> attributeMap,
@@ -177,6 +184,7 @@ public class WebService implements AutoCloseable {
         if (useSharedJsonMapperProvider){
             JsonMapperProvider jsonMapperProvider = new JsonMapperProvider(sharedUnknownPropertyHandler);
             config.register(jsonMapperProvider);
+            config.register(UnrecognizedPropertyExceptionMapper.class);
         } else {
             config.register(JsonMapperProvider.class);
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminRestTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminRestTest.java
index 176063ba0ff..c04063f7779 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminRestTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminRestTest.java
@@ -18,9 +18,13 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.Entity;
@@ -31,6 +35,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -57,16 +62,42 @@ public class AdminRestTest extends MockedPulsarServiceBaseTest {
         data.put("retention_size_in_mb", -1);
         data.put("retention_time_in_minutes", 40320);
         // Configuration default, response success.
-        Response response = target.request(MediaType.APPLICATION_JSON_TYPE).buildPost(Entity.json(data)).invoke();
-        Assert.assertTrue(response.getStatus() / 200 == 1);
+        Response response1 = target.request(MediaType.APPLICATION_JSON_TYPE).buildPost(Entity.json(data)).invoke();
+        Assert.assertTrue(response1.getStatus() / 200 == 1);
         // Enabled feature, bad request response.
-        pulsar.getWebService().getSharedUnknownPropertyHandler().setSkipUnknownProperty(false);
-        response = target.request(MediaType.APPLICATION_JSON_TYPE).buildPost(Entity.json(data)).invoke();
-        Assert.assertEquals(response.getStatus(), 400);
+        admin.brokers().updateDynamicConfiguration("httpRequestsFailOnUnknownPropertiesEnabled", "true");
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+                () -> !pulsar.getWebService().getSharedUnknownPropertyHandler().isSkipUnknownProperty()
+        );
+        Response response2 = target.request(MediaType.APPLICATION_JSON_TYPE).buildPost(Entity.json(data)).invoke();
+        Assert.assertEquals(MediaType.valueOf(MediaType.TEXT_PLAIN), response2.getMediaType());
+        String responseBody = parseResponseEntity(response2.getEntity());
+        Assert.assertEquals(responseBody, "Unknown property retention_time_in_minutes, perhaps you want to use"
+                + " one of these: [retentionSizeInMB, retentionTimeInMinutes]");
+        Assert.assertEquals(response2.getStatus(), 400);
         // Disabled feature, response success.
-        pulsar.getWebService().getSharedUnknownPropertyHandler().setSkipUnknownProperty(true);
-        response = target.request(MediaType.APPLICATION_JSON_TYPE).buildPost(Entity.json(data)).invoke();
-        Assert.assertTrue(response.getStatus() / 200 == 1);
+        admin.brokers().updateDynamicConfiguration("httpRequestsFailOnUnknownPropertiesEnabled", "false");
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(
+                () -> pulsar.getWebService().getSharedUnknownPropertyHandler().isSkipUnknownProperty()
+        );
+        Response response3 = target.request(MediaType.APPLICATION_JSON_TYPE).buildPost(Entity.json(data)).invoke();
+        Assert.assertTrue(response3.getStatus() / 200 == 1);
+        // cleanup.
+        response1.close();
+        response2.close();
+        response3.close();
+        client.close();
+    }
+
+    private String parseResponseEntity(Object entity) throws Exception {
+        InputStream in = (InputStream) entity;
+        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));
+        StringBuilder stringBuilder = new StringBuilder();
+        String line = null;
+        while ((line = bufferedReader.readLine()) != null){
+            stringBuilder.append(line);
+        }
+        return stringBuilder.toString();
     }
 
     @BeforeMethod
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index 705c512dd7d..1a7f52b1d09 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -735,6 +735,7 @@ You can set the log level and configuration in the  [log4j2.yaml](https://github
 |transactionLogBatchedWriteEnabled| Provide a mechanism allowing the Transaction Log Store to aggregate multiple records into a batched record and persist into a single BK entry. This will make Pulsar transactions work more  efficiently, aka batched log. see: https://github.com/apache/pulsar/issues/15370  |false|
 |transactionLogBatchedWriteMaxRecords| If enabled the feature that transaction log batch, this attribute means maximum log records count in a batch  |512|
 |transactionLogBatchedWriteMaxSize| If enabled the feature that transaction log batch, this attribute means bytes size in a batch. |4m|
+|httpRequestsFailOnUnknownPropertiesEnabled| Admin API fail on unknown request parameter in request-body. |false|
 |transactionLogBatchedWriteMaxDelayInMillis| If enabled the feature that transaction log batch, this attribute means maximum wait time(in millis) for the first record in a batch |1|
 |transactionPendingAckBatchedWriteEnabled| Provide a mechanism allowing the Pending Ack Store to aggregate multiple records into a batched record and persist into a single BK entry. This will make Pulsar transactions work more efficiently, aka batched log. see: https://github.com/apache/pulsar/issues/15370 |false|
 |transactionPendingAckBatchedWriteMaxRecords| If enabled the feature that transaction pending ack log batch, this attribute means maximum log records count in a batch. |512|