You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/09 02:20:11 UTC
[pinot] branch master updated: support large payload in zk put API (#7364)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8627819 support large payload in zk put API (#7364)
8627819 is described below
commit 86278194233781d8001498f0c2be4a8a3299cd9d
Author: Mohammed Galalen <mo...@gmail.com>
AuthorDate: Thu Dec 9 04:19:48 2021 +0200
support large payload in zk put API (#7364)
---
.../api/resources/ZookeeperResource.java | 87 ++++++++++++----------
.../helix/ControllerRequestURLBuilder.java | 4 +
.../pinot/controller/ControllerTestUtils.java | 20 +++++
.../controller/api/ZookeeperResourceTest.java | 86 +++++++++++++++++++++
4 files changed, 156 insertions(+), 41 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java
index 1f52709..6695f6c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java
@@ -30,7 +30,9 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import javax.inject.Inject;
+import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
@@ -40,6 +42,7 @@ import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.commons.lang.StringUtils;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.util.GZipCompressionUtil;
@@ -66,7 +69,7 @@ public class ZookeeperResource {
@GET
@Path("/zk/get")
- @Produces(MediaType.TEXT_PLAIN)
+ @Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get content of the znode")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"),
@@ -75,10 +78,9 @@ public class ZookeeperResource {
@ApiResponse(code = 500, message = "Internal server error")
})
public String getData(
- @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path")
- @DefaultValue("") String path) {
+ @ApiParam(value = "Zookeeper Path, must start with /", required = true) @QueryParam("path") String path) {
- path = validateAndNormalizeZKPath(path);
+ path = validateAndNormalizeZKPath(path, true);
ZNRecord znRecord = _pinotHelixResourceManager.readZKData(path);
if (znRecord != null) {
@@ -107,10 +109,9 @@ public class ZookeeperResource {
@ApiResponse(code = 500, message = "Internal server error")
})
public SuccessResponse delete(
- @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path")
- @DefaultValue("") String path) {
+ @ApiParam(value = "Zookeeper Path, must start with /", required = true) @QueryParam("path") String path) {
- path = validateAndNormalizeZKPath(path);
+ path = validateAndNormalizeZKPath(path, true);
boolean success = _pinotHelixResourceManager.deleteZKPath(path);
if (success) {
@@ -125,6 +126,7 @@ public class ZookeeperResource {
@Path("/zk/put")
@Authenticate(AccessType.UPDATE)
@Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Update the content of the node")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"),
@@ -133,23 +135,34 @@ public class ZookeeperResource {
@ApiResponse(code = 500, message = "Internal server error")
})
public SuccessResponse putData(
- @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path")
- @DefaultValue("") String path,
- @ApiParam(value = "Content", required = true) @QueryParam("data") @DefaultValue("") String content,
- @ApiParam(value = "expectedVersion", required = true, defaultValue = "-1") @QueryParam("expectedVersion")
- @DefaultValue("-1") String expectedVersion,
- @ApiParam(value = "accessOption", required = true, defaultValue = "1") @QueryParam("accessOption")
- @DefaultValue("1") String accessOption) {
- path = validateAndNormalizeZKPath(path);
- ZNRecord record = null;
- if (content != null) {
- record = (ZNRecord) _znRecordSerializer.deserialize(content.getBytes(Charsets.UTF_8));
+ @ApiParam(value = "Zookeeper Path, must start with /", required = true) @QueryParam("path") String path,
+ @ApiParam(value = "Content") @QueryParam("data") @Nullable String data,
+ @ApiParam(value = "expectedVersion", defaultValue = "-1") @QueryParam("expectedVersion") @DefaultValue("-1")
+ int expectedVersion,
+ @ApiParam(value = "accessOption", defaultValue = "1") @QueryParam("accessOption") @DefaultValue("1")
+ int accessOption,
+ @Nullable String payload) {
+
+ path = validateAndNormalizeZKPath(path, false);
+
+ if (StringUtils.isEmpty(data)) {
+ data = payload;
+ }
+ if (StringUtils.isEmpty(data)) {
+ throw new ControllerApplicationException(LOGGER, "Must provide data through query parameter or payload",
+ Response.Status.BAD_REQUEST);
+ }
+ ZNRecord znRecord;
+ try {
+ znRecord = (ZNRecord) _znRecordSerializer.deserialize(data.getBytes(Charsets.UTF_8));
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, "Failed to deserialize the data", Response.Status.BAD_REQUEST,
+ e);
}
try {
- boolean result = _pinotHelixResourceManager
- .setZKData(path, record, Integer.parseInt(expectedVersion), Integer.parseInt(accessOption));
+ boolean result = _pinotHelixResourceManager.setZKData(path, znRecord, expectedVersion, accessOption);
if (result) {
- return new SuccessResponse("Successfully Updated path: " + path);
+ return new SuccessResponse("Successfully updated path: " + path);
} else {
throw new ControllerApplicationException(LOGGER, "Failed to update path: " + path,
Response.Status.INTERNAL_SERVER_ERROR);
@@ -170,10 +183,9 @@ public class ZookeeperResource {
@ApiResponse(code = 500, message = "Internal server error")
})
public String ls(
- @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path")
- @DefaultValue("") String path) {
+ @ApiParam(value = "Zookeeper Path, must start with /", required = true) @QueryParam("path") String path) {
- path = validateAndNormalizeZKPath(path);
+ path = validateAndNormalizeZKPath(path, true);
List<String> children = _pinotHelixResourceManager.getZKChildren(path);
try {
@@ -193,10 +205,9 @@ public class ZookeeperResource {
@ApiResponse(code = 500, message = "Internal server error")
})
public String lsl(
- @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path")
- @DefaultValue("") String path) {
+ @ApiParam(value = "Zookeeper Path, must start with /", required = true) @QueryParam("path") String path) {
- path = validateAndNormalizeZKPath(path);
+ path = validateAndNormalizeZKPath(path, true);
Map<String, Stat> childrenStats = _pinotHelixResourceManager.getZKChildrenStats(path);
@@ -209,7 +220,7 @@ public class ZookeeperResource {
@GET
@Path("/zk/stat")
- @Produces(MediaType.TEXT_PLAIN)
+ @Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get the stat",
notes = " Use this api to fetch additional details of a znode such as creation time, modified time, numChildren"
+ " etc ")
@@ -219,10 +230,9 @@ public class ZookeeperResource {
@ApiResponse(code = 500, message = "Internal server error")
})
public String stat(
- @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path")
- @DefaultValue("") String path) {
+ @ApiParam(value = "Zookeeper Path, must start with /", required = true) @QueryParam("path") String path) {
- path = validateAndNormalizeZKPath(path);
+ path = validateAndNormalizeZKPath(path, true);
Stat stat = _pinotHelixResourceManager.getZKStat(path);
try {
@@ -232,12 +242,9 @@ public class ZookeeperResource {
}
}
- private String validateAndNormalizeZKPath(@DefaultValue("") @QueryParam("path")
- @ApiParam(value = "Zookeeper Path, must start with /", required = false, defaultValue = "/") String path) {
-
- if (path == null || path.trim().isEmpty()) {
- throw new ControllerApplicationException(LOGGER, "ZKPath " + path + " cannot be null or empty",
- Response.Status.BAD_REQUEST);
+ private String validateAndNormalizeZKPath(String path, boolean shouldExist) {
+ if (path == null) {
+ throw new ControllerApplicationException(LOGGER, "ZKPath cannot be null", Response.Status.BAD_REQUEST);
}
path = path.trim();
if (!path.startsWith("/")) {
@@ -248,10 +255,8 @@ public class ZookeeperResource {
throw new ControllerApplicationException(LOGGER, "ZKPath " + path + " cannot end with /",
Response.Status.BAD_REQUEST);
}
-
- if (!_pinotHelixResourceManager.getHelixZkManager().getHelixDataAccessor().getBaseDataAccessor().exists(path, -1)) {
- throw new ControllerApplicationException(LOGGER, "ZKPath " + path + " does not exist:",
- Response.Status.NOT_FOUND);
+ if (shouldExist && _pinotHelixResourceManager.getZKStat(path) == null) {
+ throw new ControllerApplicationException(LOGGER, "ZKPath " + path + " does not exist", Response.Status.NOT_FOUND);
}
return path;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
index f4ae93b..05a70ac 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -385,4 +385,8 @@ public class ControllerRequestURLBuilder {
public String forAppConfigs() {
return StringUtil.join("/", _baseUrl, "appconfigs");
}
+
+ public String forZkPut() {
+ return StringUtil.join("/", _baseUrl, "zk/put");
+ }
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/ControllerTestUtils.java b/pinot-controller/src/test/java/org/apache/pinot/controller/ControllerTestUtils.java
index 3df0fb5..4e82807 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/ControllerTestUtils.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/ControllerTestUtils.java
@@ -567,6 +567,26 @@ public abstract class ControllerTestUtils {
return constructResponse(httpConnection.getInputStream());
}
+ public static String sendPutRequest(String urlString, Map<String, String> headers, String payload)
+ throws IOException {
+ HttpURLConnection httpConnection = (HttpURLConnection) new URL(urlString).openConnection();
+ httpConnection.setDoOutput(true);
+ httpConnection.setRequestMethod("PUT");
+ if (headers != null) {
+ for (Map.Entry<String, String> kv : headers.entrySet()) {
+ httpConnection.setRequestProperty(kv.getKey(), kv.getValue());
+ }
+ }
+
+ try (BufferedWriter writer = new BufferedWriter(
+ new OutputStreamWriter(httpConnection.getOutputStream(), StandardCharsets.UTF_8))) {
+ writer.write(payload);
+ writer.flush();
+ }
+
+ return constructResponse(httpConnection.getInputStream());
+ }
+
public static String sendPutRequest(String urlString)
throws IOException {
HttpURLConnection httpConnection = (HttpURLConnection) new URL(urlString).openConnection();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ZookeeperResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ZookeeperResourceTest.java
new file mode 100644
index 0000000..c078da4
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ZookeeperResourceTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.controller.ControllerTestUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class ZookeeperResourceTest {
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ ControllerTestUtils.setupClusterAndValidate();
+ }
+
+ @Test
+ public void testZkPutData()
+ throws IOException {
+ String url = ControllerTestUtils.getControllerRequestURLBuilder().forZkPut();
+ String path = "/zookeeper";
+ int expectedVersion = -1;
+ int accessOption = 1;
+ String data = "{\"id\" : \"QuickStartCluster\"," + " \"data\" : { }\n" + "}";
+
+ // CASE 1: Send data in query params form using HTTP PUT
+ String params =
+ "path=" + path + "&data=" + URIUtils.encode(data) + "&expectedVersion=" + expectedVersion + "&accessOption="
+ + accessOption;
+ String result = ControllerTestUtils.sendPutRequest(url + "?" + params);
+ Assert.assertTrue(result.toLowerCase().contains("successfully updated"));
+
+ String lorem = "Loremipsumdolorsitametconsecteturadipisicingelitseddoeiusmod"
+ + "temporincididuntutlaboreetdoloremagnaaliquaUtenimadminimveniam"
+ + "quisnostrudexercitationullamcolaborisnisiutaliquipexeacommodo"
+ + "consequatDuisauteiruredolorinreprehenderitinvoluptatevelitesse"
+ + "cillumdoloreeufugiatnullapariaturExcepteursintoccaecatcupidatatnon"
+ + "proidentsuntinculpaquiofficiadeseruntmollitanimidestlaborum";
+
+ // make the content even more larger
+ for (int i = 0; i < 5; i++) {
+ lorem += lorem;
+ }
+
+ String largeConfig = "{\n" + " \"id\" : \"QuickStartCluster\",\n" + " \"data\" : " + "\"" + lorem + "\"\n" + "}";
+
+ // CASE 2: Fail when sending large data in query params
+ try {
+ params = "path=" + path + "&data=" + URIUtils.encode(largeConfig) + "&expectedVersion=" + expectedVersion
+ + "&accessOption=" + accessOption;
+ ControllerTestUtils.sendPutRequest(url + "?" + params);
+ Assert.fail("Should not get here, large payload");
+ } catch (IOException e) {
+ // Expected
+ }
+
+ // CASE 3: Send large content data should return success
+ params = "path=" + path + "&expectedVersion=" + expectedVersion + "&accessOption=" + accessOption;
+ Map<String, String> headers = new HashMap<>();
+ headers.put("Content-Type", "application/json");
+ result = ControllerTestUtils.sendPutRequest(url + "?" + params, headers, largeConfig);
+ Assert.assertTrue(result.toLowerCase().contains("successfully updated"));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org