You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/10/15 05:58:41 UTC

[pinot] branch master updated: Adding new generic method in ClusterInfoAccessor for getting ZNRecord (#7580)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5106a92  Adding new generic method in ClusterInfoAccessor for getting ZNRecord (#7580)
5106a92 is described below

commit 5106a92d84ae29c3c02320fba5f1e516f9cb01a1
Author: Tim Santos <ti...@cortexdata.io>
AuthorDate: Thu Oct 14 22:58:08 2021 -0700

    Adding new generic method in ClusterInfoAccessor for getting ZNRecord (#7580)
    
    Adding new generic method in ClusterInfoAccessor for getting a minion task metadata ZNRecord. Deleted the existing ZNRecord methods (getMinionRealtimeToOfflineSegmentsTaskMetadata and getMinionMergeRollupTaskZNRecord) that do the same thing and migrated them to use the new generic method.
    
    This new method just wraps the MinionTaskMetadataUtils.fetchTaskMetadata static method so there isn't much to test. Existing unit tests for the task generators should provide enough coverage.
---
 .../helix/core/minion/ClusterInfoAccessor.java     |  23 ++--
 .../MergeRollupMinionClusterIntegrationTest.java   |  15 +--
 ...fflineSegmentsMinionClusterIntegrationTest.java |   5 +-
 .../mergerollup/MergeRollupTaskGenerator.java      |   3 +-
 .../RealtimeToOfflineSegmentsTaskGenerator.java    |   6 +-
 .../mergerollup/MergeRollupTaskGeneratorTest.java  | 116 ++++++++++-----------
 ...RealtimeToOfflineSegmentsTaskGeneratorTest.java |  38 ++++---
 7 files changed, 106 insertions(+), 100 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index e68a99d..db666da 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -97,12 +97,14 @@ public class ClusterInfoAccessor {
   }
 
   /**
-   * Fetches the ZNRecord under MINION_TASK_METADATA/MergeRollupTask for the given tableNameWithType
-   * @param tableNameWithType table name with type
+   * Fetches the ZNRecord under MINION_TASK_METADATA/${taskType} for the given taskType and tableNameWithType
+   *
+   * @param taskType The type of the minion task
+   * @param tableNameWithType Table name with type
    */
-  public ZNRecord getMinionMergeRollupTaskZNRecord(String tableNameWithType) {
-    return MinionTaskMetadataUtils.fetchTaskMetadata(_pinotHelixResourceManager.getPropertyStore(),
-        MinionConstants.MergeRollupTask.TASK_TYPE, tableNameWithType);
+  public ZNRecord getMinionTaskMetadataZNRecord(String taskType, String tableNameWithType) {
+    return MinionTaskMetadataUtils.fetchTaskMetadata(_pinotHelixResourceManager.getPropertyStore(), taskType,
+        tableNameWithType);
   }
 
   /**
@@ -127,17 +129,6 @@ public class ClusterInfoAccessor {
   }
 
   /**
-   * Fetches the {@link RealtimeToOfflineSegmentsTaskMetadata} from MINION_TASK_METADATA for given realtime table
-   * @param tableNameWithType realtime table name
-   */
-  public RealtimeToOfflineSegmentsTaskMetadata getMinionRealtimeToOfflineSegmentsTaskMetadata(
-      String tableNameWithType) {
-    ZNRecord znRecord = MinionTaskMetadataUtils.fetchTaskMetadata(_pinotHelixResourceManager.getPropertyStore(),
-        MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, tableNameWithType);
-    return znRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord) : null;
-  }
-
-  /**
    * Sets the {@link RealtimeToOfflineSegmentsTaskMetadata} into MINION_TASK_METADATA
    * This call will override any previous metadata node
    */
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index d86e00a..f26cfc4 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -278,8 +278,9 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
       waitForTaskToComplete();
 
       // Check watermark
-      MergeRollupTaskMetadata minionTaskMetadata = MergeRollupTaskMetadata
-          .fromZNRecord(_taskManager.getClusterInfoAccessor().getMinionMergeRollupTaskZNRecord(offlineTableName));
+      MergeRollupTaskMetadata minionTaskMetadata = MergeRollupTaskMetadata.fromZNRecord(
+          _taskManager.getClusterInfoAccessor()
+              .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, offlineTableName));
       assertNotNull(minionTaskMetadata);
       assertEquals((long) minionTaskMetadata.getWatermarkMap().get("100days"), expectedWatermark);
       expectedWatermark += 100 * 86_400_000L;
@@ -373,8 +374,9 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
       waitForTaskToComplete();
 
       // Check watermark
-      MergeRollupTaskMetadata minionTaskMetadata = MergeRollupTaskMetadata
-          .fromZNRecord(_taskManager.getClusterInfoAccessor().getMinionMergeRollupTaskZNRecord(offlineTableName));
+      MergeRollupTaskMetadata minionTaskMetadata = MergeRollupTaskMetadata.fromZNRecord(
+          _taskManager.getClusterInfoAccessor()
+              .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, offlineTableName));
       assertNotNull(minionTaskMetadata);
       assertEquals((long) minionTaskMetadata.getWatermarkMap().get("150days"), expectedWatermark);
       expectedWatermark += 150 * 86_400_000L;
