You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/09 02:20:11 UTC

[pinot] branch master updated: support large payload in zk put API (#7364)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8627819  support large payload in zk put API (#7364)
8627819 is described below

commit 86278194233781d8001498f0c2be4a8a3299cd9d
Author: Mohammed Galalen <mo...@gmail.com>
AuthorDate: Thu Dec 9 04:19:48 2021 +0200

    support large payload in zk put API (#7364)
---
 .../api/resources/ZookeeperResource.java           | 87 ++++++++++++----------
 .../helix/ControllerRequestURLBuilder.java         |  4 +
 .../pinot/controller/ControllerTestUtils.java      | 20 +++++
 .../controller/api/ZookeeperResourceTest.java      | 86 +++++++++++++++++++++
 4 files changed, 156 insertions(+), 41 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java
index 1f52709..6695f6c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java
@@ -30,7 +30,9 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import javax.inject.Inject;
+import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
@@ -40,6 +42,7 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.commons.lang.StringUtils;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.util.GZipCompressionUtil;
@@ -66,7 +69,7 @@ public class ZookeeperResource {
 
   @GET
   @Path("/zk/get")
-  @Produces(MediaType.TEXT_PLAIN)
+  @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation(value = "Get content of the znode")
   @ApiResponses(value = {
       @ApiResponse(code = 200, message = "Success"),
@@ -75,10 +78,9 @@ public class ZookeeperResource {
       @ApiResponse(code = 500, message = "Internal server error")
   })
   public String getData(
-      @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path")
-      @DefaultValue("") String path) {
+      @ApiParam(value = "Zookeeper Path, must start with /", required = true) @QueryParam("path") String path) {
 
-    path = validateAndNormalizeZKPath(path);
+    path = validateAndNormalizeZKPath(path, true);
 
     ZNRecord znRecord = _pinotHelixResourceManager.readZKData(path);
     if (znRecord != null) {
@@ -107,10 +109,9 @@ public class ZookeeperResource {
       @ApiResponse(code = 500, message = "Internal server error")
   })
   public SuccessResponse delete(
-      @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path")
-      @DefaultValue("") String path) {
+      @ApiParam(value = "Zookeeper Path, must start with /", required = true) @QueryParam("path") String path) {
 
-    path = validateAndNormalizeZKPath(path);
+    path = validateAndNormalizeZKPath(path, true);
 
     boolean success = _pinotHelixResourceManager.deleteZKPath(path);
     if (success) {
@@ -125,6 +126,7 @@ public class ZookeeperResource {
   @Path("/zk/put")
   @Authenticate(AccessType.UPDATE)
   @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
   @ApiOperation(value = "Update the content of the node")
   @ApiResponses(value = {
       @ApiResponse(code = 200, message = "Success"),
@@ -133,23 +135,34 @@ public class ZookeeperResource {
       @ApiResponse(code = 500, message = "Internal server error")
   })
   public SuccessResponse putData(
-      @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path")
-      @DefaultValue("") String path,
-      @ApiParam(value = "Content", required = true) @QueryParam("data") @DefaultValue("") String content,
-      @ApiParam(value = "expectedVersion", required = true, defaultValue = "-1") @QueryParam("expectedVersion")
-      @DefaultValue("-1") String expectedVersion,
-      @ApiParam(value = "accessOption", required = true, defaultValue = "1") @QueryParam("accessOption")
-      @DefaultValue("1") String accessOption) {
-    path = validateAndNormalizeZKPath(path);
-    ZNRecord record = null;
-    if (content != null) {
-      record = (ZNRecord) _znRecordSerializer.deserialize(content.getBytes(Charsets.UTF_8));
+      @ApiParam(value = "Zookeeper Path, must start with /", required = true) @QueryParam("path") String path,
+      @ApiParam(value = "Content") @QueryParam("data") @Nullable String data,
+      @ApiParam(value = "expectedVersion", defaultValue = "-1") @QueryParam("expectedVersion") @DefaultValue("-1")
+          int expectedVersion,
+      @ApiParam(value = "accessOption", defaultValue = "1") @QueryParam("accessOption") @DefaultValue("1")
+          int accessOption,
+      @Nullable String payload) {
+
+    path = validateAndNormalizeZKPath(path, false);
+
+    if (StringUtils.isEmpty(data)) {
+      data = payload;
+    }
+    if (StringUtils.isEmpty(data)) {
+      throw new ControllerApplicationException(LOGGER, "Must provide data through query parameter or payload",
+          Response.Status.BAD_REQUEST);
+    }
+    ZNRecord znRecord;
+    try {
+      znRecord = (ZNRecord) _znRecordSerializer.deserialize(data.getBytes(Charsets.UTF_8));
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, "Failed to deserialize the data", Response.Status.BAD_REQUEST,
+          e);
     }
     try {
-      boolean result = _pinotHelixResourceManager
-          .setZKData(path, record, Integer.parseInt(expectedVersion), Integer.parseInt(accessOption));
+      boolean result = _pinotHelixResourceManager.setZKData(path, znRecord, expectedVersion, accessOption);
       if (result) {
-        return new SuccessResponse("Successfully Updated path: " + path);
+        return new SuccessResponse("Successfully updated path: " + path);
       } else {
         throw new ControllerApplicationException(LOGGER, "Failed to update path: " + path,
             Response.Status.INTERNAL_SERVER_ERROR);
@@ -170,10 +183,9 @@ public class ZookeeperResource {
       @ApiResponse(code = 500, message = "Internal server error")
   })
   public String ls(
-      @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path")
-      @DefaultValue("") String path) {
+      @ApiParam(value = "Zookeeper Path, must start with /", required = true) @QueryParam("path") String path) {
 
-    path = validateAndNormalizeZKPath(path);
+    path = validateAndNormalizeZKPath(path, true);
 
     List<String> children = _pinotHelixResourceManager.getZKChildren(path);
     try {
@@ -193,10 +205,9 @@ public class ZookeeperResource {
       @ApiResponse(code = 500, message = "Internal server error")
   })
   public String lsl(
-      @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path")
-      @DefaultValue("") String path) {
+      @ApiParam(value = "Zookeeper Path, must start with /", required = true) @QueryParam("path") String path) {
 
-    path = validateAndNormalizeZKPath(path);
+    path = validateAndNormalizeZKPath(path, true);
 
     Map<String, Stat> childrenStats = _pinotHelixResourceManager.getZKChildrenStats(path);
 
@@ -209,7 +220,7 @@ public class ZookeeperResource {
 
   @GET
   @Path("/zk/stat")
-  @Produces(MediaType.TEXT_PLAIN)
+  @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation(value = "Get the stat",
       notes = " Use this api to fetch additional details of a znode such as creation time, modified time, numChildren"
           + " etc ")
@@ -219,10 +230,9 @@ public class ZookeeperResource {
       @ApiResponse(code = 500, message = "Internal server error")
   })
   public String stat(
-      @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path")
-      @DefaultValue("") String path) {
+      @ApiParam(value = "Zookeeper Path, must start with /", required = true) @QueryParam("path") String path) {
 
-    path = validateAndNormalizeZKPath(path);
+    path = validateAndNormalizeZKPath(path, true);
 
     Stat stat = _pinotHelixResourceManager.getZKStat(path);
     try {
@@ -232,12 +242,9 @@ public class ZookeeperResource {
     }
   }
 
-  private String validateAndNormalizeZKPath(@DefaultValue("") @QueryParam("path")
-  @ApiParam(value = "Zookeeper Path, must start with /", required = false, defaultValue = "/") String path) {
-
-    if (path == null || path.trim().isEmpty()) {
-      throw new ControllerApplicationException(LOGGER, "ZKPath " + path + " cannot be null or empty",
-          Response.Status.BAD_REQUEST);
+  private String validateAndNormalizeZKPath(String path, boolean shouldExist) {
+    if (path == null) {
+      throw new ControllerApplicationException(LOGGER, "ZKPath cannot be null", Response.Status.BAD_REQUEST);
     }
     path = path.trim();
     if (!path.startsWith("/")) {
@@ -248,10 +255,8 @@ public class ZookeeperResource {
       throw new ControllerApplicationException(LOGGER, "ZKPath " + path + " cannot end with /",
           Response.Status.BAD_REQUEST);
     }
-
-    if (!_pinotHelixResourceManager.getHelixZkManager().getHelixDataAccessor().getBaseDataAccessor().exists(path, -1)) {
-      throw new ControllerApplicationException(LOGGER, "ZKPath " + path + " does not exist:",
-          Response.Status.NOT_FOUND);
+    if (shouldExist && _pinotHelixResourceManager.getZKStat(path) == null) {
+      throw new ControllerApplicationException(LOGGER, "ZKPath " + path + " does not exist", Response.Status.NOT_FOUND);
     }
     return path;
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
index f4ae93b..05a70ac 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -385,4 +385,8 @@ public class ControllerRequestURLBuilder {
   public String forAppConfigs() {
     return StringUtil.join("/", _baseUrl, "appconfigs");
   }
+
+  public String forZkPut() {
+    return StringUtil.join("/", _baseUrl, "zk/put");
+  }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/ControllerTestUtils.java b/pinot-controller/src/test/java/org/apache/pinot/controller/ControllerTestUtils.java
index 3df0fb5..4e82807 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/ControllerTestUtils.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/ControllerTestUtils.java
@@ -567,6 +567,26 @@ public abstract class ControllerTestUtils {
     return constructResponse(httpConnection.getInputStream());
   }
 
+  public static String sendPutRequest(String urlString, Map<String, String> headers, String payload)
+      throws IOException {
+    HttpURLConnection httpConnection = (HttpURLConnection) new URL(urlString).openConnection();
+    httpConnection.setDoOutput(true);
+    httpConnection.setRequestMethod("PUT");
+    if (headers != null) {
+      for (Map.Entry<String, String> kv : headers.entrySet()) {
+        httpConnection.setRequestProperty(kv.getKey(), kv.getValue());
+      }
+    }
+
+    try (BufferedWriter writer = new BufferedWriter(
+        new OutputStreamWriter(httpConnection.getOutputStream(), StandardCharsets.UTF_8))) {
+      writer.write(payload);
+      writer.flush();
+    }
+
+    return constructResponse(httpConnection.getInputStream());
+  }
+
   public static String sendPutRequest(String urlString)
       throws IOException {
     HttpURLConnection httpConnection = (HttpURLConnection) new URL(urlString).openConnection();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ZookeeperResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ZookeeperResourceTest.java
new file mode 100644
index 0000000..c078da4
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ZookeeperResourceTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.controller.ControllerTestUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class ZookeeperResourceTest {
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    ControllerTestUtils.setupClusterAndValidate();
+  }
+
+  @Test
+  public void testZkPutData()
+      throws IOException {
+    String url = ControllerTestUtils.getControllerRequestURLBuilder().forZkPut();
+    String path = "/zookeeper";
+    int expectedVersion = -1;
+    int accessOption = 1;
+    String data = "{\"id\" : \"QuickStartCluster\"," + "  \"data\" : { }\n" + "}";
+
+    // CASE 1: Send data in query params form using HTTP PUT
+    String params =
+        "path=" + path + "&data=" + URIUtils.encode(data) + "&expectedVersion=" + expectedVersion + "&accessOption="
+            + accessOption;
+    String result = ControllerTestUtils.sendPutRequest(url + "?" + params);
+    Assert.assertTrue(result.toLowerCase().contains("successfully updated"));
+
+    String lorem = "Loremipsumdolorsitametconsecteturadipisicingelitseddoeiusmod"
+        + "temporincididuntutlaboreetdoloremagnaaliquaUtenimadminimveniam"
+        + "quisnostrudexercitationullamcolaborisnisiutaliquipexeacommodo"
+        + "consequatDuisauteiruredolorinreprehenderitinvoluptatevelitesse"
+        + "cillumdoloreeufugiatnullapariaturExcepteursintoccaecatcupidatatnon"
+        + "proidentsuntinculpaquiofficiadeseruntmollitanimidestlaborum";
+
+    // make the content even more larger
+    for (int i = 0; i < 5; i++) {
+      lorem += lorem;
+    }
+
+    String largeConfig = "{\n" + "  \"id\" : \"QuickStartCluster\",\n" + "  \"data\" : " + "\"" + lorem + "\"\n" + "}";
+
+    // CASE 2: Fail when sending large data in query params
+    try {
+      params = "path=" + path + "&data=" + URIUtils.encode(largeConfig) + "&expectedVersion=" + expectedVersion
+          + "&accessOption=" + accessOption;
+      ControllerTestUtils.sendPutRequest(url + "?" + params);
+      Assert.fail("Should not get here, large payload");
+    } catch (IOException e) {
+      // Expected
+    }
+
+    // CASE 3: Send large content data should return success
+    params = "path=" + path + "&expectedVersion=" + expectedVersion + "&accessOption=" + accessOption;
+    Map<String, String> headers = new HashMap<>();
+    headers.put("Content-Type", "application/json");
+    result = ControllerTestUtils.sendPutRequest(url + "?" + params, headers, largeConfig);
+    Assert.assertTrue(result.toLowerCase().contains("successfully updated"));
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org