You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ma...@apache.org on 2021/07/24 19:14:12 UTC

[pinot] branch master updated: Fixed pinot java client to add zkClient close (#7196)

This is an automated email from the ASF dual-hosted git repository.

mayanks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ac7e5f9  Fixed pinot java client to add zkClient close (#7196)
ac7e5f9 is described below

commit ac7e5f9c298d13c90ff31521e128a981543cfb42
Author: ramabme <84...@users.noreply.github.com>
AuthorDate: Sat Jul 24 12:13:54 2021 -0700

    Fixed pinot java client to add zkClient close (#7196)
    
    Fixed the issue where zkClient connection was not being closed in pinot java client's connection close
---
 .../org/apache/pinot/client/BrokerSelector.java    |  5 +++++
 .../java/org/apache/pinot/client/Connection.java   |  1 +
 .../apache/pinot/client/DynamicBrokerSelector.java | 24 ++++++++++++++--------
 .../apache/pinot/client/SimpleBrokerSelector.java  |  4 ++++
 4 files changed, 25 insertions(+), 9 deletions(-)

diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java
index 329a1ca..4356435 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java
@@ -25,4 +25,9 @@ public interface BrokerSelector {
    * @return
    */
   String selectBroker(String table);
+
+  /**
+   * Close any resources
+   */
+  void close();
 }
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
index d408ad5..e7e9c16 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
@@ -182,6 +182,7 @@ public class Connection {
   public void close()
       throws PinotClientException {
     _transport.close();
+    _brokerSelector.close();
   }
 
   private static class ResultSetGroupFuture implements Future<ResultSetGroup> {
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
index 2cccffb..ec8b5cd 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
@@ -39,17 +39,18 @@ import static org.apache.pinot.client.ExternalViewReader.REALTIME_SUFFIX;
  * Maintains a mapping between table name and list of brokers
  */
 public class DynamicBrokerSelector implements BrokerSelector, IZkDataListener {
-  AtomicReference<Map<String, List<String>>> tableToBrokerListMapRef = new AtomicReference<Map<String, List<String>>>();
-  AtomicReference<List<String>> allBrokerListRef = new AtomicReference<List<String>>();
+  AtomicReference<Map<String, List<String>>> tableToBrokerListMapRef = new AtomicReference<>();
+  AtomicReference<List<String>> allBrokerListRef = new AtomicReference<>();
+  private final ZkClient _zkClient;
   private final Random _random = new Random();
-  private ExternalViewReader evReader;
+  private final ExternalViewReader _evReader;
 
   public DynamicBrokerSelector(String zkServers) {
-    ZkClient zkClient = getZkClient(zkServers);
-    zkClient.setZkSerializer(new BytesPushThroughSerializer());
-    zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
-    zkClient.subscribeDataChanges(ExternalViewReader.BROKER_EXTERNAL_VIEW_PATH, this);
-    evReader = getEvReader(zkClient);
+    _zkClient = getZkClient(zkServers);
+    _zkClient.setZkSerializer(new BytesPushThroughSerializer());
+    _zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
+    _zkClient.subscribeDataChanges(ExternalViewReader.BROKER_EXTERNAL_VIEW_PATH, this);
+    _evReader = getEvReader(_zkClient);
     refresh();
   }
 
@@ -62,7 +63,7 @@ public class DynamicBrokerSelector implements BrokerSelector, IZkDataListener {
   }
 
   private void refresh() {
-    Map<String, List<String>> tableToBrokerListMap = evReader.getTableToBrokersMap();
+    Map<String, List<String>> tableToBrokerListMap = _evReader.getTableToBrokersMap();
     tableToBrokerListMapRef.set(tableToBrokerListMap);
     Set<String> brokerSet = new HashSet<>();
     for (List<String> brokerList : tableToBrokerListMap.values()) {
@@ -91,6 +92,11 @@ public class DynamicBrokerSelector implements BrokerSelector, IZkDataListener {
   }
 
   @Override
+  public void close() {
+    _zkClient.close();
+  }
+
+  @Override
   public void handleDataChange(String dataPath, Object data)
       throws Exception {
     refresh();
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java
index d171c31..e5ab363 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java
@@ -39,4 +39,8 @@ public class SimpleBrokerSelector implements BrokerSelector {
   public String selectBroker(String table) {
     return _brokerList.get(_random.nextInt(_brokerList.size()));
   }
+
+  @Override
+  public void close() {
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org