@@ -512,8 +514,9 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
       waitForTaskToComplete();
 
       // Check watermark
-      MergeRollupTaskMetadata minionTaskMetadata = MergeRollupTaskMetadata
-          .fromZNRecord(_taskManager.getClusterInfoAccessor().getMinionMergeRollupTaskZNRecord(offlineTableName));
+      MergeRollupTaskMetadata minionTaskMetadata = MergeRollupTaskMetadata.fromZNRecord(
+          _taskManager.getClusterInfoAccessor()
+              .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, offlineTableName));
       assertNotNull(minionTaskMetadata);
       assertEquals(minionTaskMetadata.getWatermarkMap().get("45days"), expectedWatermarks45Days[numTasks]);
       assertEquals(minionTaskMetadata.getWatermarkMap().get("90days"), expectedWatermarks90Days[numTasks]);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
index 6ce2d05..f7710e2 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
@@ -154,8 +155,10 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends Realt
     }, 600_000L, "Failed to complete task");
 
     // Check segment ZK metadata
+    ZNRecord znRecord = _taskManager.getClusterInfoAccessor()
+        .getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, _realtimeTableName);
     RealtimeToOfflineSegmentsTaskMetadata minionTaskMetadata =
-        _taskManager.getClusterInfoAccessor().getMinionRealtimeToOfflineSegmentsTaskMetadata(_realtimeTableName);
+        znRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord) : null;
     Assert.assertNotNull(minionTaskMetadata);
     Assert.assertEquals(minionTaskMetadata.getWatermarkMs(), expectedWatermark);
   }
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 8e9265c..42457ef 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -199,7 +199,8 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
         }
       }
 
-      ZNRecord mergeRollupTaskZNRecord = _clusterInfoAccessor.getMinionMergeRollupTaskZNRecord(offlineTableName);
+      ZNRecord mergeRollupTaskZNRecord = _clusterInfoAccessor
+          .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, offlineTableName);
       int expectedVersion = mergeRollupTaskZNRecord != null ? mergeRollupTaskZNRecord.getVersion() : -1;
       MergeRollupTaskMetadata mergeRollupTaskMetadata =
           mergeRollupTaskZNRecord != null ? MergeRollupTaskMetadata.fromZNRecord(mergeRollupTaskZNRecord)
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index 6fd6f3c..a72bac0 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
@@ -290,8 +291,11 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
    */
   private long getWatermarkMs(String realtimeTableName, List<SegmentZKMetadata> completedSegmentsZKMetadata,
       long bucketMs) {
+    ZNRecord realtimeToOfflineZNRecord = _clusterInfoAccessor
+        .getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, realtimeTableName);
     RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
-        _clusterInfoAccessor.getMinionRealtimeToOfflineSegmentsTaskMetadata(realtimeTableName);
+        realtimeToOfflineZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata
+            .fromZNRecord(realtimeToOfflineZNRecord) : null;
 
     if (realtimeToOfflineSegmentsTaskMetadata == null) {
       // No ZNode exists. Cold-start.
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
index 898a07b..6eb2c31 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
@@ -132,10 +132,10 @@ public class MergeRollupTaskGeneratorTest {
       return null;
     }).when(mockClusterInfoProvide).setMergeRollupTaskMetadata(any(MergeRollupTaskMetadata.class), anyInt());
 
