You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/29 07:16:50 UTC

(pinot) branch master updated: update ControllerJobType from enum to string (#12518)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 704f73d4f9 update ControllerJobType from enum to string (#12518)
704f73d4f9 is described below

commit 704f73d4f91e369a8a3fb2081ee5a3ba50a0daa5
Author: Haitao Zhang <ha...@startree.ai>
AuthorDate: Wed Feb 28 23:16:43 2024 -0800

    update ControllerJobType from enum to string (#12518)
---
 .../pinot/common/metadata/ZKMetadataProvider.java  |  5 ++---
 .../metadata/controllerjob/ControllerJobType.java  | 15 +++++++++++++--
 .../api/resources/PinotTableRestletResource.java   | 15 +++++----------
 .../helix/core/PinotHelixResourceManager.java      | 22 +++++++++++-----------
 .../rebalance/ZkBasedTableRebalanceObserver.java   |  2 +-
 .../tenant/ZkBasedTenantRebalanceObserver.java     |  2 +-
 6 files changed, 33 insertions(+), 28 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index eba5806365..d69d386a26 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -32,7 +32,6 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
 import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -174,8 +173,8 @@ public class ZKMetadataProvider {
     return StringUtil.join("/", PROPERTYSTORE_SEGMENTS_PREFIX, resourceName);
   }
 
-  public static String constructPropertyStorePathForControllerJob(ControllerJobType jobType) {
-    return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX, jobType.name());
+  public static String constructPropertyStorePathForControllerJob(String jobType) {
+    return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX, jobType);
   }
 
   public static String constructPropertyStorePathForResourceConfig(String resourceName) {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
index 025d7b6c39..e1a8efb8af 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
@@ -18,6 +18,17 @@
  */
 package org.apache.pinot.common.metadata.controllerjob;
 
-public enum ControllerJobType {
-  RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, TENANT_REBALANCE
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+
+
+public class ControllerJobType {
+  private ControllerJobType() {
+  }
+  public static final String RELOAD_SEGMENT = "RELOAD_SEGMENT";
+  public static final String FORCE_COMMIT = "FORCE_COMMIT";
+  public static final String TABLE_REBALANCE = "TABLE_REBALANCE";
+  public static final String TENANT_REBALANCE = "TENANT_REBALANCE";
+  public static final Set<String>
+      VALID_CONTROLLER_JOB_TYPE = ImmutableSet.of(RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, TENANT_REBALANCE);
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 23a405514b..8fa1d3b1a9 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -1082,20 +1082,15 @@ public class PinotTableRestletResource {
     List<String> tableNamesWithType =
         ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest,
             LOGGER);
-    Set<ControllerJobType> validJobTypes =
-        java.util.Arrays.stream(ControllerJobType.values()).collect(Collectors.toSet());
-    Set<ControllerJobType> jobTypesToFilter = null;
+    Set<String> jobTypesToFilter = null;
     if (StringUtils.isNotEmpty(jobTypesString)) {
-      try {
-        jobTypesToFilter = new HashSet<>(java.util.Arrays.asList(StringUtils.split(jobTypesString, ','))).stream()
-            .map(type -> ControllerJobType.valueOf(type)).collect(Collectors.toSet());
-      } catch (IllegalArgumentException e) {
-        throw new IllegalArgumentException("Valid Types are: " + validJobTypes);
-      }
+      jobTypesToFilter = new HashSet<>(java.util.Arrays.asList(StringUtils.split(jobTypesString, ',')))
+          .stream().collect(Collectors.toSet());
     }
     Map<String, Map<String, String>> result = new HashMap<>();
     for (String tableNameWithType : tableNamesWithType) {
-      result.putAll(_pinotHelixResourceManager.getAllJobs(jobTypesToFilter == null ? validJobTypes : jobTypesToFilter,
+      result.putAll(_pinotHelixResourceManager.getAllJobs(jobTypesToFilter == null
+              ? ControllerJobType.VALID_CONTROLLER_JOB_TYPE : jobTypesToFilter,
           jobMetadata -> jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)
               .equals(tableNameWithType)));
     }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 22ba2de5a7..5e19ea9cf6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2036,7 +2036,7 @@ public class PinotHelixResourceManager {
    * @return Map representing the job's ZK properties
    */
   @Nullable
-  public Map<String, String> getControllerJobZKMetadata(String jobId, ControllerJobType jobType) {
+  public Map<String, String> getControllerJobZKMetadata(String jobId, String jobType) {
     String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
     ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, null, AccessOption.PERSISTENT);
     return jobsZnRecord != null ? jobsZnRecord.getMapFields().get(jobId) : null;
@@ -2046,10 +2046,10 @@ public class PinotHelixResourceManager {
    * Returns a Map of jobId to job's ZK metadata that passes the checker, like for specific tables.
    * @return A Map of jobId to job properties
    */
-  public Map<String, Map<String, String>> getAllJobs(Set<ControllerJobType> jobTypes,
+  public Map<String, Map<String, String>> getAllJobs(Set<String> jobTypes,
       Predicate<Map<String, String>> jobMetadataChecker) {
     Map<String, Map<String, String>> controllerJobs = new HashMap<>();
-    for (ControllerJobType jobType : jobTypes) {
+    for (String jobType : jobTypes) {
       String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
       ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, null, AccessOption.PERSISTENT);
       if (jobsZnRecord == null) {
@@ -2059,8 +2059,8 @@ public class PinotHelixResourceManager {
       for (Map.Entry<String, Map<String, String>> jobMetadataEntry : jobMetadataMap.entrySet()) {
         String jobId = jobMetadataEntry.getKey();
         Map<String, String> jobMetadata = jobMetadataEntry.getValue();
-        Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType.name()),
-            "Got unexpected jobType: %s at jobResourcePath: %s with jobId: %s", jobType.name(), jobResourcePath, jobId);
+        Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType),
+            "Got unexpected jobType: %s at jobResourcePath: %s with jobId: %s", jobType, jobResourcePath, jobId);
         if (jobMetadataChecker.test(jobMetadata)) {
           controllerJobs.put(jobId, jobMetadata);
         }
@@ -2083,7 +2083,7 @@ public class PinotHelixResourceManager {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType);
-    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs));
     jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numMessagesSent));
     jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentName);
@@ -2096,7 +2096,7 @@ public class PinotHelixResourceManager {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType);
-    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.FORCE_COMMIT.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.FORCE_COMMIT);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs));
     jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
         JsonUtils.objectToString(consumingSegmentsCommitted));
@@ -2116,7 +2116,7 @@ public class PinotHelixResourceManager {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType);
-    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs));
     jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numberOfMessagesSent));
     return addControllerJobToZK(jobId, jobMetadata, ControllerJobType.RELOAD_SEGMENT);
