You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/07/11 03:02:23 UTC

[incubator-pinot] 01/01: Adding controller APIs to fetch brokers information

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch controller-api-brokers
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 1a2f8fff33a5dccce1135ca13d00ad55b46786a5
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Fri Jul 10 20:01:58 2020 -0700

    Adding controller APIs to fetch brokers information
---
 .../pinot/controller/api/resources/Constants.java  |   1 +
 .../api/resources/PinotBrokerRestletResource.java  | 150 +++++++++++++++++++++
 2 files changed, 151 insertions(+)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index 13d5f88..1d910fd 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -35,6 +35,7 @@ public class Constants {
   public static final String INSTANCE_TAG = "Instance";
   public static final String SCHEMA_TAG = "Schema";
   public static final String TENANT_TAG = "Tenant";
+  public static final String BROKER_TAG = "Broker";
   public static final String SEGMENT_TAG = "Segment";
   public static final String TASK_TAG = "Task";
   public static final String LEAD_CONTROLLER_TAG = "Leader";
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
new file mode 100644
index 0000000..5f5cc34
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.collect.ImmutableList;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.common.exception.SchemaNotFoundException;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
+import org.apache.pinot.controller.api.events.SchemaEventType;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.util.SchemaUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.glassfish.jersey.media.multipart.FormDataBodyPart;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.BROKER_TAG)
+@Path("/")
+public class PinotBrokerRestletResource {
+  public static final Logger LOGGER = LoggerFactory.getLogger(PinotBrokerRestletResource.class);
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/brokers")
+  @ApiOperation(value = "List all broker instances", notes = "List all broker instances")
+  public Map<String, Map<String, List<String>>> listBrokersMapping(
+      @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+    Map<String, Map<String, List<String>>> resultMap = new HashMap<>();
+    resultMap.put("tenants", getTenantsToBrokersMapping(state));
+    resultMap.put("tables", getTenantsToBrokersMapping(state));
+    return resultMap;
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/brokers/tenants")
+  @ApiOperation(value = "List all segments", notes = "List all segments")
+  public Map<String, List<String>> getTenantsToBrokersMapping(
+      @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+    Map<String, List<String>> resultMap = new HashMap<>();
+    _pinotHelixResourceManager.getAllBrokerTenantNames().stream()
+        .forEach(tenant -> resultMap.put(tenant, getBrokersForTenant(tenant, state)));
+    return resultMap;
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/brokers/tenants/{tenantName}")
+  @ApiOperation(value = "List all brokers for a given tenant", notes = "List all brokers for a given tenant")
+  public List<String> getBrokersForTenant(
+      @ApiParam(value = "Name of the tenant", required = true) @PathParam("tenantName") String tenantName,
+      @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+    Set<String> tenantBrokers = new HashSet<>();
+    tenantBrokers.addAll(_pinotHelixResourceManager.getAllInstancesForBrokerTenant(tenantName));
+    applyStateChanges(tenantBrokers, state);
+    return ImmutableList.copyOf(tenantBrokers);
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/brokers/tables")
+  @ApiOperation(value = "List all tables to corresponding brokers mapping", notes = "List all tables to corresponding brokers mapping")
+  public Map<String, List<String>> getTablesToBrokersMapping(
+      @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+    Map<String, List<String>> resultMap = new HashMap<>();
+    _pinotHelixResourceManager.getAllTables().stream()
+        .forEach(table -> resultMap.put(table, getBrokersForTable(table, state)));
+    return resultMap;
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/brokers/tables/{tableName}")
+  @ApiOperation(value = "List all brokers for a given table", notes = "List all brokers for a given table")
+  public List<String> getBrokersForTable(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+    Set<String> tableBrokers = new HashSet<>();
+    tableBrokers.addAll(_pinotHelixResourceManager.getBrokerInstancesForTable(tableName, TableType.OFFLINE));
+    tableBrokers.addAll(_pinotHelixResourceManager.getBrokerInstancesForTable(tableName, TableType.REALTIME));
+    applyStateChanges(tableBrokers, state);
+    return ImmutableList.copyOf(tableBrokers);
+  }
+
+  private void applyStateChanges(Set<String> brokers, String state) {
+    switch (state) {
+      case "ONLINE":
+        brokers.retainAll(_pinotHelixResourceManager.getOnlineInstanceList());
+        break;
+      case "OFFLINE":
+        brokers.removeAll(_pinotHelixResourceManager.getOnlineInstanceList());
+        break;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org