You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2020/07/14 05:34:53 UTC

[incubator-pinot] branch master updated: Adding Controller API to explore Zookeeper (#5687)

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4378e87  Adding Controller API to explore Zookeeper (#5687)
4378e87 is described below

commit 4378e87aba652a727076b535f77f4c0bbd4473d2
Author: Kishore Gopalakrishna <g....@gmail.com>
AuthorDate: Mon Jul 13 22:34:38 2020 -0700

    Adding Controller API to explore Zookeeper (#5687)
    
    * Adding Zookeeper resource to browse ZK data via controller API
    
    * Fixing api documentation
    
    * Fixing api documentation
    
    * Fixing api documentation
    
    * Addressing comments
---
 .../pinot/controller/api/resources/Constants.java  |   1 +
 .../api/resources/PinotClusterConfigs.java         |  17 ++-
 .../api/resources/ZookeeperResource.java           | 165 +++++++++++++++++++++
 .../helix/core/PinotHelixResourceManager.java      |  36 ++++-
 4 files changed, 214 insertions(+), 5 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index 13d5f88..5e463e0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -39,6 +39,7 @@ public class Constants {
   public static final String TASK_TAG = "Task";
   public static final String LEAD_CONTROLLER_TAG = "Leader";
   public static final String TABLE_NAME = "tableName";
+  public static final String ZOOKEEPER = "Zookeeper";
 
   public static TableType validateTableType(String tableTypeStr) {
     if (tableTypeStr == null || tableTypeStr.isEmpty()) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotClusterConfigs.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotClusterConfigs.java
index a9c20ed..2825f55 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotClusterConfigs.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotClusterConfigs.java
@@ -58,6 +58,17 @@ public class PinotClusterConfigs {
   PinotHelixResourceManager pinotHelixResourceManager;
 
   @GET
+  @Path("/cluster/info")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get cluster Info", notes = "Get cluster Info")
+  @ApiResponses(value = {@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error")})
+  public String getClusterInfo() {
+    ObjectNode ret = JsonUtils.newObjectNode();
+    ret.put("clusterName", pinotHelixResourceManager.getHelixClusterName());
+    return ret.toString();
+  }
+
+  @GET
   @Path("/cluster/configs")
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation(value = "List cluster configurations", notes = "List cluster level configurations")
@@ -94,9 +105,11 @@ public class PinotClusterConfigs {
       }
       return new SuccessResponse("Updated cluster config.");
     } catch (IOException e) {
-      throw new ControllerApplicationException(LOGGER, "Error converting request to cluster config.", Response.Status.BAD_REQUEST, e);
+      throw new ControllerApplicationException(LOGGER, "Error converting request to cluster config.",
+          Response.Status.BAD_REQUEST, e);
     } catch (Exception e) {
-      throw new ControllerApplicationException(LOGGER, "Failed to update cluster config.", Response.Status.INTERNAL_SERVER_ERROR, e);
+      throw new ControllerApplicationException(LOGGER, "Failed to update cluster config.",
+          Response.Status.INTERNAL_SERVER_ERROR, e);
     }
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java
new file mode 100644
index 0000000..3093052
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.ZOOKEEPER)
+@Path("/")
+public class ZookeeperResource {
+
+  public static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(ZookeeperResource.class);
+
+  @Inject
+  PinotHelixResourceManager pinotHelixResourceManager;
+
+  ZNRecordSerializer _znRecordSerializer = new ZNRecordSerializer();
+
+  @GET
+  @Path("/zk/get")
+  @Produces(MediaType.TEXT_PLAIN)
+  @ApiOperation(value = "Get content of the znode")
+  @ApiResponses(value = { //
+      @ApiResponse(code = 200, message = "Success"), //
+      @ApiResponse(code = 404, message = "ZK Path not found"), //
+      @ApiResponse(code = 204, message = "No Content"), //
+      @ApiResponse(code = 500, message = "Internal server error")})
+  public String getData(
+      @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path") @DefaultValue("") String path) {
+
+    path = validateAndNormalizeZKPath(path);
+
+    ZNRecord znRecord = pinotHelixResourceManager.readZKData(path);
+    if (znRecord != null) {
+      return new String(_znRecordSerializer.serialize(znRecord), StandardCharsets.UTF_8);
+    }
+    return null;
+  }
+
+  @GET
+  @Path("/zk/ls")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "List the child znodes")
+  @ApiResponses(value = { //
+      @ApiResponse(code = 200, message = "Success"), //
+      @ApiResponse(code = 404, message = "ZK Path not found"), //
+      @ApiResponse(code = 500, message = "Internal server error")})
+  public String ls(
+      @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path") @DefaultValue("") String path) {
+
+    path = validateAndNormalizeZKPath(path);
+
+    List<String> children = pinotHelixResourceManager.getZKChildren(path);
+    try {
+      return JsonUtils.objectToString(children);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @GET
+  @Path("/zk/lsl")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "List the child znodes along with Stats")
+  @ApiResponses(value = { //
+      @ApiResponse(code = 200, message = "Success"), //
+      @ApiResponse(code = 404, message = "ZK Path not found"), //
+      @ApiResponse(code = 500, message = "Internal server error")})
+  public String lsl(
+      @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path") @DefaultValue("") String path) {
+
+    path = validateAndNormalizeZKPath(path);
+
+    Map<String, Stat> childrenStats = pinotHelixResourceManager.getZKChildrenStats(path);
+
+    try {
+      return JsonUtils.objectToString(childrenStats);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @GET
+  @Path("/zk/stat")
+  @Produces(MediaType.TEXT_PLAIN)
+  @ApiOperation(value = "Get the stat", notes = " Use this api to fetch additional details of a znode such as creation time, modified time, numChildren etc ")
+  @ApiResponses(value = { //
+      @ApiResponse(code = 200, message = "Success"), //
+      @ApiResponse(code = 404, message = "Table not found"), //
+      @ApiResponse(code = 500, message = "Internal server error")})
+  public String stat(
+      @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path") @DefaultValue("") String path) {
+
+    path = validateAndNormalizeZKPath(path);
+
+    Stat stat = pinotHelixResourceManager.getZKStat(path);
+    try {
+      return JsonUtils.objectToString(stat);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private String validateAndNormalizeZKPath(
+      @DefaultValue("") @QueryParam("path") @ApiParam(value = "Zookeeper Path, must start with /", required = false, defaultValue = "/") String path) {
+
+    if (path == null || path.trim().isEmpty()) {
+      throw new ControllerApplicationException(LOGGER, "ZKPath " + path + " cannot be null or empty",
+          Response.Status.BAD_REQUEST);
+    }
+    path = path.trim();
+    if (!path.startsWith("/")) {
+      throw new ControllerApplicationException(LOGGER, "ZKPath " + path + " must start with /",
+          Response.Status.BAD_REQUEST);
+    }
+    if (!path.equals("/") && path.endsWith("/")) {
+      throw new ControllerApplicationException(LOGGER, "ZKPath " + path + " cannot end with /",
+          Response.Status.BAD_REQUEST);
+    }
+
+    if (!pinotHelixResourceManager.getHelixZkManager().getHelixDataAccessor().getBaseDataAccessor().exists(path, -1)) {
+      throw new ControllerApplicationException(LOGGER, "ZKPath " + path + " does not exist:",
+          Response.Status.NOT_FOUND);
+    }
+    return path;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 55216ba..d044ad5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -25,13 +25,14 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
-import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -39,7 +40,9 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
 import org.apache.commons.configuration.Configuration;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ClusterMessagingService;
@@ -52,6 +55,9 @@ import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -71,6 +77,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.CommonConstants.Helix;
 import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
 import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
@@ -194,7 +201,6 @@ public class PinotHelixResourceManager {
 
     // Add instance group tag for controller
     addInstanceGroupTagIfNeeded();
-
     _segmentDeletionManager = new SegmentDeletionManager(_dataDir, _helixAdmin, _helixClusterName, _propertyStore);
     ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, _isSingleTenantCluster);
     _tableCache = new TableCache(_propertyStore);
@@ -369,7 +375,7 @@ public class PinotHelixResourceManager {
       return PinotResourceManagerResponse.failure("Instance " + instanceIdToUpdate + " does not exists");
     } else {
       InstanceConfig newConfig = InstanceUtils.toHelixInstanceConfig(newInstance);
-      if(!_helixDataAccessor.setProperty(_keyBuilder.instanceConfig(instanceIdToUpdate), newConfig)) {
+      if (!_helixDataAccessor.setProperty(_keyBuilder.instanceConfig(instanceIdToUpdate), newConfig)) {
         return PinotResourceManagerResponse.failure("Unable to update instance: " + instanceIdToUpdate);
       }
       return PinotResourceManagerResponse.SUCCESS;
@@ -1173,6 +1179,30 @@ public class PinotHelixResourceManager {
     }
   }
 
+  public ZNRecord readZKData(String path) {
+    return _helixDataAccessor.getBaseDataAccessor().get(path, null, -1);
+  }
+
+  public List<String> getZKChildren(String path) {
+    return _helixDataAccessor.getBaseDataAccessor().getChildNames(path, -1);
+  }
+
+  public Map<String, Stat> getZKChildrenStats(String path) {
+    List<String> childNames = _helixDataAccessor.getBaseDataAccessor().getChildNames(path, -1);
+    List<String> childPaths =
+        childNames.stream().map(name -> (path + "/" + name).replaceAll("//", "/")).collect(Collectors.toList());
+    Stat[] stats = _helixDataAccessor.getBaseDataAccessor().getStats(childPaths, -1);
+    Map<String, Stat> statsMap = new LinkedHashMap<>(childNames.size());
+    for (int i = 0; i < childNames.size(); i++) {
+      statsMap.put(childNames.get(i), stats[i]);
+    }
+    return statsMap;
+  }
+
+  public Stat getZKStat(String path) {
+    return _helixDataAccessor.getBaseDataAccessor().getStat(path, -1);
+  }
+
   public static class InvalidTableConfigException extends RuntimeException {
     public InvalidTableConfigException(String message) {
       super(message);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org