@@ -2129,7 +2129,7 @@ public class PinotHelixResourceManager {
    * @param jobType the type of the job to figure out where job metadata is kept in ZK
    * @return boolean representing success / failure of the ZK write step
    */
-  public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, ControllerJobType jobType) {
+  public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, String jobType) {
     return addControllerJobToZK(jobId, jobMetadata, jobType, prev -> true);
   }
 
@@ -2141,7 +2141,7 @@ public class PinotHelixResourceManager {
    * @param prevJobMetadataChecker to check the previous job metadata before adding new one
    * @return boolean representing success / failure of the ZK write step
    */
-  public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, ControllerJobType jobType,
+  public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, String jobType,
       Predicate<Map<String, String>> prevJobMetadataChecker) {
     Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS) != null,
         "Submission Time in JobMetadata record not set. Cannot expire these records");
@@ -2178,7 +2178,7 @@ public class PinotHelixResourceManager {
    * @param updater to modify the job metadata in place
    * @return boolean representing success / failure of the ZK write step
    */
-  public boolean updateJobsForTable(String tableNameWithType, ControllerJobType jobType,
+  public boolean updateJobsForTable(String tableNameWithType, String jobType,
       Consumer<Map<String, String>> updater) {
     String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
     Stat stat = new Stat();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index f02a62cdee..8386544f3c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -206,7 +206,7 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver {
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType);
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis()));
-    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name());
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE);
     try {
       jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
           JsonUtils.objectToString(tableRebalanceProgressStats));
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
index 9fabc64d9d..81962f90f2 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
@@ -100,7 +100,7 @@ public class ZkBasedTenantRebalanceObserver implements TenantRebalanceObserver {
     jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis()));
-    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TENANT_REBALANCE.name());
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TENANT_REBALANCE);
     try {
       jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
           JsonUtils.objectToString(_progressStats));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org