You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ma...@apache.org on 2021/07/24 19:14:12 UTC
[pinot] branch master updated: Fixed pinot java client to add
zkClient close (#7196)
This is an automated email from the ASF dual-hosted git repository.
mayanks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ac7e5f9 Fixed pinot java client to add zkClient close (#7196)
ac7e5f9 is described below
commit ac7e5f9c298d13c90ff31521e128a981543cfb42
Author: ramabme <84...@users.noreply.github.com>
AuthorDate: Sat Jul 24 12:13:54 2021 -0700
Fixed pinot java client to add zkClient close (#7196)
Fixed the issue where zkClient connection was not being closed in pinot java client's connection close
---
.../org/apache/pinot/client/BrokerSelector.java | 5 +++++
.../java/org/apache/pinot/client/Connection.java | 1 +
.../apache/pinot/client/DynamicBrokerSelector.java | 24 ++++++++++++++--------
.../apache/pinot/client/SimpleBrokerSelector.java | 4 ++++
4 files changed, 25 insertions(+), 9 deletions(-)
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java
index 329a1ca..4356435 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java
@@ -25,4 +25,9 @@ public interface BrokerSelector {
* @return
*/
String selectBroker(String table);
+
+ /**
+ * Close any resources
+ */
+ void close();
}
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
index d408ad5..e7e9c16 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
@@ -182,6 +182,7 @@ public class Connection {
public void close()
throws PinotClientException {
_transport.close();
+ _brokerSelector.close();
}
private static class ResultSetGroupFuture implements Future<ResultSetGroup> {
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
index 2cccffb..ec8b5cd 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
@@ -39,17 +39,18 @@ import static org.apache.pinot.client.ExternalViewReader.REALTIME_SUFFIX;
* Maintains a mapping between table name and list of brokers
*/
public class DynamicBrokerSelector implements BrokerSelector, IZkDataListener {
- AtomicReference<Map<String, List<String>>> tableToBrokerListMapRef = new AtomicReference<Map<String, List<String>>>();
- AtomicReference<List<String>> allBrokerListRef = new AtomicReference<List<String>>();
+ AtomicReference<Map<String, List<String>>> tableToBrokerListMapRef = new AtomicReference<>();
+ AtomicReference<List<String>> allBrokerListRef = new AtomicReference<>();
+ private final ZkClient _zkClient;
private final Random _random = new Random();
- private ExternalViewReader evReader;
+ private final ExternalViewReader _evReader;
public DynamicBrokerSelector(String zkServers) {
- ZkClient zkClient = getZkClient(zkServers);
- zkClient.setZkSerializer(new BytesPushThroughSerializer());
- zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
- zkClient.subscribeDataChanges(ExternalViewReader.BROKER_EXTERNAL_VIEW_PATH, this);
- evReader = getEvReader(zkClient);
+ _zkClient = getZkClient(zkServers);
+ _zkClient.setZkSerializer(new BytesPushThroughSerializer());
+ _zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
+ _zkClient.subscribeDataChanges(ExternalViewReader.BROKER_EXTERNAL_VIEW_PATH, this);
+ _evReader = getEvReader(_zkClient);
refresh();
}
@@ -62,7 +63,7 @@ public class DynamicBrokerSelector implements BrokerSelector, IZkDataListener {
}
private void refresh() {
- Map<String, List<String>> tableToBrokerListMap = evReader.getTableToBrokersMap();
+ Map<String, List<String>> tableToBrokerListMap = _evReader.getTableToBrokersMap();
tableToBrokerListMapRef.set(tableToBrokerListMap);
Set<String> brokerSet = new HashSet<>();
for (List<String> brokerList : tableToBrokerListMap.values()) {
@@ -91,6 +92,11 @@ public class DynamicBrokerSelector implements BrokerSelector, IZkDataListener {
}
@Override
+ public void close() {
+ _zkClient.close();
+ }
+
+ @Override
public void handleDataChange(String dataPath, Object data)
throws Exception {
refresh();
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java
index d171c31..e5ab363 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java
@@ -39,4 +39,8 @@ public class SimpleBrokerSelector implements BrokerSelector {
public String selectBroker(String table) {
return _brokerList.get(_random.nextInt(_brokerList.size()));
}
+
+ @Override
+ public void close() {
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org