You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2020/10/06 00:08:11 UTC

[incubator-pinot] branch master updated: [Issue 6068] Fixing the calls to Helix to throw exception if zk conne… (#6069)

This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 24147dd  [Issue 6068] Fixing the calls to Helix to throw exception if zk conne… (#6069)
24147dd is described below

commit 24147dd5b7dc188e01af283bd0d025a6cec3527b
Author: Subbu Subramaniam <mc...@users.noreply.github.com>
AuthorDate: Mon Oct 5 17:07:53 2020 -0700

    [Issue 6068] Fixing the calls to Helix to throw exception if zk conne… (#6069)
    
    * [Issue 6068] Fixing the calls to Helix to throw exception if zk connection is broken
    
    See Issue #6068
    
    * Update zk metadata APIs to use timeout zk timeout
    
    These APIs will ensure that if there is a zk disconnect
    we will get an exception after a minimal number of retries.
    We can change this to retry once and implement a backoff retry
    if needed later on.
    
    Note that the underlying helix library ends up calling the previous
    API (as yet), but we will upgrade to a helix version soon that actually
    implements these
    
    * Fixing a unit test mock call
    
    * Updating more calls to getChildren
---
 .../java/org/apache/pinot/broker/routing/RoutingManager.java   |  3 ++-
 .../org/apache/pinot/common/metadata/ZKMetadataProvider.java   | 10 +++++++---
 .../java/org/apache/pinot/common/utils/CommonConstants.java    |  3 +++
 .../java/org/apache/pinot/common/utils/helix/HelixHelper.java  |  2 +-
 .../pinot/controller/helix/core/PinotHelixResourceManager.java |  2 +-
 .../pinot/controller/helix/core/rebalance/TableRebalancer.java |  2 +-
 .../segment/OfflineReplicaGroupSegmentAssignmentTest.java      |  2 +-
 .../main/java/org/apache/pinot/tools/UpdateSegmentState.java   |  4 +++-
 8 files changed, 19 insertions(+), 9 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
index 8fe3f52..940e40c 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
@@ -213,7 +213,8 @@ public class RoutingManager implements ClusterChangeHandler {
     long startTimeMs = System.currentTimeMillis();
 
     List<ZNRecord> instanceConfigZNRecords =
-        _zkDataAccessor.getChildren(_instanceConfigsPath, null, AccessOption.PERSISTENT);
+        _zkDataAccessor.getChildren(_instanceConfigsPath, null, AccessOption.PERSISTENT,
+            CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
     long fetchInstanceConfigsEndTimeMs = System.currentTimeMillis();
 
     // Calculate new enabled and disabled instances
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 8278777..af5a5d6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -31,6 +31,7 @@ import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.common.utils.StringUtil;
@@ -305,7 +306,8 @@ public class ZKMetadataProvider {
       ZkHelixPropertyStore<ZNRecord> propertyStore, String tableName) {
     String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
     String parentPath = constructPropertyStorePathForResource(offlineTableName);
-    List<ZNRecord> znRecords = propertyStore.getChildren(parentPath, null, AccessOption.PERSISTENT);
+    List<ZNRecord> znRecords = propertyStore.getChildren(parentPath, null, AccessOption.PERSISTENT,
+        CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
     if (znRecords != null) {
       int numZNRecords = znRecords.size();
       List<OfflineSegmentZKMetadata> offlineSegmentZKMetadataList = new ArrayList<>(numZNRecords);
@@ -335,7 +337,8 @@ public class ZKMetadataProvider {
       ZkHelixPropertyStore<ZNRecord> propertyStore, String tableName) {
     String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
     String parentPath = constructPropertyStorePathForResource(realtimeTableName);
-    List<ZNRecord> znRecords = propertyStore.getChildren(parentPath, null, AccessOption.PERSISTENT);
+    List<ZNRecord> znRecords = propertyStore.getChildren(parentPath, null, AccessOption.PERSISTENT,
+        CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
     if (znRecords != null) {
       int numZNRecords = znRecords.size();
       List<RealtimeSegmentZKMetadata> realtimeSegmentZKMetadataList = new ArrayList<>(numZNRecords);
@@ -365,7 +368,8 @@ public class ZKMetadataProvider {
       ZkHelixPropertyStore<ZNRecord> propertyStore, String tableName) {
     String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
     String parentPath = constructPropertyStorePathForResource(realtimeTableName);
-    List<ZNRecord> znRecords = propertyStore.getChildren(parentPath, null, AccessOption.PERSISTENT);
+    List<ZNRecord> znRecords = propertyStore.getChildren(parentPath, null, AccessOption.PERSISTENT,
+        CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
     if (znRecords != null) {
       int numZNRecords = znRecords.size();
       List<LLCRealtimeSegmentZKMetadata> llcRealtimeSegmentZKMetadataList = new ArrayList<>(numZNRecords);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 9b18af7..2ed1259 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -90,6 +90,9 @@ public class CommonConstants {
 
     public static class ZkClient {
       public static final long DEFAULT_CONNECT_TIMEOUT_SEC = 60L;
+      // Retry interval and count for ZK operations where we would rather fail than get an empty (wrong) result back
+      public static final int RETRY_INTERVAL_MS = 50;
+      public static final int RETRY_COUNT = 2;
     }
 
     public static class DataSource {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
index 064a699..7430723 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
@@ -405,7 +405,7 @@ public class HelixHelper {
    */
   public static List<InstanceConfig> getInstanceConfigs(HelixManager helixManager) {
     HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
-    return helixDataAccessor.getChildValues(helixDataAccessor.keyBuilder().instanceConfigs());
+    return helixDataAccessor.getChildValues(helixDataAccessor.keyBuilder().instanceConfigs(), true);
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 0cc0a34..b631a01 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2159,7 +2159,7 @@ public class PinotHelixResourceManager {
         boolean toggleSucceeded = true;
         // Checks all the current states fall into the target states
         PropertyKey instanceCurrentStatesKey = _keyBuilder.currentStates(instanceName, liveInstance.getSessionId());
-        List<CurrentState> instanceCurrentStates = _helixDataAccessor.getChildValues(instanceCurrentStatesKey);
+        List<CurrentState> instanceCurrentStates = _helixDataAccessor.getChildValues(instanceCurrentStatesKey, true);
         if (instanceCurrentStates.isEmpty()) {
           return PinotResourceManagerResponse.SUCCESS;
         } else {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index e23aab6..644e990 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -421,7 +421,7 @@ public class TableRebalancer {
         LOGGER.info("Reassigning {} instances for table: {}", instancePartitionsType, tableNameWithType);
         InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
         InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType,
-            _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs()));
+            _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true));
         if (!dryRun) {
           LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions);
           InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
index 340a46a..f6728c4 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
@@ -128,7 +128,7 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
     }
     when(propertyStoreWithPartitions
         .getChildren(eq(ZKMetadataProvider.constructPropertyStorePathForResource(OFFLINE_TABLE_NAME_WITH_PARTITION)),
-            any(), anyInt())).thenReturn(segmentZKMetadataZNRecords);
+            any(), anyInt(), anyInt(), anyInt())).thenReturn(segmentZKMetadataZNRecords);
     HelixManager helixManagerWithPartitions = mock(HelixManager.class);
     when(helixManagerWithPartitions.getHelixPropertyStore()).thenReturn(propertyStoreWithPartitions);
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/UpdateSegmentState.java b/pinot-tools/src/main/java/org/apache/pinot/tools/UpdateSegmentState.java
index fe7b090..e4bcc8a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpdateSegmentState.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpdateSegmentState.java
@@ -28,6 +28,7 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.kohsuke.args4j.Option;
@@ -133,7 +134,8 @@ public class UpdateSegmentState extends AbstractBaseCommand implements Command {
   public List<String> getAllTenantTables()
       throws Exception {
     String tableConfigPath = "/CONFIGS/TABLE";
-    List<ZNRecord> tableConfigs = _propertyStore.getChildren(tableConfigPath, null, 0);
+    List<ZNRecord> tableConfigs = _propertyStore.getChildren(tableConfigPath, null, 0,
+        CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
     List<String> tables = new ArrayList<>(128);
     for (ZNRecord znRecord : tableConfigs) {
       TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org