You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/07/11 03:02:23 UTC
[incubator-pinot] 01/01: Adding controller APIs to fetch brokers
information
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch controller-api-brokers
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 1a2f8fff33a5dccce1135ca13d00ad55b46786a5
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Fri Jul 10 20:01:58 2020 -0700
Adding controller APIs to fetch brokers information
---
.../pinot/controller/api/resources/Constants.java | 1 +
.../api/resources/PinotBrokerRestletResource.java | 150 +++++++++++++++++++++
2 files changed, 151 insertions(+)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index 13d5f88..1d910fd 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -35,6 +35,7 @@ public class Constants {
public static final String INSTANCE_TAG = "Instance";
public static final String SCHEMA_TAG = "Schema";
public static final String TENANT_TAG = "Tenant";
+ public static final String BROKER_TAG = "Broker";
public static final String SEGMENT_TAG = "Segment";
public static final String TASK_TAG = "Task";
public static final String LEAD_CONTROLLER_TAG = "Leader";
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
new file mode 100644
index 0000000..5f5cc34
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.collect.ImmutableList;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.common.exception.SchemaNotFoundException;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
+import org.apache.pinot.controller.api.events.SchemaEventType;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.util.SchemaUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.glassfish.jersey.media.multipart.FormDataBodyPart;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.BROKER_TAG)
+@Path("/")
+public class PinotBrokerRestletResource {
+ public static final Logger LOGGER = LoggerFactory.getLogger(PinotBrokerRestletResource.class);
+
+ @Inject
+ PinotHelixResourceManager _pinotHelixResourceManager;
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/brokers")
+ @ApiOperation(value = "List all broker instances", notes = "List all broker instances")
+ public Map<String, Map<String, List<String>>> listBrokersMapping(
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ Map<String, Map<String, List<String>>> resultMap = new HashMap<>();
+ resultMap.put("tenants", getTenantsToBrokersMapping(state));
+ resultMap.put("tables", getTenantsToBrokersMapping(state));
+ return resultMap;
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/brokers/tenants")
+ @ApiOperation(value = "List all segments", notes = "List all segments")
+ public Map<String, List<String>> getTenantsToBrokersMapping(
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ Map<String, List<String>> resultMap = new HashMap<>();
+ _pinotHelixResourceManager.getAllBrokerTenantNames().stream()
+ .forEach(tenant -> resultMap.put(tenant, getBrokersForTenant(tenant, state)));
+ return resultMap;
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/brokers/tenants/{tenantName}")
+ @ApiOperation(value = "List all brokers for a given tenant", notes = "List all brokers for a given tenant")
+ public List<String> getBrokersForTenant(
+ @ApiParam(value = "Name of the tenant", required = true) @PathParam("tenantName") String tenantName,
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ Set<String> tenantBrokers = new HashSet<>();
+ tenantBrokers.addAll(_pinotHelixResourceManager.getAllInstancesForBrokerTenant(tenantName));
+ applyStateChanges(tenantBrokers, state);
+ return ImmutableList.copyOf(tenantBrokers);
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/brokers/tables")
+ @ApiOperation(value = "List all tables to corresponding brokers mapping", notes = "List all tables to corresponding brokers mapping")
+ public Map<String, List<String>> getTablesToBrokersMapping(
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ Map<String, List<String>> resultMap = new HashMap<>();
+ _pinotHelixResourceManager.getAllTables().stream()
+ .forEach(table -> resultMap.put(table, getBrokersForTable(table, state)));
+ return resultMap;
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/brokers/tables/{tableName}")
+ @ApiOperation(value = "List all brokers for a given table", notes = "List all brokers for a given table")
+ public List<String> getBrokersForTable(
+ @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ Set<String> tableBrokers = new HashSet<>();
+ tableBrokers.addAll(_pinotHelixResourceManager.getBrokerInstancesForTable(tableName, TableType.OFFLINE));
+ tableBrokers.addAll(_pinotHelixResourceManager.getBrokerInstancesForTable(tableName, TableType.REALTIME));
+ applyStateChanges(tableBrokers, state);
+ return ImmutableList.copyOf(tableBrokers);
+ }
+
+ private void applyStateChanges(Set<String> brokers, String state) {
+ switch (state) {
+ case "ONLINE":
+ brokers.retainAll(_pinotHelixResourceManager.getOnlineInstanceList());
+ break;
+ case "OFFLINE":
+ brokers.removeAll(_pinotHelixResourceManager.getOnlineInstanceList());
+ break;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org