You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/07/20 11:17:31 UTC

[incubator-pinot] branch pinot_client_controller_constructor created (now 532b0ea)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch pinot_client_controller_constructor
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 532b0ea  Adding pinot java client constructor only by pinot controller

This branch includes the following new commits:

     new 532b0ea  Adding pinot java client constructor only by pinot controller

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Adding pinot java client constructor only by pinot controller

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch pinot_client_controller_constructor
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 532b0ea53940644b9e307545e4e22f9fa7b360fc
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Mon Jul 20 04:17:05 2020 -0700

    Adding pinot java client constructor only by pinot controller
---
 pinot-clients/pinot-java-client/pom.xml            |   4 +
 .../org/apache/pinot/client/ConnectionFactory.java |  19 ++++
 .../client/ControllerBasedBrokerSelector.java      | 122 +++++++++++++++++++++
 .../tests/BaseClusterIntegrationTest.java          |   6 +-
 4 files changed, 150 insertions(+), 1 deletion(-)

diff --git a/pinot-clients/pinot-java-client/pom.xml b/pinot-clients/pinot-java-client/pom.xml
index 6a98e3d..eb457ff 100644
--- a/pinot-clients/pinot-java-client/pom.xml
+++ b/pinot-clients/pinot-java-client/pom.xml
@@ -85,6 +85,10 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
index 666ca7c..d81a04c 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
@@ -49,6 +49,25 @@ public class ConnectionFactory {
   }
 
   /**
+   * Creates a connection to a Pinot cluster, given its Controller URL
+   * Please note that this client requires Pinot Controller supports getBroker APIs,
+   * which is supported from Pinot 0.5.0.
+   *
+   * @param controllerUrls The comma separated URLs to Pinot Controller, suggest to use Vip hostname or k8s service.
+   *                      E.g. http://pinot-controller:9000
+   *                      http://pinot-controller-0:9000,http://pinot-controller-1:9000,http://pinot-controller-2:9000
+   * @return A connection that connects to the brokers in the given Pinot cluster
+   */
+  public static Connection fromController(String controllerUrls) {
+    try {
+      BrokerSelector brokerSelector = new ControllerBasedBrokerSelector(controllerUrls);
+      return new Connection(brokerSelector, _transportFactory.buildTransport(null));
+    } catch (Exception e) {
+      throw new PinotClientException(e);
+    }
+  }
+
+  /**
    * Creates a connection from properties containing the connection parameters.
    *
    * @param properties The properties to use for the connection
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java
new file mode 100644
index 0000000..269475f
--- /dev/null
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java
@@ -0,0 +1,122 @@
+package org.apache.pinot.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.client.ExternalViewReader.OFFLINE_SUFFIX;
+import static org.apache.pinot.client.ExternalViewReader.REALTIME_SUFFIX;
+
+
+public class ControllerBasedBrokerSelector implements BrokerSelector {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ControllerBasedBrokerSelector.class);
+
+  public static final int DEFAULT_CONTROLLER_REQUEST_RETRIES = 3;
+  public static final long DEFAULT_CONTROLLER_REQUEST_RETRIES_INTERVAL_IN_MILLS = 5000; // 5 seconds
+  public static final long DEFAULT_CONTROLLER_REQUEST_SCHEDULE_INTERVAL_IN_MILLS = 300000; // 5 minutes
+
+  private static final String CONTROLLER_ONLINE_TABLE_BROKERS_MAP_URL_TEMPLATE = "%s/brokers/tables?state=online";
+  private static final Random RANDOM = new Random();
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private final String[] controllerUrls;
+  private final AtomicReference<Map<String, List<String>>> tableToBrokerListMapRef =
+      new AtomicReference<Map<String, List<String>>>();
+  private final AtomicReference<List<String>> allBrokerListRef = new AtomicReference<List<String>>();
+  private final int controllerFetchRetries;
+  private final long controllerFetchRetriesIntervalMills;
+  private final long controllerFetchScheduleIntervalMills;
+
+  public ControllerBasedBrokerSelector(String controllerUrl) {
+    this(controllerUrl, DEFAULT_CONTROLLER_REQUEST_RETRIES, DEFAULT_CONTROLLER_REQUEST_RETRIES_INTERVAL_IN_MILLS,
+        DEFAULT_CONTROLLER_REQUEST_SCHEDULE_INTERVAL_IN_MILLS);
+  }
+
+  public ControllerBasedBrokerSelector(String controllerUrl, int controllerFetchRetries,
+      long controllerFetchRetriesIntervalMills, long controllerFetchScheduleIntervalMills) {
+    this.controllerUrls = controllerUrl.split(",");
+    this.controllerFetchRetries = controllerFetchRetries;
+    this.controllerFetchRetriesIntervalMills = controllerFetchRetriesIntervalMills;
+    this.controllerFetchScheduleIntervalMills = controllerFetchScheduleIntervalMills;
+    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+    scheduledExecutorService.scheduleAtFixedRate(() -> refreshBroker(), this.controllerFetchScheduleIntervalMills,
+        this.controllerFetchScheduleIntervalMills, TimeUnit.MILLISECONDS);
+    refreshBroker();
+  }
+
+  private void refreshBroker() {
+    Map<String, List<String>> tableToBrokerListMap = getTableToBrokersMap();
+    if (tableToBrokerListMap == null) {
+      throw new RuntimeException("Unable to fetch broker information from controller");
+    }
+    tableToBrokerListMapRef.set(ImmutableMap.copyOf(tableToBrokerListMap));
+    Set<String> brokerSet = new HashSet<>();
+    for (List<String> brokerList : tableToBrokerListMap.values()) {
+      brokerSet.addAll(brokerList);
+    }
+    allBrokerListRef.set(ImmutableList.copyOf(brokerSet));
+  }
+
+  private Map<String, List<String>> getTableToBrokersMap() {
+    int controllerIdx = RANDOM.nextInt(controllerUrls.length);
+    for (int i = 0; i < controllerFetchRetries; i++) {
+      try {
+        String brokerTableQueryResp = IOUtils.toString(
+            new URL(String.format(CONTROLLER_ONLINE_TABLE_BROKERS_MAP_URL_TEMPLATE, controllerUrls[controllerIdx])));
+        Map<String, List<String>> tablesToBrokersMap = OBJECT_MAPPER.readValue(brokerTableQueryResp, Map.class);
+        for (String table : tablesToBrokersMap.keySet()) {
+          List<String> brokerHostPorts = new ArrayList<>();
+          for (String broker : tablesToBrokersMap.get(table)) {
+            brokerHostPorts.add(broker.replace("Broker_", "").replace("_", ":"));
+          }
+          tablesToBrokersMap.put(table, brokerHostPorts);
+        }
+        return tablesToBrokersMap;
+
+      } catch (Exception e) {
+        LOGGER.warn(String.format("Unable to fetch controller broker information from '%s', retry in %d millseconds",
+            controllerUrls[controllerIdx], this.controllerFetchRetriesIntervalMills), e);
+        controllerIdx = (controllerIdx + 1) % controllerUrls.length;
+        try {
+          Thread.sleep(controllerFetchRetriesIntervalMills);
+        } catch (InterruptedException interruptedException) {
+          // Swallow
+        }
+      }
+    }
+    LOGGER.warn(
+        String.format("Failed to fetch controller broker information with %d retries", this.controllerFetchRetries));
+    return null;
+  }
+
+  @Override
+  public String selectBroker(String table) {
+    if (table == null) {
+      List<String> list = allBrokerListRef.get();
+      if (list != null && !list.isEmpty()) {
+        return list.get(RANDOM.nextInt(list.size()));
+      } else {
+        return null;
+      }
+    }
+    String tableName = table.replace(OFFLINE_SUFFIX, "").replace(REALTIME_SUFFIX, "");
+    List<String> list = tableToBrokerListMapRef.get().get(tableName);
+    if (list != null && !list.isEmpty()) {
+      return list.get(RANDOM.nextInt(list.size()));
+    }
+    return null;
+  }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 6fd5065..0a1bea2 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -350,7 +350,11 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
    */
   protected org.apache.pinot.client.Connection getPinotConnection() {
     if (_pinotConnection == null) {
-      _pinotConnection = ConnectionFactory.fromZookeeper(ZkStarter.DEFAULT_ZK_STR + "/" + getHelixClusterName());
+      if (System.currentTimeMillis() % 2 == 0) {
+        _pinotConnection = ConnectionFactory.fromZookeeper(ZkStarter.DEFAULT_ZK_STR + "/" + getHelixClusterName());
+      } else {
+        _pinotConnection = ConnectionFactory.fromController(_controllerBaseApiUrl);
+      }
     }
     return _pinotConnection;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org