-    when(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(anyString())).thenAnswer(invocation -> {
+    when(mockClusterInfoProvide.getMinionTaskMetadataZNRecord(anyString(), anyString())).thenAnswer(invocation -> {
       Object[] arguments = invocation.getArguments();
-      if (arguments != null && arguments.length > 0 && arguments[0] != null) {
-        String tableNameWithType = (String) arguments[0];
+      if (arguments != null && arguments.length == 2 && arguments[1] != null) {
+        String tableNameWithType = (String) arguments[1];
         if (mockMergeRollupTaskMetadataMap.containsKey(tableNameWithType)) {
           return mockMergeRollupTaskMetadataMap.get(tableNameWithType).toZNRecord();
         }
@@ -165,7 +165,8 @@ public class MergeRollupTaskGeneratorTest {
     MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
     generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
-    assertNull(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME));
+    assertNull(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME));
     assertEquals(pinotTaskConfigs.size(), 0);
   }
 
@@ -194,7 +195,8 @@ public class MergeRollupTaskGeneratorTest {
     MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
     generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
-    assertNull(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME));
+    assertNull(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME));
     assertEquals(pinotTaskConfigs.size(), 0);
   }
 
@@ -257,8 +259,8 @@ public class MergeRollupTaskGeneratorTest {
     generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 1);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat",
-        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat", "1d",
+        null, "1000000");
     assertEquals(pinotTaskConfigs.get(0).getConfigs().get(MinionConstants.DOWNLOAD_URL_KEY), "download1,download2");
 
     // Multiple tasks
@@ -270,8 +272,8 @@ public class MergeRollupTaskGeneratorTest {
         .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3));
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 2);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat",
-        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat", "1d",
+        null, "1000000");
     checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, DAILY, "concat", "1d", null, "1000000");
   }
 
@@ -314,12 +316,10 @@ public class MergeRollupTaskGeneratorTest {
     generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 3);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat",
-        "1d", null, "1000000");
-    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, DAILY, "concat",
-        "1d", null, "1000000");
-    checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName4, DAILY, "concat",
-        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat", "1d",
+        null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, DAILY, "concat", "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName4, DAILY, "concat", "1d", null, "1000000");
 
     // Has spilled over data
     String segmentName6 = "testTable__6";
@@ -329,22 +329,20 @@ public class MergeRollupTaskGeneratorTest {
         .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5, metadata6));
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 2);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat",
-        "1d", null, "1000000");
-    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3 + "," + segmentName6, DAILY, "concat",
-        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat", "1d",
+        null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3 + "," + segmentName6, DAILY, "concat", "1d",
+        null, "1000000");
 
     // Has time bucket without overlapping segments
     when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
         .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata4, metadata5));
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 3);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat",
-        "1d", null, "1000000");
-    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName4, DAILY, "concat",
-        "1d", null, "1000000");
-    checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName5, DAILY, "concat",
-        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat", "1d",
+        null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName4, DAILY, "concat", "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName5, DAILY, "concat", "1d", null, "1000000");
 
     // Has un-merged buckets
     metadata6 = getSegmentZKMetadata(segmentName6, 432_000_000L, 432_100_000L, TimeUnit.MILLISECONDS, null);
@@ -355,12 +353,9 @@ public class MergeRollupTaskGeneratorTest {
         .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5, metadata6));
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 3);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName3, DAILY, "concat",
-        "1d", null, "1000000");
-    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName5, DAILY, "concat",
-        "1d", null, "1000000");
-    checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName6, DAILY, "concat",
-        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName3, DAILY, "concat", "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName5, DAILY, "concat", "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName6, DAILY, "concat", "1d", null, "1000000");
 
     // Test number of scheduled buckets < numParallelBuckets
     tableTaskConfigs.put("monthly.mergeType", "concat");
@@ -371,8 +366,9 @@ public class MergeRollupTaskGeneratorTest {
     TreeMap<String, Long> waterMarkMap = new TreeMap<>();
     // Watermark for daily is at 30 days since epoch
     waterMarkMap.put(DAILY, 2_592_000_000L);
-    when(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME)).thenReturn(
-        new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, waterMarkMap).toZNRecord());
+    when(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME))
+        .thenReturn(new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, waterMarkMap).toZNRecord());
 
     String segmentName7 = "testTable__7";
     String segmentName8 = "testTable__8";
