You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/29 07:16:50 UTC
(pinot) branch master updated: update ControllerJobType from enum to string (#12518)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 704f73d4f9 update ControllerJobType from enum to string (#12518)
704f73d4f9 is described below
commit 704f73d4f91e369a8a3fb2081ee5a3ba50a0daa5
Author: Haitao Zhang <ha...@startree.ai>
AuthorDate: Wed Feb 28 23:16:43 2024 -0800
update ControllerJobType from enum to string (#12518)
---
.../pinot/common/metadata/ZKMetadataProvider.java | 5 ++---
.../metadata/controllerjob/ControllerJobType.java | 15 +++++++++++++--
.../api/resources/PinotTableRestletResource.java | 15 +++++----------
.../helix/core/PinotHelixResourceManager.java | 22 +++++++++++-----------
.../rebalance/ZkBasedTableRebalanceObserver.java | 2 +-
.../tenant/ZkBasedTenantRebalanceObserver.java | 2 +-
6 files changed, 33 insertions(+), 28 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index eba5806365..d69d386a26 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -32,7 +32,6 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.pinot.common.assignment.InstancePartitions;
-import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
@@ -174,8 +173,8 @@ public class ZKMetadataProvider {
return StringUtil.join("/", PROPERTYSTORE_SEGMENTS_PREFIX, resourceName);
}
- public static String constructPropertyStorePathForControllerJob(ControllerJobType jobType) {
- return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX, jobType.name());
+ public static String constructPropertyStorePathForControllerJob(String jobType) {
+ return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX, jobType);
}
public static String constructPropertyStorePathForResourceConfig(String resourceName) {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
index 025d7b6c39..e1a8efb8af 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
@@ -18,6 +18,17 @@
*/
package org.apache.pinot.common.metadata.controllerjob;
-public enum ControllerJobType {
- RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, TENANT_REBALANCE
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+
+
+public class ControllerJobType {
+ private ControllerJobType() {
+ }
+ public static final String RELOAD_SEGMENT = "RELOAD_SEGMENT";
+ public static final String FORCE_COMMIT = "FORCE_COMMIT";
+ public static final String TABLE_REBALANCE = "TABLE_REBALANCE";
+ public static final String TENANT_REBALANCE = "TENANT_REBALANCE";
+ public static final Set<String>
+ VALID_CONTROLLER_JOB_TYPE = ImmutableSet.of(RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, TENANT_REBALANCE);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 23a405514b..8fa1d3b1a9 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -1082,20 +1082,15 @@ public class PinotTableRestletResource {
List<String> tableNamesWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest,
LOGGER);
- Set<ControllerJobType> validJobTypes =
- java.util.Arrays.stream(ControllerJobType.values()).collect(Collectors.toSet());
- Set<ControllerJobType> jobTypesToFilter = null;
+ Set<String> jobTypesToFilter = null;
if (StringUtils.isNotEmpty(jobTypesString)) {
- try {
- jobTypesToFilter = new HashSet<>(java.util.Arrays.asList(StringUtils.split(jobTypesString, ','))).stream()
- .map(type -> ControllerJobType.valueOf(type)).collect(Collectors.toSet());
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException("Valid Types are: " + validJobTypes);
- }
+ jobTypesToFilter = new HashSet<>(java.util.Arrays.asList(StringUtils.split(jobTypesString, ',')))
+ .stream().collect(Collectors.toSet());
}
Map<String, Map<String, String>> result = new HashMap<>();
for (String tableNameWithType : tableNamesWithType) {
- result.putAll(_pinotHelixResourceManager.getAllJobs(jobTypesToFilter == null ? validJobTypes : jobTypesToFilter,
+ result.putAll(_pinotHelixResourceManager.getAllJobs(jobTypesToFilter == null
+ ? ControllerJobType.VALID_CONTROLLER_JOB_TYPE : jobTypesToFilter,
jobMetadata -> jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)
.equals(tableNameWithType)));
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 22ba2de5a7..5e19ea9cf6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2036,7 +2036,7 @@ public class PinotHelixResourceManager {
* @return Map representing the job's ZK properties
*/
@Nullable
- public Map<String, String> getControllerJobZKMetadata(String jobId, ControllerJobType jobType) {
+ public Map<String, String> getControllerJobZKMetadata(String jobId, String jobType) {
String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, null, AccessOption.PERSISTENT);
return jobsZnRecord != null ? jobsZnRecord.getMapFields().get(jobId) : null;
@@ -2046,10 +2046,10 @@ public class PinotHelixResourceManager {
* Returns a Map of jobId to job's ZK metadata that passes the checker, like for specific tables.
* @return A Map of jobId to job properties
*/
- public Map<String, Map<String, String>> getAllJobs(Set<ControllerJobType> jobTypes,
+ public Map<String, Map<String, String>> getAllJobs(Set<String> jobTypes,
Predicate<Map<String, String>> jobMetadataChecker) {
Map<String, Map<String, String>> controllerJobs = new HashMap<>();
- for (ControllerJobType jobType : jobTypes) {
+ for (String jobType : jobTypes) {
String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, null, AccessOption.PERSISTENT);
if (jobsZnRecord == null) {
@@ -2059,8 +2059,8 @@ public class PinotHelixResourceManager {
for (Map.Entry<String, Map<String, String>> jobMetadataEntry : jobMetadataMap.entrySet()) {
String jobId = jobMetadataEntry.getKey();
Map<String, String> jobMetadata = jobMetadataEntry.getValue();
- Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType.name()),
- "Got unexpected jobType: %s at jobResourcePath: %s with jobId: %s", jobType.name(), jobResourcePath, jobId);
+ Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType),
+ "Got unexpected jobType: %s at jobResourcePath: %s with jobId: %s", jobType, jobResourcePath, jobId);
if (jobMetadataChecker.test(jobMetadata)) {
controllerJobs.put(jobId, jobMetadata);
}
@@ -2083,7 +2083,7 @@ public class PinotHelixResourceManager {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType);
- jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString());
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numMessagesSent));
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentName);
@@ -2096,7 +2096,7 @@ public class PinotHelixResourceManager {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType);
- jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.FORCE_COMMIT.toString());
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.FORCE_COMMIT);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
JsonUtils.objectToString(consumingSegmentsCommitted));
@@ -2116,7 +2116,7 @@ public class PinotHelixResourceManager {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType);
- jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString());
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numberOfMessagesSent));
return addControllerJobToZK(jobId, jobMetadata, ControllerJobType.RELOAD_SEGMENT);
@@ -2129,7 +2129,7 @@ public class PinotHelixResourceManager {
* @param jobType the type of the job to figure out where job metadata is kept in ZK
* @return boolean representing success / failure of the ZK write step
*/
- public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, ControllerJobType jobType) {
+ public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, String jobType) {
return addControllerJobToZK(jobId, jobMetadata, jobType, prev -> true);
}
@@ -2141,7 +2141,7 @@ public class PinotHelixResourceManager {
* @param prevJobMetadataChecker to check the previous job metadata before adding new one
* @return boolean representing success / failure of the ZK write step
*/
- public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, ControllerJobType jobType,
+ public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, String jobType,
Predicate<Map<String, String>> prevJobMetadataChecker) {
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS) != null,
"Submission Time in JobMetadata record not set. Cannot expire these records");
@@ -2178,7 +2178,7 @@ public class PinotHelixResourceManager {
* @param updater to modify the job metadata in place
* @return boolean representing success / failure of the ZK write step
*/
- public boolean updateJobsForTable(String tableNameWithType, ControllerJobType jobType,
+ public boolean updateJobsForTable(String tableNameWithType, String jobType,
Consumer<Map<String, String>> updater) {
String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
Stat stat = new Stat();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index f02a62cdee..8386544f3c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -206,7 +206,7 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver {
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType);
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis()));
- jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name());
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE);
try {
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(tableRebalanceProgressStats));
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
index 9fabc64d9d..81962f90f2 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
@@ -100,7 +100,7 @@ public class ZkBasedTenantRebalanceObserver implements TenantRebalanceObserver {
jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis()));
- jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TENANT_REBALANCE.name());
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TENANT_REBALANCE);
try {
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(_progressStats));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org