You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2020/07/12 06:14:52 UTC
[incubator-pinot] 01/01: Adding Zookeeper resource to browse ZK
data via controller API
This is an automated email from the ASF dual-hosted git repository.
kishoreg pushed a commit to branch zk-browser
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit b00a65444d8e3d99e496d44e7950122208022b00
Author: kishoreg <g....@gmail.com>
AuthorDate: Sat Jul 11 23:14:13 2020 -0700
Adding Zookeeper resource to browse ZK data via controller API
---
.../pinot/controller/api/resources/Constants.java | 1 +
.../api/resources/PinotClusterConfigs.java | 17 ++-
.../api/resources/ZookeeperResource.java | 165 +++++++++++++++++++++
.../helix/core/PinotHelixResourceManager.java | 36 ++++-
4 files changed, 214 insertions(+), 5 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index 13d5f88..05de8b8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -39,6 +39,7 @@ public class Constants {
public static final String TASK_TAG = "Task";
public static final String LEAD_CONTROLLER_TAG = "Leader";
public static final String TABLE_NAME = "tableName";
+ public static final String ZOOKEEPER = "zookeeper";
public static TableType validateTableType(String tableTypeStr) {
if (tableTypeStr == null || tableTypeStr.isEmpty()) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotClusterConfigs.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotClusterConfigs.java
index a9c20ed..3fbccdd 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotClusterConfigs.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotClusterConfigs.java
@@ -58,6 +58,17 @@ public class PinotClusterConfigs {
PinotHelixResourceManager pinotHelixResourceManager;
@GET
+ @Path("/cluster/info")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get cluster Ingo", notes = "Get cluster Info")
+ @ApiResponses(value = {@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error")})
+ public String getClusterInfo() {
+ ObjectNode ret = JsonUtils.newObjectNode();
+ ret.put("clusterName", pinotHelixResourceManager.getHelixClusterName());
+ return ret.toString();
+ }
+
+ @GET
@Path("/cluster/configs")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "List cluster configurations", notes = "List cluster level configurations")
@@ -94,9 +105,11 @@ public class PinotClusterConfigs {
}
return new SuccessResponse("Updated cluster config.");
} catch (IOException e) {
- throw new ControllerApplicationException(LOGGER, "Error converting request to cluster config.", Response.Status.BAD_REQUEST, e);
+ throw new ControllerApplicationException(LOGGER, "Error converting request to cluster config.",
+ Response.Status.BAD_REQUEST, e);
} catch (Exception e) {
- throw new ControllerApplicationException(LOGGER, "Failed to update cluster config.", Response.Status.INTERNAL_SERVER_ERROR, e);
+ throw new ControllerApplicationException(LOGGER, "Failed to update cluster config.",
+ Response.Status.INTERNAL_SERVER_ERROR, e);
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java
new file mode 100644
index 0000000..4ed1ba1
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.ZOOKEEPER)
+@Path("/")
+public class ZookeeperResource {
+
+ public static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(ZookeeperResource.class);
+
+ @Inject
+ PinotHelixResourceManager pinotHelixResourceManager;
+
+ ZNRecordSerializer _znRecordSerializer = new ZNRecordSerializer();
+
+ @GET
+ @Path("/zookeeper/get")
+ @Produces(MediaType.TEXT_PLAIN)
+ @ApiOperation(value = "Get the data of the specific path", notes = "Get the data of the specific path")
+ @ApiResponses(value = { //
+ @ApiResponse(code = 200, message = "Success"), //
+ @ApiResponse(code = 404, message = "ZK Path not found"), //
+ @ApiResponse(code = 204, message = "No Content"), //
+ @ApiResponse(code = 500, message = "Internal server error")})
+ public String getData(
+ @ApiParam(value = "Zookeeper Path, must start with /", required = false, defaultValue = "/") @QueryParam("path") @DefaultValue("") String path) {
+
+ path = validateAndNormalizeZKPath(path);
+
+ ZNRecord znRecord = pinotHelixResourceManager.readZKData(path);
+ if (znRecord != null) {
+ return new String(_znRecordSerializer.serialize(znRecord), StandardCharsets.UTF_8);
+ }
+ return null;
+ }
+
+ @GET
+ @Path("/zookeeper/ls")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Listing the child nodes of one path", notes = "Listing the child nodes of one path")
+ @ApiResponses(value = { //
+ @ApiResponse(code = 200, message = "Success"), //
+ @ApiResponse(code = 404, message = "ZK Path not found"), //
+ @ApiResponse(code = 500, message = "Internal server error")})
+ public String ls(
+ @ApiParam(value = "Zookeeper Path, must start with /", required = false, defaultValue = "/") @QueryParam("path") @DefaultValue("") String path) {
+
+ path = validateAndNormalizeZKPath(path);
+
+ List<String> children = pinotHelixResourceManager.getZKChildren(path);
+ try {
+ return JsonUtils.objectToString(children);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @GET
+ @Path("/zookeeper/lsl")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Listing the child nodes of one path along with stats", notes = "Listing the child nodes of one path along with stats")
+ @ApiResponses(value = { //
+ @ApiResponse(code = 200, message = "Success"), //
+ @ApiResponse(code = 404, message = "ZK Path not found"), //
+ @ApiResponse(code = 500, message = "Internal server error")})
+ public String lsl(
+ @ApiParam(value = "Zookeeper Path, must start with /", required = false, defaultValue = "/") @QueryParam("path") @DefaultValue("") String path) {
+
+ path = validateAndNormalizeZKPath(path);
+
+ Map<String, Stat> childrenStats = pinotHelixResourceManager.getZKChildrenStats(path);
+
+ try {
+ return JsonUtils.objectToString(childrenStats);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @GET
+ @Path("/zookeeper/stat")
+ @Produces(MediaType.TEXT_PLAIN)
+ @ApiOperation(value = "", notes = "Listing the child nodes of one path")
+ @ApiResponses(value = { //
+ @ApiResponse(code = 200, message = "Success"), //
+ @ApiResponse(code = 404, message = "Table not found"), //
+ @ApiResponse(code = 500, message = "Internal server error")})
+ public String stat(
+ @ApiParam(value = "Zookeeper Path, must start with /", required = false, defaultValue = "/") @QueryParam("path") @DefaultValue("") String path) {
+
+ path = validateAndNormalizeZKPath(path);
+
+ Stat stat = pinotHelixResourceManager.getZKStat(path);
+ try {
+ return JsonUtils.objectToString(stat);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String validateAndNormalizeZKPath(
+ @DefaultValue("") @QueryParam("path") @ApiParam(value = "Zookeeper Path, must start with /", required = false, defaultValue = "/") String path) {
+
+ if (path == null || path.trim().isEmpty()) {
+ throw new ControllerApplicationException(LOGGER, "ZKPath " + path + " cannot be null or empty",
+ Response.Status.BAD_REQUEST);
+ }
+ path = path.trim();
+ if (!path.startsWith("/")) {
+ throw new ControllerApplicationException(LOGGER, "ZKPath " + path + " must start with /",
+ Response.Status.BAD_REQUEST);
+ }
+ if (!path.equals("/") && path.endsWith("/")) {
+ throw new ControllerApplicationException(LOGGER, "ZKPath " + path + " cannot end with /",
+ Response.Status.BAD_REQUEST);
+ }
+
+ if (!pinotHelixResourceManager.getHelixZkManager().getHelixDataAccessor().getBaseDataAccessor().exists(path, -1)) {
+ throw new ControllerApplicationException(LOGGER, "ZKPath " + path + " does not exist:",
+ Response.Status.NOT_FOUND);
+ }
+ return path;
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 55216ba..d044ad5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -25,13 +25,14 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
-import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -39,7 +40,9 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.AccessOption;
import org.apache.helix.ClusterMessagingService;
@@ -52,6 +55,9 @@ import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -71,6 +77,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.CommonConstants.Helix;
import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
@@ -194,7 +201,6 @@ public class PinotHelixResourceManager {
// Add instance group tag for controller
addInstanceGroupTagIfNeeded();
-
_segmentDeletionManager = new SegmentDeletionManager(_dataDir, _helixAdmin, _helixClusterName, _propertyStore);
ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, _isSingleTenantCluster);
_tableCache = new TableCache(_propertyStore);
@@ -369,7 +375,7 @@ public class PinotHelixResourceManager {
return PinotResourceManagerResponse.failure("Instance " + instanceIdToUpdate + " does not exists");
} else {
InstanceConfig newConfig = InstanceUtils.toHelixInstanceConfig(newInstance);
- if(!_helixDataAccessor.setProperty(_keyBuilder.instanceConfig(instanceIdToUpdate), newConfig)) {
+ if (!_helixDataAccessor.setProperty(_keyBuilder.instanceConfig(instanceIdToUpdate), newConfig)) {
return PinotResourceManagerResponse.failure("Unable to update instance: " + instanceIdToUpdate);
}
return PinotResourceManagerResponse.SUCCESS;
@@ -1173,6 +1179,30 @@ public class PinotHelixResourceManager {
}
}
+ public ZNRecord readZKData(String path) {
+ return _helixDataAccessor.getBaseDataAccessor().get(path, null, -1);
+ }
+
+ public List<String> getZKChildren(String path) {
+ return _helixDataAccessor.getBaseDataAccessor().getChildNames(path, -1);
+ }
+
+ public Map<String, Stat> getZKChildrenStats(String path) {
+ List<String> childNames = _helixDataAccessor.getBaseDataAccessor().getChildNames(path, -1);
+ List<String> childPaths =
+ childNames.stream().map(name -> (path + "/" + name).replaceAll("//", "/")).collect(Collectors.toList());
+ Stat[] stats = _helixDataAccessor.getBaseDataAccessor().getStats(childPaths, -1);
+ Map<String, Stat> statsMap = new LinkedHashMap<>(childNames.size());
+ for (int i = 0; i < childNames.size(); i++) {
+ statsMap.put(childNames.get(i), stats[i]);
+ }
+ return statsMap;
+ }
+
+ public Stat getZKStat(String path) {
+ return _helixDataAccessor.getBaseDataAccessor().getStat(path, -1);
+ }
+
public static class InvalidTableConfigException extends RuntimeException {
public InvalidTableConfigException(String message) {
super(message);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org