@@ -386,8 +382,7 @@ public class MergeRollupTaskGeneratorTest {
         .thenReturn(Lists.newArrayList(metadata7, metadata8));
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 1);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName7, MONTHLY, "concat", "30d",
-        null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName7, MONTHLY, "concat", "30d", null, "1000000");
   }
 
   /**
@@ -436,10 +431,10 @@ public class MergeRollupTaskGeneratorTest {
     generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 2);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat",
-        "1d", null, "1000000");
-    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3 + "," + segmentName4, DAILY, "concat",
-        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat", "1d",
+        null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3 + "," + segmentName4, DAILY, "concat", "1d",
+        null, "1000000");
 
     // With numMaxRecordsPerTask constraints
     tableTaskConfigs.put("daily.maxNumRecordsPerTask", "5000000");
@@ -450,8 +445,8 @@ public class MergeRollupTaskGeneratorTest {
 
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 3);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat",
-        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat", "1d",
+        null, "1000000");
     checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, DAILY, "concat", "1d", null, "1000000");
     checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName4, DAILY, "concat", "1d", null, "1000000");
   }
@@ -486,8 +481,8 @@ public class MergeRollupTaskGeneratorTest {
     MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
     generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
-    assertEquals(MergeRollupTaskMetadata
-        .fromZNRecord(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME)).getWatermarkMap()
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap()
         .get(DAILY).longValue(), 86_400_000L);
     assertEquals(pinotTaskConfigs.size(), 1);
     checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1, DAILY, "concat", "1d", null, "1000000");
@@ -499,8 +494,8 @@ public class MergeRollupTaskGeneratorTest {
         .setMergeRollupTaskMetadata(new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, waterMarkMap), -1);
     metadata1.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY));
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
-    assertEquals(MergeRollupTaskMetadata
-        .fromZNRecord(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME)).getWatermarkMap()
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap()
         .get(DAILY).longValue(), 345_600_000L);
     assertEquals(pinotTaskConfigs.size(), 1);
     checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName2, DAILY, "concat", "1d", null, "1000000");
@@ -511,8 +506,8 @@ public class MergeRollupTaskGeneratorTest {
         .setMergeRollupTaskMetadata(new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, waterMarkMap), -1);
     metadata2.setCustomMap(ImmutableMap.of(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY, DAILY));
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
-    assertEquals(MergeRollupTaskMetadata
-        .fromZNRecord(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME)).getWatermarkMap()
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap()
         .get(DAILY).longValue(), 345_600_000L);
     assertEquals(pinotTaskConfigs.size(), 0);
   }
@@ -544,7 +539,8 @@ public class MergeRollupTaskGeneratorTest {
     ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
     Map<String, Long> waterMarkMap = new TreeMap<>();
     waterMarkMap.put(DAILY, 86_400_000L);
-    when(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME))
+    when(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME))
         .thenReturn(new MergeRollupTaskMetadata(OFFLINE_TABLE_NAME, waterMarkMap).toZNRecord());
 
     Map<String, TaskState> taskStatesMap = new HashMap<>();
@@ -641,8 +637,8 @@ public class MergeRollupTaskGeneratorTest {
     MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
     generator.init(mockClusterInfoProvide);
     List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
-    assertEquals(MergeRollupTaskMetadata
-        .fromZNRecord(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME)).getWatermarkMap()
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap()
         .get(DAILY).longValue(), 86_400_000L);
     assertEquals(pinotTaskConfigs.size(), 1);
     Map<String, String> taskConfigsDaily1 = pinotTaskConfigs.get(0).getConfigs();
@@ -674,8 +670,8 @@ public class MergeRollupTaskGeneratorTest {
 
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
 
-    assertEquals(MergeRollupTaskMetadata
-        .fromZNRecord(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME)).getWatermarkMap()
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap()
         .get(DAILY).longValue(), 2_505_600_000L);
     assertEquals(pinotTaskConfigs.size(), 1);
     Map<String, String> taskConfigsDaily2 = pinotTaskConfigs.get(0).getConfigs();
@@ -707,11 +703,11 @@ public class MergeRollupTaskGeneratorTest {
 
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
 
-    assertEquals(MergeRollupTaskMetadata
-        .fromZNRecord(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME)).getWatermarkMap()
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap()
         .get(DAILY).longValue(), 2_592_000_000L);
-    assertEquals(MergeRollupTaskMetadata
-        .fromZNRecord(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME)).getWatermarkMap()
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap()
         .get(MONTHLY).longValue(), 0L);
     assertEquals(pinotTaskConfigs.size(), 2);
     Map<String, String> taskConfigsDaily3 = pinotTaskConfigs.get(0).getConfigs();
@@ -755,11 +751,11 @@ public class MergeRollupTaskGeneratorTest {
 
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
 
-    assertEquals(MergeRollupTaskMetadata
-        .fromZNRecord(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME)).getWatermarkMap()
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap()
         .get(DAILY).longValue(), 2_592_000_000L); // 30 days since epoch
-    assertEquals(MergeRollupTaskMetadata
-        .fromZNRecord(mockClusterInfoProvide.getMinionMergeRollupTaskZNRecord(OFFLINE_TABLE_NAME)).getWatermarkMap()
+    assertEquals(MergeRollupTaskMetadata.fromZNRecord(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, OFFLINE_TABLE_NAME)).getWatermarkMap()
         .get(MONTHLY).longValue(), 0L);
     assertEquals(pinotTaskConfigs.size(), 0);
   }
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
index 2584d7b..fc71b5b 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
@@ -134,8 +134,9 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(taskStatesMap);
     when(mockClusterInfoProvide.getTaskConfigs(taskName))
         .thenReturn(Lists.newArrayList(new PinotTaskConfig(RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigs)));
-    when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
-        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 100_000L));
+    when(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(RealtimeToOfflineSegmentsTask.TASK_TYPE, REALTIME_TABLE_NAME))
+        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 100_000L).toZNRecord());
     SegmentZKMetadata segmentZKMetadata =
         getSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 80_000_000, 90_000_000, TimeUnit.MILLISECONDS,
             null);
@@ -215,7 +216,8 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
   public void testGenerateTasksNoMinionMetadata() {
     ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
     when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
-    when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME)).thenReturn(null);
+    when(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(RealtimeToOfflineSegmentsTask.TASK_TYPE, REALTIME_TABLE_NAME)).thenReturn(null);
     SegmentZKMetadata segmentZKMetadata1 =
         getSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 1590048000000L, 1590134400000L,
             TimeUnit.MILLISECONDS, "download1"); // 21 May 2020 8am to 22 May 2020 8am UTC
@@ -269,8 +271,9 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
   public void testGenerateTasksWithMinionMetadata() {
     ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
     when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
-    when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
-        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L)); // 21 May 2020 UTC
+    when(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(RealtimeToOfflineSegmentsTask.TASK_TYPE, REALTIME_TABLE_NAME)).thenReturn(
+        new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L).toZNRecord()); // 21 May 2020 UTC
     SegmentZKMetadata segmentZKMetadata1 =
         getSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 1589972400000L, 1590048000000L,
             TimeUnit.MILLISECONDS, "download1"); // 05-20-2020T11:00:00 to 05-21-2020T08:00:00 UTC
@@ -298,16 +301,18 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), "1590105600000"); // 5-22-2020
 
     // No segments match
-    when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
-        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590490800000L)); // 26 May 2020 UTC
+    when(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(RealtimeToOfflineSegmentsTask.TASK_TYPE, REALTIME_TABLE_NAME)).thenReturn(
+        new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590490800000L).toZNRecord()); // 26 May 2020 UTC
     generator = new RealtimeToOfflineSegmentsTaskGenerator();
     generator.init(mockClusterInfoProvide);
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
     assertEquals(pinotTaskConfigs.size(), 0);
 
     // Some segments match
-    when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
-        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L)); // 21 May 2020 UTC
+    when(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(RealtimeToOfflineSegmentsTask.TASK_TYPE, REALTIME_TABLE_NAME)).thenReturn(
+        new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L).toZNRecord()); // 21 May 2020 UTC
     taskConfigsMap = new HashMap<>();
     Map<String, String> taskConfigs = new HashMap<>();
     taskConfigs.put(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, "2h");
@@ -361,8 +366,9 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
     when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
 
-    when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
-        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 100_000L));
+    when(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(RealtimeToOfflineSegmentsTask.TASK_TYPE, REALTIME_TABLE_NAME))
+        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 100_000L).toZNRecord());
     SegmentZKMetadata segmentZKMetadata1 =
         getSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 50_000, 150_000, TimeUnit.MILLISECONDS, null);
     SegmentZKMetadata segmentZKMetadata2 =
@@ -408,8 +414,9 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     long watermarkMs = now - TimeUnit.DAYS.toMillis(1);
     ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
     when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
-    when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
-        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, watermarkMs));
+    when(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(RealtimeToOfflineSegmentsTask.TASK_TYPE, REALTIME_TABLE_NAME))
+        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, watermarkMs).toZNRecord());
     SegmentZKMetadata segmentZKMetadata =
         getSegmentZKMetadata("testTable__0__0__12345", Status.DONE, watermarkMs - 100, watermarkMs + 100,
             TimeUnit.MILLISECONDS, null);
@@ -429,8 +436,9 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
 
     watermarkMs = now - TimeUnit.DAYS.toMillis(10);
-    when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
-        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, watermarkMs));
+    when(mockClusterInfoProvide
+        .getMinionTaskMetadataZNRecord(RealtimeToOfflineSegmentsTask.TASK_TYPE, REALTIME_TABLE_NAME))
+        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, watermarkMs).toZNRecord());
     segmentZKMetadata =
         getSegmentZKMetadata("testTable__0__0__12345", Status.DONE, watermarkMs - 100, watermarkMs + 100,
             TimeUnit.MILLISECONDS, null);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org