You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2022/12/13 12:05:57 UTC

[pinot] branch master updated: Adding configs for zk client timeout (#9975)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f643b1df8 Adding configs for zk client timeout (#9975)
7f643b1df8 is described below

commit 7f643b1df8ca4d9696e3876efcbd82e280b2886e
Author: Xiang Fu <xi...@gmail.com>
AuthorDate: Tue Dec 13 04:05:49 2022 -0800

    Adding configs for zk client timeout (#9975)
    
    * Adding configs for zk client timeout
    
    * Revert "[multistage][testing] Add test for null handling, round func and some other cases (#9950)"
    
    This reverts commit 6a5f58b8e194bee7211bb9bdc97ad855486b296d.
---
 .../pinot/common/utils/ServiceStartableUtils.java  | 19 +++++---
 .../pinot/controller/BaseControllerStarter.java    |  5 ++-
 .../core/realtime/PinotRealtimeSegmentManager.java | 19 ++++++--
 .../helix/core/util/HelixSetupUtils.java           | 20 +++++++--
 .../src/test/resources/queries/Cast.json           | 20 ---------
 .../src/test/resources/queries/MathFuncs.json      | 50 ----------------------
 .../src/test/resources/queries/NullHanlding.json   | 40 -----------------
 .../test/resources/queries/TableExpressions.json   |  2 -
 .../apache/pinot/spi/utils/CommonConstants.java    |  5 ++-
 .../admin/command/ShowClusterInfoCommand.java      |  2 +-
 10 files changed, 54 insertions(+), 128 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java
index 6b38a4e808..38c9870709 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java
@@ -25,6 +25,7 @@ import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.services.ServiceRole;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,7 +38,6 @@ public class ServiceStartableUtils {
   private static final String CLUSTER_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/CLUSTER/%s";
   private static final String PINOT_ALL_CONFIG_KEY_PREFIX = "pinot.all.";
   private static final String PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE = "pinot.%s.";
-  private static final int ZK_TIMEOUT_MS = 30_000;
 
   /**
    * Applies the ZK cluster config to the given instance config if it does not already exist.
@@ -47,10 +47,19 @@ public class ServiceStartableUtils {
    */
   public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
       ServiceRole serviceRole) {
-
-    ZkClient zkClient = new ZkClient.Builder().setZkSerializer(new ZNRecordSerializer()).setZkServer(zkAddress)
-        .setConnectionTimeout(ZK_TIMEOUT_MS).build();
-    zkClient.waitUntilConnected(ZK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+    int zkClientSessionConfig =
+        instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
+            CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
+    int zkClientConnectionTimeoutMs =
+        instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
+            CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
+    ZkClient zkClient = new ZkClient.Builder()
+        .setZkSerializer(new ZNRecordSerializer())
+        .setZkServer(zkAddress)
+        .setConnectionTimeout(zkClientConnectionTimeoutMs)
+        .setSessionTimeout(zkClientSessionConfig)
+        .build();
+    zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);
 
     try {
       ZNRecord clusterConfigZNRecord =
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index b3334617c0..f6ebb5006d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -373,7 +373,7 @@ public abstract class BaseControllerStarter implements ServiceStartable {
 
     // Set up Pinot cluster in Helix if needed
     HelixSetupUtils.setupPinotCluster(_helixClusterName, _helixZkURL, _isUpdateStateModel, _enableBatchMessageMode,
-        _config.getLeadControllerResourceRebalanceStrategy());
+        _config);
 
     // Start all components
     initPinotFSFactory();
@@ -422,7 +422,8 @@ public abstract class BaseControllerStarter implements ServiceStartable {
 
     if (_config.getHLCTablesAllowed()) {
       LOGGER.info("Realtime tables with High Level consumers will be supported");
-      _realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager, _leadControllerManager);
+      _realtimeSegmentsManager =
+          new PinotRealtimeSegmentManager(_helixResourceManager, _leadControllerManager, _config);
       _realtimeSegmentsManager.start(_controllerMetrics);
     } else {
       LOGGER.info("Realtime tables with High Level consumers will NOT be supported");
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
index 9d559fd88c..6e7e9281a6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
@@ -46,6 +46,7 @@ import org.apache.pinot.common.utils.HLCSegmentName;
 import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
@@ -81,11 +82,13 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh
   private ZkClient _zkClient;
   private ControllerMetrics _controllerMetrics;
   private final LeadControllerManager _leadControllerManager;
+  private final ControllerConf _controllerConf;
 
   public PinotRealtimeSegmentManager(PinotHelixResourceManager pinotManager,
-      LeadControllerManager leadControllerManager) {
+      LeadControllerManager leadControllerManager, ControllerConf controllerConf) {
     _pinotHelixResourceManager = pinotManager;
     _leadControllerManager = leadControllerManager;
+    _controllerConf = controllerConf;
     String clusterName = _pinotHelixResourceManager.getHelixClusterName();
     _propertyStorePath = PropertyPathBuilder.propertyStore(clusterName);
     _tableConfigPath = _propertyStorePath + TABLE_CONFIG;
@@ -96,9 +99,19 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh
 
     LOGGER.info("Starting realtime segments manager, adding a listener on the property store table configs path.");
     String zkUrl = _pinotHelixResourceManager.getHelixZkURL();
-    _zkClient = new ZkClient(zkUrl, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
+    int zkClientSessionTimeoutMs =
+        _controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
+            CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
+    int zkClientConnectionTimeoutMs =
+        _controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
+            CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
+    _zkClient = new ZkClient.Builder()
+        .setZkServer(zkUrl)
+        .setSessionTimeout(zkClientSessionTimeoutMs)
+        .setConnectionTimeout(zkClientConnectionTimeoutMs)
+        .build();
     _zkClient.setZkSerializer(new ZNRecordSerializer());
-    _zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
+    _zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);
 
     // Subscribe to any data/child changes to property
     _zkClient.subscribeChildChanges(_tableConfigPath, this);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index adde0d2f4d..d42fe6a36b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -43,6 +43,7 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.pinot.common.utils.helix.LeadControllerUtils;
+import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator;
 import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -92,11 +93,22 @@ public class HelixSetupUtils {
   }
 
   public static void setupPinotCluster(String helixClusterName, String zkPath, boolean isUpdateStateModel,
-      boolean enableBatchMessageMode, String leadControllerResourceRebalanceStrategy) {
+      boolean enableBatchMessageMode, ControllerConf controllerConf) {
     ZkClient zkClient = null;
+    int zkClientSessionConfig =
+        controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
+            CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
+    int zkClientConnectionTimeoutMs =
+        controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
+            CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
     try {
-      zkClient = new ZkClient.Builder().setZkServer(zkPath).setZkSerializer(new ZNRecordSerializer()).build();
-      zkClient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+      zkClient = new ZkClient.Builder()
+          .setZkServer(zkPath)
+          .setSessionTimeout(zkClientSessionConfig)
+          .setConnectionTimeout(zkClientConnectionTimeoutMs)
+          .setZkSerializer(new ZNRecordSerializer())
+          .build();
+      zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);
       HelixAdmin helixAdmin = new ZKHelixAdmin(zkClient);
       HelixDataAccessor helixDataAccessor =
           new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<>(zkClient));
@@ -113,7 +125,7 @@ public class HelixSetupUtils {
 
       // Add lead controller resource if needed
       createLeadControllerResourceIfNeeded(helixClusterName, helixAdmin, configAccessor, enableBatchMessageMode,
-          leadControllerResourceRebalanceStrategy);
+          controllerConf.getLeadControllerResourceRebalanceStrategy());
     } finally {
       if (zkClient != null) {
         zkClient.close();
diff --git a/pinot-query-runtime/src/test/resources/queries/Cast.json b/pinot-query-runtime/src/test/resources/queries/Cast.json
deleted file mode 100644
index f640148a48..0000000000
--- a/pinot-query-runtime/src/test/resources/queries/Cast.json
+++ /dev/null
@@ -1,20 +0,0 @@
-{
-  "select_expression_test": {
-    "tables": {
-      "cast_as_int": {
-        "schema":[
-          {"name": "strCol", "type": "STRING"}
-        ],
-        "inputs": [
-          ["123"],
-          ["456"]
-        ]
-      }
-    },
-    "queries": [
-      {
-        "sql":"SELECT cast strCol as INT FROM {cast_as_int};"
-      }
-    ]
-  }
-}
diff --git a/pinot-query-runtime/src/test/resources/queries/MathFuncs.json b/pinot-query-runtime/src/test/resources/queries/MathFuncs.json
index 8915696b05..2a84c4b1d7 100644
--- a/pinot-query-runtime/src/test/resources/queries/MathFuncs.json
+++ b/pinot-query-runtime/src/test/resources/queries/MathFuncs.json
@@ -535,55 +535,5 @@
         "sql": "SELECT longCol / 1e20 FROM {numTbl}"
       }
     ]
-  },
-  "round": {
-    "tables": {
-      "numTbl": {
-        "schema": [
-          {"name": "intCol", "type": "INT"},
-          {"name": "longCol", "type": "LONG"},
-          {"name": "doubleCol", "type": "DOUBLE"},
-          {"name": "floatCol", "type": "FLOAT"}
-        ],
-        "inputs": [
-          [0, 3, 0.123, 3.2],
-          [123, 321, 4.242, 3.03],
-          [-456, -2, 1.134, 7.722],
-          [123, -456, 3.634, 9.12]
-        ]
-      }
-    },
-    "queries": [
-      {
-        "description": "test round on integer columns",
-        "ignored": true,
-        "comment": "we round the number up somehow",
-        "sql": "SELECT round(intCol, 2) FROM {numTbl}"
-      },
-      {
-        "description": "test round on long columns",
-        "ignored": true,
-        "comment": "we round the number up somehow",
-        "sql": "SELECT round(longCol, 2) FROM {numTbl}"
-      },
-      {
-        "description": "test round on double columns",
-        "ignored": true,
-        "comment": "double is rounded to 0",
-        "sql": "SELECT round(doubleCol, 2) FROM {numTbl}"
-      },
-      {
-        "description": "test ceil on float columns",
-        "ignored": true,
-        "comment": "float is rounded to 0",
-        "sql": "SELECT round(floatCol, 2) FROM {numTbl}"
-      },
-      {
-        "ignored": true,
-        "comment": "Caught exception while initializing transform function: round",
-        "description": "test round on literal",
-        "sql": "SELECT round(2.0, 0) FROM {numTbl}"
-      }
-    ]
   }
 }
diff --git a/pinot-query-runtime/src/test/resources/queries/NullHanlding.json b/pinot-query-runtime/src/test/resources/queries/NullHanlding.json
deleted file mode 100644
index d02234b8b0..0000000000
--- a/pinot-query-runtime/src/test/resources/queries/NullHanlding.json
+++ /dev/null
@@ -1,40 +0,0 @@
-{
-  "select_expression_test": {
-    "tables": {
-      "null_handling": {
-        "schema":[
-          {"name": "a", "type": "INT"},
-          {"name": "b", "type": "INT"},
-          {"name": "c", "type": "STRING"},
-          {"name": "d", "type": "BOOLEAN"}
-        ],
-        "inputs": [
-          [0, 1, null, true],
-          [1, 2, "AAAA", false],
-          [2, null, "AAAA", true],
-          [3, 3, "BBBB", false],
-          [4, 3, null,  true],
-          [null, 3, "bbbb", false],
-          [6, 4, "cccc", null],
-          [null, null, null, null],
-          [null, 4, "CCCC", null],
-          [9, 4, "CCCC", true]
-        ]
-      }
-    },
-    "queries": [
-      {
-        "sql":"SELECT is_null(a), is_null(b), is_null(c), is_null(d) FROM {null_handling};"
-      },
-      {
-        "sql":"SELECT is_not_null(a), is_not_null(b), is_not_null(c), is_not_null(d) FROM {null_handling};"
-      },
-      {
-        "sql":"SELECT is_null(null), is_not_null(null) FROM {null_handling};"
-      },
-      {
-        "sql":"SELECT is_null('a'), is_not_null('b') FROM {null_handling};"
-      }
-    ]
-  }
-}
diff --git a/pinot-query-runtime/src/test/resources/queries/TableExpressions.json b/pinot-query-runtime/src/test/resources/queries/TableExpressions.json
index a25970901d..a722a7bd93 100644
--- a/pinot-query-runtime/src/test/resources/queries/TableExpressions.json
+++ b/pinot-query-runtime/src/test/resources/queries/TableExpressions.json
@@ -17,10 +17,8 @@
     },
     "queries": [
       { "sql": "SELECT * FROM {tbl} WHERE intCol > 5" },
-      { "sql": "SELECT * FROM {tbl} WHERE strCol = 'foo'" },
       { "sql": "SELECT * FROM {tbl} WHERE strCol IN ('foo', 'bar')" },
       { "sql": "SELECT * FROM {tbl} WHERE intCol IN (196883, 42)" },
-      { "sql": "SELECT * FROM {tbl} WHERE intCol IN (111, 222)" },
       { "sql": "SELECT * FROM {tbl} WHERE intCol NOT IN (196883, 42) AND strCol IN ('alice')" },
       { "sql": "SELECT * FROM {tbl} WHERE strCol IN (SELECT strCol FROM {tbl} WHERE intCol > 100)" },
       { "sql": "SELECT * FROM {tbl} WHERE intCol < (SELECT SUM(intCol) FROM {tbl} AS b WHERE strCol BETWEEN 'bar' AND 'foo')" },
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index c0f8a26972..85dce1f421 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -120,10 +120,13 @@ public class CommonConstants {
     }
 
     public static class ZkClient {
-      public static final long DEFAULT_CONNECT_TIMEOUT_SEC = 60L;
+      public static final int DEFAULT_CONNECT_TIMEOUT_MS = 60_000;
+      public static final int DEFAULT_SESSION_TIMEOUT_MS = 30_000;
       // Retry interval and count for ZK operations where we would rather fail than get an empty (wrong) result back
       public static final int RETRY_INTERVAL_MS = 50;
       public static final int RETRY_COUNT = 2;
+      public static final String ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG = "zk.client.connection.timeout.ms";
+      public static final String ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG = "zk.client.session.timeout.ms";
     }
 
     public static class DataSource {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ShowClusterInfoCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ShowClusterInfoCommand.java
index 8f01b1d73c..ce365bfac1 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ShowClusterInfoCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ShowClusterInfoCommand.java
@@ -103,7 +103,7 @@ public class ShowClusterInfoCommand extends AbstractBaseAdminCommand implements
     ZkClient zkClient = new ZkClient(_zkAddress);
     zkClient.setZkSerializer(new ZNRecordStreamingSerializer());
     LOGGER.info("Connecting to Zookeeper at: {}", _zkAddress);
-    zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
+    zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
     ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
     ZKHelixDataAccessor zkHelixDataAccessor = new ZKHelixDataAccessor(_clusterName, baseDataAccessor);
     PropertyKey property = zkHelixDataAccessor.keyBuilder().liveInstances();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org